This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 7bda5b6  Modify producer handler of WebSocket to send ack to client 
asynchronously (#3172)
7bda5b6 is described below

commit 7bda5b60f8bf27af155b2a490dcf7ed926e86029
Author: massakam <massa...@yahoo-corp.jp>
AuthorDate: Thu Dec 13 04:15:25 2018 +0900

    Modify producer handler of WebSocket to send ack to client asynchronously 
(#3172)
---
 .../org/apache/pulsar/websocket/ProducerHandler.java     | 16 +++++++++++++++-
 1 file changed, 15 insertions(+), 1 deletion(-)

diff --git 
a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ProducerHandler.java
 
b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ProducerHandler.java
index bca9762..338c0f1 100644
--- 
a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ProducerHandler.java
+++ 
b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ProducerHandler.java
@@ -52,6 +52,7 @@ import org.apache.pulsar.common.util.ObjectMapperFactory;
 import org.apache.pulsar.websocket.data.ProducerAck;
 import org.apache.pulsar.websocket.data.ProducerMessage;
 import org.apache.pulsar.websocket.stats.StatsBuckets;
+import org.eclipse.jetty.websocket.api.WriteCallback;
 import org.eclipse.jetty.websocket.servlet.ServletUpgradeResponse;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -243,7 +244,20 @@ public class ProducerHandler extends 
AbstractWebSocketHandler {
     private void sendAckResponse(ProducerAck response) {
         try {
             String msg = 
ObjectMapperFactory.getThreadLocal().writeValueAsString(response);
-            getSession().getRemote().sendString(msg);
+            getSession().getRemote().sendString(msg, new WriteCallback() {
+                @Override
+                public void writeFailed(Throwable th) {
+                    log.warn("[{}] Failed to send ack {}", 
producer.getTopic(), th.getMessage(), th);
+                }
+
+                @Override
+                public void writeSuccess() {
+                    if (log.isDebugEnabled()) {
+                        log.debug("[{}] Ack was sent successfully to {}", 
producer.getTopic(),
+                                getRemote().getInetSocketAddress().toString());
+                    }
+                }
+            });
         } catch (JsonProcessingException e) {
             log.warn("[{}] Failed to generate ack json-response {}", 
producer.getTopic(), e.getMessage(), e);
         } catch (Exception e) {

Reply via email to