This is an automated email from the ASF dual-hosted git repository. tison pushed a commit to branch release-1.16 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 59d9a3742651e48c5150993be67fe10a63750c08 Author: Yufan Sheng <[email protected]> AuthorDate: Tue Sep 6 00:01:17 2022 +0800 [FLINK-27400][Connector/pulsar] Filter system topics for Pulsar connector. --- .../sink/writer/topic/TopicMetadataListener.java | 4 +- .../subscriber/impl/TopicPatternSubscriber.java | 3 ++ .../source/enumerator/topic/TopicNameUtils.java | 52 +++++++++++++++++++--- .../writer/topic/TopicMetadataListenerTest.java | 6 +-- .../enumerator/topic/TopicNameUtilsTest.java | 9 ++++ 5 files changed, 63 insertions(+), 11 deletions(-) diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/topic/TopicMetadataListener.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/topic/TopicMetadataListener.java index 19a47c775cf..7db42e6e4b5 100644 --- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/topic/TopicMetadataListener.java +++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/topic/TopicMetadataListener.java @@ -43,8 +43,8 @@ import static java.util.Collections.emptyList; import static org.apache.flink.connector.pulsar.common.config.PulsarClientFactory.createAdmin; import static org.apache.flink.connector.pulsar.common.utils.PulsarExceptionUtils.sneakyAdmin; import static org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils.isPartition; +import static org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils.topicName; import static org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils.topicNameWithPartition; -import static org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils.topicNameWithoutPartition; import static org.apache.pulsar.common.partition.PartitionedTopicMetadata.NON_PARTITIONED; /** @@ -120,7 +120,7 @@ public class TopicMetadataListener implements Serializable, Closeable { int partitionNums = entry.getValue(); // Get all topics from partitioned and non-partitioned topic names if (partitionNums == NON_PARTITIONED) { - results.add(topicNameWithoutPartition(entry.getKey())); + results.add(topicName(entry.getKey())); } else { for (int i = 0; i < partitionNums; i++) { results.add(topicNameWithPartition(entry.getKey(), i)); diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/subscriber/impl/TopicPatternSubscriber.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/subscriber/impl/TopicPatternSubscriber.java index 472dbde3e35..fae2bac7b3e 100644 --- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/subscriber/impl/TopicPatternSubscriber.java +++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/subscriber/impl/TopicPatternSubscriber.java @@ -18,6 +18,7 @@ package org.apache.flink.connector.pulsar.source.enumerator.subscriber.impl; +import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils; import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition; import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicRange; import org.apache.flink.connector.pulsar.source.enumerator.topic.range.RangeGenerator; @@ -35,6 +36,7 @@ import java.util.Set; import java.util.regex.Pattern; import static java.util.stream.Collectors.toSet; +import static org.apache.flink.shaded.guava30.com.google.common.base.Predicates.not; /** Subscribe to matching topics based on topic pattern. */ public class TopicPatternSubscriber extends BasePulsarSubscriber { @@ -64,6 +66,7 @@ public class TopicPatternSubscriber extends BasePulsarSubscriber { .getTopics(namespace) .parallelStream() .filter(this::matchesSubscriptionMode) + .filter(not(TopicNameUtils::isInternal)) .filter(topic -> topicPattern.matcher(topic).find()) .map(topic -> queryTopicMetadata(pulsarAdmin, topic)) .filter(Objects::nonNull) diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/topic/TopicNameUtils.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/topic/TopicNameUtils.java index 41c54892838..efa31227499 100644 --- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/topic/TopicNameUtils.java +++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/topic/TopicNameUtils.java @@ -21,6 +21,7 @@ package org.apache.flink.connector.pulsar.source.enumerator.topic; import org.apache.flink.annotation.Internal; import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableList; +import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableSet; import org.apache.pulsar.common.naming.TopicName; @@ -30,13 +31,32 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.regex.Pattern; import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.pulsar.common.naming.NamespaceName.SYSTEM_NAMESPACE; +import static org.apache.pulsar.common.naming.TopicDomain.persistent; /** util for topic name. */ @Internal public final class TopicNameUtils { + private static final Pattern HEARTBEAT_NAMESPACE_PATTERN = + Pattern.compile("pulsar/[^/]+/([^:]+:\\d+)"); + private static final Pattern HEARTBEAT_NAMESPACE_PATTERN_V2 = + Pattern.compile("pulsar/([^:]+:\\d+)"); + private static final Pattern SLA_NAMESPACE_PATTERN = + Pattern.compile("sla-monitor" + "/[^/]+/([^:]+:\\d+)"); + private static final Set<String> EVENTS_TOPIC_NAMES = + ImmutableSet.of("__change_events", "__transaction_buffer_snapshot"); + private static final String TRANSACTION_COORDINATOR_ASSIGN_PREFIX = + TopicName.get(persistent.value(), SYSTEM_NAMESPACE, "transaction_coordinator_assign") + .toString(); + private static final String TRANSACTION_COORDINATOR_LOG_PREFIX = + TopicName.get(persistent.value(), SYSTEM_NAMESPACE, "__transaction_log_").toString(); + private static final String PENDING_ACK_STORE_SUFFIX = "__transaction_pending_ack"; + private static final String PENDING_ACK_STORE_CURSOR_SUFFIX = "__pending_ack_state"; + private TopicNameUtils() { // No public constructor. } @@ -52,11 +72,6 @@ public final class TopicNameUtils { return TopicName.get(topic).getPartition(partitionId).toString(); } - /** Get a non-partitioned topic name that does not belong to any partitioned topic. */ - public static String topicNameWithoutPartition(String topic) { - return TopicName.get(topic).toString(); - } - public static boolean isPartition(String topic) { return TopicName.get(topic).isPartitioned(); } @@ -92,4 +107,31 @@ public final class TopicNameUtils { return builder.build(); } + + /** + * This method is refactored from {@code BrokerService} in pulsar-broker which is not available + * in the Pulsar client. We have to put it here and self maintained. Since these topic names + * would never be changed for backward compatible, we only need to add new topic names after + * version bump. + * + * @see <a + * href="https://github.com/apache/pulsar/blob/7075a5ce0d4a70f52625ac8c3d0c48894442b72a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java#L3024">BrokerService#isSystemTopic</a> + */ + public static boolean isInternal(String topic) { + // A topic name instance without partition information. + String topicName = topicName(topic); + TopicName topicInstance = TopicName.get(topicName); + String localName = topicInstance.getLocalName(); + String namespace = topicInstance.getNamespace(); + + return namespace.equals(SYSTEM_NAMESPACE.toString()) + || SLA_NAMESPACE_PATTERN.matcher(namespace).matches() + || HEARTBEAT_NAMESPACE_PATTERN.matcher(namespace).matches() + || HEARTBEAT_NAMESPACE_PATTERN_V2.matcher(namespace).matches() + || EVENTS_TOPIC_NAMES.contains(localName) + || topicName.startsWith(TRANSACTION_COORDINATOR_ASSIGN_PREFIX) + || topicName.startsWith(TRANSACTION_COORDINATOR_LOG_PREFIX) + || localName.endsWith(PENDING_ACK_STORE_SUFFIX) + || localName.endsWith(PENDING_ACK_STORE_CURSOR_SUFFIX); + } } diff --git a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/sink/writer/topic/TopicMetadataListenerTest.java b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/sink/writer/topic/TopicMetadataListenerTest.java index 7e2db66f253..8af27e81fba 100644 --- a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/sink/writer/topic/TopicMetadataListenerTest.java +++ b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/sink/writer/topic/TopicMetadataListenerTest.java @@ -26,7 +26,6 @@ import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService; import org.junit.jupiter.api.Test; import java.time.Duration; -import java.util.Collections; import java.util.List; import java.util.stream.IntStream; @@ -34,8 +33,8 @@ import static java.util.Collections.singletonList; import static java.util.stream.Collectors.toList; import static org.apache.commons.lang3.RandomStringUtils.randomAlphabetic; import static org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_TOPIC_METADATA_REFRESH_INTERVAL; +import static org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils.topicName; import static org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils.topicNameWithPartition; -import static org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils.topicNameWithoutPartition; import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -130,8 +129,7 @@ class TopicMetadataListenerTest extends PulsarTestSuiteBase { void fetchNonPartitionTopic() { String topic = randomAlphabetic(10); operator().createTopic(topic, 0); - List<String> nonPartitionTopic = - Collections.singletonList(topicNameWithoutPartition(topic)); + List<String> nonPartitionTopic = singletonList(topicName(topic)); TopicMetadataListener listener = new TopicMetadataListener(nonPartitionTopic); long interval = Duration.ofMinutes(15).toMillis(); diff --git a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/topic/TopicNameUtilsTest.java b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/topic/TopicNameUtilsTest.java index 0abacc4d9ed..8f6b3ff7615 100644 --- a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/topic/TopicNameUtilsTest.java +++ b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/topic/TopicNameUtilsTest.java @@ -26,6 +26,7 @@ import java.util.List; import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; /** Unit tests for {@link TopicNameUtils}. */ class TopicNameUtilsTest { @@ -84,4 +85,12 @@ class TopicNameUtilsTest { "persistent://public/default/short-topic", "persistent://public/default/long-topic-partition-1"); } + + @Test + void internalTopicAssertion() { + boolean internal = + TopicNameUtils.isInternal( + "persistent://public/default/topic__transaction_pending_ack"); + assertTrue(internal); + } }
