This is an automated email from the ASF dual-hosted git repository. martijnvisser pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-connector-pulsar.git
commit 712f6475bb564399202c9b6c3f5d54e5a6a4c755 Author: Yufan Sheng <yu...@streamnative.io> AuthorDate: Tue Sep 27 18:04:24 2022 +0800 [FLINK-26182][Connector/pulsar] Extract common logic from Pulsar source testing tools. --- flink-connector-pulsar-e2e-tests/pom.xml | 24 +++- .../util/pulsar/PulsarSourceOrderedE2ECase.java | 5 +- .../util/pulsar/PulsarSourceUnorderedE2ECase.java | 13 +- .../pulsar/cases/KeySharedSubscriptionContext.java | 144 --------------------- .../pulsar/cases/SharedSubscriptionContext.java | 116 ----------------- .../FlinkContainerWithPulsarEnvironment.java | 3 +- .../common/KeyedPulsarPartitionDataWriter.java | 19 +-- .../common/UnorderedSourceTestSuiteBase.java | 86 ------------ .../ExclusiveSubscriptionContext.java | 19 +-- .../FailoverSubscriptionContext.java | 19 +-- .../source/KeySharedSubscriptionContext.java | 87 +++++++++++++ .../SharedSubscriptionContext.java} | 30 ++--- 12 files changed, 151 insertions(+), 414 deletions(-) diff --git a/flink-connector-pulsar-e2e-tests/pom.xml b/flink-connector-pulsar-e2e-tests/pom.xml index c105a2b..6e6f7d3 100644 --- a/flink-connector-pulsar-e2e-tests/pom.xml +++ b/flink-connector-pulsar-e2e-tests/pom.xml @@ -33,6 +33,7 @@ under the License. <properties> <pulsar.version>2.10.0</pulsar.version> <bouncycastle.version>1.69</bouncycastle.version> + <jaxb-api.version>2.3.1</jaxb-api.version> </properties> <dependencies> @@ -41,6 +42,17 @@ under the License. <artifactId>flink-end-to-end-tests-common</artifactId> <version>${project.version}</version> </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-connector-test-utils</artifactId> + <version>${project.version}</version> + <exclusions> + <exclusion> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + </exclusion> + </exclusions> + </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-pulsar</artifactId> @@ -70,9 +82,6 @@ under the License. <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-surefire-plugin</artifactId> - <configuration> - <skip>true</skip> - </configuration> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> @@ -88,6 +97,15 @@ under the License. </executions> <configuration> <artifactItems> + <artifactItem> + <groupId>org.apache.flink</groupId> + <artifactId>flink-connector-test-utils</artifactId> + <version>${project.version}</version> + <destFileName>flink-connector-testing.jar</destFileName> + <type>jar</type> + <outputDirectory>${project.build.directory}/dependencies + </outputDirectory> + </artifactItem> <artifactItem> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-pulsar</artifactId> diff --git a/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSourceOrderedE2ECase.java b/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSourceOrderedE2ECase.java index 234c1a0..ea6a982 100644 --- a/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSourceOrderedE2ECase.java +++ b/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSourceOrderedE2ECase.java @@ -25,10 +25,10 @@ import org.apache.flink.connector.testframe.junit.annotations.TestExternalSystem import org.apache.flink.connector.testframe.junit.annotations.TestSemantics; import org.apache.flink.connector.testframe.testsuites.SourceTestSuiteBase; import org.apache.flink.streaming.api.CheckpointingMode; -import org.apache.flink.tests.util.pulsar.cases.ExclusiveSubscriptionContext; -import org.apache.flink.tests.util.pulsar.cases.FailoverSubscriptionContext; import org.apache.flink.tests.util.pulsar.common.FlinkContainerWithPulsarEnvironment; import org.apache.flink.tests.util.pulsar.common.PulsarContainerTestEnvironment; +import org.apache.flink.tests.util.pulsar.source.ExclusiveSubscriptionContext; +import org.apache.flink.tests.util.pulsar.source.FailoverSubscriptionContext; import org.apache.flink.testutils.junit.FailsOnJava11; import org.junit.experimental.categories.Category; @@ -37,6 +37,7 @@ import org.junit.experimental.categories.Category; * Pulsar E2E test based on connector testing framework. It's used for Failover & Exclusive * subscription. */ +@SuppressWarnings("unused") @Category(value = {FailsOnJava11.class}) public class PulsarSourceOrderedE2ECase extends SourceTestSuiteBase<String> { diff --git a/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSourceUnorderedE2ECase.java b/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSourceUnorderedE2ECase.java index 5039048..15333a1 100644 --- a/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSourceUnorderedE2ECase.java +++ b/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSourceUnorderedE2ECase.java @@ -19,21 +19,26 @@ package org.apache.flink.tests.util.pulsar; import org.apache.flink.connector.pulsar.testutils.PulsarTestContextFactory; +import org.apache.flink.connector.pulsar.testutils.source.UnorderedSourceTestSuiteBase; import org.apache.flink.connector.testframe.junit.annotations.TestContext; import org.apache.flink.connector.testframe.junit.annotations.TestEnv; import org.apache.flink.connector.testframe.junit.annotations.TestExternalSystem; import org.apache.flink.connector.testframe.junit.annotations.TestSemantics; import org.apache.flink.streaming.api.CheckpointingMode; -import org.apache.flink.tests.util.pulsar.cases.KeySharedSubscriptionContext; -import org.apache.flink.tests.util.pulsar.cases.SharedSubscriptionContext; import org.apache.flink.tests.util.pulsar.common.FlinkContainerWithPulsarEnvironment; import org.apache.flink.tests.util.pulsar.common.PulsarContainerTestEnvironment; -import org.apache.flink.tests.util.pulsar.common.UnorderedSourceTestSuiteBase; +import org.apache.flink.tests.util.pulsar.source.KeySharedSubscriptionContext; +import org.apache.flink.tests.util.pulsar.source.SharedSubscriptionContext; +import org.apache.flink.testutils.junit.FailsOnJava11; + +import org.junit.experimental.categories.Category; /** * Pulsar E2E test based on connector testing framework. It's used for Shared & Key_Shared * subscription. */ +@SuppressWarnings("unused") +@Category(value = {FailsOnJava11.class}) public class PulsarSourceUnorderedE2ECase extends UnorderedSourceTestSuiteBase<String> { // Defines the Semantic. @@ -49,12 +54,10 @@ public class PulsarSourceUnorderedE2ECase extends UnorderedSourceTestSuiteBase<S PulsarContainerTestEnvironment pulsar = new PulsarContainerTestEnvironment(flink); // Defines a set of external context Factories for different test cases. - @SuppressWarnings("unused") @TestContext PulsarTestContextFactory<String, SharedSubscriptionContext> shared = new PulsarTestContextFactory<>(pulsar, SharedSubscriptionContext::new); - @SuppressWarnings("unused") @TestContext PulsarTestContextFactory<String, KeySharedSubscriptionContext> keyShared = new PulsarTestContextFactory<>(pulsar, KeySharedSubscriptionContext::new); diff --git a/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/cases/KeySharedSubscriptionContext.java b/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/cases/KeySharedSubscriptionContext.java deleted file mode 100644 index 5ad369b..0000000 --- a/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/cases/KeySharedSubscriptionContext.java +++ /dev/null @@ -1,144 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.tests.util.pulsar.cases; - -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.connector.source.Boundedness; -import org.apache.flink.api.connector.source.Source; -import org.apache.flink.connector.pulsar.source.PulsarSource; -import org.apache.flink.connector.pulsar.source.PulsarSourceBuilder; -import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor; -import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils; -import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicRange; -import org.apache.flink.connector.pulsar.source.enumerator.topic.range.FixedRangeGenerator; -import org.apache.flink.connector.pulsar.testutils.PulsarTestContext; -import org.apache.flink.connector.pulsar.testutils.PulsarTestEnvironment; -import org.apache.flink.connector.testframe.external.ExternalSystemSplitDataWriter; -import org.apache.flink.connector.testframe.external.source.TestingSourceSettings; -import org.apache.flink.tests.util.pulsar.common.KeyedPulsarPartitionDataWriter; - -import org.apache.pulsar.client.api.RegexSubscriptionMode; -import org.apache.pulsar.client.api.SubscriptionType; -import org.apache.pulsar.common.util.Murmur3_32Hash; - -import java.net.URL; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; - -import static java.util.Collections.singletonList; -import static org.apache.commons.lang3.RandomStringUtils.randomAlphabetic; -import static org.apache.flink.connector.pulsar.source.enumerator.topic.TopicRange.RANGE_SIZE; -import static org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema.pulsarSchema; -import static org.apache.pulsar.client.api.Schema.STRING; - -/** We would consume from test splits by using {@link SubscriptionType#Key_Shared} subscription. */ -public class KeySharedSubscriptionContext extends PulsarTestContext<String> { - - private int index = 0; - - private final List<KeyedPulsarPartitionDataWriter> writers = new ArrayList<>(); - - // Message keys. - private final String key1; - private final String key2; - - public KeySharedSubscriptionContext(PulsarTestEnvironment environment) { - this(environment, Collections.emptyList()); - } - - public KeySharedSubscriptionContext( - PulsarTestEnvironment environment, List<URL> connectorJarPaths) { - super(environment, connectorJarPaths); - - // Init message keys. - this.key1 = randomAlphabetic(8); - String newKey2; - do { - newKey2 = randomAlphabetic(8); - } while (keyHash(key1) == keyHash(newKey2)); - this.key2 = newKey2; - } - - @Override - protected String displayName() { - return "consuming message by Key_Shared"; - } - - @Override - public Source<String, ?, ?> createSource(TestingSourceSettings sourceSettings) { - int keyHash = keyHash(key1); - TopicRange range = new TopicRange(keyHash, keyHash); - - PulsarSourceBuilder<String> builder = - PulsarSource.builder() - .setDeserializationSchema(pulsarSchema(STRING)) - .setServiceUrl(operator.serviceUrl()) - .setAdminUrl(operator.adminUrl()) - .setTopicPattern( - "pulsar-[0-9]+-key-shared", RegexSubscriptionMode.AllTopics) - .setSubscriptionType(SubscriptionType.Key_Shared) - .setSubscriptionName("pulsar-key-shared") - .setRangeGenerator(new FixedRangeGenerator(singletonList(range))); - if (sourceSettings.getBoundedness() == Boundedness.BOUNDED) { - // Using latest stop cursor for making sure the source could be stopped. - builder.setBoundedStopCursor(StopCursor.latest()); - } - - return builder.build(); - } - - @Override - public ExternalSystemSplitDataWriter<String> createSourceSplitDataWriter( - TestingSourceSettings sourceSettings) { - String topicName = "pulsar-" + index + "-key-shared"; - operator.createTopic(topicName, 1); - index++; - - String partitionName = TopicNameUtils.topicNameWithPartition(topicName, 0); - KeyedPulsarPartitionDataWriter writer = - new KeyedPulsarPartitionDataWriter(operator, partitionName, key1, key2); - writers.add(writer); - - return writer; - } - - @Override - public List<String> generateTestData( - TestingSourceSettings sourceSettings, int splitIndex, long seed) { - return generateStringTestData(splitIndex, seed); - } - - @Override - public TypeInformation<String> getProducedType() { - return TypeInformation.of(String.class); - } - - @Override - public void close() { - for (KeyedPulsarPartitionDataWriter writer : writers) { - writer.close(); - } - writers.clear(); - } - - private int keyHash(String key) { - return Murmur3_32Hash.getInstance().makeHash(key.getBytes()) % RANGE_SIZE; - } -} diff --git a/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/cases/SharedSubscriptionContext.java b/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/cases/SharedSubscriptionContext.java deleted file mode 100644 index 1a2db66..0000000 --- a/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/cases/SharedSubscriptionContext.java +++ /dev/null @@ -1,116 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.tests.util.pulsar.cases; - -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.connector.source.Boundedness; -import org.apache.flink.api.connector.source.Source; -import org.apache.flink.connector.pulsar.source.PulsarSource; -import org.apache.flink.connector.pulsar.source.PulsarSourceBuilder; -import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor; -import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils; -import org.apache.flink.connector.pulsar.testutils.PulsarPartitionDataWriter; -import org.apache.flink.connector.pulsar.testutils.PulsarTestContext; -import org.apache.flink.connector.pulsar.testutils.PulsarTestEnvironment; -import org.apache.flink.connector.testframe.external.ExternalSystemSplitDataWriter; -import org.apache.flink.connector.testframe.external.source.TestingSourceSettings; - -import org.apache.pulsar.client.api.RegexSubscriptionMode; -import org.apache.pulsar.client.api.SubscriptionType; - -import java.net.URL; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; - -import static org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema.pulsarSchema; -import static org.apache.pulsar.client.api.Schema.STRING; - -/** We would consuming from test splits by using {@link SubscriptionType#Shared} subscription. */ -public class SharedSubscriptionContext extends PulsarTestContext<String> { - - private int index = 0; - - private final List<PulsarPartitionDataWriter> writers = new ArrayList<>(); - - public SharedSubscriptionContext(PulsarTestEnvironment environment) { - this(environment, Collections.emptyList()); - } - - public SharedSubscriptionContext( - PulsarTestEnvironment environment, List<URL> connectorJarPaths) { - super(environment, connectorJarPaths); - } - - @Override - protected String displayName() { - return "consuming message by Shared"; - } - - @Override - public Source<String, ?, ?> createSource(TestingSourceSettings sourceSettings) { - PulsarSourceBuilder<String> builder = - PulsarSource.builder() - .setDeserializationSchema(pulsarSchema(STRING)) - .setServiceUrl(operator.serviceUrl()) - .setAdminUrl(operator.adminUrl()) - .setTopicPattern("pulsar-[0-9]+-shared", RegexSubscriptionMode.AllTopics) - .setSubscriptionType(SubscriptionType.Shared) - .setSubscriptionName("pulsar-shared"); - if (sourceSettings.getBoundedness() == Boundedness.BOUNDED) { - // Using latest stop cursor for making sure the source could be stopped. - builder.setBoundedStopCursor(StopCursor.latest()); - } - - return builder.build(); - } - - @Override - public ExternalSystemSplitDataWriter<String> createSourceSplitDataWriter( - TestingSourceSettings sourceSettings) { - String topicName = "pulsar-" + index + "-shared"; - operator.createTopic(topicName, 1); - index++; - - String partitionName = TopicNameUtils.topicNameWithPartition(topicName, 0); - PulsarPartitionDataWriter writer = new PulsarPartitionDataWriter(operator, partitionName); - writers.add(writer); - - return writer; - } - - @Override - public List<String> generateTestData( - TestingSourceSettings sourceSettings, int splitIndex, long seed) { - return generateStringTestData(splitIndex, seed); - } - - @Override - public TypeInformation<String> getProducedType() { - return TypeInformation.of(String.class); - } - - @Override - public void close() { - for (PulsarPartitionDataWriter writer : writers) { - writer.close(); - } - writers.clear(); - } -} diff --git a/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/common/FlinkContainerWithPulsarEnvironment.java b/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/common/FlinkContainerWithPulsarEnvironment.java index 9f34554..f5e862f 100644 --- a/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/common/FlinkContainerWithPulsarEnvironment.java +++ b/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/common/FlinkContainerWithPulsarEnvironment.java @@ -43,7 +43,8 @@ public class FlinkContainerWithPulsarEnvironment extends FlinkContainerTestEnvir resourcePath("bcutil-jdk15on.jar"), resourcePath("bcprov-ext-jdk15on.jar"), resourcePath("jaxb-api.jar"), - resourcePath("jul-to-slf4j.jar")); + resourcePath("jul-to-slf4j.jar"), + resourcePath("flink-connector-testing.jar")); } private static String resourcePath(String jarName) { diff --git a/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/common/KeyedPulsarPartitionDataWriter.java b/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/common/KeyedPulsarPartitionDataWriter.java index e431e4c..d5f6e11 100644 --- a/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/common/KeyedPulsarPartitionDataWriter.java +++ b/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/common/KeyedPulsarPartitionDataWriter.java @@ -35,23 +35,26 @@ public class KeyedPulsarPartitionDataWriter implements ExternalSystemSplitDataWr private final PulsarRuntimeOperator operator; private final String fullTopicName; - private final String key1; - private final String key2; + private final String keyToRead; + private final String keyToExclude; public KeyedPulsarPartitionDataWriter( - PulsarRuntimeOperator operator, String fullTopicName, String key1, String key2) { + PulsarRuntimeOperator operator, + String fullTopicName, + String keyToRead, + String keyToExclude) { this.operator = operator; this.fullTopicName = fullTopicName; - this.key1 = key1; - this.key2 = key2; + this.keyToRead = keyToRead; + this.keyToExclude = keyToExclude; } @Override public void writeRecords(List<String> records) { - operator.sendMessages(fullTopicName, Schema.STRING, key1, records); + List<String> newRecords = records.stream().map(a -> a + keyToRead).collect(toList()); - List<String> newRecords = records.stream().map(a -> a + key1).collect(toList()); - operator.sendMessages(fullTopicName, Schema.STRING, key2, newRecords); + operator.sendMessages(fullTopicName, Schema.STRING, keyToExclude, newRecords); + operator.sendMessages(fullTopicName, Schema.STRING, keyToRead, records); } @Override diff --git a/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/common/UnorderedSourceTestSuiteBase.java b/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/common/UnorderedSourceTestSuiteBase.java deleted file mode 100644 index 01527ea..0000000 --- a/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/common/UnorderedSourceTestSuiteBase.java +++ /dev/null @@ -1,86 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.tests.util.pulsar.common; - -import org.apache.flink.api.common.eventtime.WatermarkStrategy; -import org.apache.flink.api.connector.source.Boundedness; -import org.apache.flink.api.connector.source.Source; -import org.apache.flink.connector.testframe.environment.TestEnvironment; -import org.apache.flink.connector.testframe.environment.TestEnvironmentSettings; -import org.apache.flink.connector.testframe.external.ExternalSystemSplitDataWriter; -import org.apache.flink.connector.testframe.external.source.DataStreamSourceExternalContext; -import org.apache.flink.connector.testframe.external.source.TestingSourceSettings; -import org.apache.flink.connector.testframe.junit.extensions.ConnectorTestingExtension; -import org.apache.flink.connector.testframe.junit.extensions.TestCaseInvocationContextProvider; -import org.apache.flink.streaming.api.CheckpointingMode; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.util.TestLoggerExtension; - -import org.junit.jupiter.api.DisplayName; -import org.junit.jupiter.api.TestInstance; -import org.junit.jupiter.api.TestTemplate; -import org.junit.jupiter.api.extension.ExtendWith; - -import java.util.List; -import java.util.concurrent.ThreadLocalRandom; - -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.containsInAnyOrder; - -/** A source test template for testing the messages which could be consumed in a unordered way. */ -@ExtendWith({ - ConnectorTestingExtension.class, - TestLoggerExtension.class, - TestCaseInvocationContextProvider.class -}) -@TestInstance(TestInstance.Lifecycle.PER_CLASS) -public abstract class UnorderedSourceTestSuiteBase<T> { - - @TestTemplate - @DisplayName("Test source with one split and four consumers") - public void testOneSplitWithMultipleConsumers( - TestEnvironment testEnv, DataStreamSourceExternalContext<T> externalContext) - throws Exception { - TestingSourceSettings sourceSettings = - TestingSourceSettings.builder() - .setBoundedness(Boundedness.BOUNDED) - .setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE) - .build(); - TestEnvironmentSettings envOptions = - TestEnvironmentSettings.builder() - .setConnectorJarPaths(externalContext.getConnectorJarPaths()) - .build(); - List<T> testData = - externalContext.generateTestData( - sourceSettings, 0, ThreadLocalRandom.current().nextLong()); - ExternalSystemSplitDataWriter<T> writer = - externalContext.createSourceSplitDataWriter(sourceSettings); - writer.writeRecords(testData); - - Source<T, ?, ?> source = externalContext.createSource(sourceSettings); - StreamExecutionEnvironment execEnv = testEnv.createExecutionEnvironment(envOptions); - List<T> results = - execEnv.fromSource(source, WatermarkStrategy.noWatermarks(), "Pulsar source") - .setParallelism(4) - .executeAndCollect( - "Source single split with four readers.", testData.size()); - - assertThat(results, containsInAnyOrder(testData.toArray())); - } -} diff --git a/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/cases/ExclusiveSubscriptionContext.java b/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/source/ExclusiveSubscriptionContext.java similarity index 71% rename from flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/cases/ExclusiveSubscriptionContext.java rename to flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/source/ExclusiveSubscriptionContext.java index 6fea0c9..4906ad6 100644 --- a/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/cases/ExclusiveSubscriptionContext.java +++ b/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/source/ExclusiveSubscriptionContext.java @@ -16,32 +16,23 @@ * limitations under the License. */ -package org.apache.flink.tests.util.pulsar.cases; +package org.apache.flink.tests.util.pulsar.source; import org.apache.flink.connector.pulsar.testutils.PulsarTestEnvironment; -import org.apache.flink.connector.pulsar.testutils.cases.MultipleTopicTemplateContext; +import org.apache.flink.connector.pulsar.testutils.source.cases.MultipleTopicConsumingContext; import org.apache.pulsar.client.api.SubscriptionType; -import java.net.URL; -import java.util.Collections; -import java.util.List; - /** We would consume from test splits by using {@link SubscriptionType#Exclusive} subscription. */ -public class ExclusiveSubscriptionContext extends MultipleTopicTemplateContext { +public class ExclusiveSubscriptionContext extends MultipleTopicConsumingContext { public ExclusiveSubscriptionContext(PulsarTestEnvironment environment) { - this(environment, Collections.emptyList()); - } - - public ExclusiveSubscriptionContext( - PulsarTestEnvironment environment, List<URL> connectorJarPaths) { - super(environment, connectorJarPaths); + super(environment); } @Override protected String displayName() { - return "consuming message by Exclusive"; + return "consume message by Exclusive"; } @Override diff --git a/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/cases/FailoverSubscriptionContext.java b/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/source/FailoverSubscriptionContext.java similarity index 71% copy from flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/cases/FailoverSubscriptionContext.java copy to flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/source/FailoverSubscriptionContext.java index c473488..3134db4 100644 --- a/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/cases/FailoverSubscriptionContext.java +++ b/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/source/FailoverSubscriptionContext.java @@ -16,32 +16,23 @@ * limitations under the License. */ -package org.apache.flink.tests.util.pulsar.cases; +package org.apache.flink.tests.util.pulsar.source; import org.apache.flink.connector.pulsar.testutils.PulsarTestEnvironment; -import org.apache.flink.connector.pulsar.testutils.cases.MultipleTopicTemplateContext; +import org.apache.flink.connector.pulsar.testutils.source.cases.MultipleTopicConsumingContext; import org.apache.pulsar.client.api.SubscriptionType; -import java.net.URL; -import java.util.Collections; -import java.util.List; - /** We would consume from test splits by using {@link SubscriptionType#Failover} subscription. */ -public class FailoverSubscriptionContext extends MultipleTopicTemplateContext { +public class FailoverSubscriptionContext extends MultipleTopicConsumingContext { public FailoverSubscriptionContext(PulsarTestEnvironment environment) { - this(environment, Collections.emptyList()); - } - - public FailoverSubscriptionContext( - PulsarTestEnvironment environment, List<URL> connectorJarPaths) { - super(environment, connectorJarPaths); + super(environment); } @Override protected String displayName() { - return "consuming message by Failover"; + return "consume message by Failover"; } @Override diff --git a/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/source/KeySharedSubscriptionContext.java b/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/source/KeySharedSubscriptionContext.java new file mode 100644 index 0000000..0cae6e5 --- /dev/null +++ b/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/source/KeySharedSubscriptionContext.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.tests.util.pulsar.source; + +import org.apache.flink.connector.pulsar.source.PulsarSourceBuilder; +import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicRange; +import org.apache.flink.connector.pulsar.source.enumerator.topic.range.FixedRangeGenerator; +import org.apache.flink.connector.pulsar.testutils.PulsarTestEnvironment; +import org.apache.flink.connector.pulsar.testutils.source.cases.MultipleTopicConsumingContext; +import org.apache.flink.connector.testframe.external.ExternalSystemSplitDataWriter; +import org.apache.flink.connector.testframe.external.source.TestingSourceSettings; +import org.apache.flink.tests.util.pulsar.common.KeyedPulsarPartitionDataWriter; + +import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.common.util.Murmur3_32Hash; + +import static java.util.Collections.singletonList; +import static org.apache.commons.lang3.RandomStringUtils.randomAlphabetic; +import static org.apache.flink.connector.pulsar.source.enumerator.topic.TopicRange.RANGE_SIZE; +import static org.apache.pulsar.client.api.SubscriptionType.Key_Shared; + +/** We would consume from test splits by using {@link SubscriptionType#Key_Shared} subscription. */ +public class KeySharedSubscriptionContext extends MultipleTopicConsumingContext { + + private final String keyToRead; + private final String keyToExclude; + + public KeySharedSubscriptionContext(PulsarTestEnvironment environment) { + super(environment, Key_Shared); + + this.keyToRead = randomAlphabetic(8); + + // Make sure they have different hash code. + int readHash = keyHash(keyToRead); + String randomKey; + do { + randomKey = randomAlphabetic(8); + } while (keyHash(randomKey) == readHash); + this.keyToExclude = randomKey; + } + + @Override + public ExternalSystemSplitDataWriter<String> createSourceSplitDataWriter( + TestingSourceSettings sourceSettings) { + String partitionName = generatePartitionName(); + return new KeyedPulsarPartitionDataWriter(operator, partitionName, keyToRead, keyToExclude); + } + + @Override + protected String displayName() { + return "consume message by Key_Shared"; + } + + @Override + protected void setSourceBuilder(PulsarSourceBuilder<String> builder) { + int keyHash = keyHash(keyToRead); + TopicRange range = new TopicRange(keyHash, keyHash); + + builder.setRangeGenerator(new FixedRangeGenerator(singletonList(range))); + } + + @Override + protected String subscriptionName() { + return "pulsar-key-shared-subscription"; + } + + // This method is copied from Pulsar for calculating message key hash. + private int keyHash(String key) { + return Murmur3_32Hash.getInstance().makeHash(key.getBytes()) % RANGE_SIZE; + } +} diff --git a/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/cases/FailoverSubscriptionContext.java b/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/source/SharedSubscriptionContext.java similarity index 57% rename from flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/cases/FailoverSubscriptionContext.java rename to flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/source/SharedSubscriptionContext.java index c473488..fe9f078 100644 --- a/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/cases/FailoverSubscriptionContext.java +++ b/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/source/SharedSubscriptionContext.java @@ -16,41 +16,29 @@ * limitations under the License. */ -package org.apache.flink.tests.util.pulsar.cases; +package org.apache.flink.tests.util.pulsar.source; import org.apache.flink.connector.pulsar.testutils.PulsarTestEnvironment; -import org.apache.flink.connector.pulsar.testutils.cases.MultipleTopicTemplateContext; +import org.apache.flink.connector.pulsar.testutils.source.cases.MultipleTopicConsumingContext; import org.apache.pulsar.client.api.SubscriptionType; -import java.net.URL; -import java.util.Collections; -import java.util.List; +import static org.apache.pulsar.client.api.SubscriptionType.Shared; -/** We would consume from test splits by using {@link SubscriptionType#Failover} subscription. */ -public class FailoverSubscriptionContext extends MultipleTopicTemplateContext { +/** We would consume from test splits by using {@link SubscriptionType#Shared} subscription. */ +public class SharedSubscriptionContext extends MultipleTopicConsumingContext { - public FailoverSubscriptionContext(PulsarTestEnvironment environment) { - this(environment, Collections.emptyList()); - } - - public FailoverSubscriptionContext( - PulsarTestEnvironment environment, List<URL> connectorJarPaths) { - super(environment, connectorJarPaths); + public SharedSubscriptionContext(PulsarTestEnvironment environment) { + super(environment, Shared); } @Override protected String displayName() { - return "consuming message by Failover"; + return "consume message by Shared"; } @Override protected String subscriptionName() { - return "pulsar-failover-subscription"; - } - - @Override - protected SubscriptionType subscriptionType() { - return SubscriptionType.Failover; + return "pulsar-shared-subscription"; } }