This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 7bda5b6 Modify producer handler of WebSocket to send ack to client asynchronously (#3172) 7bda5b6 is described below commit 7bda5b60f8bf27af155b2a490dcf7ed926e86029 Author: massakam <massa...@yahoo-corp.jp> AuthorDate: Thu Dec 13 04:15:25 2018 +0900 Modify producer handler of WebSocket to send ack to client asynchronously (#3172) --- .../org/apache/pulsar/websocket/ProducerHandler.java | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ProducerHandler.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ProducerHandler.java index bca9762..338c0f1 100644 --- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ProducerHandler.java +++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ProducerHandler.java @@ -52,6 +52,7 @@ import org.apache.pulsar.common.util.ObjectMapperFactory; import org.apache.pulsar.websocket.data.ProducerAck; import org.apache.pulsar.websocket.data.ProducerMessage; import org.apache.pulsar.websocket.stats.StatsBuckets; +import org.eclipse.jetty.websocket.api.WriteCallback; import org.eclipse.jetty.websocket.servlet.ServletUpgradeResponse; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -243,7 +244,20 @@ public class ProducerHandler extends AbstractWebSocketHandler { private void sendAckResponse(ProducerAck response) { try { String msg = ObjectMapperFactory.getThreadLocal().writeValueAsString(response); - getSession().getRemote().sendString(msg); + getSession().getRemote().sendString(msg, new WriteCallback() { + @Override + public void writeFailed(Throwable th) { + log.warn("[{}] Failed to send ack {}", producer.getTopic(), th.getMessage(), th); + } + + @Override + public void writeSuccess() { + if (log.isDebugEnabled()) { + log.debug("[{}] Ack was sent successfully to {}", producer.getTopic(), + getRemote().getInetSocketAddress().toString()); + } + } + }); } catch (JsonProcessingException e) { log.warn("[{}] Failed to generate ack json-response {}", producer.getTopic(), e.getMessage(), e); } catch (Exception e) {