This is an automated email from the ASF dual-hosted git repository.

arvid pushed a commit to branch release-1.15
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.15 by this push:
     new 5de8016a5dc [FLINK-26645][Connector/pulsar] Support subscribe only one 
topic partition.
5de8016a5dc is described below

commit 5de8016a5dcee8a38d21f54e685633809f800c48
Author: Yufan Sheng <yu...@streamnative.io>
AuthorDate: Wed Mar 30 20:41:55 2022 +0800

    [FLINK-26645][Connector/pulsar] Support subscribe only one topic partition.
---
 .../sink/writer/topic/TopicMetadataListener.java   |  8 +-
 .../subscriber/impl/BasePulsarSubscriber.java      | 23 +++--
 .../subscriber/impl/TopicListSubscriber.java       | 56 +++++++++---
 .../source/enumerator/topic/TopicNameUtils.java    |  6 +-
 .../writer/topic/TopicMetadataListenerTest.java    |  4 +-
 .../subscriber/PulsarSubscriberTest.java           | 99 +++++++++++++++++-----
 .../testutils/runtime/PulsarRuntimeOperator.java   |  2 +-
 7 files changed, 139 insertions(+), 59 deletions(-)

diff --git 
a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/topic/TopicMetadataListener.java
 
b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/topic/TopicMetadataListener.java
index fb51ec824a2..19a47c775cf 100644
--- 
a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/topic/TopicMetadataListener.java
+++ 
b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/topic/TopicMetadataListener.java
@@ -42,9 +42,9 @@ import java.util.Map;
 import static java.util.Collections.emptyList;
 import static 
org.apache.flink.connector.pulsar.common.config.PulsarClientFactory.createAdmin;
 import static 
org.apache.flink.connector.pulsar.common.utils.PulsarExceptionUtils.sneakyAdmin;
-import static 
org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils.isPartitioned;
-import static 
org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils.topicNameWithNonPartition;
+import static 
org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils.isPartition;
 import static 
org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils.topicNameWithPartition;
+import static 
org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils.topicNameWithoutPartition;
 import static 
