Repository: kafka Updated Branches: refs/heads/0.9.0 555fab913 -> 8c370093d
KAFKA-2813; selector doesn't close socket connection on non-IOExceptions Patched Selector.poll() to close the connection on any exception. Author: Jun Rao <[email protected]> Reviewers: Guozhang Wang <[email protected]>, Gwen Shapira <[email protected]> Closes #501 from junrao/KAFKA-2813 (cherry picked from commit 3fd168d9522d1ad25f5582fbe838cea15bdb525f) Signed-off-by: Jun Rao <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/8c370093 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/8c370093 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/8c370093 Branch: refs/heads/0.9.0 Commit: 8c370093d1ee3b9ba97f85970ec30fbb26ff48ef Parents: 555fab9 Author: Jun Rao <[email protected]> Authored: Wed Nov 11 22:18:19 2015 -0800 Committer: Jun Rao <[email protected]> Committed: Wed Nov 11 22:18:32 2015 -0800 ---------------------------------------------------------------------- .../apache/kafka/common/network/Selector.java | 20 +++++++------------- 1 file changed, 7 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/8c370093/clients/src/main/java/org/apache/kafka/common/network/Selector.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/network/Selector.java b/clients/src/main/java/org/apache/kafka/common/network/Selector.java index 34de616..639a2be 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/Selector.java +++ b/clients/src/main/java/org/apache/kafka/common/network/Selector.java @@ -241,7 +241,6 @@ public class Selector implements Selectable { * @throws IllegalArgumentException If `timeout` is negative * @throws IllegalStateException If a send is given for which we have no existing connection or for which there is * already an in-progress send - * @throws InvalidReceiveException If invalid data is received */ @Override public void poll(long timeout) throws IOException { @@ -284,16 +283,8 @@ public class Selector implements Selectable { /* if channel is ready read from any connections that have readable data */ if (channel.ready() && key.isReadable() && !hasStagedReceive(channel)) { NetworkReceive networkReceive; - try { - while ((networkReceive = channel.read()) != null) { - addToStagedReceives(channel, networkReceive); - } - } catch (InvalidReceiveException e) { - log.error("Invalid data received from " + channel.id() + " closing connection", e); - close(channel); - this.disconnected.add(channel.id()); - throw e; - } + while ((networkReceive = channel.read()) != null) + addToStagedReceives(channel, networkReceive); } /* if channel is ready write to any sockets that have space in their buffer and for which we have data */ @@ -310,9 +301,12 @@ public class Selector implements Selectable { close(channel); this.disconnected.add(channel.id()); } - } catch (IOException e) { + } catch (Exception e) { String desc = channel.socketDescription(); - log.debug("Connection with {} disconnected", desc, e); + if (e instanceof IOException) + log.debug("Connection with {} disconnected", desc, e); + else + log.warn("Unexpected error from {}; closing connection", desc, e); close(channel); this.disconnected.add(channel.id()); }
