alien11689 commented on code in PR #58:
URL: https://github.com/apache/aries-rsa/pull/58#discussion_r3046835556


##########
provider/tcp/src/main/java/org/apache/aries/rsa/provider/tcp/TcpInvocationHandler.java:
##########
@@ -46,22 +49,105 @@
  * which sends the details of the method invocations
  * over a TCP connection, to be executed by the remote service.
  */
-public class TcpInvocationHandler implements InvocationHandler {
+public class TcpInvocationHandler implements InvocationHandler, Closeable {
+
+    private static class Connection {
+        Socket socket;
+        BasicObjectOutputStream out;
+        BasicObjectInputStream in;
+
+        public Connection(Socket socket) throws IOException {
+            this.socket = socket;
+            out = new BasicObjectOutputStream(socket.getOutputStream());
+            in = new BasicObjectInputStream(socket.getInputStream());
+        }
+    }
+
     private String host;
     private int port;
     private String endpointId;
     private ClassLoader cl;
     private int timeoutMillis;
 
+    private final Deque<Connection> pool = new ArrayDeque<>();
+    private int acquired; // counts connections currently in use (not in pool)
+    private boolean closed;
+
     public TcpInvocationHandler(ClassLoader cl, String host, int port, String 
endpointId, int timeoutMillis)
-        throws UnknownHostException, IOException {
+            throws UnknownHostException, IOException {
         this.cl = cl;
         this.host = host;
         this.port = port;
         this.endpointId = endpointId;
         this.timeoutMillis = timeoutMillis;
     }
 
+    private Connection acquireConnection() throws IOException {
+        Connection conn;
+        synchronized (pool) {
+            acquired++; // must be first
+            if (closed) {

Review Comment:
   this check may be first?



##########
provider/tcp/src/main/java/org/apache/aries/rsa/provider/tcp/TcpInvocationHandler.java:
##########
@@ -46,22 +49,105 @@
  * which sends the details of the method invocations
  * over a TCP connection, to be executed by the remote service.
  */
-public class TcpInvocationHandler implements InvocationHandler {
+public class TcpInvocationHandler implements InvocationHandler, Closeable {
+
+    private static class Connection {
+        Socket socket;
+        BasicObjectOutputStream out;
+        BasicObjectInputStream in;
+
+        public Connection(Socket socket) throws IOException {
+            this.socket = socket;
+            out = new BasicObjectOutputStream(socket.getOutputStream());
+            in = new BasicObjectInputStream(socket.getInputStream());
+        }
+    }
+
     private String host;
     private int port;
     private String endpointId;
     private ClassLoader cl;
     private int timeoutMillis;
 
+    private final Deque<Connection> pool = new ArrayDeque<>();
+    private int acquired; // counts connections currently in use (not in pool)
+    private boolean closed;
+
     public TcpInvocationHandler(ClassLoader cl, String host, int port, String 
endpointId, int timeoutMillis)
-        throws UnknownHostException, IOException {
+            throws UnknownHostException, IOException {
         this.cl = cl;
         this.host = host;
         this.port = port;
         this.endpointId = endpointId;
         this.timeoutMillis = timeoutMillis;
     }
 
+    private Connection acquireConnection() throws IOException {
+        Connection conn;
+        synchronized (pool) {
+            acquired++; // must be first
+            if (closed) {
+                throw new IOException("Connection pool is closed");
+            }
+            conn = pool.pollFirst(); // reuse most recently used connection
+        }
+        // if the pool is empty, create a new connection
+        if (conn == null) {
+            conn = new Connection(openSocket());
+            conn.socket.setSoTimeout(timeoutMillis);
+            conn.socket.setTcpNoDelay(true);
+            conn.in.addClassLoader(cl);
+            conn.out.writeUTF(endpointId); // select endpoint for this 
connection
+        }
+        return conn;
+    }
+
+    // must be called exactly once for each call to acquireConnection,
+    // regardless of the outcome - if there was an error, pass null
+    private void releaseConnection(Connection conn) {
+        synchronized (pool) {
+            acquired--; // must be first

Review Comment:
   should we have negative check?



##########
provider/tcp/src/main/java/org/apache/aries/rsa/provider/tcp/TcpInvocationHandler.java:
##########
@@ -46,22 +49,105 @@
  * which sends the details of the method invocations
  * over a TCP connection, to be executed by the remote service.
  */
-public class TcpInvocationHandler implements InvocationHandler {
+public class TcpInvocationHandler implements InvocationHandler, Closeable {
+
+    private static class Connection {
+        Socket socket;
+        BasicObjectOutputStream out;
+        BasicObjectInputStream in;
+
+        public Connection(Socket socket) throws IOException {

Review Comment:
   nitpicking: `public` is not necessary



##########
provider/tcp/src/main/java/org/apache/aries/rsa/provider/tcp/TcpInvocationHandler.java:
##########
@@ -46,22 +49,105 @@
  * which sends the details of the method invocations
  * over a TCP connection, to be executed by the remote service.
  */
-public class TcpInvocationHandler implements InvocationHandler {
+public class TcpInvocationHandler implements InvocationHandler, Closeable {
+
+    private static class Connection {
+        Socket socket;
+        BasicObjectOutputStream out;
+        BasicObjectInputStream in;
+
+        public Connection(Socket socket) throws IOException {
+            this.socket = socket;
+            out = new BasicObjectOutputStream(socket.getOutputStream());
+            in = new BasicObjectInputStream(socket.getInputStream());
+        }
+    }
+
     private String host;
     private int port;
     private String endpointId;
     private ClassLoader cl;
     private int timeoutMillis;
 
+    private final Deque<Connection> pool = new ArrayDeque<>();
+    private int acquired; // counts connections currently in use (not in pool)

Review Comment:
   we can make the comment a javadoc



##########
provider/tcp/src/main/java/org/apache/aries/rsa/provider/tcp/TcpInvocationHandler.java:
##########
@@ -46,22 +49,105 @@
  * which sends the details of the method invocations
  * over a TCP connection, to be executed by the remote service.
  */
-public class TcpInvocationHandler implements InvocationHandler {
+public class TcpInvocationHandler implements InvocationHandler, Closeable {
+
+    private static class Connection {
+        Socket socket;
+        BasicObjectOutputStream out;
+        BasicObjectInputStream in;
+
+        public Connection(Socket socket) throws IOException {
+            this.socket = socket;
+            out = new BasicObjectOutputStream(socket.getOutputStream());
+            in = new BasicObjectInputStream(socket.getInputStream());
+        }
+    }
+
     private String host;
     private int port;
     private String endpointId;
     private ClassLoader cl;
     private int timeoutMillis;
 
+    private final Deque<Connection> pool = new ArrayDeque<>();
+    private int acquired; // counts connections currently in use (not in pool)
+    private boolean closed;
+
     public TcpInvocationHandler(ClassLoader cl, String host, int port, String 
endpointId, int timeoutMillis)
-        throws UnknownHostException, IOException {
+            throws UnknownHostException, IOException {
         this.cl = cl;
         this.host = host;
         this.port = port;
         this.endpointId = endpointId;
         this.timeoutMillis = timeoutMillis;
     }
 
+    private Connection acquireConnection() throws IOException {
+        Connection conn;
+        synchronized (pool) {
+            acquired++; // must be first
+            if (closed) {
+                throw new IOException("Connection pool is closed");
+            }
+            conn = pool.pollFirst(); // reuse most recently used connection
+        }
+        // if the pool is empty, create a new connection
+        if (conn == null) {
+            conn = new Connection(openSocket());
+            conn.socket.setSoTimeout(timeoutMillis);
+            conn.socket.setTcpNoDelay(true);
+            conn.in.addClassLoader(cl);
+            conn.out.writeUTF(endpointId); // select endpoint for this 
connection
+        }
+        return conn;
+    }
+
+    // must be called exactly once for each call to acquireConnection,

Review Comment:
   the comments above the methods may be the javadoc



##########
provider/tcp/src/main/java/org/apache/aries/rsa/provider/tcp/TcpInvocationHandler.java:
##########
@@ -46,22 +49,105 @@
  * which sends the details of the method invocations
  * over a TCP connection, to be executed by the remote service.
  */
-public class TcpInvocationHandler implements InvocationHandler {
+public class TcpInvocationHandler implements InvocationHandler, Closeable {

Review Comment:
   can we test the new changes? there are not a lot of tests at the end of this 
PR



##########
provider/tcp/src/main/java/org/apache/aries/rsa/provider/tcp/TcpServer.java:
##########
@@ -113,20 +115,24 @@ public void run() {
     }
 
     private void handleConnection(Socket socket) {
-        try (Socket sock = socket;
-             BasicObjectInputStream in = new 
BasicObjectInputStream(socket.getInputStream());
-             ObjectOutputStream out = new 
BasicObjectOutputStream(socket.getOutputStream())) {
+        try (Socket sock = socket; // socket will be closed when done
+             ObjectOutputStream out = new 
BasicObjectOutputStream(socket.getOutputStream());
+             BasicObjectInputStream in = new 
BasicObjectInputStream(socket.getInputStream())) {
+            socket.setTcpNoDelay(true);
             String endpointId = in.readUTF();
             MethodInvoker invoker = invokers.get(endpointId);
             if (invoker == null)
                 throw new IllegalArgumentException("invalid endpoint: " + 
endpointId);
             
in.addClassLoader(invoker.getService().getClass().getClassLoader());
-            handleCall(invoker, in, out);
-        } catch (SocketException se) {
-            return; // e.g. connection closed by client
-        } catch (Exception e) {
-            LOG.warn("Error processing service call", e);
+            while (running) {
+                handleCall(invoker, in, out);
+            }
+        } catch (SocketException | SocketTimeoutException | EOFException se) {
+            return; // e.g. connection closed by client or read timeout due to 
inactivity

Review Comment:
   should we log on debug?



##########
provider/tcp/src/main/java/org/apache/aries/rsa/provider/tcp/TcpInvocationHandler.java:
##########
@@ -46,22 +49,105 @@
  * which sends the details of the method invocations
  * over a TCP connection, to be executed by the remote service.
  */
-public class TcpInvocationHandler implements InvocationHandler {
+public class TcpInvocationHandler implements InvocationHandler, Closeable {
+
+    private static class Connection {
+        Socket socket;
+        BasicObjectOutputStream out;
+        BasicObjectInputStream in;
+
+        public Connection(Socket socket) throws IOException {
+            this.socket = socket;
+            out = new BasicObjectOutputStream(socket.getOutputStream());
+            in = new BasicObjectInputStream(socket.getInputStream());
+        }
+    }
+
     private String host;
     private int port;
     private String endpointId;
     private ClassLoader cl;
     private int timeoutMillis;
 
+    private final Deque<Connection> pool = new ArrayDeque<>();
+    private int acquired; // counts connections currently in use (not in pool)
+    private boolean closed;
+
     public TcpInvocationHandler(ClassLoader cl, String host, int port, String 
endpointId, int timeoutMillis)
-        throws UnknownHostException, IOException {
+            throws UnknownHostException, IOException {
         this.cl = cl;
         this.host = host;
         this.port = port;
         this.endpointId = endpointId;
         this.timeoutMillis = timeoutMillis;
     }
 
+    private Connection acquireConnection() throws IOException {
+        Connection conn;
+        synchronized (pool) {
+            acquired++; // must be first
+            if (closed) {
+                throw new IOException("Connection pool is closed");
+            }
+            conn = pool.pollFirst(); // reuse most recently used connection
+        }
+        // if the pool is empty, create a new connection
+        if (conn == null) {
+            conn = new Connection(openSocket());
+            conn.socket.setSoTimeout(timeoutMillis);
+            conn.socket.setTcpNoDelay(true);
+            conn.in.addClassLoader(cl);
+            conn.out.writeUTF(endpointId); // select endpoint for this 
connection
+        }
+        return conn;
+    }
+
+    // must be called exactly once for each call to acquireConnection,
+    // regardless of the outcome - if there was an error, pass null
+    private void releaseConnection(Connection conn) {
+        synchronized (pool) {
+            acquired--; // must be first
+            if (conn != null) {
+                pool.offerFirst(conn); // add to front of queue so old idle 
ones can expire
+            }
+            pool.notifyAll();
+        }
+    }
+
+    private void closeConnection(Connection conn) throws IOException {
+        if (conn != null) {
+            conn.socket.close();

Review Comment:
   1. can we have method close on Connection class?
   2. we don't need to close in/out?



##########
provider/tcp/src/main/java/org/apache/aries/rsa/provider/tcp/TcpInvocationHandler.java:
##########
@@ -46,22 +49,105 @@
  * which sends the details of the method invocations
  * over a TCP connection, to be executed by the remote service.
  */
-public class TcpInvocationHandler implements InvocationHandler {
+public class TcpInvocationHandler implements InvocationHandler, Closeable {
+
+    private static class Connection {
+        Socket socket;
+        BasicObjectOutputStream out;
+        BasicObjectInputStream in;
+
+        public Connection(Socket socket) throws IOException {
+            this.socket = socket;
+            out = new BasicObjectOutputStream(socket.getOutputStream());
+            in = new BasicObjectInputStream(socket.getInputStream());
+        }
+    }
+
     private String host;
     private int port;
     private String endpointId;
     private ClassLoader cl;
     private int timeoutMillis;
 
+    private final Deque<Connection> pool = new ArrayDeque<>();
+    private int acquired; // counts connections currently in use (not in pool)
+    private boolean closed;
+
     public TcpInvocationHandler(ClassLoader cl, String host, int port, String 
endpointId, int timeoutMillis)
-        throws UnknownHostException, IOException {
+            throws UnknownHostException, IOException {
         this.cl = cl;
         this.host = host;
         this.port = port;
         this.endpointId = endpointId;
         this.timeoutMillis = timeoutMillis;
     }
 
+    private Connection acquireConnection() throws IOException {
+        Connection conn;
+        synchronized (pool) {
+            acquired++; // must be first

Review Comment:
   there are no limits?



##########
provider/tcp/src/main/java/org/apache/aries/rsa/provider/tcp/TcpInvocationHandler.java:
##########
@@ -46,22 +49,105 @@
  * which sends the details of the method invocations
  * over a TCP connection, to be executed by the remote service.
  */
-public class TcpInvocationHandler implements InvocationHandler {
+public class TcpInvocationHandler implements InvocationHandler, Closeable {
+
+    private static class Connection {
+        Socket socket;
+        BasicObjectOutputStream out;
+        BasicObjectInputStream in;
+
+        public Connection(Socket socket) throws IOException {
+            this.socket = socket;
+            out = new BasicObjectOutputStream(socket.getOutputStream());
+            in = new BasicObjectInputStream(socket.getInputStream());
+        }
+    }
+
     private String host;
     private int port;
     private String endpointId;
     private ClassLoader cl;
     private int timeoutMillis;
 
+    private final Deque<Connection> pool = new ArrayDeque<>();
+    private int acquired; // counts connections currently in use (not in pool)
+    private boolean closed;
+
     public TcpInvocationHandler(ClassLoader cl, String host, int port, String 
endpointId, int timeoutMillis)
-        throws UnknownHostException, IOException {
+            throws UnknownHostException, IOException {
         this.cl = cl;
         this.host = host;
         this.port = port;
         this.endpointId = endpointId;
         this.timeoutMillis = timeoutMillis;
     }
 
+    private Connection acquireConnection() throws IOException {
+        Connection conn;
+        synchronized (pool) {
+            acquired++; // must be first
+            if (closed) {
+                throw new IOException("Connection pool is closed");
+            }
+            conn = pool.pollFirst(); // reuse most recently used connection
+        }
+        // if the pool is empty, create a new connection
+        if (conn == null) {
+            conn = new Connection(openSocket());
+            conn.socket.setSoTimeout(timeoutMillis);
+            conn.socket.setTcpNoDelay(true);
+            conn.in.addClassLoader(cl);
+            conn.out.writeUTF(endpointId); // select endpoint for this 
connection
+        }
+        return conn;
+    }
+
+    // must be called exactly once for each call to acquireConnection,
+    // regardless of the outcome - if there was an error, pass null
+    private void releaseConnection(Connection conn) {
+        synchronized (pool) {
+            acquired--; // must be first
+            if (conn != null) {
+                pool.offerFirst(conn); // add to front of queue so old idle 
ones can expire
+            }
+            pool.notifyAll();
+        }
+    }
+
+    private void closeConnection(Connection conn) throws IOException {
+        if (conn != null) {
+            conn.socket.close();
+        }
+    }
+
+    private void closeConnections() throws IOException {
+        synchronized (pool) {
+            closed = true; // first prevent acquiring new connections
+            while (true) {
+                // close all idle connections
+                for (Iterator<Connection> it = pool.iterator(); it.hasNext(); 
) {

Review Comment:
   1. can we use for each normally? `for (Connection connection : pool)`?
   2. should we run in try catch for each connection?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to