org.apache.pulsar.common.partition.PartitionedTopicMetadata.NON_PARTITIONED;
 
 /**
@@ -75,7 +75,7 @@ public class TopicMetadataListener implements Serializable, 
Closeable {
         List<String> partitions = new ArrayList<>(topics.size());
         Map<String, Integer> metadata = new HashMap<>(topics.size());
         for (String topic : topics) {
-            if (isPartitioned(topic)) {
+            if (isPartition(topic)) {
                 partitions.add(topic);
             } else {
                 // This would be updated when open writing.
@@ -120,7 +120,7 @@ public class TopicMetadataListener implements Serializable, 
Closeable {
                 int partitionNums = entry.getValue();
                 // Get all topics from partitioned and non-partitioned topic 
names
                 if (partitionNums == NON_PARTITIONED) {
-                    results.add(topicNameWithNonPartition(entry.getKey()));
+                    results.add(topicNameWithoutPartition(entry.getKey()));
                 } else {
                     for (int i = 0; i < partitionNums; i++) {
                         results.add(topicNameWithPartition(entry.getKey(), i));
diff --git 
a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/subscriber/impl/BasePulsarSubscriber.java
 
b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/subscriber/impl/BasePulsarSubscriber.java
index d266c98df42..d29f7a0d548 100644
--- 
a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/subscriber/impl/BasePulsarSubscriber.java
+++ 
b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/subscriber/impl/BasePulsarSubscriber.java
@@ -28,8 +28,8 @@ import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
 
+import java.util.ArrayList;
 import java.util.List;
-import java.util.stream.IntStream;
 
 import static java.util.stream.Collectors.toList;
 
@@ -63,18 +63,15 @@ public abstract class BasePulsarSubscriber implements 
PulsarSubscriber {
                     .map(range -> new TopicPartition(metadata.getName(), -1, 
range))
                     .collect(toList());
         } else {
-            return IntStream.range(0, metadata.getPartitionSize())
-                    .boxed()
-                    .flatMap(
-                            partitionId ->
-                                    ranges.stream()
-                                            .map(
-                                                    range ->
-                                                            new TopicPartition(
-                                                                    
metadata.getName(),
-                                                                    
partitionId,
-                                                                    range)))
-                    .collect(toList());
+            List<TopicPartition> partitions = new ArrayList<>();
+            for (int i = 0; i < metadata.getPartitionSize(); i++) {
+                for (TopicRange range : ranges) {
+                    TopicPartition partition = new 
TopicPartition(metadata.getName(), i, range);
+                    partitions.add(partition);
+                }
+            }
+
+            return partitions;
         }
     }
 }
diff --git 
a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/subscriber/impl/TopicListSubscriber.java
 
b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/subscriber/impl/TopicListSubscriber.java
index 5f5617ebe50..28f525b2d07 100644
--- 
a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/subscriber/impl/TopicListSubscriber.java
+++ 
b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/subscriber/impl/TopicListSubscriber.java
@@ -18,40 +18,68 @@
 
 package org.apache.flink.connector.pulsar.source.enumerator.subscriber.impl;
 
+import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicMetadata;
 import 
org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition;
 import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicRange;
 import 
org.apache.flink.connector.pulsar.source.enumerator.topic.range.RangeGenerator;
 
 import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.common.naming.TopicName;
 
+import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.List;
-import java.util.Objects;
 import java.util.Set;
 
-import static java.util.stream.Collectors.toSet;
+import static 
org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils.isPartition;
 
 /** the implements of consuming multiple topics. */
 public class TopicListSubscriber extends BasePulsarSubscriber {
     private static final long serialVersionUID = 6473918213832993116L;
 
-    private final List<String> topics;
+    private final List<String> partitions;
+    private final List<String> fullTopicNames;
 
-    public TopicListSubscriber(List<String> topics) {
-        this.topics = topics;
+    public TopicListSubscriber(List<String> fullTopicNameOrPartitions) {
+        this.partitions = new ArrayList<>();
+        this.fullTopicNames = new ArrayList<>();
+
+        for (String fullTopicNameOrPartition : fullTopicNameOrPartitions) {
+            if (isPartition(fullTopicNameOrPartition)) {
+                this.partitions.add(fullTopicNameOrPartition);
+            } else {
+                this.fullTopicNames.add(fullTopicNameOrPartition);
+            }
+        }
     }
 
     @Override
     public Set<TopicPartition> getSubscribedTopicPartitions(
             PulsarAdmin pulsarAdmin, RangeGenerator rangeGenerator, int 
parallelism) {
+        Set<TopicPartition> results = new HashSet<>();
+
+        // Query topics from Pulsar.
+        for (String topic : fullTopicNames) {
+            TopicMetadata metadata = queryTopicMetadata(pulsarAdmin, topic);
+            List<TopicRange> ranges = rangeGenerator.range(metadata, 
parallelism);
+            List<TopicPartition> list = toTopicPartitions(metadata, ranges);
+
+            results.addAll(list);
+        }
+
+        for (String partition : partitions) {
+            TopicName topicName = TopicName.get(partition);
+            String name = topicName.getPartitionedTopicName();
+            int index = topicName.getPartitionIndex();
+
+            TopicMetadata metadata = queryTopicMetadata(pulsarAdmin, name);
+            List<TopicRange> ranges = rangeGenerator.range(metadata, 
parallelism);
+
+            for (TopicRange range : ranges) {
+                results.add(new TopicPartition(name, index, range));
+            }
+        }
 
-        return topics.parallelStream()
-                .map(topic -> queryTopicMetadata(pulsarAdmin, topic))
-                .filter(Objects::nonNull)
-                .flatMap(
-                        metadata -> {
-                            List<TopicRange> ranges = 
rangeGenerator.range(metadata, parallelism);
-                            return toTopicPartitions(metadata, 
ranges).stream();
-                        })
-                .collect(toSet());
+        return results;
     }
 }
diff --git 
a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/topic/TopicNameUtils.java
 
b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/topic/TopicNameUtils.java
index f5c71159deb..41c54892838 100644
--- 
a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/topic/TopicNameUtils.java
+++ 
b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/topic/TopicNameUtils.java
@@ -53,15 +53,15 @@ public final class TopicNameUtils {
     }
 
     /** Get a non-partitioned topic name that does not belong to any 
partitioned topic. */
-    public static String topicNameWithNonPartition(String topic) {
+    public static String topicNameWithoutPartition(String topic) {
         return TopicName.get(topic).toString();
     }
 
-    public static boolean isPartitioned(String topic) {
+    public static boolean isPartition(String topic) {
         return TopicName.get(topic).isPartitioned();
     }
 
-    /** Merge the same topics into one topics. */
+    /** Merge the same topics into one topic. */
     public static List<String> distinctTopics(List<String> topics) {
         Set<String> fullTopics = new HashSet<>();
         Map<String, List<Integer>> partitionedTopics = new HashMap<>();
diff --git 
a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/sink/writer/topic/TopicMetadataListenerTest.java
 
b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/sink/writer/topic/TopicMetadataListenerTest.java
index b4aba9617c9..7e2db66f253 100644
--- 
a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/sink/writer/topic/TopicMetadataListenerTest.java
+++ 
b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/sink/writer/topic/TopicMetadataListenerTest.java
@@ -34,8 +34,8 @@ import static java.util.Collections.singletonList;
 import static java.util.stream.Collectors.toList;
 import static org.apache.commons.lang3.RandomStringUtils.randomAlphabetic;
 import static 
org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_TOPIC_METADATA_REFRESH_INTERVAL;
-import static 
org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils.topicNameWithNonPartition;
 import static 
org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils.topicNameWithPartition;
+import static 
org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils.topicNameWithoutPartition;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 
@@ -131,7 +131,7 @@ class TopicMetadataListenerTest extends PulsarTestSuiteBase 
{
         String topic = randomAlphabetic(10);
         operator().createTopic(topic, 0);
         List<String> nonPartitionTopic =
-                Collections.singletonList(topicNameWithNonPartition(topic));
+                Collections.singletonList(topicNameWithoutPartition(topic));
 
         TopicMetadataListener listener = new 
TopicMetadataListener(nonPartitionTopic);
         long interval = Duration.ofMinutes(15).toMillis();
diff --git 
a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/subscriber/PulsarSubscriberTest.java
 
b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/subscriber/PulsarSubscriberTest.java
index 8409f63f2b8..f57980e3481 100644
--- 
a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/subscriber/PulsarSubscriberTest.java
+++ 
b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/subscriber/PulsarSubscriberTest.java
@@ -18,11 +18,12 @@
 
 package org.apache.flink.connector.pulsar.source.enumerator.subscriber;
 
-import 
org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils;
 import 
org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition;
 import 
org.apache.flink.connector.pulsar.source.enumerator.topic.range.FullRangeGenerator;
 import org.apache.flink.connector.pulsar.testutils.PulsarTestSuiteBase;
 
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
 
 import java.util.Arrays;
@@ -30,50 +31,108 @@ import java.util.HashSet;
 import java.util.Set;
 import java.util.regex.Pattern;
 
+import static java.util.Collections.singletonList;
+import static org.apache.commons.lang3.RandomStringUtils.randomAlphanumeric;
 import static 
org.apache.flink.connector.pulsar.source.enumerator.subscriber.PulsarSubscriber.getTopicListSubscriber;
 import static 
org.apache.flink.connector.pulsar.source.enumerator.subscriber.PulsarSubscriber.getTopicPatternSubscriber;
+import static 
org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils.topicName;
+import static 
org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils.topicNameWithPartition;
 import static 
org.apache.flink.connector.pulsar.source.enumerator.topic.TopicRange.createFullRange;
 import static org.apache.pulsar.client.api.RegexSubscriptionMode.AllTopics;
+import static org.assertj.core.api.Assertions.assertThat;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 
 /** Unit tests for {@link PulsarSubscriber}. */
 class PulsarSubscriberTest extends PulsarTestSuiteBase {
 
-    private static final String TOPIC1 = TopicNameUtils.topicName("topic1");
-    private static final String TOPIC2 = 
TopicNameUtils.topicName("pattern-topic");
-    private static final String TOPIC3 = TopicNameUtils.topicName("topic2");
+    private final String topic1 = topicName("topic-" + randomAlphanumeric(4));
+    private final String topic2 = topicName("pattern-topic-" + 
randomAlphanumeric(4));
+    private final String topic3 = topicName("topic2-" + randomAlphanumeric(4));
+    private final String topic4 = topicName("non-partitioned-topic-" + 
randomAlphanumeric(4));
+    private final String topic5 = topicName("non-partitioned-topic2-" + 
randomAlphanumeric(4));
 
     private static final int NUM_PARTITIONS_PER_TOPIC = 5;
     private static final int NUM_PARALLELISM = 10;
 
+    @BeforeAll
+    void setUp() {
+        operator().createTopic(topic1, NUM_PARTITIONS_PER_TOPIC);
+        operator().createTopic(topic2, NUM_PARTITIONS_PER_TOPIC);
+        operator().createTopic(topic3, NUM_PARTITIONS_PER_TOPIC);
+        operator().createTopic(topic4, 0);
+        operator().createTopic(topic5, 0);
+    }
+
+    @AfterAll
+    void tearDown() {
+        operator().deleteTopic(topic1);
+        operator().deleteTopic(topic2);
+        operator().deleteTopic(topic3);
+        operator().deleteTopic(topic4);
+        operator().deleteTopic(topic5);
+    }
+
     @Test
     void topicListSubscriber() {
-        operator().createTopic(TOPIC1, NUM_PARTITIONS_PER_TOPIC);
-        operator().createTopic(TOPIC2, NUM_PARTITIONS_PER_TOPIC);
-
-        PulsarSubscriber subscriber = 
getTopicListSubscriber(Arrays.asList(TOPIC1, TOPIC2));
+        PulsarSubscriber subscriber = 
getTopicListSubscriber(Arrays.asList(topic1, topic2));
         Set<TopicPartition> topicPartitions =
                 subscriber.getSubscribedTopicPartitions(
                         operator().admin(), new FullRangeGenerator(), 
NUM_PARALLELISM);
         Set<TopicPartition> expectedPartitions = new HashSet<>();
 
         for (int i = 0; i < NUM_PARTITIONS_PER_TOPIC; i++) {
-            expectedPartitions.add(new TopicPartition(TOPIC1, i, 
createFullRange()));
-            expectedPartitions.add(new TopicPartition(TOPIC2, i, 
createFullRange()));
+            expectedPartitions.add(new TopicPartition(topic1, i, 
createFullRange()));
+            expectedPartitions.add(new TopicPartition(topic2, i, 
createFullRange()));
         }
 
         assertEquals(expectedPartitions, topicPartitions);
+    }
+
+    @Test
+    void subscribeOnePartitionOfMultiplePartitionTopic() {
+        String partition = topicNameWithPartition(topic1, 2);
 
-        operator().deleteTopic(TOPIC1);
-        operator().deleteTopic(TOPIC2);
+        PulsarSubscriber subscriber = 
getTopicListSubscriber(singletonList(partition));
+        Set<TopicPartition> partitions =
+                subscriber.getSubscribedTopicPartitions(
+                        operator().admin(), new FullRangeGenerator(), 
NUM_PARALLELISM);
+
+        TopicPartition desiredPartition = new TopicPartition(topic1, 2, 
createFullRange());
+        assertThat(partitions).hasSize(1).containsExactly(desiredPartition);
     }
 
     @Test
-    void topicPatternSubscriber() {
-        operator().createTopic(TOPIC1, NUM_PARTITIONS_PER_TOPIC);
-        operator().createTopic(TOPIC2, NUM_PARTITIONS_PER_TOPIC);
-        operator().createTopic(TOPIC3, NUM_PARTITIONS_PER_TOPIC);
+    void subscribeNonPartitionedTopicList() {
+        PulsarSubscriber subscriber = 
getTopicListSubscriber(singletonList(topic4));
+        Set<TopicPartition> partitions =
+                subscriber.getSubscribedTopicPartitions(
+                        operator().admin(), new FullRangeGenerator(), 
NUM_PARALLELISM);
+
+        TopicPartition desiredPartition = new TopicPartition(topic4, -1, 
createFullRange());
+        assertThat(partitions).hasSize(1).containsExactly(desiredPartition);
+    }
 
+    @Test
+    void subscribeNonPartitionedTopicPattern() {
+        PulsarSubscriber subscriber =
+                getTopicPatternSubscriber(
+                        
Pattern.compile("persistent://public/default/non-partitioned-topic*?"),
+                        AllTopics);
+
+        Set<TopicPartition> topicPartitions =
+                subscriber.getSubscribedTopicPartitions(
+                        operator().admin(), new FullRangeGenerator(), 
NUM_PARALLELISM);
+
+        Set<TopicPartition> expectedPartitions = new HashSet<>();
+
+        expectedPartitions.add(new TopicPartition(topic4, -1, 
createFullRange()));
+        expectedPartitions.add(new TopicPartition(topic5, -1, 
createFullRange()));
+
+        assertEquals(expectedPartitions, topicPartitions);
+    }
+
+    @Test
+    void topicPatternSubscriber() {
         PulsarSubscriber subscriber =
                 getTopicPatternSubscriber(
                         
Pattern.compile("persistent://public/default/topic*?"), AllTopics);
@@ -85,14 +144,10 @@ class PulsarSubscriberTest extends PulsarTestSuiteBase {
         Set<TopicPartition> expectedPartitions = new HashSet<>();
 
         for (int i = 0; i < NUM_PARTITIONS_PER_TOPIC; i++) {
-            expectedPartitions.add(new TopicPartition(TOPIC1, i, 
createFullRange()));
-            expectedPartitions.add(new TopicPartition(TOPIC3, i, 
createFullRange()));
+            expectedPartitions.add(new TopicPartition(topic1, i, 
createFullRange()));
+            expectedPartitions.add(new TopicPartition(topic3, i, 
createFullRange()));
         }
 
         assertEquals(expectedPartitions, topicPartitions);
-
-        operator().deleteTopic(TOPIC1);
-        operator().deleteTopic(TOPIC2);
-        operator().deleteTopic(TOPIC3);
     }
 }
diff --git 
a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/PulsarRuntimeOperator.java
 
b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/PulsarRuntimeOperator.java
index a78ea992d4b..b8c49581f75 100644
--- 
a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/PulsarRuntimeOperator.java
+++ 
b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/PulsarRuntimeOperator.java
@@ -174,7 +174,7 @@ public class PulsarRuntimeOperator implements Closeable {
      *
      * @param topic The name of the topic.
      * @param numberOfPartitions The number of partitions. We would create a 
non-partitioned topic
-     *     if this number if zero.
+     *     if this number is zero.
      */
     public void createTopic(String topic, int numberOfPartitions) {
         checkArgument(numberOfPartitions >= 0);

Reply via email to