This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push: new f4300bd Log warn message if exception occurs while WebSocket proxy is sending/receiving message (#1537) f4300bd is described below commit f4300bdfed8403d17707f8a796c044a024c178f6 Author: massakam <massa...@yahoo-corp.jp> AuthorDate: Wed Apr 11 02:31:52 2018 +0900 Log warn message if exception occurs while WebSocket proxy is sending/receiving message (#1537) --- .../main/java/org/apache/pulsar/websocket/ConsumerHandler.java | 9 +++++++++ .../main/java/org/apache/pulsar/websocket/ProducerHandler.java | 2 ++ .../src/main/java/org/apache/pulsar/websocket/ReaderHandler.java | 9 +++++++-- 3 files changed, 18 insertions(+), 2 deletions(-) diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java index aea88f3..406a2c8 100644 --- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java +++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java @@ -35,6 +35,7 @@ import javax.servlet.http.HttpServletResponse; import org.apache.pulsar.broker.authentication.AuthenticationDataSource; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.ConsumerConfiguration; +import org.apache.pulsar.client.api.PulsarClientException.AlreadyClosedException; import org.apache.pulsar.client.api.PulsarClientException.ConsumerBusyException; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.SubscriptionType; @@ -181,6 +182,14 @@ public class ConsumerHandler extends AbstractWebSocketHandler { service.getExecutor().execute(() -> receiveMessage()); } }).exceptionally(exception -> { + if (exception.getCause() instanceof AlreadyClosedException) { + log.info("[{}/{}] Consumer was closed while receiving msg from broker", consumer.getTopic(), + subscription); + } else { + log.warn("[{}/{}] Error occurred while consumer handler was delivering msg to {}: {}", + consumer.getTopic(), subscription, getRemote().getInetSocketAddress().toString(), + exception.getMessage()); + } return null; }); } diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ProducerHandler.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ProducerHandler.java index 25fed37..c2c3ee9 100644 --- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ProducerHandler.java +++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ProducerHandler.java @@ -187,6 +187,8 @@ public class ProducerHandler extends AbstractWebSocketHandler { sendAckResponse(new ProducerAck(messageId, sendRequest.context)); } }).exceptionally(exception -> { + log.warn("[{}] Error occurred while producer handler was sending msg from {}: {}", producer.getTopic(), + getRemote().getInetSocketAddress().toString(), exception.getMessage()); numMsgsFailed.increment(); sendAckResponse( new ProducerAck(UnknownError, exception.getMessage(), null, sendRequest.context)); diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ReaderHandler.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ReaderHandler.java index 2efb585..54037fa 100644 --- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ReaderHandler.java +++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ReaderHandler.java @@ -31,6 +31,7 @@ import javax.servlet.http.HttpServletResponse; import org.apache.pulsar.broker.authentication.AuthenticationDataSource; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.PulsarClientException.AlreadyClosedException; import org.apache.pulsar.client.api.Reader; import org.apache.pulsar.client.api.ReaderBuilder; import org.apache.pulsar.client.api.SubscriptionType; @@ -166,8 +167,12 @@ public class ReaderHandler extends AbstractWebSocketHandler { service.getExecutor().execute(() -> receiveMessage()); } }).exceptionally(exception -> { - log.warn("[{}/{}] Failed to deliver msg to {} {}", reader.getTopic(), - subscription, getRemote().getInetSocketAddress().toString(), exception); + if (exception.getCause() instanceof AlreadyClosedException) { + log.info("[{}/{}] Reader was closed while receiving msg from broker", reader.getTopic(), subscription); + } else { + log.warn("[{}/{}] Error occurred while reader handler was delivering msg to {}: {}", reader.getTopic(), + subscription, getRemote().getInetSocketAddress().toString(), exception.getMessage()); + } return null; }); } -- To stop receiving notification emails like this one, please contact mme...@apache.org.