This is an automated email from the ASF dual-hosted git repository.

jerrypeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 2607c18  fix bug involving consumer producer deadlock in functions 
(#3237)
2607c18 is described below

commit 2607c18d1b27c1df667d62d8343acd1b306c3c8f
Author: Boyang Jerry Peng <jerry.boyang.p...@gmail.com>
AuthorDate: Fri Dec 21 16:04:36 2018 -0800

    fix bug involving consumer producer deadlock in functions (#3237)
    
    * fix bug involving consumer producer deadlock in functions
    
    * fix unit test
---
 .../src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java    | 3 +++
 pulsar-functions/instance/src/main/python/contextimpl.py              | 4 ++--
 pulsar-functions/instance/src/main/python/python_instance.py          | 4 ++++
 .../test/java/org/apache/pulsar/functions/sink/PulsarSinkTest.java    | 4 +++-
 4 files changed, 12 insertions(+), 3 deletions(-)

diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
index 4b634d2..1c61b45 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
@@ -88,6 +88,9 @@ public class PulsarSink<T> implements Sink<T> {
                     .hashingScheme(HashingScheme.Murmur3_32Hash) //
                     .messageRoutingMode(MessageRoutingMode.CustomPartition)
                     .messageRouter(FunctionResultRouter.of())
+                    // set send timeout to be infinity to prevent potential 
deadlock with consumer
+                    // that might happen when consumer is blocked due to 
unacked messages
+                    .sendTimeout(0, TimeUnit.SECONDS)
                     .topic(topic);
             if (producerName != null) {
                 builder.producerName(producerName);
diff --git a/pulsar-functions/instance/src/main/python/contextimpl.py 
b/pulsar-functions/instance/src/main/python/contextimpl.py
index 2797658..63332d0 100644
--- a/pulsar-functions/instance/src/main/python/contextimpl.py
+++ b/pulsar-functions/instance/src/main/python/contextimpl.py
@@ -133,7 +133,7 @@ class ContextImpl(pulsar.Context):
   def get_output_serde_class_name(self):
     return self.instance_config.function_details.outputSerdeClassName
 
-  def publish(self, topic_name, message, 
serde_class_name="serde.IdentitySerDe", properties=None, compression_type=None):
+  def publish(self, topic_name, message, 
serde_class_name="serde.IdentitySerDe", properties=None, compression_type=None, 
callback=None):
     # Just make sure that user supplied values are properly typed
     topic_name = str(topic_name)
     serde_class_name = str(serde_class_name)
@@ -155,7 +155,7 @@ class ContextImpl(pulsar.Context):
       self.publish_serializers[serde_class_name] = serde_klass()
 
     output_bytes = 
bytes(self.publish_serializers[serde_class_name].serialize(message))
-    self.publish_producers[topic_name].send_async(output_bytes, None, 
properties=properties)
+    self.publish_producers[topic_name].send_async(output_bytes, callback, 
properties=properties)
 
   def ack(self, msgid, topic):
     if topic not in self.consumers:
diff --git a/pulsar-functions/instance/src/main/python/python_instance.py 
b/pulsar-functions/instance/src/main/python/python_instance.py
index 815865b..d4c3da5 100644
--- a/pulsar-functions/instance/src/main/python/python_instance.py
+++ b/pulsar-functions/instance/src/main/python/python_instance.py
@@ -262,11 +262,15 @@ class PythonInstance(object):
     if self.instance_config.function_details.sink.topic != None and \
             len(self.instance_config.function_details.sink.topic) > 0:
       Log.debug("Setting up producer for topic %s" % 
self.instance_config.function_details.sink.topic)
+
       self.producer = self.pulsar_client.create_producer(
         str(self.instance_config.function_details.sink.topic),
         block_if_queue_full=True,
         batching_enabled=True,
         batching_max_publish_delay_ms=1,
+        # set send timeout to be infinity to prevent potential deadlock with 
consumer
+        # that might happen when consumer is blocked due to unacked messages
+        send_timeout_millis=0,
         max_pending_messages=100000)
 
   def message_listener(self, serde, consumer, message):
diff --git 
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/PulsarSinkTest.java
 
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/PulsarSinkTest.java
index 51fa452..fb3d2c2 100644
--- 
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/PulsarSinkTest.java
+++ 
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/PulsarSinkTest.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.functions.sink;
 
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyBoolean;
+import static org.mockito.Matchers.anyInt;
 import static org.mockito.Matchers.anyList;
 import static org.mockito.Matchers.anyLong;
 import static org.mockito.Matchers.anyString;
@@ -99,7 +100,8 @@ public class PulsarSinkTest {
         doReturn(producerBuilder).when(producerBuilder).topic(anyString());
         
doReturn(producerBuilder).when(producerBuilder).producerName(anyString());
         doReturn(producerBuilder).when(producerBuilder).property(anyString(), 
anyString());
-
+        doReturn(producerBuilder).when(producerBuilder).sendTimeout(anyInt(), 
any());
+        
         CompletableFuture completableFuture = new CompletableFuture<>();
         completableFuture.complete(mock(MessageId.class));
         TypedMessageBuilder typedMessageBuilder = 
mock(TypedMessageBuilder.class);

Reply via email to