jerrypeng closed pull request #3237: fix bug involving consumer producer 
deadlock in functions
URL: https://github.com/apache/pulsar/pull/3237
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
index 4b634d28cf..1c61b45af9 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
@@ -88,6 +88,9 @@ protected PulsarSinkProcessorBase(Schema schema) {
                     .hashingScheme(HashingScheme.Murmur3_32Hash) //
                     .messageRoutingMode(MessageRoutingMode.CustomPartition)
                     .messageRouter(FunctionResultRouter.of())
+                    // set send timeout to be infinity to prevent potential 
deadlock with consumer
+                    // that might happen when consumer is blocked due to 
unacked messages
+                    .sendTimeout(0, TimeUnit.SECONDS)
                     .topic(topic);
             if (producerName != null) {
                 builder.producerName(producerName);
diff --git a/pulsar-functions/instance/src/main/python/contextimpl.py 
b/pulsar-functions/instance/src/main/python/contextimpl.py
index 2797658c14..63332d0171 100644
--- a/pulsar-functions/instance/src/main/python/contextimpl.py
+++ b/pulsar-functions/instance/src/main/python/contextimpl.py
@@ -133,7 +133,7 @@ def get_output_topic(self):
   def get_output_serde_class_name(self):
     return self.instance_config.function_details.outputSerdeClassName
 
-  def publish(self, topic_name, message, 
serde_class_name="serde.IdentitySerDe", properties=None, compression_type=None):
+  def publish(self, topic_name, message, 
serde_class_name="serde.IdentitySerDe", properties=None, compression_type=None, 
callback=None):
     # Just make sure that user supplied values are properly typed
     topic_name = str(topic_name)
     serde_class_name = str(serde_class_name)
@@ -155,7 +155,7 @@ def publish(self, topic_name, message, 
serde_class_name="serde.IdentitySerDe", p
       self.publish_serializers[serde_class_name] = serde_klass()
 
     output_bytes = 
bytes(self.publish_serializers[serde_class_name].serialize(message))
-    self.publish_producers[topic_name].send_async(output_bytes, None, 
properties=properties)
+    self.publish_producers[topic_name].send_async(output_bytes, callback, 
properties=properties)
 
   def ack(self, msgid, topic):
     if topic not in self.consumers:
diff --git a/pulsar-functions/instance/src/main/python/python_instance.py 
b/pulsar-functions/instance/src/main/python/python_instance.py
index 815865b3a1..d4c3da5a95 100644
--- a/pulsar-functions/instance/src/main/python/python_instance.py
+++ b/pulsar-functions/instance/src/main/python/python_instance.py
@@ -262,11 +262,15 @@ def setup_producer(self):
     if self.instance_config.function_details.sink.topic != None and \
             len(self.instance_config.function_details.sink.topic) > 0:
       Log.debug("Setting up producer for topic %s" % 
self.instance_config.function_details.sink.topic)
+
       self.producer = self.pulsar_client.create_producer(
         str(self.instance_config.function_details.sink.topic),
         block_if_queue_full=True,
         batching_enabled=True,
         batching_max_publish_delay_ms=1,
+        # set send timeout to be infinity to prevent potential deadlock with 
consumer
+        # that might happen when consumer is blocked due to unacked messages
+        send_timeout_millis=0,
         max_pending_messages=100000)
 
   def message_listener(self, serde, consumer, message):
diff --git 
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/PulsarSinkTest.java
 
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/PulsarSinkTest.java
index 51fa4523aa..fb3d2c2af6 100644
--- 
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/PulsarSinkTest.java
+++ 
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/PulsarSinkTest.java
@@ -20,6 +20,7 @@
 
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyBoolean;
+import static org.mockito.Matchers.anyInt;
 import static org.mockito.Matchers.anyList;
 import static org.mockito.Matchers.anyLong;
 import static org.mockito.Matchers.anyString;
@@ -99,7 +100,8 @@ private static PulsarClientImpl getPulsarClient() throws 
PulsarClientException {
         doReturn(producerBuilder).when(producerBuilder).topic(anyString());
         
doReturn(producerBuilder).when(producerBuilder).producerName(anyString());
         doReturn(producerBuilder).when(producerBuilder).property(anyString(), 
anyString());
-
+        doReturn(producerBuilder).when(producerBuilder).sendTimeout(anyInt(), 
any());
+        
         CompletableFuture completableFuture = new CompletableFuture<>();
         completableFuture.complete(mock(MessageId.class));
         TypedMessageBuilder typedMessageBuilder = 
mock(TypedMessageBuilder.class);


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to