View Javadoc

1   /*
2   Copyright 2011-2014 AGH-UST, http://www.agh.edu.pl
3   Faculty of Computer Science, Electronics and Telecommunications
4   Department of Computer Science 
5   
6   See the NOTICE file distributed with this work for additional
7   information regarding copyright ownership
8   
9   Licensed under the Apache License, Version 2.0 (the "License");
10  you may not use this file except in compliance with the License.
11  You may obtain a copy of the License at
12  
13    http://www.apache.org/licenses/LICENSE-2.0
14  
15  Unless required by applicable law or agreed to in writing, software
16  distributed under the License is distributed on an "AS IS" BASIS,
17  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18  See the License for the specific language governing permissions and
19  limitations under the License.
20  */
21  package org.universAAL.ri.gateway.eimanager.impl;
22  
23  import java.io.IOException;
24  import java.util.ArrayList;
25  import java.util.HashMap;
26  import java.util.List;
27  import java.util.Map;
28  import java.util.UUID;
29  import java.util.concurrent.ArrayBlockingQueue;
30  import java.util.concurrent.BlockingQueue;
31  
32  import org.universAAL.middleware.context.ContextEvent;
33  import org.universAAL.middleware.context.ContextEventPattern;
34  import org.universAAL.middleware.context.owl.ContextProvider;
35  import org.universAAL.middleware.context.owl.ContextProviderType;
36  import org.universAAL.middleware.service.owls.profile.ServiceProfile;
37  import org.universAAL.middleware.bus.member.BusMember;
38  import org.universAAL.middleware.ui.UIResponse;
39  import org.universAAL.ri.gateway.communicator.Activator;
40  import org.universAAL.ri.gateway.communicator.service.GatewayCommunicator;
41  import org.universAAL.ri.gateway.communicator.service.Message;
42  import org.universAAL.ri.gateway.communicator.service.RemoteSpacesManager;
43  import org.universAAL.ri.gateway.communicator.service.impl.Serializer;
44  import org.universAAL.ri.gateway.eimanager.ImportEntry;
45  import org.universAAL.ri.gateway.eimanager.ImportExecutor;
46  import org.universAAL.ri.gateway.eimanager.ImportManager;
47  import org.universAAL.ri.gateway.eimanager.exception.InterruptExecutionException;
48  import org.universAAL.ri.gateway.eimanager.impl.exporting.ProxyRegistration;
49  import org.universAAL.ri.gateway.eimanager.impl.importing.ImportProcessExecutor;
50  import org.universAAL.ri.gateway.eimanager.impl.importing.ImportRequest;
51  import org.universAAL.ri.gateway.eimanager.impl.importing.ImportedProxyManager;
52  import org.universAAL.ri.gateway.eimanager.impl.importing.InternalImportOperation;
53  import org.universAAL.ri.gateway.eimanager.impl.registry.EIRepoAccessManager;
54  import org.universAAL.ri.gateway.eimanager.impl.registry.IRegistryListener;
55  import org.universAAL.ri.gateway.eimanager.impl.registry.RegistryEntry;
56  import org.universAAL.ri.gateway.eimanager.impl.registry.RepoOperation;
57  
58  public class ImportManagerImpl implements ImportManager, ImportExecutor,
59  	IRegistryListener, RemoteSpacesManager {
60  
61      private ImportProcessExecutor importExecutor;
62      private Thread importThread;
63  
64      private ImportedProxyManager manager;
65  
66      private Map<String, BlockingQueue<ImportEntry>> importingMock;
67  
68      private BlockingQueue<InternalImportOperation> busMembersToImport;
69      private GatewayCommunicator communicator;
70  
71      public ImportManagerImpl(final GatewayCommunicator communicator) {
72  	this.communicator = communicator;
73  	busMembersToImport = new ArrayBlockingQueue<InternalImportOperation>(5);
74  
75  	importingMock = new HashMap<String, BlockingQueue<ImportEntry>>();
76  
77  	importExecutor = new ImportProcessExecutor(busMembersToImport);
78  	importThread = new Thread(importExecutor);
79  	importThread.start();
80  
81  	manager = new ImportedProxyManager(communicator);
82  
83  	EIRepoAccessManager.Instance.addListener(this);
84  
85  	Activator.mc.getContainer().shareObject(Activator.mc, this,
86  		new Object[] { RemoteSpacesManager.class.getName() });
87      }
88  
89      public void shutdown() {
90  	importThread.interrupt();
91      }
92  
93      /*
94       * public void addExportPremise(ImportPremise premise) {
95       * importPremises.add(premise); } public void
96       * removeExportPremise(ImportPremise premise) {
97       * importPremises.remove(premise); }
98       */
99  
100     /*
101      * delegate methods for manager of lower size
102      */
103     public void sendContextEvent(final String sourceId, final ContextEvent event) {
104 	manager.realizeLocalContextEventPublishment(sourceId, event);
105     }
106 
107     public void sendUIResponse(final String sourceId, final UIResponse response) {
108 	manager.realizeLocalUIResponsePublishment(sourceId, response);
109     }
110 
111     public void removeRemoteBusMember(final BusMember sourceMember,
112 	    final String targetMemberIdRegex) {
113 	throw new RuntimeException("Not yet implemented");
114 
115 	/*
116 	 * busMembersToImport.add(new InternalImportOperation(sourceMember,
117 	 * RepoOperation.Purge, targetMemberIdRegex));
118 	 * 
119 	 * communicator.sendImportRemoval(null, new URL[]{});
120 	 */
121     }
122 
123     /*
124      * Methods for integrating with bus tracking capabilities
125      */
126     public void memberAdded(final BusMember member) {
127 	// TODO implement in next release
128     }
129 
130     public void memberRemoved(final BusMember member) {
131 	// TODO implement in next release
132     }
133 
134     public void registryEntryAdded(final RegistryEntry entry) {
135 	if (entry instanceof ImportEntry) {
136 	    InternalImportOperation op = (InternalImportOperation) ((ImportEntry) entry)
137 		    .getOperation();
138 	    System.out.println("Registering proxy for : "
139 		    + op.getType().toString());
140 	    manager.registerProxies(op);
141 	    importingMock.get(op.getUuid()).add((ImportEntry) entry);
142 	    System.out.println("Proxy registered");
143 	}
144     }
145 
146     public void registryEntryRemoved(final RegistryEntry entry) {
147 	if (entry instanceof ImportEntry) {
148 	    ImportEntry importEntry = (ImportEntry) entry;
149 	    InternalImportOperation op = (InternalImportOperation) importEntry
150 		    .getOperation();
151 	    manager.unregisterProxies(op);
152 	    importingMock.get(op.getUuid()).add(importEntry);
153 	}
154     }
155 
156     public ImportEntry importRemoteService(final BusMember sourceMember,
157 	    final String serviceType, final String serverNamespace)
158 	    throws IOException, ClassNotFoundException {
159 	String uuid = UUID.randomUUID().toString();
160 	importingMock.put(uuid, new ArrayBlockingQueue<ImportEntry>(1));
161 	System.out.println("Importing RemoteService");
162 	internalImportRemoteService(uuid, sourceMember, serviceType,
163 		serverNamespace);
164 	try {
165 	    System.out.println("Waiting for proxy registration");
166 	    ImportEntry entry = importingMock.get(uuid).take();
167 	    System.out.println("Continuing");
168 	    return entry;
169 	} catch (InterruptedException e) {
170 	    e.printStackTrace();
171 	    return null;
172 	}
173     }
174 
175     private boolean performInterceptorChainExecution(String uuid, ImportRequest request, EIOperationManager.Type type){
176     		try {
177 				EIOperationManager.Instance.executeImportOperationChain(request, type);
178 			} catch (InterruptExecutionException e) {
179 				importingMock.get(uuid).add(new ImportEntry("", null, "", null, false, e.getMessage()));
180 				return false;
181 			}
182     		return true;
183     }
184     
185     public void internalImportRemoteService(final String uuid,
186 	    final BusMember sourceMember, final String targetServiceType,
187 	    final String serverNamespace) throws IOException,
188 	    ClassNotFoundException {
189 	InternalImportOperation internal = new InternalImportOperation(
190 		sourceMember, RepoOperation.Publish, uuid);
191 
192 	ImportRequest importRequest = new ImportRequest(internal.getType(),
193 		uuid);
194 	importRequest.setServiceType(targetServiceType);
195 	importRequest.setServerNamespace(serverNamespace);
196 
197 	if (!performInterceptorChainExecution(uuid,importRequest, EIOperationManager.Type.Service)){
198 		return;
199 	}
200 	
201 	System.out.println("Sending ImportRequest internalImportRemoteService");
202 	Message m = communicator.sendImportRequest(Serializer.Instance
203 		.marshall(importRequest));
204 	String registeredRemoteProxyId = ((ProxyRegistration) m.getContent())
205 		.getId();
206 	Map<String, List<String>> serializedProfilesMap = (Map<String, List<String>> ) ((ProxyRegistration) m
207 			.getContent()).getReturnedValues();
208 	//String[] serializedProfiles = (String[]) ((ProxyRegistration) m
209 	//	.getContent()).getReturnedValues();
210 
211 	System.out.println("Got ImportResponse. ServiceProfiles count: "
212 		+ serializedProfilesMap.values().size());
213 	
214 	Map<String, List<ServiceProfile>> profilesMap = new HashMap<String, List<ServiceProfile>>();
215 	
216 	for(String key : serializedProfilesMap.keySet()){
217 		if (profilesMap.get(key) == null){
218 			profilesMap.put(key, new ArrayList<ServiceProfile>());
219 		}
220 		for(String serializedP : serializedProfilesMap.get(key)){
221 			profilesMap.get(key).add( Serializer.Instance.unmarshallObject(
222 		    ServiceProfile.class, serializedP,
223 		    Activator.class.getClassLoader()));
224 		}
225 	}
226 	/*
227 	for (int i = 0; i < serializedProfiles.length; i++) {
228 	    profiles[i] = Serializer.Instance.unmarshallObject(
229 		    ServiceProfile.class, serializedProfiles[i],
230 		    Activator.class.getClassLoader());
231 	}
232 	*/
233 	internal.setRealizedServices(profilesMap);
234 	internal.setRemoteRegisteredProxyId(registeredRemoteProxyId);
235 	busMembersToImport.add(internal);
236     }
237 
238     public ImportEntry importRemoteContextEvents(final BusMember sourceMember,
239 	    final ContextEventPattern[] cpe) throws IOException,
240 	    ClassNotFoundException {
241 	String uuid = UUID.randomUUID().toString();
242 	importingMock.put(uuid, new ArrayBlockingQueue<ImportEntry>(1));
243 	System.out.println("Importing RemoteContextEvents");
244 	internalImportRemoteContextEvents(uuid, sourceMember, cpe);
245 	try {
246 	    System.out.println("Waiting for proxy registration");
247 	    importingMock.get(uuid).take();
248 	    ImportEntry entry = importingMock.get(uuid).take();
249 	    System.out.println("Continuing");
250 	    return entry;
251 	} catch (InterruptedException e) {
252 		e.printStackTrace();
253 	    return null;
254 	}
255     }
256 
257     private void internalImportRemoteContextEvents(final String uuid,
258 	    final BusMember sourceMember, final ContextEventPattern[] cpe)
259 	    throws IOException, ClassNotFoundException {
260 	InternalImportOperation internal = new InternalImportOperation(
261 		sourceMember, RepoOperation.Publish, uuid);
262 	
263 	ImportRequest importRequest = new ImportRequest(internal.getType(),
264 		uuid);
265 	String[] serializedCpe = new String[cpe.length];
266 	String[] subjectURIS = new String[cpe.length];
267 	System.out.println("Import sent:");
268 	for (int i = 0; i < cpe.length; i++) {
269 	    serializedCpe[i] = (String) Serializer.Instance.marshallObject(
270 		    cpe[i]).getContent();
271 	    subjectURIS[i] = cpe[i].getIndices().getSubjectTypes()[i];
272 	    System.out.println(serializedCpe[i]);
273 	}
274 	importRequest.setCpe(serializedCpe);
275 	importRequest.setSubjectURIs(subjectURIS);
276 	if (!performInterceptorChainExecution(uuid,importRequest, EIOperationManager.Type.Context)){
277 		return;
278 	}
279 	
280 	System.out
281 		.println("Sending ImportRequest internalImportRemoteContextEvents");
282 	Message m = communicator.sendImportRequest(Serializer.Instance
283 		.marshall(importRequest));
284 	String registeredRemoteProxyId = ((ProxyRegistration) m.getContent())
285 		.getId();
286 	String[] serializedCEP = (String[]) ((ProxyRegistration) m.getContent())
287 		.getReturnedValues();
288 	ContextEventPattern[] patterns = new ContextEventPattern[serializedCEP.length];
289 	System.out.println("Import received:");
290 	for (int i = 0; i < serializedCEP.length; i++) {
291 	    System.out.println(serializedCEP[i]);
292 	    patterns[i] = Serializer.Instance.unmarshallObject(
293 		    ContextEventPattern.class, serializedCEP[i],
294 		    Activator.class.getClassLoader());
295 	}
296 
297 	ContextProvider info = new ContextProvider(ContextProvider.MY_URI);
298 	info.setType(ContextProviderType.controller);
299 	info.setProvidedEvents(patterns);
300 
301 	System.out.println("Got ImportResponse. ContextProvider events count: "
302 		+ ((info.getProvidedEvents() == null) ? " is null " : info
303 			.getProvidedEvents().length));
304 
305 	internal.setContextProvider(info);
306 	internal.setRemoteRegisteredProxyId(registeredRemoteProxyId);
307 	busMembersToImport.add(internal);
308     }
309 
310     public void refreshProxy(final ProxyRegistration proxyRegistration) throws IOException, ClassNotFoundException {
311     	manager.refreshProxy(proxyRegistration);
312     }
313 
314     public boolean unimportRemoteService(final ImportEntry importEntry)
315 	    throws IOException, ClassNotFoundException {
316 	InternalImportOperation operation = (InternalImportOperation) importEntry
317 		.getOperation();
318 	String uuid = operation.getUuid();
319 	importingMock.put(uuid, new ArrayBlockingQueue<ImportEntry>(1));
320 	System.out.println("Unimporting RemoteService");
321 
322 	operation.setOp(RepoOperation.Purge);
323 
324 	ImportRequest importRequest = new ImportRequest(operation.getType(),
325 		uuid);
326 	importRequest.setId(operation.getRemoteRegisteredProxyId());
327 
328 	System.out.println("Sending ImportRequest internalImportRemoteService");
329 	communicator.sendImportRemoval(Serializer.Instance
330 		.marshall(importRequest));
331 	busMembersToImport.add(operation);
332 	try {
333 	    System.out.println("Waiting for proxy unregistration");
334 	    importingMock.get(uuid).take();
335 	    System.out.println("Continuing");
336 	} catch (InterruptedException e) {
337 	    e.printStackTrace();
338 	    return false;
339 	}
340 	return true;
341     }
342 
343     public ImportEntry importRemoteUI(final BusMember sourceMember,
344     	    final String uiType)
345     	    throws IOException, ClassNotFoundException {
346     	String uuid = UUID.randomUUID().toString();
347     	importingMock.put(uuid, new ArrayBlockingQueue<ImportEntry>(1));
348     	System.out.println("Importing RemoteUI");
349     	internalImportRemoteUI(uuid, sourceMember, uiType);
350     	try {
351     	    System.out.println("Waiting for proxy registration");
352     	    ImportEntry entry = importingMock.get(uuid).take();
353     	    System.out.println("Continuing");
354     	    return entry;
355     	} catch (InterruptedException e) {
356     	    e.printStackTrace();
357     	    return null;
358     	}
359         }
360     
361     public void internalImportRemoteUI(final String uuid,
362     	    final BusMember sourceMember, final String targetUIType) throws IOException,
363     	    ClassNotFoundException {
364     	InternalImportOperation internal = new InternalImportOperation(
365     		sourceMember, RepoOperation.Publish, uuid);
366 
367     	ImportRequest importRequest = new ImportRequest(internal.getType(),
368     		uuid);
369     	importRequest.setModalityRegex(targetUIType);
370 
371     	if (!performInterceptorChainExecution(uuid,importRequest, EIOperationManager.Type.Service)){
372     		return;
373     	}
374     	//TODO
375     	/*
376     	System.out.println("Sending ImportRequest internalImportRemoteService");
377     	Message m = communicator.sendImportRequest(Serializer.Instance
378     		.marshall(importRequest));
379     	String registeredRemoteProxyId = ((ProxyRegistration) m.getContent())
380     		.getId();
381     	String[] serializedProfiles = (String[]) ((ProxyRegistration) m
382     		.getContent()).getReturnedValues();
383 
384     	System.out.println("Got ImportResponse. ServiceProfiles count: "
385     		+ serializedProfiles.length);
386     	ServiceProfile[] profiles = new ServiceProfile[serializedProfiles.length];
387     	for (int i = 0; i < serializedProfiles.length; i++) {
388     	    profiles[i] = Serializer.Instance.unmarshallObject(
389     		    ServiceProfile.class, serializedProfiles[i],
390     		    Activator.class.getClassLoader());
391     	}
392 
393     	internal.setRealizedServices(profiles);
394     	internal.setRemoteRegisteredProxyId(registeredRemoteProxyId);
395     	busMembersToImport.add(internal);
396     */
397         }
398     
399 }