1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 package org.universAAL.ri.gateway.communicator.service.impl;
22
23 import java.io.IOException;
24 import java.io.InputStream;
25 import java.io.OutputStream;
26 import java.net.URL;
27 import java.util.ArrayList;
28 import java.util.Collection;
29 import java.util.Collections;
30 import java.util.HashMap;
31 import java.util.List;
32 import java.util.Map;
33 import java.util.UUID;
34 import java.util.concurrent.Callable;
35 import java.util.concurrent.ExecutionException;
36 import java.util.concurrent.Executor;
37 import java.util.concurrent.Executors;
38 import java.util.concurrent.FutureTask;
39 import java.util.concurrent.TimeUnit;
40 import java.util.concurrent.TimeoutException;
41
42 import org.bouncycastle.crypto.CryptoException;
43 import org.universAAL.middleware.context.ContextEvent;
44 import org.universAAL.middleware.context.ContextEventPattern;
45 import org.universAAL.middleware.service.ServiceCall;
46 import org.universAAL.middleware.service.ServiceResponse;
47 import org.universAAL.middleware.service.owls.profile.ServiceProfile;
48 import org.universAAL.middleware.ui.UIRequest;
49 import org.universAAL.middleware.ui.UIResponse;
50 import org.universAAL.ri.gateway.communicator.service.CommunicationException;
51 import org.universAAL.ri.gateway.communicator.service.CommunicationHandler;
52 import org.universAAL.ri.gateway.communicator.service.GatewayCommunicator;
53 import org.universAAL.ri.gateway.communicator.service.Message;
54 import org.universAAL.ri.gateway.communicator.service.ResponseCallback;
55 import org.universAAL.ri.gateway.eimanager.ExportManager;
56 import org.universAAL.ri.gateway.eimanager.ImportManager;
57 import org.universAAL.ri.gateway.eimanager.exception.InterruptExecutionException;
58 import org.universAAL.ri.gateway.eimanager.impl.BusMemberType;
59 import org.universAAL.ri.gateway.eimanager.impl.EIOperationManager;
60 import org.universAAL.ri.gateway.eimanager.impl.exporting.ProxyRegistration;
61 import org.universAAL.ri.gateway.eimanager.impl.importing.ImportRequest;
62
63
64
65
66
67
68
69
70
71
72 public class GatewayCommunicatorImpl implements GatewayCommunicator {
73
74
75
76
77 private static final long serialVersionUID = 7119632127833531787L;
78
79
80
81
82 private final Map<UUID, ResponseCallback> callbacks;
83
84
85
86
87
88 private volatile ImportManager importManager;
89
90
91
92
93
94 private volatile ExportManager exportManager;
95
96
97
98
99 private final Executor executor = Executors.newCachedThreadPool();
100
101 private List<GatewayAddress> remoteGateways;
102
103 private CommunicationHandler commHandler;
104
105
106
107
108
109
110
111
112 public GatewayCommunicatorImpl() throws Exception {
113 callbacks = Collections
114 .synchronizedMap(new HashMap<UUID, ResponseCallback>());
115 remoteGateways = Collections
116 .synchronizedList(new ArrayList<GatewayAddress>());
117 commHandler = new SocketCommunicationHandler(this);
118 }
119
120 public void addRemoteGateway(final GatewayAddress gwAddrToAdd) {
121 remoteGateways.add(gwAddrToAdd);
122 }
123
124 public void removeRemoteGateway(final GatewayAddress gwAddrToDelete) {
125 remoteGateways.remove(gwAddrToDelete);
126 }
127
128 public void addRemoteGateways(
129 final Collection<GatewayAddress> gwAddressesToAdd) {
130 remoteGateways.addAll(gwAddressesToAdd);
131 }
132
133 public void setManagers(final ImportManager importManager,
134 final ExportManager exportManager) {
135 this.importManager = importManager;
136 this.exportManager = exportManager;
137 }
138
139 private URL[] getRemoteURLs() {
140 URL[] uris = new URL[remoteGateways.size()];
141 for (int i = 0; i < remoteGateways.size(); i++) {
142 uris[i] = remoteGateways.get(i).getUrl();
143 }
144 return uris;
145 }
146
147
148
149
150
151
152
153
154
155
156
157 public void sendContextEvent(final Message message, final URL[] to) {
158 if (to == null || message == null) {
159 throw new IllegalArgumentException();
160 }
161 MessageWrapper wrap = new MessageWrapper(MessageType.Context, message,
162 "");
163 for (URL url : to) {
164 sendMessage(wrap, url);
165 }
166 }
167
168
169
170
171
172
173
174
175
176
177
178 public void sendUIRequest(final Message message, final URL[] to) {
179 if (to == null || message == null) {
180 throw new IllegalArgumentException();
181 }
182 MessageWrapper wrap = new MessageWrapper(MessageType.UI, message, "");
183 for (URL url : to) {
184 sendMessage(wrap, url);
185 }
186 }
187
188
189
190
191
192
193
194
195
196
197
198 public void sendUIResponse(final Message message, final URL[] to) {
199 if (to == null || message == null) {
200 throw new IllegalArgumentException();
201 }
202 MessageWrapper wrap = new MessageWrapper(MessageType.UIResponse,
203 message, "");
204 for (URL url : to) {
205 sendMessage(wrap, url);
206 }
207 }
208
209
210
211
212
213
214
215
216
217
218
219
220
221 public Message[] sendServiceRequest(final Message message, final URL[] to) {
222 if (to == null || message == null) {
223 throw new IllegalArgumentException();
224 }
225 Message[] returnedValues = new Message[to.length];
226 for (int i = 0; i < to.length; i++) {
227 MessageWrapper wrapReq = new MessageWrapper(
228 MessageType.ServiceRequest, message, "");
229 MessageWrapper wrapResp = sendMessage(wrapReq, to[i]);
230 returnedValues[i] = wrapResp.getMessage();
231 }
232 return returnedValues;
233 }
234
235 public Message sendServiceRequest(final Message message, final URL to) {
236 if (to == null || message == null) {
237 throw new IllegalArgumentException();
238 }
239 MessageWrapper wrapReq = new MessageWrapper(MessageType.ServiceRequest,
240 message, "");
241 MessageWrapper wrapResp = sendMessage(wrapReq, to);
242 return wrapResp.getMessage();
243 }
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260 public Message[] sendServiceRequest(final Message message, final URL[] to,
261 final long timeout) throws TimeoutException {
262 if (to == null || message == null || timeout < 0) {
263 throw new IllegalArgumentException();
264 }
265 if (timeout == 0) {
266 return sendServiceRequest(message, to);
267 }
268
269 Message[] returnedValues = new Message[to.length];
270 for (int i = 0; i < to.length; i++) {
271 final int index = i;
272 FutureTask<Message> task = new FutureTask<Message>(
273 new Callable<Message>() {
274 public Message call() throws Exception {
275 return sendServiceRequest(message, to[index]);
276 }
277 });
278 executor.execute(task);
279 try {
280 while (true) {
281 try {
282 returnedValues[i] = task.get(timeout,
283 TimeUnit.MILLISECONDS);
284 break;
285 } catch (InterruptedException e) {
286
287 continue;
288 }
289 }
290 } catch (ExecutionException ex) {
291 throw new CommunicationException(ex);
292 }
293 }
294 return returnedValues;
295 }
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311 public void sendServiceRequestAsync(final Message message,
312 final URL returnTo, final URL to, final ResponseCallback callback) {
313 if (to == null || message == null || returnTo == null
314 || callback == null) {
315 throw new IllegalArgumentException();
316 }
317
318
319 MessageWrapper wrapReq = new MessageWrapper(
320 MessageType.ServiceRequestAsync, message, returnTo, "");
321 callbacks.put(wrapReq.getId(), callback);
322 MessageWrapper wrapResp = sendMessage(wrapReq, to);
323 if (wrapResp != null) {
324
325 throw new CommunicationException("protocol failure");
326 }
327 }
328
329
330
331
332
333
334
335
336
337
338
339
340
341 private MessageWrapper sendMessage(final MessageWrapper w, final URL to) {
342 if (w == null || to == null) {
343 throw new IllegalArgumentException();
344 }
345
346 logInfo("sending message: %s to %s", w, to);
347 MessageWrapper resp = null;
348 try {
349 resp = commHandler.sendMessage(w, to);
350 } catch (IOException e) {
351 throw new CommunicationException(e);
352 } catch (ClassNotFoundException e) {
353 throw new CommunicationException(e);
354 } catch (CryptoException e) {
355 throw new CommunicationException(e);
356 }
357 return resp;
358 }
359
360
361
362
363
364
365
366
367
368 private static void logInfo(final String format, final Object... args) {
369 String callingMethod = Thread.currentThread().getStackTrace()[2]
370 .getMethodName();
371 System.out.format("[%s] %s%n", callingMethod,
372 String.format(format, args));
373
374
375
376 }
377
378 public Message[] sendImportRequest(final Message message, final URL[] to) {
379 if (to == null || message == null) {
380 throw new IllegalArgumentException();
381 }
382 Message[] resultsValue = new Message[to.length];
383 MessageWrapper wrap = new MessageWrapper(MessageType.ImportRequest,
384 message, "");
385 for (int i = 0; i < to.length; i++) {
386 MessageWrapper wrapper = sendMessage(wrap, to[i]);
387 if (wrapper == null){
388 throw new CommunicationException("Import from " + to[i].toString() + " resulted in null value MessageWrapper object. Is hash-key property the same in both spaces???");
389 }
390 resultsValue[i] = wrapper.getMessage();
391 }
392 return resultsValue;
393 }
394
395 public void sendImportRefresh(final Message message, final URL[] to) {
396 if (to == null || message == null) {
397 throw new IllegalArgumentException();
398 }
399 MessageWrapper wrap = new MessageWrapper(MessageType.ImportRefresh,
400 message, "");
401 for (URL url : to) {
402 sendMessage(wrap, url);
403 }
404 }
405
406 public void sendImportRemoval(final Message message, final URL[] to) {
407 if (to == null || message == null) {
408 throw new IllegalArgumentException();
409 }
410 MessageWrapper wrap = new MessageWrapper(MessageType.ImportRemoval,
411 message, "");
412 for (URL url : to) {
413 sendMessage(wrap, url);
414 }
415 }
416
417 public Message[] sendServiceRequest(final Message message) {
418 return this.sendServiceRequest(message, getRemoteURLs());
419 }
420
421 public Message[] sendServiceRequest(final Message message,
422 final long timeout) throws TimeoutException {
423 return this.sendServiceRequest(message, getRemoteURLs(), timeout);
424 }
425
426 public void sendContextEvent(final Message message) {
427 this.sendContextEvent(message, getRemoteURLs());
428 }
429
430 public void sendUIResponse(final Message message) {
431 this.sendUIResponse(message, getRemoteURLs());
432 }
433
434 public void sendUIRequest(final Message message) {
435 this.sendUIRequest(message, getRemoteURLs());
436 }
437
438 public Message sendImportRequest(final Message message) {
439 return this.sendImportRequest(message, getRemoteURLs())[0];
440 }
441
442 public void sendImportRefresh(final Message message) {
443 this.sendImportRefresh(message, getRemoteURLs());
444 }
445
446 public void sendImportRemoval(final Message message) {
447 this.sendImportRemoval(message, getRemoteURLs());
448 }
449
450 public void handleMessage(final InputStream in, final OutputStream out) {
451 MessageWrapper wrapOut = null;
452 MessageWrapper wrapIn = null;
453 try{
454 logInfo("handleMessage");
455 wrapIn = Serializer.unmarshalMessage(in);
456 logInfo("handleMessage: type: %s", wrapIn.getType());
457
458 switch (wrapIn.getType()) {
459 case ImportRequest:
460 ImportRequest request = Serializer.Instance.unmarshall(
461 ImportRequest.class, wrapIn.getMessage());
462
463 EIOperationManager.Type type = null;
464 switch (BusMemberType.valueOf(request.getMember())) {
465 case ServiceCaller:
466 type = EIOperationManager.Type.Service;
467 break;
468 case ContextSubscriber:
469 type = EIOperationManager.Type.Context;
470 break;
471 case UICaller:
472 type = EIOperationManager.Type.UI;
473 break;
474 }
475
476 try {
477 EIOperationManager.Instance.executeExportOperationChain(request, type);
478 } catch (InterruptExecutionException e) {
479 ProxyRegistration errorRegistration = new ProxyRegistration(e.getMessage());
480 wrapOut = new MessageWrapper(MessageType.ImportResponse,
481 Serializer.Instance.marshall(errorRegistration),
482 wrapIn.getId(), "");
483 logInfo("Sending ImportResponse with failed interceptor execution: %s", errorRegistration);
484 Serializer.sendMessageToStream(wrapOut, out);
485 return;
486 }
487
488 logInfo("Got ImportRequest: %s", request);
489 ProxyRegistration registration = this.exportManager
490 .registerProxies(request);
491
492
493 String[] serialized = null;
494
495 switch (BusMemberType.valueOf(request.getMember())) {
496 case ServiceCaller:
497 Map<String, List<ServiceProfile>> profiles = (Map<String, List<ServiceProfile>>) registration
498 .getReturnedValues();
499 if (profiles != null) {
500 logInfo("Profiles count: %d", profiles.values().size());
501 }
502
503 Map<String, List<String>> serializedMap = new HashMap<String, List<String>>();
504
505 for(String key: profiles.keySet()){
506 if (serializedMap.get(key) == null){
507 serializedMap.put(key, new ArrayList<String>());
508 }
509 for(ServiceProfile p : profiles.get(key)){
510 serializedMap.get(key).add((String) Serializer.Instance
511 .marshallObject(p).getContent());
512 }
513 }
514
515
516
517
518
519
520
521
522 registration.setReturnedValues(serializedMap);
523 wrapOut = new MessageWrapper(MessageType.ImportResponse,
524 Serializer.Instance.marshall(registration),
525 wrapIn.getId(), "");
526 logInfo("Sending ImportResponse: %s", registration);
527 Serializer.sendMessageToStream(wrapOut, out);
528 break;
529 case ContextSubscriber:
530 ContextEventPattern[] cpe = (ContextEventPattern[]) registration
531 .getReturnedValues();
532 serialized = new String[cpe.length];
533 System.out.println("Export sent:");
534 for (int i = 0; i < cpe.length; i++) {
535 serialized[i] = (String) Serializer.Instance
536 .marshallObject(cpe[i]).getContent();
537
538
539
540
541
542 }
543 if (cpe != null) {
544 logInfo("ContextEventPattern count: %d", cpe.length);
545 }
546
547
548
549 registration.setReturnedValues(new String[] {});
550
551
552 wrapOut = new MessageWrapper(MessageType.ImportResponse,
553 Serializer.Instance.marshall(registration),
554 wrapIn.getId(), "");
555 logInfo("Sending ImportResponse: %s", "");
556 Serializer.sendMessageToStream(wrapOut, out);
557 break;
558
559 case UICaller:
560 throw new Exception("Not yet implemented");
561 }
562 break;
563 case ImportRefresh:
564 this.importManager.refreshProxy(Serializer.Instance.unmarshall(
565 ProxyRegistration.class, wrapIn.getMessage()));
566 break;
567 case ImportRemoval:
568 this.exportManager.unregisterProxies(Serializer.Instance
569 .unmarshall(ImportRequest.class, wrapIn.getMessage()));
570 break;
571 case ServiceRequest:
572
573 ServiceResponse response = this.exportManager
574 .sendServiceRequest(wrapIn.getMessage()
575 .getRemoteProxyRegistrationId(),
576 Serializer.Instance.unmarshallObject(
577 ServiceCall.class,
578 wrapIn.getMessage()),wrapIn.getMessage()
579 .getRemoteMemberId());
580
581 wrapOut = new MessageWrapper(MessageType.ServiceResponseAsync,
582 Serializer.Instance.marshallObject(response),
583 wrapIn.getId(), "");
584
585 logInfo("sending back the response: %s", wrapOut);
586 Serializer.sendMessageToStream(wrapOut, out);
587 break;
588 case ServiceRequestAsync:
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616 break;
617 case UI:
618 this.exportManager.sendUIRequest(wrapIn.getSourceId(),
619 Serializer.Instance.unmarshallObject(UIRequest.class,
620 wrapIn.getMessage()));
621 logInfo("published ui request to the bus: %s", wrapIn);
622 break;
623 case ServiceResponseAsync:
624 ResponseCallback call = callbacks.get(wrapIn.getId());
625 if (call == null) {
626 throw new Exception("couldn't find callback");
627 }
628 call.collectResponse(wrapIn.getMessage());
629 break;
630 case Context:
631 ContextEvent remoteContextEvent = Serializer.Instance
632 .unmarshallObject(ContextEvent.class,
633 wrapIn.getMessage());
634 String targetId = wrapIn.getMessage()
635 .getRemoteProxyRegistrationId();
636 this.importManager.sendContextEvent(targetId,
637 remoteContextEvent);
638 break;
639
640 case UIResponse:
641
642 this.importManager.sendUIResponse(wrapIn.getSourceId(),
643 Serializer.Instance.unmarshallObject(UIResponse.class,
644 wrapIn.getMessage()));
645 logInfo("published ui request to the bus: %s", wrapIn);
646 break;
647 default:
648 throw new UnsupportedOperationException();
649 }
650 } catch (Exception ex) {
651 logInfo("ERROR: %s", ex);
652 ex.printStackTrace();
653 try{
654 wrapOut = new MessageWrapper(MessageType.Error,
655 Serializer.Instance.marshallObject(ex.getMessage()),
656 UUID.randomUUID(), "");
657 if (wrapIn != null){
658 sendMessage(wrapOut, wrapIn.getReturnTo());
659 }else{
660 Serializer.sendMessageToStream(wrapOut, out);
661 }
662 }catch(Exception e){
663
664 }
665
666 }
667 }
668
669 public void stop() {
670 commHandler.stop();
671 }
672
673 public void start() throws Exception {
674 commHandler.start();
675 }
676
677 }