1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 package org.universAAL.ri.gateway.eimanager.impl;
22
23 import java.io.IOException;
24 import java.util.ArrayList;
25 import java.util.HashMap;
26 import java.util.List;
27 import java.util.Map;
28 import java.util.UUID;
29 import java.util.concurrent.ArrayBlockingQueue;
30 import java.util.concurrent.BlockingQueue;
31
32 import org.universAAL.middleware.context.ContextEvent;
33 import org.universAAL.middleware.context.ContextEventPattern;
34 import org.universAAL.middleware.context.owl.ContextProvider;
35 import org.universAAL.middleware.context.owl.ContextProviderType;
36 import org.universAAL.middleware.service.owls.profile.ServiceProfile;
37 import org.universAAL.middleware.bus.member.BusMember;
38 import org.universAAL.middleware.ui.UIResponse;
39 import org.universAAL.ri.gateway.communicator.Activator;
40 import org.universAAL.ri.gateway.communicator.service.GatewayCommunicator;
41 import org.universAAL.ri.gateway.communicator.service.Message;
42 import org.universAAL.ri.gateway.communicator.service.RemoteSpacesManager;
43 import org.universAAL.ri.gateway.communicator.service.impl.Serializer;
44 import org.universAAL.ri.gateway.eimanager.ImportEntry;
45 import org.universAAL.ri.gateway.eimanager.ImportExecutor;
46 import org.universAAL.ri.gateway.eimanager.ImportManager;
47 import org.universAAL.ri.gateway.eimanager.exception.InterruptExecutionException;
48 import org.universAAL.ri.gateway.eimanager.impl.exporting.ProxyRegistration;
49 import org.universAAL.ri.gateway.eimanager.impl.importing.ImportProcessExecutor;
50 import org.universAAL.ri.gateway.eimanager.impl.importing.ImportRequest;
51 import org.universAAL.ri.gateway.eimanager.impl.importing.ImportedProxyManager;
52 import org.universAAL.ri.gateway.eimanager.impl.importing.InternalImportOperation;
53 import org.universAAL.ri.gateway.eimanager.impl.registry.EIRepoAccessManager;
54 import org.universAAL.ri.gateway.eimanager.impl.registry.IRegistryListener;
55 import org.universAAL.ri.gateway.eimanager.impl.registry.RegistryEntry;
56 import org.universAAL.ri.gateway.eimanager.impl.registry.RepoOperation;
57
58 public class ImportManagerImpl implements ImportManager, ImportExecutor,
59 IRegistryListener, RemoteSpacesManager {
60
61 private ImportProcessExecutor importExecutor;
62 private Thread importThread;
63
64 private ImportedProxyManager manager;
65
66 private Map<String, BlockingQueue<ImportEntry>> importingMock;
67
68 private BlockingQueue<InternalImportOperation> busMembersToImport;
69 private GatewayCommunicator communicator;
70
71 public ImportManagerImpl(final GatewayCommunicator communicator) {
72 this.communicator = communicator;
73 busMembersToImport = new ArrayBlockingQueue<InternalImportOperation>(5);
74
75 importingMock = new HashMap<String, BlockingQueue<ImportEntry>>();
76
77 importExecutor = new ImportProcessExecutor(busMembersToImport);
78 importThread = new Thread(importExecutor);
79 importThread.start();
80
81 manager = new ImportedProxyManager(communicator);
82
83 EIRepoAccessManager.Instance.addListener(this);
84
85 Activator.mc.getContainer().shareObject(Activator.mc, this,
86 new Object[] { RemoteSpacesManager.class.getName() });
87 }
88
89 public void shutdown() {
90 importThread.interrupt();
91 }
92
93
94
95
96
97
98
99
100
101
102
103 public void sendContextEvent(final String sourceId, final ContextEvent event) {
104 manager.realizeLocalContextEventPublishment(sourceId, event);
105 }
106
107 public void sendUIResponse(final String sourceId, final UIResponse response) {
108 manager.realizeLocalUIResponsePublishment(sourceId, response);
109 }
110
111 public void removeRemoteBusMember(final BusMember sourceMember,
112 final String targetMemberIdRegex) {
113 throw new RuntimeException("Not yet implemented");
114
115
116
117
118
119
120
121 }
122
123
124
125
126 public void memberAdded(final BusMember member) {
127
128 }
129
130 public void memberRemoved(final BusMember member) {
131
132 }
133
134 public void registryEntryAdded(final RegistryEntry entry) {
135 if (entry instanceof ImportEntry) {
136 InternalImportOperation op = (InternalImportOperation) ((ImportEntry) entry)
137 .getOperation();
138 System.out.println("Registering proxy for : "
139 + op.getType().toString());
140 manager.registerProxies(op);
141 importingMock.get(op.getUuid()).add((ImportEntry) entry);
142 System.out.println("Proxy registered");
143 }
144 }
145
146 public void registryEntryRemoved(final RegistryEntry entry) {
147 if (entry instanceof ImportEntry) {
148 ImportEntry importEntry = (ImportEntry) entry;
149 InternalImportOperation op = (InternalImportOperation) importEntry
150 .getOperation();
151 manager.unregisterProxies(op);
152 importingMock.get(op.getUuid()).add(importEntry);
153 }
154 }
155
156 public ImportEntry importRemoteService(final BusMember sourceMember,
157 final String serviceType, final String serverNamespace)
158 throws IOException, ClassNotFoundException {
159 String uuid = UUID.randomUUID().toString();
160 importingMock.put(uuid, new ArrayBlockingQueue<ImportEntry>(1));
161 System.out.println("Importing RemoteService");
162 internalImportRemoteService(uuid, sourceMember, serviceType,
163 serverNamespace);
164 try {
165 System.out.println("Waiting for proxy registration");
166 ImportEntry entry = importingMock.get(uuid).take();
167 System.out.println("Continuing");
168 return entry;
169 } catch (InterruptedException e) {
170 e.printStackTrace();
171 return null;
172 }
173 }
174
175 private boolean performInterceptorChainExecution(String uuid, ImportRequest request, EIOperationManager.Type type){
176 try {
177 EIOperationManager.Instance.executeImportOperationChain(request, type);
178 } catch (InterruptExecutionException e) {
179 importingMock.get(uuid).add(new ImportEntry("", null, "", null, false, e.getMessage()));
180 return false;
181 }
182 return true;
183 }
184
185 public void internalImportRemoteService(final String uuid,
186 final BusMember sourceMember, final String targetServiceType,
187 final String serverNamespace) throws IOException,
188 ClassNotFoundException {
189 InternalImportOperation internal = new InternalImportOperation(
190 sourceMember, RepoOperation.Publish, uuid);
191
192 ImportRequest importRequest = new ImportRequest(internal.getType(),
193 uuid);
194 importRequest.setServiceType(targetServiceType);
195 importRequest.setServerNamespace(serverNamespace);
196
197 if (!performInterceptorChainExecution(uuid,importRequest, EIOperationManager.Type.Service)){
198 return;
199 }
200
201 System.out.println("Sending ImportRequest internalImportRemoteService");
202 Message m = communicator.sendImportRequest(Serializer.Instance
203 .marshall(importRequest));
204 String registeredRemoteProxyId = ((ProxyRegistration) m.getContent())
205 .getId();
206 Map<String, List<String>> serializedProfilesMap = (Map<String, List<String>> ) ((ProxyRegistration) m
207 .getContent()).getReturnedValues();
208
209
210
211 System.out.println("Got ImportResponse. ServiceProfiles count: "
212 + serializedProfilesMap.values().size());
213
214 Map<String, List<ServiceProfile>> profilesMap = new HashMap<String, List<ServiceProfile>>();
215
216 for(String key : serializedProfilesMap.keySet()){
217 if (profilesMap.get(key) == null){
218 profilesMap.put(key, new ArrayList<ServiceProfile>());
219 }
220 for(String serializedP : serializedProfilesMap.get(key)){
221 profilesMap.get(key).add( Serializer.Instance.unmarshallObject(
222 ServiceProfile.class, serializedP,
223 Activator.class.getClassLoader()));
224 }
225 }
226
227
228
229
230
231
232
233 internal.setRealizedServices(profilesMap);
234 internal.setRemoteRegisteredProxyId(registeredRemoteProxyId);
235 busMembersToImport.add(internal);
236 }
237
238 public ImportEntry importRemoteContextEvents(final BusMember sourceMember,
239 final ContextEventPattern[] cpe) throws IOException,
240 ClassNotFoundException {
241 String uuid = UUID.randomUUID().toString();
242 importingMock.put(uuid, new ArrayBlockingQueue<ImportEntry>(1));
243 System.out.println("Importing RemoteContextEvents");
244 internalImportRemoteContextEvents(uuid, sourceMember, cpe);
245 try {
246 System.out.println("Waiting for proxy registration");
247 importingMock.get(uuid).take();
248 ImportEntry entry = importingMock.get(uuid).take();
249 System.out.println("Continuing");
250 return entry;
251 } catch (InterruptedException e) {
252 e.printStackTrace();
253 return null;
254 }
255 }
256
257 private void internalImportRemoteContextEvents(final String uuid,
258 final BusMember sourceMember, final ContextEventPattern[] cpe)
259 throws IOException, ClassNotFoundException {
260 InternalImportOperation internal = new InternalImportOperation(
261 sourceMember, RepoOperation.Publish, uuid);
262
263 ImportRequest importRequest = new ImportRequest(internal.getType(),
264 uuid);
265 String[] serializedCpe = new String[cpe.length];
266 String[] subjectURIS = new String[cpe.length];
267 System.out.println("Import sent:");
268 for (int i = 0; i < cpe.length; i++) {
269 serializedCpe[i] = (String) Serializer.Instance.marshallObject(
270 cpe[i]).getContent();
271 subjectURIS[i] = cpe[i].getIndices().getSubjectTypes()[i];
272 System.out.println(serializedCpe[i]);
273 }
274 importRequest.setCpe(serializedCpe);
275 importRequest.setSubjectURIs(subjectURIS);
276 if (!performInterceptorChainExecution(uuid,importRequest, EIOperationManager.Type.Context)){
277 return;
278 }
279
280 System.out
281 .println("Sending ImportRequest internalImportRemoteContextEvents");
282 Message m = communicator.sendImportRequest(Serializer.Instance
283 .marshall(importRequest));
284 String registeredRemoteProxyId = ((ProxyRegistration) m.getContent())
285 .getId();
286 String[] serializedCEP = (String[]) ((ProxyRegistration) m.getContent())
287 .getReturnedValues();
288 ContextEventPattern[] patterns = new ContextEventPattern[serializedCEP.length];
289 System.out.println("Import received:");
290 for (int i = 0; i < serializedCEP.length; i++) {
291 System.out.println(serializedCEP[i]);
292 patterns[i] = Serializer.Instance.unmarshallObject(
293 ContextEventPattern.class, serializedCEP[i],
294 Activator.class.getClassLoader());
295 }
296
297 ContextProvider info = new ContextProvider(ContextProvider.MY_URI);
298 info.setType(ContextProviderType.controller);
299 info.setProvidedEvents(patterns);
300
301 System.out.println("Got ImportResponse. ContextProvider events count: "
302 + ((info.getProvidedEvents() == null) ? " is null " : info
303 .getProvidedEvents().length));
304
305 internal.setContextProvider(info);
306 internal.setRemoteRegisteredProxyId(registeredRemoteProxyId);
307 busMembersToImport.add(internal);
308 }
309
310 public void refreshProxy(final ProxyRegistration proxyRegistration) throws IOException, ClassNotFoundException {
311 manager.refreshProxy(proxyRegistration);
312 }
313
314 public boolean unimportRemoteService(final ImportEntry importEntry)
315 throws IOException, ClassNotFoundException {
316 InternalImportOperation operation = (InternalImportOperation) importEntry
317 .getOperation();
318 String uuid = operation.getUuid();
319 importingMock.put(uuid, new ArrayBlockingQueue<ImportEntry>(1));
320 System.out.println("Unimporting RemoteService");
321
322 operation.setOp(RepoOperation.Purge);
323
324 ImportRequest importRequest = new ImportRequest(operation.getType(),
325 uuid);
326 importRequest.setId(operation.getRemoteRegisteredProxyId());
327
328 System.out.println("Sending ImportRequest internalImportRemoteService");
329 communicator.sendImportRemoval(Serializer.Instance
330 .marshall(importRequest));
331 busMembersToImport.add(operation);
332 try {
333 System.out.println("Waiting for proxy unregistration");
334 importingMock.get(uuid).take();
335 System.out.println("Continuing");
336 } catch (InterruptedException e) {
337 e.printStackTrace();
338 return false;
339 }
340 return true;
341 }
342
343 public ImportEntry importRemoteUI(final BusMember sourceMember,
344 final String uiType)
345 throws IOException, ClassNotFoundException {
346 String uuid = UUID.randomUUID().toString();
347 importingMock.put(uuid, new ArrayBlockingQueue<ImportEntry>(1));
348 System.out.println("Importing RemoteUI");
349 internalImportRemoteUI(uuid, sourceMember, uiType);
350 try {
351 System.out.println("Waiting for proxy registration");
352 ImportEntry entry = importingMock.get(uuid).take();
353 System.out.println("Continuing");
354 return entry;
355 } catch (InterruptedException e) {
356 e.printStackTrace();
357 return null;
358 }
359 }
360
361 public void internalImportRemoteUI(final String uuid,
362 final BusMember sourceMember, final String targetUIType) throws IOException,
363 ClassNotFoundException {
364 InternalImportOperation internal = new InternalImportOperation(
365 sourceMember, RepoOperation.Publish, uuid);
366
367 ImportRequest importRequest = new ImportRequest(internal.getType(),
368 uuid);
369 importRequest.setModalityRegex(targetUIType);
370
371 if (!performInterceptorChainExecution(uuid,importRequest, EIOperationManager.Type.Service)){
372 return;
373 }
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397 }
398
399 }