1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 package org.universAAL.ri.gateway.communicator.service.impl;
22
23 import java.io.EOFException;
24 import java.io.IOException;
25 import java.io.InputStream;
26 import java.io.OutputStream;
27 import java.net.ServerSocket;
28 import java.net.Socket;
29 import java.net.URL;
30 import java.util.concurrent.Executor;
31 import java.util.concurrent.Executors;
32
33 import org.bouncycastle.crypto.CryptoException;
34 import org.universAAL.middleware.container.utils.LogUtils;
35 import org.universAAL.ri.gateway.communicator.Activator;
36 import org.universAAL.ri.gateway.communicator.service.CommunicationHandler;
37 import org.universAAL.ri.gateway.communicator.service.GatewayCommunicator;
38
39 public class SocketCommunicationHandler implements CommunicationHandler {
40
41 private static final int NUM_THREADS = 1;
42 private static int PORT;
43 private GatewayCommunicator communicator;
44 private Executor executor;
45 private ServerSocket server;
46 private Thread serverThread;
47
48 public SocketCommunicationHandler(final GatewayCommunicator communicator) {
49 this.communicator = communicator;
50
51 final String localPort = CommunicatorStarter.properties
52 .getProperty(GatewayCommunicator.LOCAL_SOCKET_PORT);
53 if (localPort == null) {
54 throw new RuntimeException("Local socket port is not "
55 + "specified during middleware startup in '"
56 + GatewayCommunicator.LOCAL_SOCKET_PORT + "' property.");
57 }
58
59 final String hashKey = CommunicatorStarter.properties
60 .getProperty(GatewayCommunicator.HASH_KEY);
61
62 SecurityUtils.Instance.initialize(hashKey);
63
64 PORT = Integer.valueOf(localPort);
65
66 this.executor = Executors.newFixedThreadPool(NUM_THREADS);
67 log("Created " + SocketCommunicationHandler.class.getName());
68 }
69
70 private void log(String msg) {
71 LogUtils.logInfo(Activator.mc, SocketCommunicationHandler.class, "log", msg);
72 }
73
74 public void start() throws IOException {
75 log("Starting TCP server on port " + PORT);
76 server = new ServerSocket(PORT);
77 serverThread = new Thread(new Runnable() {
78 public void run() {
79 log("TCP server started on port " + PORT);
80 Thread.currentThread().setName("Space Gateway :: Server");
81 while (!(Thread.currentThread().isInterrupted())) {
82 try {
83 final Socket socket = server.accept();
84 log("Got request ... processing ...");
85 executor.execute(new Handler(socket));
86 } catch (IOException e) {
87
88 e.printStackTrace();
89 }
90 }
91
92 }
93 });
94 serverThread.start();
95 }
96
97 private class Handler implements Runnable {
98
99 private Socket socket;
100
101 public Handler(final Socket socket) {
102 this.socket = socket;
103 }
104
105 public void run() {
106 Thread.currentThread().setName("Space Gateway :: ClientHandler");
107 try {
108 communicator.handleMessage(socket.getInputStream(),
109 socket.getOutputStream());
110 } catch (Exception e) {
111 e.printStackTrace();
112 } finally {
113 if (socket != null) {
114 try {
115 socket.close();
116 } catch (IOException e) {
117
118 e.printStackTrace();
119 }
120 }
121 }
122 }
123 }
124
125 public MessageWrapper sendMessage(final MessageWrapper toSend, final URL target)
126 throws IOException, ClassNotFoundException, CryptoException {
127 MessageWrapper resp = null;
128 Socket socket = new Socket(target.getHost(), target.getPort());
129 try {
130 InputStream is = socket.getInputStream();
131 OutputStream os = socket.getOutputStream();
132 Serializer.sendMessageToStream(toSend, os);
133
134 if (!toSend.getType().equals(MessageType.Context) && !toSend.getType().equals(MessageType.UIResponse)){
135 resp = Serializer.unmarshalMessage(is);
136 }
137 } catch (EOFException ex) {
138
139 return null;
140 } finally {
141 if (socket != null && !socket.isClosed()) {
142 socket.close();
143 }
144 }
145 return resp;
146 }
147
148 public void stop() {
149 try {
150 server.close();
151 } catch (IOException e) {
152
153 e.printStackTrace();
154 }
155 serverThread.interrupt();
156 }
157 }