This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new f4300bd  Log warn message if exception occurs while WebSocket proxy is 
sending/receiving message (#1537)
f4300bd is described below

commit f4300bdfed8403d17707f8a796c044a024c178f6
Author: massakam <massa...@yahoo-corp.jp>
AuthorDate: Wed Apr 11 02:31:52 2018 +0900

    Log warn message if exception occurs while WebSocket proxy is 
sending/receiving message (#1537)
---
 .../main/java/org/apache/pulsar/websocket/ConsumerHandler.java   | 9 +++++++++
 .../main/java/org/apache/pulsar/websocket/ProducerHandler.java   | 2 ++
 .../src/main/java/org/apache/pulsar/websocket/ReaderHandler.java | 9 +++++++--
 3 files changed, 18 insertions(+), 2 deletions(-)

diff --git 
a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java
 
b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java
index aea88f3..406a2c8 100644
--- 
a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java
+++ 
b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java
@@ -35,6 +35,7 @@ import javax.servlet.http.HttpServletResponse;
 import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.ConsumerConfiguration;
+import 
org.apache.pulsar.client.api.PulsarClientException.AlreadyClosedException;
 import 
org.apache.pulsar.client.api.PulsarClientException.ConsumerBusyException;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.SubscriptionType;
@@ -181,6 +182,14 @@ public class ConsumerHandler extends 
AbstractWebSocketHandler {
                 service.getExecutor().execute(() -> receiveMessage());
             }
         }).exceptionally(exception -> {
+            if (exception.getCause() instanceof AlreadyClosedException) {
+                log.info("[{}/{}] Consumer was closed while receiving msg from 
broker", consumer.getTopic(),
+                        subscription);
+            } else {
+                log.warn("[{}/{}] Error occurred while consumer handler was 
delivering msg to {}: {}",
+                        consumer.getTopic(), subscription, 
getRemote().getInetSocketAddress().toString(),
+                        exception.getMessage());
+            }
             return null;
         });
     }
diff --git 
a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ProducerHandler.java
 
b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ProducerHandler.java
index 25fed37..c2c3ee9 100644
--- 
a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ProducerHandler.java
+++ 
b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ProducerHandler.java
@@ -187,6 +187,8 @@ public class ProducerHandler extends 
AbstractWebSocketHandler {
                 sendAckResponse(new ProducerAck(messageId, 
sendRequest.context));
             }
         }).exceptionally(exception -> {
+            log.warn("[{}] Error occurred while producer handler was sending 
msg from {}: {}", producer.getTopic(),
+                    getRemote().getInetSocketAddress().toString(), 
exception.getMessage());
             numMsgsFailed.increment();
             sendAckResponse(
                     new ProducerAck(UnknownError, exception.getMessage(), 
null, sendRequest.context));
diff --git 
a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ReaderHandler.java 
b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ReaderHandler.java
index 2efb585..54037fa 100644
--- 
a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ReaderHandler.java
+++ 
b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ReaderHandler.java
@@ -31,6 +31,7 @@ import javax.servlet.http.HttpServletResponse;
 import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.MessageId;
+import 
org.apache.pulsar.client.api.PulsarClientException.AlreadyClosedException;
 import org.apache.pulsar.client.api.Reader;
 import org.apache.pulsar.client.api.ReaderBuilder;
 import org.apache.pulsar.client.api.SubscriptionType;
@@ -166,8 +167,12 @@ public class ReaderHandler extends 
AbstractWebSocketHandler {
                 service.getExecutor().execute(() -> receiveMessage());
             }
         }).exceptionally(exception -> {
-            log.warn("[{}/{}] Failed to deliver msg to {} {}", 
reader.getTopic(),
-                    subscription, 
getRemote().getInetSocketAddress().toString(), exception);
+            if (exception.getCause() instanceof AlreadyClosedException) {
+                log.info("[{}/{}] Reader was closed while receiving msg from 
broker", reader.getTopic(), subscription);
+            } else {
+                log.warn("[{}/{}] Error occurred while reader handler was 
delivering msg to {}: {}", reader.getTopic(),
+                        subscription, 
getRemote().getInetSocketAddress().toString(), exception.getMessage());
+            }
             return null;
         });
     }

-- 
To stop receiving notification emails like this one, please contact
mme...@apache.org.

Reply via email to