jerrypeng closed pull request #3237: fix bug involving consumer producer deadlock in functions URL: https://github.com/apache/pulsar/pull/3237
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java index 4b634d28cf..1c61b45af9 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java @@ -88,6 +88,9 @@ protected PulsarSinkProcessorBase(Schema schema) { .hashingScheme(HashingScheme.Murmur3_32Hash) // .messageRoutingMode(MessageRoutingMode.CustomPartition) .messageRouter(FunctionResultRouter.of()) + // set send timeout to be infinity to prevent potential deadlock with consumer + // that might happen when consumer is blocked due to unacked messages + .sendTimeout(0, TimeUnit.SECONDS) .topic(topic); if (producerName != null) { builder.producerName(producerName); diff --git a/pulsar-functions/instance/src/main/python/contextimpl.py b/pulsar-functions/instance/src/main/python/contextimpl.py index 2797658c14..63332d0171 100644 --- a/pulsar-functions/instance/src/main/python/contextimpl.py +++ b/pulsar-functions/instance/src/main/python/contextimpl.py @@ -133,7 +133,7 @@ def get_output_topic(self): def get_output_serde_class_name(self): return self.instance_config.function_details.outputSerdeClassName - def publish(self, topic_name, message, serde_class_name="serde.IdentitySerDe", properties=None, compression_type=None): + def publish(self, topic_name, message, serde_class_name="serde.IdentitySerDe", properties=None, compression_type=None, callback=None): # Just make sure that user supplied values are properly typed topic_name = str(topic_name) serde_class_name = str(serde_class_name) @@ -155,7 +155,7 @@ def publish(self, topic_name, message, serde_class_name="serde.IdentitySerDe", p self.publish_serializers[serde_class_name] = serde_klass() output_bytes = bytes(self.publish_serializers[serde_class_name].serialize(message)) - self.publish_producers[topic_name].send_async(output_bytes, None, properties=properties) + self.publish_producers[topic_name].send_async(output_bytes, callback, properties=properties) def ack(self, msgid, topic): if topic not in self.consumers: diff --git a/pulsar-functions/instance/src/main/python/python_instance.py b/pulsar-functions/instance/src/main/python/python_instance.py index 815865b3a1..d4c3da5a95 100644 --- a/pulsar-functions/instance/src/main/python/python_instance.py +++ b/pulsar-functions/instance/src/main/python/python_instance.py @@ -262,11 +262,15 @@ def setup_producer(self): if self.instance_config.function_details.sink.topic != None and \ len(self.instance_config.function_details.sink.topic) > 0: Log.debug("Setting up producer for topic %s" % self.instance_config.function_details.sink.topic) + self.producer = self.pulsar_client.create_producer( str(self.instance_config.function_details.sink.topic), block_if_queue_full=True, batching_enabled=True, batching_max_publish_delay_ms=1, + # set send timeout to be infinity to prevent potential deadlock with consumer + # that might happen when consumer is blocked due to unacked messages + send_timeout_millis=0, max_pending_messages=100000) def message_listener(self, serde, consumer, message): diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/PulsarSinkTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/PulsarSinkTest.java index 51fa4523aa..fb3d2c2af6 100644 --- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/PulsarSinkTest.java +++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/PulsarSinkTest.java @@ -20,6 +20,7 @@ import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyBoolean; +import static org.mockito.Matchers.anyInt; import static org.mockito.Matchers.anyList; import static org.mockito.Matchers.anyLong; import static org.mockito.Matchers.anyString; @@ -99,7 +100,8 @@ private static PulsarClientImpl getPulsarClient() throws PulsarClientException { doReturn(producerBuilder).when(producerBuilder).topic(anyString()); doReturn(producerBuilder).when(producerBuilder).producerName(anyString()); doReturn(producerBuilder).when(producerBuilder).property(anyString(), anyString()); - + doReturn(producerBuilder).when(producerBuilder).sendTimeout(anyInt(), any()); + CompletableFuture completableFuture = new CompletableFuture<>(); completableFuture.complete(mock(MessageId.class)); TypedMessageBuilder typedMessageBuilder = mock(TypedMessageBuilder.class); ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services