This is an automated email from the ASF dual-hosted git repository. dwysakowicz pushed a commit to branch release-1.14 in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.14 by this push: new 5f2ec45 [FLINK-24283][connector/pulsar] Use stick key consumer in Key_Shared subscription. This would make sure Pulsar won't treat the flink reader as a shared consumer. 5f2ec45 is described below commit 5f2ec45efc319e8ae713415e7c443b76c1bdf6a3 Author: syhily <syh...@gmail.com> AuthorDate: Tue Sep 14 23:03:40 2021 +0800 [FLINK-24283][connector/pulsar] Use stick key consumer in Key_Shared subscription. This would make sure Pulsar won't treat the flink reader as a shared consumer. This fix https://github.com/apache/pulsar/pull/12035 --- .../source/enumerator/PulsarSourceEnumerator.java | 22 ++++++++++++++++++++-- 1 file changed, 20 insertions(+), 2 deletions(-) diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumerator.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumerator.java index 4e92b2d..67cc3c7 100644 --- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumerator.java +++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumerator.java @@ -26,6 +26,7 @@ import org.apache.flink.connector.pulsar.source.config.SourceConfiguration; import org.apache.flink.connector.pulsar.source.enumerator.cursor.StartCursor; import org.apache.flink.connector.pulsar.source.enumerator.subscriber.PulsarSubscriber; import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition; +import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicRange; import org.apache.flink.connector.pulsar.source.enumerator.topic.range.RangeGenerator; import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit; import org.apache.flink.util.FlinkRuntimeException; @@ -33,9 +34,13 @@ import org.apache.flink.util.FlinkRuntimeException; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.ConsumerBuilder; +import org.apache.pulsar.client.api.KeySharedPolicy; +import org.apache.pulsar.client.api.KeySharedPolicy.KeySharedPolicySticky; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.Range; import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.SubscriptionType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -173,8 +178,7 @@ public class PulsarSourceEnumerator } private void seekStartPosition(Set<TopicPartition> partitions) { - ConsumerBuilder<byte[]> consumerBuilder = - createConsumerBuilder(pulsarClient, Schema.BYTES, configuration); + ConsumerBuilder<byte[]> consumerBuilder = consumerBuilder(); Set<String> seekedTopics = new HashSet<>(); for (TopicPartition partition : partitions) { @@ -200,6 +204,20 @@ public class PulsarSourceEnumerator } } + private ConsumerBuilder<byte[]> consumerBuilder() { + ConsumerBuilder<byte[]> builder = + createConsumerBuilder(pulsarClient, Schema.BYTES, configuration); + if (sourceConfiguration.getSubscriptionType() == SubscriptionType.Key_Shared) { + Range range = TopicRange.createFullRange().toPulsarRange(); + KeySharedPolicySticky keySharedPolicy = KeySharedPolicy.stickyHashRange().ranges(range); + // Force this consume use sticky hash range in Key_Shared subscription. + // Pulsar won't remove old message dispatcher before 2.8.2 release. + builder.keySharedPolicy(keySharedPolicy); + } + + return builder; + } + /** * Check if there's any partition changes within subscribed topic partitions fetched by worker * thread, and convert them to splits the assign them to pulsar readers.