This is an automated email from the ASF dual-hosted git repository. arvid pushed a commit to branch release-1.15 in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.15 by this push: new 5de8016a5dc [FLINK-26645][Connector/pulsar] Support subscribe only one topic partition. 5de8016a5dc is described below commit 5de8016a5dcee8a38d21f54e685633809f800c48 Author: Yufan Sheng <yu...@streamnative.io> AuthorDate: Wed Mar 30 20:41:55 2022 +0800 [FLINK-26645][Connector/pulsar] Support subscribe only one topic partition. --- .../sink/writer/topic/TopicMetadataListener.java | 8 +- .../subscriber/impl/BasePulsarSubscriber.java | 23 +++-- .../subscriber/impl/TopicListSubscriber.java | 56 +++++++++--- .../source/enumerator/topic/TopicNameUtils.java | 6 +- .../writer/topic/TopicMetadataListenerTest.java | 4 +- .../subscriber/PulsarSubscriberTest.java | 99 +++++++++++++++++----- .../testutils/runtime/PulsarRuntimeOperator.java | 2 +- 7 files changed, 139 insertions(+), 59 deletions(-) diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/topic/TopicMetadataListener.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/topic/TopicMetadataListener.java index fb51ec824a2..19a47c775cf 100644 --- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/topic/TopicMetadataListener.java +++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/topic/TopicMetadataListener.java @@ -42,9 +42,9 @@ import java.util.Map; import static java.util.Collections.emptyList; import static org.apache.flink.connector.pulsar.common.config.PulsarClientFactory.createAdmin; import static org.apache.flink.connector.pulsar.common.utils.PulsarExceptionUtils.sneakyAdmin; -import static org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils.isPartitioned; -import static org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils.topicNameWithNonPartition; +import static org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils.isPartition; import static org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils.topicNameWithPartition; +import static org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils.topicNameWithoutPartition; import static org.apache.pulsar.common.partition.PartitionedTopicMetadata.NON_PARTITIONED; /** @@ -75,7 +75,7 @@ public class TopicMetadataListener implements Serializable, Closeable { List<String> partitions = new ArrayList<>(topics.size()); Map<String, Integer> metadata = new HashMap<>(topics.size()); for (String topic : topics) { - if (isPartitioned(topic)) { + if (isPartition(topic)) { partitions.add(topic); } else { // This would be updated when open writing. @@ -120,7 +120,7 @@ public class TopicMetadataListener implements Serializable, Closeable { int partitionNums = entry.getValue(); // Get all topics from partitioned and non-partitioned topic names if (partitionNums == NON_PARTITIONED) { - results.add(topicNameWithNonPartition(entry.getKey())); + results.add(topicNameWithoutPartition(entry.getKey())); } else { for (int i = 0; i < partitionNums; i++) { results.add(topicNameWithPartition(entry.getKey(), i)); diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/subscriber/impl/BasePulsarSubscriber.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/subscriber/impl/BasePulsarSubscriber.java index d266c98df42..d29f7a0d548 100644 --- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/subscriber/impl/BasePulsarSubscriber.java +++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/subscriber/impl/BasePulsarSubscriber.java @@ -28,8 +28,8 @@ import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.common.partition.PartitionedTopicMetadata; +import java.util.ArrayList; import java.util.List; -import java.util.stream.IntStream; import static java.util.stream.Collectors.toList; @@ -63,18 +63,15 @@ public abstract class BasePulsarSubscriber implements PulsarSubscriber { .map(range -> new TopicPartition(metadata.getName(), -1, range)) .collect(toList()); } else { - return IntStream.range(0, metadata.getPartitionSize()) - .boxed() - .flatMap( - partitionId -> - ranges.stream() - .map( - range -> - new TopicPartition( - metadata.getName(), - partitionId, - range))) - .collect(toList()); + List<TopicPartition> partitions = new ArrayList<>(); + for (int i = 0; i < metadata.getPartitionSize(); i++) { + for (TopicRange range : ranges) { + TopicPartition partition = new TopicPartition(metadata.getName(), i, range); + partitions.add(partition); + } + } + + return partitions; } } } diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/subscriber/impl/TopicListSubscriber.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/subscriber/impl/TopicListSubscriber.java index 5f5617ebe50..28f525b2d07 100644 --- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/subscriber/impl/TopicListSubscriber.java +++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/subscriber/impl/TopicListSubscriber.java @@ -18,40 +18,68 @@ package org.apache.flink.connector.pulsar.source.enumerator.subscriber.impl; +import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicMetadata; import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition; import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicRange; import org.apache.flink.connector.pulsar.source.enumerator.topic.range.RangeGenerator; import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.common.naming.TopicName; +import java.util.ArrayList; +import java.util.HashSet; import java.util.List; -import java.util.Objects; import java.util.Set; -import static java.util.stream.Collectors.toSet; +import static org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils.isPartition; /** the implements of consuming multiple topics. */ public class TopicListSubscriber extends BasePulsarSubscriber { private static final long serialVersionUID = 6473918213832993116L; - private final List<String> topics; + private final List<String> partitions; + private final List<String> fullTopicNames; - public TopicListSubscriber(List<String> topics) { - this.topics = topics; + public TopicListSubscriber(List<String> fullTopicNameOrPartitions) { + this.partitions = new ArrayList<>(); + this.fullTopicNames = new ArrayList<>(); + + for (String fullTopicNameOrPartition : fullTopicNameOrPartitions) { + if (isPartition(fullTopicNameOrPartition)) { + this.partitions.add(fullTopicNameOrPartition); + } else { + this.fullTopicNames.add(fullTopicNameOrPartition); + } + } } @Override public Set<TopicPartition> getSubscribedTopicPartitions( PulsarAdmin pulsarAdmin, RangeGenerator rangeGenerator, int parallelism) { + Set<TopicPartition> results = new HashSet<>(); + + // Query topics from Pulsar. + for (String topic : fullTopicNames) { + TopicMetadata metadata = queryTopicMetadata(pulsarAdmin, topic); + List<TopicRange> ranges = rangeGenerator.range(metadata, parallelism); + List<TopicPartition> list = toTopicPartitions(metadata, ranges); + + results.addAll(list); + } + + for (String partition : partitions) { + TopicName topicName = TopicName.get(partition); + String name = topicName.getPartitionedTopicName(); + int index = topicName.getPartitionIndex(); + + TopicMetadata metadata = queryTopicMetadata(pulsarAdmin, name); + List<TopicRange> ranges = rangeGenerator.range(metadata, parallelism); + + for (TopicRange range : ranges) { + results.add(new TopicPartition(name, index, range)); + } + } - return topics.parallelStream() - .map(topic -> queryTopicMetadata(pulsarAdmin, topic)) - .filter(Objects::nonNull) - .flatMap( - metadata -> { - List<TopicRange> ranges = rangeGenerator.range(metadata, parallelism); - return toTopicPartitions(metadata, ranges).stream(); - }) - .collect(toSet()); + return results; } } diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/topic/TopicNameUtils.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/topic/TopicNameUtils.java index f5c71159deb..41c54892838 100644 --- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/topic/TopicNameUtils.java +++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/topic/TopicNameUtils.java @@ -53,15 +53,15 @@ public final class TopicNameUtils { } /** Get a non-partitioned topic name that does not belong to any partitioned topic. */ - public static String topicNameWithNonPartition(String topic) { + public static String topicNameWithoutPartition(String topic) { return TopicName.get(topic).toString(); } - public static boolean isPartitioned(String topic) { + public static boolean isPartition(String topic) { return TopicName.get(topic).isPartitioned(); } - /** Merge the same topics into one topics. */ + /** Merge the same topics into one topic. */ public static List<String> distinctTopics(List<String> topics) { Set<String> fullTopics = new HashSet<>(); Map<String, List<Integer>> partitionedTopics = new HashMap<>(); diff --git a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/sink/writer/topic/TopicMetadataListenerTest.java b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/sink/writer/topic/TopicMetadataListenerTest.java index b4aba9617c9..7e2db66f253 100644 --- a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/sink/writer/topic/TopicMetadataListenerTest.java +++ b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/sink/writer/topic/TopicMetadataListenerTest.java @@ -34,8 +34,8 @@ import static java.util.Collections.singletonList; import static java.util.stream.Collectors.toList; import static org.apache.commons.lang3.RandomStringUtils.randomAlphabetic; import static org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_TOPIC_METADATA_REFRESH_INTERVAL; -import static org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils.topicNameWithNonPartition; import static org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils.topicNameWithPartition; +import static org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils.topicNameWithoutPartition; import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -131,7 +131,7 @@ class TopicMetadataListenerTest extends PulsarTestSuiteBase { String topic = randomAlphabetic(10); operator().createTopic(topic, 0); List<String> nonPartitionTopic = - Collections.singletonList(topicNameWithNonPartition(topic)); + Collections.singletonList(topicNameWithoutPartition(topic)); TopicMetadataListener listener = new TopicMetadataListener(nonPartitionTopic); long interval = Duration.ofMinutes(15).toMillis(); diff --git a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/subscriber/PulsarSubscriberTest.java b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/subscriber/PulsarSubscriberTest.java index 8409f63f2b8..f57980e3481 100644 --- a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/subscriber/PulsarSubscriberTest.java +++ b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/subscriber/PulsarSubscriberTest.java @@ -18,11 +18,12 @@ package org.apache.flink.connector.pulsar.source.enumerator.subscriber; -import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils; import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition; import org.apache.flink.connector.pulsar.source.enumerator.topic.range.FullRangeGenerator; import org.apache.flink.connector.pulsar.testutils.PulsarTestSuiteBase; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; import java.util.Arrays; @@ -30,50 +31,108 @@ import java.util.HashSet; import java.util.Set; import java.util.regex.Pattern; +import static java.util.Collections.singletonList; +import static org.apache.commons.lang3.RandomStringUtils.randomAlphanumeric; import static org.apache.flink.connector.pulsar.source.enumerator.subscriber.PulsarSubscriber.getTopicListSubscriber; import static org.apache.flink.connector.pulsar.source.enumerator.subscriber.PulsarSubscriber.getTopicPatternSubscriber; +import static org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils.topicName; +import static org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils.topicNameWithPartition; import static org.apache.flink.connector.pulsar.source.enumerator.topic.TopicRange.createFullRange; import static org.apache.pulsar.client.api.RegexSubscriptionMode.AllTopics; +import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.assertEquals; /** Unit tests for {@link PulsarSubscriber}. */ class PulsarSubscriberTest extends PulsarTestSuiteBase { - private static final String TOPIC1 = TopicNameUtils.topicName("topic1"); - private static final String TOPIC2 = TopicNameUtils.topicName("pattern-topic"); - private static final String TOPIC3 = TopicNameUtils.topicName("topic2"); + private final String topic1 = topicName("topic-" + randomAlphanumeric(4)); + private final String topic2 = topicName("pattern-topic-" + randomAlphanumeric(4)); + private final String topic3 = topicName("topic2-" + randomAlphanumeric(4)); + private final String topic4 = topicName("non-partitioned-topic-" + randomAlphanumeric(4)); + private final String topic5 = topicName("non-partitioned-topic2-" + randomAlphanumeric(4)); private static final int NUM_PARTITIONS_PER_TOPIC = 5; private static final int NUM_PARALLELISM = 10; + @BeforeAll + void setUp() { + operator().createTopic(topic1, NUM_PARTITIONS_PER_TOPIC); + operator().createTopic(topic2, NUM_PARTITIONS_PER_TOPIC); + operator().createTopic(topic3, NUM_PARTITIONS_PER_TOPIC); + operator().createTopic(topic4, 0); + operator().createTopic(topic5, 0); + } + + @AfterAll + void tearDown() { + operator().deleteTopic(topic1); + operator().deleteTopic(topic2); + operator().deleteTopic(topic3); + operator().deleteTopic(topic4); + operator().deleteTopic(topic5); + } + @Test void topicListSubscriber() { - operator().createTopic(TOPIC1, NUM_PARTITIONS_PER_TOPIC); - operator().createTopic(TOPIC2, NUM_PARTITIONS_PER_TOPIC); - - PulsarSubscriber subscriber = getTopicListSubscriber(Arrays.asList(TOPIC1, TOPIC2)); + PulsarSubscriber subscriber = getTopicListSubscriber(Arrays.asList(topic1, topic2)); Set<TopicPartition> topicPartitions = subscriber.getSubscribedTopicPartitions( operator().admin(), new FullRangeGenerator(), NUM_PARALLELISM); Set<TopicPartition> expectedPartitions = new HashSet<>(); for (int i = 0; i < NUM_PARTITIONS_PER_TOPIC; i++) { - expectedPartitions.add(new TopicPartition(TOPIC1, i, createFullRange())); - expectedPartitions.add(new TopicPartition(TOPIC2, i, createFullRange())); + expectedPartitions.add(new TopicPartition(topic1, i, createFullRange())); + expectedPartitions.add(new TopicPartition(topic2, i, createFullRange())); } assertEquals(expectedPartitions, topicPartitions); + } + + @Test + void subscribeOnePartitionOfMultiplePartitionTopic() { + String partition = topicNameWithPartition(topic1, 2); - operator().deleteTopic(TOPIC1); - operator().deleteTopic(TOPIC2); + PulsarSubscriber subscriber = getTopicListSubscriber(singletonList(partition)); + Set<TopicPartition> partitions = + subscriber.getSubscribedTopicPartitions( + operator().admin(), new FullRangeGenerator(), NUM_PARALLELISM); + + TopicPartition desiredPartition = new TopicPartition(topic1, 2, createFullRange()); + assertThat(partitions).hasSize(1).containsExactly(desiredPartition); } @Test - void topicPatternSubscriber() { - operator().createTopic(TOPIC1, NUM_PARTITIONS_PER_TOPIC); - operator().createTopic(TOPIC2, NUM_PARTITIONS_PER_TOPIC); - operator().createTopic(TOPIC3, NUM_PARTITIONS_PER_TOPIC); + void subscribeNonPartitionedTopicList() { + PulsarSubscriber subscriber = getTopicListSubscriber(singletonList(topic4)); + Set<TopicPartition> partitions = + subscriber.getSubscribedTopicPartitions( + operator().admin(), new FullRangeGenerator(), NUM_PARALLELISM); + + TopicPartition desiredPartition = new TopicPartition(topic4, -1, createFullRange()); + assertThat(partitions).hasSize(1).containsExactly(desiredPartition); + } + @Test + void subscribeNonPartitionedTopicPattern() { + PulsarSubscriber subscriber = + getTopicPatternSubscriber( + Pattern.compile("persistent://public/default/non-partitioned-topic*?"), + AllTopics); + + Set<TopicPartition> topicPartitions = + subscriber.getSubscribedTopicPartitions( + operator().admin(), new FullRangeGenerator(), NUM_PARALLELISM); + + Set<TopicPartition> expectedPartitions = new HashSet<>(); + + expectedPartitions.add(new TopicPartition(topic4, -1, createFullRange())); + expectedPartitions.add(new TopicPartition(topic5, -1, createFullRange())); + + assertEquals(expectedPartitions, topicPartitions); + } + + @Test + void topicPatternSubscriber() { PulsarSubscriber subscriber = getTopicPatternSubscriber( Pattern.compile("persistent://public/default/topic*?"), AllTopics); @@ -85,14 +144,10 @@ class PulsarSubscriberTest extends PulsarTestSuiteBase { Set<TopicPartition> expectedPartitions = new HashSet<>(); for (int i = 0; i < NUM_PARTITIONS_PER_TOPIC; i++) { - expectedPartitions.add(new TopicPartition(TOPIC1, i, createFullRange())); - expectedPartitions.add(new TopicPartition(TOPIC3, i, createFullRange())); + expectedPartitions.add(new TopicPartition(topic1, i, createFullRange())); + expectedPartitions.add(new TopicPartition(topic3, i, createFullRange())); } assertEquals(expectedPartitions, topicPartitions); - - operator().deleteTopic(TOPIC1); - operator().deleteTopic(TOPIC2); - operator().deleteTopic(TOPIC3); } } diff --git a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/PulsarRuntimeOperator.java b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/PulsarRuntimeOperator.java index a78ea992d4b..b8c49581f75 100644 --- a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/PulsarRuntimeOperator.java +++ b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/PulsarRuntimeOperator.java @@ -174,7 +174,7 @@ public class PulsarRuntimeOperator implements Closeable { * * @param topic The name of the topic. * @param numberOfPartitions The number of partitions. We would create a non-partitioned topic - * if this number if zero. + * if this number is zero. */ public void createTopic(String topic, int numberOfPartitions) { checkArgument(numberOfPartitions >= 0);