Repository: kafka Updated Branches: refs/heads/trunk 30f08d158 -> b69093398
KAFKA-6300; SelectorTest may fail with ConcurrentModificationException Synchronization is added w.r.t. sockets ArrayList to avoid ConcurrentModificationException Author: tedyu <[email protected]> Reviewers: Viktor Somogyi <[email protected]>, Rajini Sivaram <[email protected]> Closes #4299 from tedyu/trunk Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/b6909339 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/b6909339 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/b6909339 Branch: refs/heads/trunk Commit: b6909339876ca1cbcce28043c4b42f2c7e48307b Parents: 30f08d1 Author: tedyu <[email protected]> Authored: Wed Dec 6 19:43:29 2017 +0000 Committer: Rajini Sivaram <[email protected]> Committed: Wed Dec 6 19:43:29 2017 +0000 ---------------------------------------------------------------------- .../apache/kafka/common/network/EchoServer.java | 69 +++++++++++--------- 1 file changed, 39 insertions(+), 30 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/b6909339/clients/src/test/java/org/apache/kafka/common/network/EchoServer.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/network/EchoServer.java b/clients/src/test/java/org/apache/kafka/common/network/EchoServer.java index aa7a15e..e986598 100644 --- a/clients/src/test/java/org/apache/kafka/common/network/EchoServer.java +++ b/clients/src/test/java/org/apache/kafka/common/network/EchoServer.java @@ -41,6 +41,7 @@ class EchoServer extends Thread { private final ServerSocket serverSocket; private final List<Thread> threads; private final List<Socket> sockets; + private volatile boolean closing = false; private final SslFactory sslFactory; private final AtomicBoolean renegotiate = new AtomicBoolean(); @@ -71,40 +72,45 @@ class EchoServer extends Thread { @Override public void run() { try { - while (true) { + while (!closing) { final Socket socket = serverSocket.accept(); - sockets.add(socket); - Thread thread = new Thread() { - @Override - public void run() { - try { - DataInputStream input = new DataInputStream(socket.getInputStream()); - DataOutputStream output = new DataOutputStream(socket.getOutputStream()); - while (socket.isConnected() && !socket.isClosed()) { - int size = input.readInt(); - if (renegotiate.get()) { - renegotiate.set(false); - ((SSLSocket) socket).startHandshake(); - } - byte[] bytes = new byte[size]; - input.readFully(bytes); - output.writeInt(size); - output.write(bytes); - output.flush(); - } - } catch (IOException e) { - // ignore - } finally { + synchronized (sockets) { + if (closing) { + break; + } + sockets.add(socket); + Thread thread = new Thread() { + @Override + public void run() { try { - socket.close(); + DataInputStream input = new DataInputStream(socket.getInputStream()); + DataOutputStream output = new DataOutputStream(socket.getOutputStream()); + while (socket.isConnected() && !socket.isClosed()) { + int size = input.readInt(); + if (renegotiate.get()) { + renegotiate.set(false); + ((SSLSocket) socket).startHandshake(); + } + byte[] bytes = new byte[size]; + input.readFully(bytes); + output.writeInt(size); + output.write(bytes); + output.flush(); + } } catch (IOException e) { // ignore + } finally { + try { + socket.close(); + } catch (IOException e) { + // ignore + } } } - } - }; - thread.start(); - threads.add(thread); + }; + thread.start(); + threads.add(thread); + } } } catch (IOException e) { // ignore @@ -112,11 +118,14 @@ class EchoServer extends Thread { } public void closeConnections() throws IOException { - for (Socket socket : sockets) - socket.close(); + synchronized (sockets) { + for (Socket socket : sockets) + socket.close(); + } } public void close() throws IOException, InterruptedException { + closing = true; this.serverSocket.close(); closeConnections(); for (Thread t : threads)
