This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.14 by this push:
     new 5f2ec45  [FLINK-24283][connector/pulsar] Use stick key consumer in 
Key_Shared subscription. This would make sure Pulsar won't treat the flink 
reader as a shared consumer.
5f2ec45 is described below

commit 5f2ec45efc319e8ae713415e7c443b76c1bdf6a3
Author: syhily <syh...@gmail.com>
AuthorDate: Tue Sep 14 23:03:40 2021 +0800

    [FLINK-24283][connector/pulsar] Use stick key consumer in Key_Shared 
subscription. This would make sure Pulsar won't treat the flink reader as a 
shared consumer.
    
    This fix https://github.com/apache/pulsar/pull/12035
---
 .../source/enumerator/PulsarSourceEnumerator.java  | 22 ++++++++++++++++++++--
 1 file changed, 20 insertions(+), 2 deletions(-)

diff --git 
a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumerator.java
 
b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumerator.java
index 4e92b2d..67cc3c7 100644
--- 
a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumerator.java
+++ 
b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumerator.java
@@ -26,6 +26,7 @@ import 
org.apache.flink.connector.pulsar.source.config.SourceConfiguration;
 import org.apache.flink.connector.pulsar.source.enumerator.cursor.StartCursor;
 import 
org.apache.flink.connector.pulsar.source.enumerator.subscriber.PulsarSubscriber;
 import 
org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition;
+import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicRange;
 import 
org.apache.flink.connector.pulsar.source.enumerator.topic.range.RangeGenerator;
 import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit;
 import org.apache.flink.util.FlinkRuntimeException;
@@ -33,9 +34,13 @@ import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.ConsumerBuilder;
+import org.apache.pulsar.client.api.KeySharedPolicy;
+import org.apache.pulsar.client.api.KeySharedPolicy.KeySharedPolicySticky;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Range;
 import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -173,8 +178,7 @@ public class PulsarSourceEnumerator
     }
 
     private void seekStartPosition(Set<TopicPartition> partitions) {
-        ConsumerBuilder<byte[]> consumerBuilder =
-                createConsumerBuilder(pulsarClient, Schema.BYTES, 
configuration);
+        ConsumerBuilder<byte[]> consumerBuilder = consumerBuilder();
         Set<String> seekedTopics = new HashSet<>();
 
         for (TopicPartition partition : partitions) {
@@ -200,6 +204,20 @@ public class PulsarSourceEnumerator
         }
     }
 
+    private ConsumerBuilder<byte[]> consumerBuilder() {
+        ConsumerBuilder<byte[]> builder =
+                createConsumerBuilder(pulsarClient, Schema.BYTES, 
configuration);
+        if (sourceConfiguration.getSubscriptionType() == 
SubscriptionType.Key_Shared) {
+            Range range = TopicRange.createFullRange().toPulsarRange();
+            KeySharedPolicySticky keySharedPolicy = 
KeySharedPolicy.stickyHashRange().ranges(range);
+            // Force this consume use sticky hash range in Key_Shared 
subscription.
+            // Pulsar won't remove old message dispatcher before 2.8.2 release.
+            builder.keySharedPolicy(keySharedPolicy);
+        }
+
+        return builder;
+    }
+
     /**
      * Check if there's any partition changes within subscribed topic 
partitions fetched by worker
      * thread, and convert them to splits the assign them to pulsar readers.

Reply via email to