This is an automated email from the ASF dual-hosted git repository. dwysakowicz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new b608620 [FLINK-23969][connector/pulsar] Create e2e tests for pulsar connector. b608620 is described below commit b6086203d5fc0a08a330dd0069fbe1359ceac97a Author: syhily <syh...@gmail.com> AuthorDate: Mon Sep 13 21:52:31 2021 +0800 [FLINK-23969][connector/pulsar] Create e2e tests for pulsar connector. --- flink-connectors/flink-connector-pulsar/pom.xml | 47 ++++++++ .../pulsar/source/enumerator/topic/TopicRange.java | 2 + .../topic/range/FixedRangeGenerator.java} | 29 ++--- .../pulsar/source/PulsarSourceITCase.java | 2 +- .../testutils/PulsarPartitionDataWriter.java | 28 ++--- .../pulsar/testutils/PulsarTestContext.java | 9 +- .../pulsar/testutils/PulsarTestEnvironment.java | 19 ++-- .../pulsar/testutils/PulsarTestSuiteBase.java | 2 +- .../cases/MultipleTopicConsumingContext.java | 83 ++------------ ...text.java => MultipleTopicTemplateContext.java} | 46 ++++---- .../cases/SingleTopicConsumingContext.java | 16 ++- .../pulsar/testutils/runtime/PulsarRuntime.java | 39 ++++--- .../testutils/runtime/PulsarRuntimeOperator.java | 21 +++- ...erProvider.java => PulsarContainerRuntime.java} | 28 ++++- ...sarMockProvider.java => PulsarMockRuntime.java} | 41 +++++-- .../util/flink/FlinkContainerTestEnvironment.java | 14 ++- .../flink-end-to-end-tests-pulsar/pom.xml | 121 +++++++++++++++++++++ .../util/pulsar/PulsarSourceOrderedE2ECase.java | 40 ++++--- .../util/pulsar/PulsarSourceUnorderedE2ECase.java | 55 ++++++++++ .../pulsar/cases/ExclusiveSubscriptionContext.java | 61 +++++++++++ .../pulsar/cases/FailoverSubscriptionContext.java | 61 +++++++++++ .../pulsar/cases/KeySharedSubscriptionContext.java | 93 ++++++++++------ .../pulsar/cases/SharedSubscriptionContext.java | 67 +++++------- .../FlinkContainerWithPulsarEnvironment.java | 54 +++++++++ .../common/KeyedPulsarPartitionDataWriter.java | 62 +++++++++++ .../common/UnorderedSourceTestSuiteBase.java | 72 ++++++++++++ .../src/test/resources/log4j2-test.properties | 31 ++---- flink-end-to-end-tests/pom.xml | 1 + .../modules-skipping-deployment.modulelist | 1 + 29 files changed, 846 insertions(+), 299 deletions(-) diff --git a/flink-connectors/flink-connector-pulsar/pom.xml b/flink-connectors/flink-connector-pulsar/pom.xml index 9992ef9..c3e5e9a 100644 --- a/flink-connectors/flink-connector-pulsar/pom.xml +++ b/flink-connectors/flink-connector-pulsar/pom.xml @@ -169,6 +169,12 @@ under the License. <groupId>org.apache.pulsar</groupId> <artifactId>pulsar-client-all</artifactId> <version>${pulsar.version}</version> + <exclusions> + <exclusion> + <groupId>org.apache.pulsar</groupId> + <artifactId>pulsar-package-core</artifactId> + </exclusion> + </exclusions> </dependency> </dependencies> @@ -258,6 +264,47 @@ under the License. </execution> </executions> </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-jar-plugin</artifactId> + <executions> + <execution> + <goals> + <goal>test-jar</goal> + </goals> + <configuration> + <includes> + <include>**/testutils/**</include> + <include>META-INF/LICENSE</include> + <include>META-INF/NOTICE</include> + </includes> + </configuration> + </execution> + </executions> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-source-plugin</artifactId> + <executions> + <execution> + <id>attach-test-sources</id> + <goals> + <goal>test-jar-no-fork</goal> + </goals> + <configuration> + <archive> + <!-- Globally exclude maven metadata, because it may accidentally bundle files we don't intend to --> + <addMavenDescriptor>false</addMavenDescriptor> + </archive> + <includes> + <include>**/testutils/**</include> + <include>META-INF/LICENSE</include> + <include>META-INF/NOTICE</include> + </includes> + </configuration> + </execution> + </executions> + </plugin> </plugins> </build> </project> diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/topic/TopicRange.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/topic/TopicRange.java index 1d1574b..5b77922 100644 --- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/topic/TopicRange.java +++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/topic/TopicRange.java @@ -57,6 +57,8 @@ public class TopicRange implements Serializable { public TopicRange(int start, int end) { checkArgument(start >= MIN_RANGE, "Start range %s shouldn't below zero.", start); checkArgument(end <= MAX_RANGE, "End range %s shouldn't exceed 65535.", end); + checkArgument(start <= end, "Range end must >= range start."); + this.start = start; this.end = end; } diff --git a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/PulsarRuntimeProvider.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/topic/range/FixedRangeGenerator.java similarity index 53% rename from flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/PulsarRuntimeProvider.java rename to flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/topic/range/FixedRangeGenerator.java index d8ad718..6f82725 100644 --- a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/PulsarRuntimeProvider.java +++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/topic/range/FixedRangeGenerator.java @@ -16,22 +16,25 @@ * limitations under the License. */ -package org.apache.flink.connector.pulsar.testutils.runtime; +package org.apache.flink.connector.pulsar.source.enumerator.topic.range; -import org.apache.flink.connector.pulsar.testutils.PulsarTestEnvironment; +import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicMetadata; +import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicRange; -/** - * A abstraction for different pulsar runtimes. Providing the common methods for {@link - * PulsarTestEnvironment}. - */ -public interface PulsarRuntimeProvider { +import java.util.List; + +/** Always return the same range set for all topics. */ +public class FixedRangeGenerator implements RangeGenerator { + private static final long serialVersionUID = -3895203007855538734L; - /** Start up this pulsar runtime, block the thread until everytime is ready for this runtime. */ - void startUp(); + private final List<TopicRange> ranges; - /** Shutdown this pulsar runtime. */ - void tearDown(); + public FixedRangeGenerator(List<TopicRange> ranges) { + this.ranges = ranges; + } - /** Return a operator for operating this pulsar runtime. */ - PulsarRuntimeOperator operator(); + @Override + public List<TopicRange> range(TopicMetadata metadata, int parallelism) { + return ranges; + } } diff --git a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/PulsarSourceITCase.java b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/PulsarSourceITCase.java index 89457dc..8cc2e0a 100644 --- a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/PulsarSourceITCase.java +++ b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/PulsarSourceITCase.java @@ -37,7 +37,7 @@ class PulsarSourceITCase extends SourceTestSuiteBase<String> { @TestEnv MiniClusterTestEnvironment flink = new MiniClusterTestEnvironment(); // Defines pulsar running environment - @ExternalSystem PulsarTestEnvironment pulsar = new PulsarTestEnvironment(PulsarRuntime.MOCK); + @ExternalSystem PulsarTestEnvironment pulsar = new PulsarTestEnvironment(PulsarRuntime.mock()); // Defines a external context Factories, // so test cases will be invoked using this external contexts. diff --git a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/PulsarPartitionDataWriter.java b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/PulsarPartitionDataWriter.java index ea9c4ab..c2afee5 100644 --- a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/PulsarPartitionDataWriter.java +++ b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/PulsarPartitionDataWriter.java @@ -18,41 +18,31 @@ package org.apache.flink.connector.pulsar.testutils; -import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition; +import org.apache.flink.connector.pulsar.testutils.runtime.PulsarRuntimeOperator; import org.apache.flink.connectors.test.common.external.SourceSplitDataWriter; -import org.apache.pulsar.client.api.Producer; -import org.apache.pulsar.client.api.PulsarClient; -import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Schema; import java.util.Collection; -import static org.apache.flink.connector.pulsar.common.utils.PulsarExceptionUtils.sneakyClient; - /** Source split data writer for writing test data into a Pulsar topic partition. */ public class PulsarPartitionDataWriter implements SourceSplitDataWriter<String> { - private final Producer<String> producer; + private final PulsarRuntimeOperator operator; + private final String fullTopicName; - public PulsarPartitionDataWriter(PulsarClient client, TopicPartition partition) { - try { - this.producer = - client.newProducer(Schema.STRING).topic(partition.getFullTopicName()).create(); - } catch (PulsarClientException e) { - throw new IllegalStateException(e); - } + public PulsarPartitionDataWriter(PulsarRuntimeOperator operator, String fullTopicName) { + this.operator = operator; + this.fullTopicName = fullTopicName; } @Override public void writeRecords(Collection<String> records) { - for (String record : records) { - sneakyClient(() -> producer.newMessage().value(record).send()); - } + operator.sendMessages(fullTopicName, Schema.STRING, records); } @Override - public void close() throws Exception { - producer.close(); + public void close() { + // Nothing to do. } } diff --git a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/PulsarTestContext.java b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/PulsarTestContext.java index a80d721..2ad4c2f 100644 --- a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/PulsarTestContext.java +++ b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/PulsarTestContext.java @@ -29,15 +29,14 @@ import static org.apache.commons.lang3.RandomStringUtils.randomAlphanumeric; /** Common test context for pulsar based test. */ public abstract class PulsarTestContext<T> implements ExternalContext<T> { + private static final long serialVersionUID = 4717940854368532130L; private static final int NUM_RECORDS_UPPER_BOUND = 500; private static final int NUM_RECORDS_LOWER_BOUND = 100; - private final String displayName; protected final PulsarRuntimeOperator operator; - protected PulsarTestContext(String displayName, PulsarTestEnvironment environment) { - this.displayName = displayName; + protected PulsarTestContext(PulsarTestEnvironment environment) { this.operator = environment.operator(); } @@ -58,8 +57,10 @@ public abstract class PulsarTestContext<T> implements ExternalContext<T> { return records; } + protected abstract String displayName(); + @Override public String toString() { - return displayName; + return displayName(); } } diff --git a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/PulsarTestEnvironment.java b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/PulsarTestEnvironment.java index 7b31c7c..50ca3fe 100644 --- a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/PulsarTestEnvironment.java +++ b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/PulsarTestEnvironment.java @@ -20,7 +20,6 @@ package org.apache.flink.connector.pulsar.testutils; import org.apache.flink.connector.pulsar.testutils.runtime.PulsarRuntime; import org.apache.flink.connector.pulsar.testutils.runtime.PulsarRuntimeOperator; -import org.apache.flink.connector.pulsar.testutils.runtime.PulsarRuntimeProvider; import org.apache.flink.connectors.test.common.TestResource; import org.apache.flink.connectors.test.common.junit.annotations.ExternalSystem; @@ -54,10 +53,10 @@ import java.util.List; public class PulsarTestEnvironment implements BeforeAllCallback, AfterAllCallback, TestResource, TestRule { - private final PulsarRuntimeProvider provider; + private final PulsarRuntime runtime; public PulsarTestEnvironment(PulsarRuntime runtime) { - this.provider = runtime.provider(); + this.runtime = runtime; } /** JUnit 4 Rule based test logic. */ @@ -66,7 +65,7 @@ public class PulsarTestEnvironment return new Statement() { @Override public void evaluate() throws Throwable { - provider.startUp(); + runtime.startUp(); List<Throwable> errors = new ArrayList<>(); try { @@ -75,7 +74,7 @@ public class PulsarTestEnvironment errors.add(t); } finally { try { - provider.tearDown(); + runtime.tearDown(); } catch (Throwable t) { errors.add(t); } @@ -88,29 +87,29 @@ public class PulsarTestEnvironment /** JUnit 5 Extension setup method. */ @Override public void beforeAll(ExtensionContext context) { - provider.startUp(); + runtime.startUp(); } /** flink-connector-testing setup method. */ @Override public void startUp() { - provider.startUp(); + runtime.startUp(); } /** JUnit 5 Extension shutdown method. */ @Override public void afterAll(ExtensionContext context) { - provider.tearDown(); + runtime.tearDown(); } /** flink-connector-testing shutdown method. */ @Override public void tearDown() { - provider.tearDown(); + runtime.tearDown(); } /** Get a common supported set of method for operating pulsar which is in container. */ public PulsarRuntimeOperator operator() { - return provider.operator(); + return runtime.operator(); } } diff --git a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/PulsarTestSuiteBase.java b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/PulsarTestSuiteBase.java index 2321bd4..18a8655 100644 --- a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/PulsarTestSuiteBase.java +++ b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/PulsarTestSuiteBase.java @@ -56,7 +56,7 @@ public abstract class PulsarTestSuiteBase { * pulsar broker. Override this method when needs. */ protected PulsarRuntime runtime() { - return PulsarRuntime.MOCK; + return PulsarRuntime.mock(); } /** Operate pulsar by acquiring a runtime operator. */ diff --git a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/cases/MultipleTopicConsumingContext.java b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/cases/MultipleTopicConsumingContext.java index 7ce676c..12dbabe 100644 --- a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/cases/MultipleTopicConsumingContext.java +++ b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/cases/MultipleTopicConsumingContext.java @@ -18,96 +18,33 @@ package org.apache.flink.connector.pulsar.testutils.cases; -import org.apache.flink.api.connector.source.Boundedness; -import org.apache.flink.api.connector.source.Source; -import org.apache.flink.connector.pulsar.source.PulsarSource; -import org.apache.flink.connector.pulsar.source.PulsarSourceBuilder; -import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor; -import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils; -import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition; -import org.apache.flink.connector.pulsar.testutils.PulsarPartitionDataWriter; -import org.apache.flink.connector.pulsar.testutils.PulsarTestContext; import org.apache.flink.connector.pulsar.testutils.PulsarTestEnvironment; -import org.apache.flink.connectors.test.common.external.SourceSplitDataWriter; -import org.apache.pulsar.client.api.RegexSubscriptionMode; - -import java.util.Collection; -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.ThreadLocalRandom; - -import static org.apache.flink.connector.pulsar.source.enumerator.topic.TopicRange.createFullRange; -import static org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema.pulsarSchema; -import static org.apache.pulsar.client.api.Schema.STRING; -import static org.apache.pulsar.client.api.SubscriptionType.Exclusive; +import org.apache.pulsar.client.api.SubscriptionType; /** * Pulsar external context that will create multiple topics with only one partitions as source * splits. */ -public class MultipleTopicConsumingContext extends PulsarTestContext<String> { - - private int numTopics = 0; - - private final String topicPattern; - - private final Map<String, SourceSplitDataWriter<String>> topicNameToSplitWriters = - new HashMap<>(); +public class MultipleTopicConsumingContext extends MultipleTopicTemplateContext { + private static final long serialVersionUID = -3855336888090886528L; public MultipleTopicConsumingContext(PulsarTestEnvironment environment) { - super("consuming message on multiple topic", environment); - this.topicPattern = - "pulsar-multiple-topic-[0-9]+-" - + ThreadLocalRandom.current().nextLong(Long.MAX_VALUE); + super(environment); } @Override - public Source<String, ?, ?> createSource(Boundedness boundedness) { - PulsarSourceBuilder<String> builder = - PulsarSource.builder() - .setDeserializationSchema(pulsarSchema(STRING)) - .setServiceUrl(operator.serviceUrl()) - .setAdminUrl(operator.adminUrl()) - .setTopicPattern(topicPattern, RegexSubscriptionMode.AllTopics) - .setSubscriptionType(Exclusive) - .setSubscriptionName("flink-pulsar-multiple-topic-test"); - if (boundedness == Boundedness.BOUNDED) { - // Using latest stop cursor for making sure the source could be stopped. - // This is required for SourceTestSuiteBase. - builder.setBoundedStopCursor(StopCursor.latest()); - } - - return builder.build(); + protected String displayName() { + return "consuming message on multiple topic"; } @Override - public SourceSplitDataWriter<String> createSourceSplitDataWriter() { - String topicName = topicPattern.replace("[0-9]+", String.valueOf(numTopics)); - operator.createTopic(topicName, 1); - - String partitionName = TopicNameUtils.topicNameWithPartition(topicName, 0); - TopicPartition partition = new TopicPartition(partitionName, 0, createFullRange()); - PulsarPartitionDataWriter writer = - new PulsarPartitionDataWriter(operator.client(), partition); - - topicNameToSplitWriters.put(partitionName, writer); - numTopics++; - - return writer; - } - - @Override - public Collection<String> generateTestData(int splitIndex, long seed) { - return generateStringTestData(splitIndex, seed); + protected String subscriptionName() { + return "flink-pulsar-multiple-topic-test"; } @Override - public void close() throws Exception { - for (SourceSplitDataWriter<String> writer : topicNameToSplitWriters.values()) { - writer.close(); - } - - topicNameToSplitWriters.clear(); + protected SubscriptionType subscriptionType() { + return SubscriptionType.Exclusive; } } diff --git a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/cases/MultipleTopicConsumingContext.java b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/cases/MultipleTopicTemplateContext.java similarity index 72% copy from flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/cases/MultipleTopicConsumingContext.java copy to flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/cases/MultipleTopicTemplateContext.java index 7ce676c..a0801ec 100644 --- a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/cases/MultipleTopicConsumingContext.java +++ b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/cases/MultipleTopicTemplateContext.java @@ -24,42 +24,38 @@ import org.apache.flink.connector.pulsar.source.PulsarSource; import org.apache.flink.connector.pulsar.source.PulsarSourceBuilder; import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor; import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils; -import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition; import org.apache.flink.connector.pulsar.testutils.PulsarPartitionDataWriter; import org.apache.flink.connector.pulsar.testutils.PulsarTestContext; import org.apache.flink.connector.pulsar.testutils.PulsarTestEnvironment; import org.apache.flink.connectors.test.common.external.SourceSplitDataWriter; import org.apache.pulsar.client.api.RegexSubscriptionMode; +import org.apache.pulsar.client.api.SubscriptionType; import java.util.Collection; import java.util.HashMap; import java.util.Map; -import java.util.concurrent.ThreadLocalRandom; -import static org.apache.flink.connector.pulsar.source.enumerator.topic.TopicRange.createFullRange; +import static org.apache.commons.lang3.RandomStringUtils.randomAlphabetic; import static org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema.pulsarSchema; import static org.apache.pulsar.client.api.Schema.STRING; -import static org.apache.pulsar.client.api.SubscriptionType.Exclusive; /** - * Pulsar external context that will create multiple topics with only one partitions as source - * splits. + * Pulsar external context template that will create multiple topics with only one partitions as + * source splits. */ -public class MultipleTopicConsumingContext extends PulsarTestContext<String> { +public abstract class MultipleTopicTemplateContext extends PulsarTestContext<String> { + private static final long serialVersionUID = 7333807392445848344L; private int numTopics = 0; - private final String topicPattern; + private final String topicPattern = "pulsar-multiple-topic-[0-9]+-" + randomAlphabetic(8); private final Map<String, SourceSplitDataWriter<String>> topicNameToSplitWriters = new HashMap<>(); - public MultipleTopicConsumingContext(PulsarTestEnvironment environment) { - super("consuming message on multiple topic", environment); - this.topicPattern = - "pulsar-multiple-topic-[0-9]+-" - + ThreadLocalRandom.current().nextLong(Long.MAX_VALUE); + public MultipleTopicTemplateContext(PulsarTestEnvironment environment) { + super(environment); } @Override @@ -67,11 +63,11 @@ public class MultipleTopicConsumingContext extends PulsarTestContext<String> { PulsarSourceBuilder<String> builder = PulsarSource.builder() .setDeserializationSchema(pulsarSchema(STRING)) - .setServiceUrl(operator.serviceUrl()) - .setAdminUrl(operator.adminUrl()) + .setServiceUrl(serviceUrl()) + .setAdminUrl(adminUrl()) .setTopicPattern(topicPattern, RegexSubscriptionMode.AllTopics) - .setSubscriptionType(Exclusive) - .setSubscriptionName("flink-pulsar-multiple-topic-test"); + .setSubscriptionType(subscriptionType()) + .setSubscriptionName(subscriptionName()); if (boundedness == Boundedness.BOUNDED) { // Using latest stop cursor for making sure the source could be stopped. // This is required for SourceTestSuiteBase. @@ -87,9 +83,7 @@ public class MultipleTopicConsumingContext extends PulsarTestContext<String> { operator.createTopic(topicName, 1); String partitionName = TopicNameUtils.topicNameWithPartition(topicName, 0); - TopicPartition partition = new TopicPartition(partitionName, 0, createFullRange()); - PulsarPartitionDataWriter writer = - new PulsarPartitionDataWriter(operator.client(), partition); + PulsarPartitionDataWriter writer = new PulsarPartitionDataWriter(operator, partitionName); topicNameToSplitWriters.put(partitionName, writer); numTopics++; @@ -110,4 +104,16 @@ public class MultipleTopicConsumingContext extends PulsarTestContext<String> { topicNameToSplitWriters.clear(); } + + protected abstract String subscriptionName(); + + protected abstract SubscriptionType subscriptionType(); + + protected String serviceUrl() { + return operator.serviceUrl(); + } + + protected String adminUrl() { + return operator.adminUrl(); + } } diff --git a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/cases/SingleTopicConsumingContext.java b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/cases/SingleTopicConsumingContext.java index cb1b582..b89511c 100644 --- a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/cases/SingleTopicConsumingContext.java +++ b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/cases/SingleTopicConsumingContext.java @@ -23,7 +23,7 @@ import org.apache.flink.api.connector.source.Source; import org.apache.flink.connector.pulsar.source.PulsarSource; import org.apache.flink.connector.pulsar.source.PulsarSourceBuilder; import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor; -import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition; +import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils; import org.apache.flink.connector.pulsar.testutils.PulsarPartitionDataWriter; import org.apache.flink.connector.pulsar.testutils.PulsarTestContext; import org.apache.flink.connector.pulsar.testutils.PulsarTestEnvironment; @@ -34,7 +34,6 @@ import java.util.HashMap; import java.util.Map; import java.util.concurrent.ThreadLocalRandom; -import static org.apache.flink.connector.pulsar.source.enumerator.topic.TopicRange.createFullRange; import static org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema.pulsarSchema; import static org.apache.pulsar.client.api.Schema.STRING; import static org.apache.pulsar.client.api.SubscriptionType.Exclusive; @@ -44,6 +43,7 @@ import static org.apache.pulsar.client.api.SubscriptionType.Exclusive; * source splits. */ public class SingleTopicConsumingContext extends PulsarTestContext<String> { + private static final long serialVersionUID = 2754642285356345741L; private static final String TOPIC_NAME_PREFIX = "pulsar-single-topic"; private final String topicName; @@ -53,12 +53,17 @@ public class SingleTopicConsumingContext extends PulsarTestContext<String> { private int numSplits = 0; public SingleTopicConsumingContext(PulsarTestEnvironment environment) { - super("consuming message on single topic", environment); + super(environment); this.topicName = TOPIC_NAME_PREFIX + "-" + ThreadLocalRandom.current().nextLong(Long.MAX_VALUE); } @Override + protected String displayName() { + return "consuming message on single topic"; + } + + @Override public Source<String, ?, ?> createSource(Boundedness boundedness) { PulsarSourceBuilder<String> builder = PulsarSource.builder() @@ -88,9 +93,8 @@ public class SingleTopicConsumingContext extends PulsarTestContext<String> { operator.increaseTopicPartitions(topicName, numSplits); } - TopicPartition partition = new TopicPartition(topicName, numSplits - 1, createFullRange()); - PulsarPartitionDataWriter writer = - new PulsarPartitionDataWriter(operator.client(), partition); + String partitionName = TopicNameUtils.topicNameWithPartition(topicName, numSplits - 1); + PulsarPartitionDataWriter writer = new PulsarPartitionDataWriter(operator, partitionName); partitionToSplitWriter.put(numSplits - 1, writer); return writer; diff --git a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/PulsarRuntime.java b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/PulsarRuntime.java index 986f4bd..d46658e 100644 --- a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/PulsarRuntime.java +++ b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/PulsarRuntime.java @@ -18,33 +18,36 @@ package org.apache.flink.connector.pulsar.testutils.runtime; -import org.apache.flink.connector.pulsar.testutils.runtime.container.PulsarContainerProvider; -import org.apache.flink.connector.pulsar.testutils.runtime.mock.PulsarMockProvider; +import org.apache.flink.connector.pulsar.testutils.PulsarTestEnvironment; +import org.apache.flink.connector.pulsar.testutils.runtime.container.PulsarContainerRuntime; +import org.apache.flink.connector.pulsar.testutils.runtime.mock.PulsarMockRuntime; -import java.util.function.Supplier; +import org.testcontainers.containers.GenericContainer; /** - * A enum class for providing a operable pulsar runtime. We support two types of runtime, the - * container and mock. + * An abstraction for different pulsar runtimes. Providing the common methods for {@link + * PulsarTestEnvironment}. */ -public enum PulsarRuntime { +public interface PulsarRuntime { - /** - * The whole pulsar cluster would run in a docker container, provide the full fledged test - * backend. - */ - CONTAINER(PulsarContainerProvider::new), + /** Start up this pulsar runtime, block the thread until everytime is ready for this runtime. */ + void startUp(); - /** The bookkeeper and zookeeper would use a mock backend, and start a single pulsar broker. */ - MOCK(PulsarMockProvider::new); + /** Shutdown this pulsar runtime. */ + void tearDown(); - private final Supplier<PulsarRuntimeProvider> provider; + /** Return a operator for operating this pulsar runtime. */ + PulsarRuntimeOperator operator(); - PulsarRuntime(Supplier<PulsarRuntimeProvider> provider) { - this.provider = provider; + static PulsarRuntime mock() { + return new PulsarMockRuntime(); } - public PulsarRuntimeProvider provider() { - return provider.get(); + static PulsarRuntime container() { + return new PulsarContainerRuntime(); + } + + static PulsarRuntime container(GenericContainer<?> flinkContainer) { + return new PulsarContainerRuntime().bindWithFlinkContainer(flinkContainer); } } diff --git a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/PulsarRuntimeOperator.java b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/PulsarRuntimeOperator.java index a68c065..2d26925 100644 --- a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/PulsarRuntimeOperator.java +++ b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/PulsarRuntimeOperator.java @@ -24,6 +24,8 @@ import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition; import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicRange; import org.apache.flink.connectors.test.common.external.ExternalContext; +import org.apache.flink.shaded.guava30.com.google.common.base.Strings; + import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.MessageId; @@ -148,13 +150,30 @@ public class PulsarRuntimeOperator implements Serializable, Closeable { return messageIds.get(0); } + public <T> MessageId sendMessage(String topic, Schema<T> schema, String key, T message) { + List<MessageId> messageIds = sendMessages(topic, schema, key, singletonList(message)); + checkArgument(messageIds.size() == 1); + + return messageIds.get(0); + } + public <T> List<MessageId> sendMessages( String topic, Schema<T> schema, Collection<T> messages) { + return sendMessages(topic, schema, null, messages); + } + + public <T> List<MessageId> sendMessages( + String topic, Schema<T> schema, String key, Collection<T> messages) { try (Producer<T> producer = client().newProducer(schema).topic(topic).create()) { List<MessageId> messageIds = new ArrayList<>(messages.size()); for (T message : messages) { - MessageId messageId = producer.newMessage().value(message).send(); + MessageId messageId; + if (Strings.isNullOrEmpty(key)) { + messageId = producer.newMessage().value(message).send(); + } else { + messageId = producer.newMessage().key(key).value(message).send(); + } messageIds.add(messageId); } diff --git a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/container/PulsarContainerProvider.java b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/container/PulsarContainerRuntime.java similarity index 75% rename from flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/container/PulsarContainerProvider.java rename to flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/container/PulsarContainerRuntime.java index 06be3cb..5560767 100644 --- a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/container/PulsarContainerProvider.java +++ b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/container/PulsarContainerRuntime.java @@ -18,13 +18,14 @@ package org.apache.flink.connector.pulsar.testutils.runtime.container; +import org.apache.flink.connector.pulsar.testutils.runtime.PulsarRuntime; import org.apache.flink.connector.pulsar.testutils.runtime.PulsarRuntimeOperator; -import org.apache.flink.connector.pulsar.testutils.runtime.PulsarRuntimeProvider; import org.apache.flink.util.DockerImageVersions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testcontainers.containers.BindMode; +import org.testcontainers.containers.GenericContainer; import org.testcontainers.containers.PulsarContainer; import org.testcontainers.containers.output.Slf4jLogConsumer; import org.testcontainers.containers.wait.strategy.HttpWaitStrategy; @@ -36,13 +37,22 @@ import java.time.Duration; import static org.apache.flink.util.DockerImageVersions.PULSAR; import static org.apache.flink.util.Preconditions.checkNotNull; import static org.testcontainers.containers.PulsarContainer.BROKER_HTTP_PORT; +import static org.testcontainers.containers.PulsarContainer.BROKER_PORT; /** - * {@link PulsarRuntimeProvider} implementation, use the TestContainers as the backend. We would - * start a pulsar container by this provider. + * {@link PulsarRuntime} implementation, use the TestContainers as the backend. We would start a + * pulsar container by this provider. */ -public class PulsarContainerProvider implements PulsarRuntimeProvider { - private static final Logger LOG = LoggerFactory.getLogger(PulsarContainerProvider.class); +public class PulsarContainerRuntime implements PulsarRuntime { + private static final Logger LOG = LoggerFactory.getLogger(PulsarContainerRuntime.class); + private static final String PULSAR_INTERNAL_HOSTNAME = "pulsar"; + + // This url is used on the container side. + public static final String PULSAR_SERVICE_URL = + String.format("pulsar://%s:%d", PULSAR_INTERNAL_HOSTNAME, BROKER_PORT); + // This url is used on the container side. + public static final String PULSAR_ADMIN_URL = + String.format("http://%s:%d", PULSAR_INTERNAL_HOSTNAME, BROKER_HTTP_PORT); /** * Create a pulsar container provider by a predefined version, this constance {@link @@ -52,6 +62,14 @@ public class PulsarContainerProvider implements PulsarRuntimeProvider { private PulsarRuntimeOperator operator; + public PulsarContainerRuntime bindWithFlinkContainer(GenericContainer<?> flinkContainer) { + this.container + .withNetworkAliases(PULSAR_INTERNAL_HOSTNAME) + .dependsOn(flinkContainer) + .withNetwork(flinkContainer.getNetwork()); + return this; + } + @Override public void startUp() { // Prepare Pulsar Container. diff --git a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/mock/PulsarMockProvider.java b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/mock/PulsarMockRuntime.java similarity index 76% rename from flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/mock/PulsarMockProvider.java rename to flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/mock/PulsarMockRuntime.java index 1cb8ce9..552ce42 100644 --- a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/mock/PulsarMockProvider.java +++ b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/mock/PulsarMockRuntime.java @@ -18,8 +18,8 @@ package org.apache.flink.connector.pulsar.testutils.runtime.mock; +import org.apache.flink.connector.pulsar.testutils.runtime.PulsarRuntime; import org.apache.flink.connector.pulsar.testutils.runtime.PulsarRuntimeOperator; -import org.apache.flink.connector.pulsar.testutils.runtime.PulsarRuntimeProvider; import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableSet; @@ -39,17 +39,17 @@ import static org.apache.flink.connector.pulsar.testutils.runtime.mock.PortBindi import static org.apache.flink.util.Preconditions.checkNotNull; /** Providing a mocked pulsar server. */ -public class PulsarMockProvider implements PulsarRuntimeProvider { +public class PulsarMockRuntime implements PulsarRuntime { private static final String CLUSTER_NAME = "mock-pulsar-" + randomAlphanumeric(6); private final MockPulsarService pulsarService; private PulsarRuntimeOperator operator; - public PulsarMockProvider() { + public PulsarMockRuntime() { this(createConfig()); } - public PulsarMockProvider(ServiceConfiguration configuration) { + public PulsarMockRuntime(ServiceConfiguration configuration) { this.pulsarService = new MockPulsarService(configuration); } @@ -95,20 +95,34 @@ public class PulsarMockProvider implements PulsarRuntimeProvider { admin.clusters().createCluster(CLUSTER_NAME, data); } + createOrUpdateTenant("public"); + createOrUpdateNamespace("public", "default"); + + createOrUpdateTenant("pulsar"); + createOrUpdateNamespace("pulsar", "system"); + } + + private void createOrUpdateTenant(String tenant) throws PulsarAdminException { + PulsarAdmin admin = operator().admin(); TenantInfo info = TenantInfo.builder() .adminRoles(ImmutableSet.of("appid1", "appid2")) .allowedClusters(ImmutableSet.of(CLUSTER_NAME)) .build(); - if (!admin.tenants().getTenants().contains("public")) { - admin.tenants().createTenant("public", info); + if (!admin.tenants().getTenants().contains(tenant)) { + admin.tenants().createTenant(tenant, info); } else { - admin.tenants().updateTenant("public", info); + admin.tenants().updateTenant(tenant, info); } + } - if (!admin.namespaces().getNamespaces("public").contains("public/default")) { - admin.namespaces().createNamespace("public/default"); - admin.namespaces().setRetention("public/default", new RetentionPolicies(60, 1000)); + public void createOrUpdateNamespace(String tenant, String namespace) + throws PulsarAdminException { + PulsarAdmin admin = operator().admin(); + String namespaceValue = tenant + "/" + namespace; + if (!admin.namespaces().getNamespaces(tenant).contains(namespaceValue)) { + admin.namespaces().createNamespace(namespaceValue); + admin.namespaces().setRetention(namespaceValue, new RetentionPolicies(60, 1000)); } } @@ -135,6 +149,13 @@ public class PulsarMockProvider implements PulsarRuntimeProvider { configuration.setBrokerServicePort(Optional.of(findAvailablePort())); configuration.setWebServicePort(Optional.of(findAvailablePort())); + // Enable transaction with in memory. + configuration.setTransactionCoordinatorEnabled(true); + configuration.setTransactionMetadataStoreProviderClassName( + "org.apache.pulsar.transaction.coordinator.impl.InMemTransactionMetadataStoreProvider"); + configuration.setTransactionBufferProviderClassName( + "org.apache.pulsar.broker.transaction.buffer.impl.InMemTransactionBufferProvider"); + return configuration; } } diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/FlinkContainerTestEnvironment.java b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/FlinkContainerTestEnvironment.java index 070bd82..9630003 100644 --- a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/FlinkContainerTestEnvironment.java +++ b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/FlinkContainerTestEnvironment.java @@ -42,10 +42,7 @@ public class FlinkContainerTestEnvironment implements TestEnvironment, ClusterCo public FlinkContainerTestEnvironment( int numTaskManagers, int numSlotsPerTaskManager, String... jarPath) { - Configuration flinkConfiguration = new Configuration(); - flinkConfiguration.set(HEARTBEAT_INTERVAL, 1000L); - flinkConfiguration.set(HEARTBEAT_TIMEOUT, 5000L); - flinkConfiguration.set(SLOT_REQUEST_TIMEOUT, 10000L); + Configuration flinkConfiguration = flinkConfiguration(); flinkConfiguration.set(NUM_TASK_SLOTS, numSlotsPerTaskManager); this.flinkContainer = @@ -113,4 +110,13 @@ public class FlinkContainerTestEnvironment implements TestEnvironment, ClusterCo public FlinkContainer getFlinkContainer() { return this.flinkContainer; } + + protected Configuration flinkConfiguration() { + Configuration flinkConfiguration = new Configuration(); + flinkConfiguration.set(HEARTBEAT_INTERVAL, 1000L); + flinkConfiguration.set(HEARTBEAT_TIMEOUT, 5000L); + flinkConfiguration.set(SLOT_REQUEST_TIMEOUT, 10000L); + + return flinkConfiguration; + } } diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/pom.xml b/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/pom.xml new file mode 100644 index 0000000..269f89c --- /dev/null +++ b/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/pom.xml @@ -0,0 +1,121 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> + <artifactId>flink-end-to-end-tests</artifactId> + <groupId>org.apache.flink</groupId> + <version>1.15-SNAPSHOT</version> + </parent> + <modelVersion>4.0.0</modelVersion> + + <artifactId>flink-end-to-end-tests-pulsar</artifactId> + <name>Flink : E2E Tests : Pulsar</name> + + <properties> + <pulsar.version>2.8.0</pulsar.version> + </properties> + + <dependencies> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-end-to-end-tests-common</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-connector-pulsar_${scala.binary.version}</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-connector-pulsar_${scala.binary.version}</artifactId> + <version>${project.version}</version> + <type>test-jar</type> + </dependency> + <dependency> + <groupId>org.testcontainers</groupId> + <artifactId>pulsar</artifactId> + </dependency> + </dependencies> + + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-dependency-plugin</artifactId> + <executions> + <execution> + <id>copy</id> + <phase>pre-integration-test</phase> + <goals> + <goal>copy</goal> + </goals> + </execution> + </executions> + <configuration> + <artifactItems> + <artifactItem> + <groupId>org.apache.flink</groupId> + <artifactId>flink-connector-pulsar_${scala.binary.version}</artifactId> + <version>${project.version}</version> + <destFileName>pulsar-connector.jar</destFileName> + <type>jar</type> + <outputDirectory>${project.build.directory}/dependencies</outputDirectory> + </artifactItem> + <artifactItem> + <groupId>org.apache.pulsar</groupId> + <artifactId>pulsar-client-all</artifactId> + <version>${pulsar.version}</version> + <destFileName>pulsar-client-all.jar</destFileName> + <type>jar</type> + <outputDirectory>${project.build.directory}/dependencies</outputDirectory> + </artifactItem> + <artifactItem> + <groupId>org.apache.pulsar</groupId> + <artifactId>pulsar-client-api</artifactId> + <version>${pulsar.version}</version> + <destFileName>pulsar-client-api.jar</destFileName> + <type>jar</type> + <outputDirectory>${project.build.directory}/dependencies</outputDirectory> + </artifactItem> + <artifactItem> + <groupId>org.apache.pulsar</groupId> + <artifactId>pulsar-client-admin-api</artifactId> + <version>${pulsar.version}</version> + <destFileName>pulsar-admin-api.jar</destFileName> + <type>jar</type> + <outputDirectory>${project.build.directory}/dependencies</outputDirectory> + </artifactItem> + <artifactItem> + <groupId>org.slf4j</groupId> + <artifactId>jul-to-slf4j</artifactId> + <version>${slf4j.version}</version> + <destFileName>jul-to-slf4j.jar</destFileName> + <type>jar</type> + <outputDirectory>${project.build.directory}/dependencies</outputDirectory> + </artifactItem> + </artifactItems> + </configuration> + </plugin> + </plugins> + </build> +</project> diff --git a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/PulsarSourceITCase.java b/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSourceOrderedE2ECase.java similarity index 51% copy from flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/PulsarSourceITCase.java copy to flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSourceOrderedE2ECase.java index 89457dc..1427c2b 100644 --- a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/PulsarSourceITCase.java +++ b/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSourceOrderedE2ECase.java @@ -16,36 +16,40 @@ * limitations under the License. */ -package org.apache.flink.connector.pulsar.source; +package org.apache.flink.tests.util.pulsar; import org.apache.flink.connector.pulsar.testutils.PulsarTestContextFactory; import org.apache.flink.connector.pulsar.testutils.PulsarTestEnvironment; -import org.apache.flink.connector.pulsar.testutils.cases.MultipleTopicConsumingContext; -import org.apache.flink.connector.pulsar.testutils.cases.SingleTopicConsumingContext; -import org.apache.flink.connector.pulsar.testutils.runtime.PulsarRuntime; -import org.apache.flink.connectors.test.common.environment.MiniClusterTestEnvironment; import org.apache.flink.connectors.test.common.junit.annotations.ExternalContextFactory; import org.apache.flink.connectors.test.common.junit.annotations.ExternalSystem; import org.apache.flink.connectors.test.common.junit.annotations.TestEnv; import org.apache.flink.connectors.test.common.testsuites.SourceTestSuiteBase; +import org.apache.flink.tests.util.pulsar.cases.ExclusiveSubscriptionContext; +import org.apache.flink.tests.util.pulsar.cases.FailoverSubscriptionContext; +import org.apache.flink.tests.util.pulsar.common.FlinkContainerWithPulsarEnvironment; -/** Unite test class for {@link PulsarSource}. */ -@SuppressWarnings("unused") -class PulsarSourceITCase extends SourceTestSuiteBase<String> { +import static org.apache.flink.connector.pulsar.testutils.runtime.PulsarRuntime.container; - // Defines test environment on Flink MiniCluster - @TestEnv MiniClusterTestEnvironment flink = new MiniClusterTestEnvironment(); +/** + * Pulsar E2E test based on connector testing framework. It's used for Failover & Exclusive + * subscription. + */ +public class PulsarSourceOrderedE2ECase extends SourceTestSuiteBase<String> { + + // Defines TestEnvironment. + @TestEnv + FlinkContainerWithPulsarEnvironment flink = new FlinkContainerWithPulsarEnvironment(1, 6); - // Defines pulsar running environment - @ExternalSystem PulsarTestEnvironment pulsar = new PulsarTestEnvironment(PulsarRuntime.MOCK); + // Defines ConnectorExternalSystem. + @ExternalSystem + PulsarTestEnvironment pulsar = new PulsarTestEnvironment(container(flink.getFlinkContainer())); - // Defines a external context Factories, - // so test cases will be invoked using this external contexts. + // Defines a set of external context Factories for different test cases. @ExternalContextFactory - PulsarTestContextFactory<String, SingleTopicConsumingContext> singleTopic = - new PulsarTestContextFactory<>(pulsar, SingleTopicConsumingContext::new); + PulsarTestContextFactory<String, ExclusiveSubscriptionContext> exclusive = + new PulsarTestContextFactory<>(pulsar, ExclusiveSubscriptionContext::new); @ExternalContextFactory - PulsarTestContextFactory<String, MultipleTopicConsumingContext> multipleTopic = - new PulsarTestContextFactory<>(pulsar, MultipleTopicConsumingContext::new); + PulsarTestContextFactory<String, FailoverSubscriptionContext> failover = + new PulsarTestContextFactory<>(pulsar, FailoverSubscriptionContext::new); } diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSourceUnorderedE2ECase.java b/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSourceUnorderedE2ECase.java new file mode 100644 index 0000000..25cab21 --- /dev/null +++ b/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSourceUnorderedE2ECase.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.tests.util.pulsar; + +import org.apache.flink.connector.pulsar.testutils.PulsarTestContextFactory; +import org.apache.flink.connector.pulsar.testutils.PulsarTestEnvironment; +import org.apache.flink.connectors.test.common.junit.annotations.ExternalContextFactory; +import org.apache.flink.connectors.test.common.junit.annotations.ExternalSystem; +import org.apache.flink.connectors.test.common.junit.annotations.TestEnv; +import org.apache.flink.tests.util.pulsar.cases.KeySharedSubscriptionContext; +import org.apache.flink.tests.util.pulsar.cases.SharedSubscriptionContext; +import org.apache.flink.tests.util.pulsar.common.FlinkContainerWithPulsarEnvironment; +import org.apache.flink.tests.util.pulsar.common.UnorderedSourceTestSuiteBase; + +import static org.apache.flink.connector.pulsar.testutils.runtime.PulsarRuntime.container; + +/** + * Pulsar E2E test based on connector testing framework. It's used for Shared & Key_Shared + * subscription. + */ +public class PulsarSourceUnorderedE2ECase extends UnorderedSourceTestSuiteBase<String> { + + // Defines TestEnvironment. + @TestEnv + FlinkContainerWithPulsarEnvironment flink = new FlinkContainerWithPulsarEnvironment(1, 8); + + // Defines ConnectorExternalSystem. + @ExternalSystem + PulsarTestEnvironment pulsar = new PulsarTestEnvironment(container(flink.getFlinkContainer())); + + // Defines a set of external context Factories for different test cases. + @ExternalContextFactory + PulsarTestContextFactory<String, SharedSubscriptionContext> shared = + new PulsarTestContextFactory<>(pulsar, SharedSubscriptionContext::new); + + @ExternalContextFactory + PulsarTestContextFactory<String, KeySharedSubscriptionContext> keyShared = + new PulsarTestContextFactory<>(pulsar, KeySharedSubscriptionContext::new); +} diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/cases/ExclusiveSubscriptionContext.java b/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/cases/ExclusiveSubscriptionContext.java new file mode 100644 index 0000000..18b2ffc --- /dev/null +++ b/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/cases/ExclusiveSubscriptionContext.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.tests.util.pulsar.cases; + +import org.apache.flink.connector.pulsar.testutils.PulsarTestEnvironment; +import org.apache.flink.connector.pulsar.testutils.cases.MultipleTopicTemplateContext; + +import org.apache.pulsar.client.api.SubscriptionType; + +import static org.apache.flink.connector.pulsar.testutils.runtime.container.PulsarContainerRuntime.PULSAR_ADMIN_URL; +import static org.apache.flink.connector.pulsar.testutils.runtime.container.PulsarContainerRuntime.PULSAR_SERVICE_URL; + +/** We would consuming from test splits by using {@link SubscriptionType#Exclusive} subscription. */ +public class ExclusiveSubscriptionContext extends MultipleTopicTemplateContext { + private static final long serialVersionUID = 6238209089442257487L; + + public ExclusiveSubscriptionContext(PulsarTestEnvironment environment) { + super(environment); + } + + @Override + protected String displayName() { + return "consuming message by Exclusive"; + } + + @Override + protected String subscriptionName() { + return "pulsar-exclusive-subscription"; + } + + @Override + protected SubscriptionType subscriptionType() { + return SubscriptionType.Exclusive; + } + + @Override + protected String serviceUrl() { + return PULSAR_SERVICE_URL; + } + + @Override + protected String adminUrl() { + return PULSAR_ADMIN_URL; + } +} diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/cases/FailoverSubscriptionContext.java b/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/cases/FailoverSubscriptionContext.java new file mode 100644 index 0000000..c322efa --- /dev/null +++ b/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/cases/FailoverSubscriptionContext.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.tests.util.pulsar.cases; + +import org.apache.flink.connector.pulsar.testutils.PulsarTestEnvironment; +import org.apache.flink.connector.pulsar.testutils.cases.MultipleTopicTemplateContext; + +import org.apache.pulsar.client.api.SubscriptionType; + +import static org.apache.flink.connector.pulsar.testutils.runtime.container.PulsarContainerRuntime.PULSAR_ADMIN_URL; +import static org.apache.flink.connector.pulsar.testutils.runtime.container.PulsarContainerRuntime.PULSAR_SERVICE_URL; + +/** We would consuming from test splits by using {@link SubscriptionType#Failover} subscription. */ +public class FailoverSubscriptionContext extends MultipleTopicTemplateContext { + private static final long serialVersionUID = 6238209089442257487L; + + public FailoverSubscriptionContext(PulsarTestEnvironment environment) { + super(environment); + } + + @Override + protected String displayName() { + return "consuming message by Failover"; + } + + @Override + protected String subscriptionName() { + return "pulsar-failover-subscription"; + } + + @Override + protected SubscriptionType subscriptionType() { + return SubscriptionType.Failover; + } + + @Override + protected String serviceUrl() { + return PULSAR_SERVICE_URL; + } + + @Override + protected String adminUrl() { + return PULSAR_ADMIN_URL; + } +} diff --git a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/cases/MultipleTopicConsumingContext.java b/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/cases/KeySharedSubscriptionContext.java similarity index 50% copy from flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/cases/MultipleTopicConsumingContext.java copy to flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/cases/KeySharedSubscriptionContext.java index 7ce676c..e442418 100644 --- a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/cases/MultipleTopicConsumingContext.java +++ b/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/cases/KeySharedSubscriptionContext.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.flink.connector.pulsar.testutils.cases; +package org.apache.flink.tests.util.pulsar.cases; import org.apache.flink.api.connector.source.Boundedness; import org.apache.flink.api.connector.source.Source; @@ -24,57 +24,77 @@ import org.apache.flink.connector.pulsar.source.PulsarSource; import org.apache.flink.connector.pulsar.source.PulsarSourceBuilder; import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor; import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils; -import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition; -import org.apache.flink.connector.pulsar.testutils.PulsarPartitionDataWriter; +import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicRange; +import org.apache.flink.connector.pulsar.source.enumerator.topic.range.FixedRangeGenerator; import org.apache.flink.connector.pulsar.testutils.PulsarTestContext; import org.apache.flink.connector.pulsar.testutils.PulsarTestEnvironment; import org.apache.flink.connectors.test.common.external.SourceSplitDataWriter; +import org.apache.flink.tests.util.pulsar.common.KeyedPulsarPartitionDataWriter; import org.apache.pulsar.client.api.RegexSubscriptionMode; +import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.common.util.Murmur3_32Hash; +import java.util.ArrayList; import java.util.Collection; -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.ThreadLocalRandom; +import java.util.List; -import static org.apache.flink.connector.pulsar.source.enumerator.topic.TopicRange.createFullRange; +import static java.util.Collections.singletonList; +import static org.apache.commons.lang3.RandomStringUtils.randomAlphabetic; +import static org.apache.flink.connector.pulsar.source.enumerator.topic.TopicRange.RANGE_SIZE; import static org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema.pulsarSchema; +import static org.apache.flink.connector.pulsar.testutils.runtime.container.PulsarContainerRuntime.PULSAR_ADMIN_URL; +import static org.apache.flink.connector.pulsar.testutils.runtime.container.PulsarContainerRuntime.PULSAR_SERVICE_URL; import static org.apache.pulsar.client.api.Schema.STRING; -import static org.apache.pulsar.client.api.SubscriptionType.Exclusive; /** - * Pulsar external context that will create multiple topics with only one partitions as source - * splits. + * We would consuming from test splits by using {@link SubscriptionType#Key_Shared} subscription. */ -public class MultipleTopicConsumingContext extends PulsarTestContext<String> { +public class KeySharedSubscriptionContext extends PulsarTestContext<String> { + private static final long serialVersionUID = 3246516520107893983L; - private int numTopics = 0; + private int index = 0; - private final String topicPattern; + private final List<KeyedPulsarPartitionDataWriter> writers = new ArrayList<>(); - private final Map<String, SourceSplitDataWriter<String>> topicNameToSplitWriters = - new HashMap<>(); + // Message keys. + private final String key1; + private final String key2; - public MultipleTopicConsumingContext(PulsarTestEnvironment environment) { - super("consuming message on multiple topic", environment); - this.topicPattern = - "pulsar-multiple-topic-[0-9]+-" - + ThreadLocalRandom.current().nextLong(Long.MAX_VALUE); + public KeySharedSubscriptionContext(PulsarTestEnvironment environment) { + super(environment); + + // Init message keys. + this.key1 = randomAlphabetic(8); + String newKey2; + do { + newKey2 = randomAlphabetic(8); + } while (keyHash(key1) == keyHash(newKey2)); + this.key2 = newKey2; + } + + @Override + protected String displayName() { + return "consuming message by Key_Shared"; } @Override public Source<String, ?, ?> createSource(Boundedness boundedness) { + int keyHash = keyHash(key1); + TopicRange range = new TopicRange(keyHash, keyHash); + PulsarSourceBuilder<String> builder = PulsarSource.builder() .setDeserializationSchema(pulsarSchema(STRING)) - .setServiceUrl(operator.serviceUrl()) - .setAdminUrl(operator.adminUrl()) - .setTopicPattern(topicPattern, RegexSubscriptionMode.AllTopics) - .setSubscriptionType(Exclusive) - .setSubscriptionName("flink-pulsar-multiple-topic-test"); + .setServiceUrl(PULSAR_SERVICE_URL) + .setAdminUrl(PULSAR_ADMIN_URL) + .setTopicPattern( + "pulsar-[0-9]+-key-shared", RegexSubscriptionMode.AllTopics) + .setSubscriptionType(SubscriptionType.Key_Shared) + .setSubscriptionName("pulsar-key-shared") + .setRangeGenerator(new FixedRangeGenerator(singletonList(range))); if (boundedness == Boundedness.BOUNDED) { // Using latest stop cursor for making sure the source could be stopped. - // This is required for SourceTestSuiteBase. builder.setBoundedStopCursor(StopCursor.latest()); } @@ -83,16 +103,14 @@ public class MultipleTopicConsumingContext extends PulsarTestContext<String> { @Override public SourceSplitDataWriter<String> createSourceSplitDataWriter() { - String topicName = topicPattern.replace("[0-9]+", String.valueOf(numTopics)); + String topicName = "pulsar-" + index + "-key-shared"; operator.createTopic(topicName, 1); + index++; String partitionName = TopicNameUtils.topicNameWithPartition(topicName, 0); - TopicPartition partition = new TopicPartition(partitionName, 0, createFullRange()); - PulsarPartitionDataWriter writer = - new PulsarPartitionDataWriter(operator.client(), partition); - - topicNameToSplitWriters.put(partitionName, writer); - numTopics++; + KeyedPulsarPartitionDataWriter writer = + new KeyedPulsarPartitionDataWriter(operator, partitionName, key1, key2); + writers.add(writer); return writer; } @@ -103,11 +121,14 @@ public class MultipleTopicConsumingContext extends PulsarTestContext<String> { } @Override - public void close() throws Exception { - for (SourceSplitDataWriter<String> writer : topicNameToSplitWriters.values()) { + public void close() { + for (KeyedPulsarPartitionDataWriter writer : writers) { writer.close(); } + writers.clear(); + } - topicNameToSplitWriters.clear(); + private int keyHash(String key) { + return Murmur3_32Hash.getInstance().makeHash(key.getBytes()) % RANGE_SIZE; } } diff --git a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/cases/MultipleTopicConsumingContext.java b/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/cases/SharedSubscriptionContext.java similarity index 57% copy from flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/cases/MultipleTopicConsumingContext.java copy to flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/cases/SharedSubscriptionContext.java index 7ce676c..f936b6f 100644 --- a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/cases/MultipleTopicConsumingContext.java +++ b/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/cases/SharedSubscriptionContext.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.flink.connector.pulsar.testutils.cases; +package org.apache.flink.tests.util.pulsar.cases; import org.apache.flink.api.connector.source.Boundedness; import org.apache.flink.api.connector.source.Source; @@ -24,42 +24,38 @@ import org.apache.flink.connector.pulsar.source.PulsarSource; import org.apache.flink.connector.pulsar.source.PulsarSourceBuilder; import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor; import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils; -import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition; import org.apache.flink.connector.pulsar.testutils.PulsarPartitionDataWriter; import org.apache.flink.connector.pulsar.testutils.PulsarTestContext; import org.apache.flink.connector.pulsar.testutils.PulsarTestEnvironment; import org.apache.flink.connectors.test.common.external.SourceSplitDataWriter; import org.apache.pulsar.client.api.RegexSubscriptionMode; +import org.apache.pulsar.client.api.SubscriptionType; +import java.util.ArrayList; import java.util.Collection; -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.ThreadLocalRandom; +import java.util.List; -import static org.apache.flink.connector.pulsar.source.enumerator.topic.TopicRange.createFullRange; import static org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema.pulsarSchema; +import static org.apache.flink.connector.pulsar.testutils.runtime.container.PulsarContainerRuntime.PULSAR_ADMIN_URL; +import static org.apache.flink.connector.pulsar.testutils.runtime.container.PulsarContainerRuntime.PULSAR_SERVICE_URL; import static org.apache.pulsar.client.api.Schema.STRING; -import static org.apache.pulsar.client.api.SubscriptionType.Exclusive; -/** - * Pulsar external context that will create multiple topics with only one partitions as source - * splits. - */ -public class MultipleTopicConsumingContext extends PulsarTestContext<String> { +/** We would consuming from test splits by using {@link SubscriptionType#Shared} subscription. */ +public class SharedSubscriptionContext extends PulsarTestContext<String> { + private static final long serialVersionUID = -2798707923661295245L; - private int numTopics = 0; + private int index = 0; - private final String topicPattern; + private final List<PulsarPartitionDataWriter> writers = new ArrayList<>(); - private final Map<String, SourceSplitDataWriter<String>> topicNameToSplitWriters = - new HashMap<>(); + public SharedSubscriptionContext(PulsarTestEnvironment environment) { + super(environment); + } - public MultipleTopicConsumingContext(PulsarTestEnvironment environment) { - super("consuming message on multiple topic", environment); - this.topicPattern = - "pulsar-multiple-topic-[0-9]+-" - + ThreadLocalRandom.current().nextLong(Long.MAX_VALUE); + @Override + protected String displayName() { + return "consuming message by Shared"; } @Override @@ -67,14 +63,13 @@ public class MultipleTopicConsumingContext extends PulsarTestContext<String> { PulsarSourceBuilder<String> builder = PulsarSource.builder() .setDeserializationSchema(pulsarSchema(STRING)) - .setServiceUrl(operator.serviceUrl()) - .setAdminUrl(operator.adminUrl()) - .setTopicPattern(topicPattern, RegexSubscriptionMode.AllTopics) - .setSubscriptionType(Exclusive) - .setSubscriptionName("flink-pulsar-multiple-topic-test"); + .setServiceUrl(PULSAR_SERVICE_URL) + .setAdminUrl(PULSAR_ADMIN_URL) + .setTopicPattern("pulsar-[0-9]+-shared", RegexSubscriptionMode.AllTopics) + .setSubscriptionType(SubscriptionType.Shared) + .setSubscriptionName("pulsar-shared"); if (boundedness == Boundedness.BOUNDED) { // Using latest stop cursor for making sure the source could be stopped. - // This is required for SourceTestSuiteBase. builder.setBoundedStopCursor(StopCursor.latest()); } @@ -83,16 +78,13 @@ public class MultipleTopicConsumingContext extends PulsarTestContext<String> { @Override public SourceSplitDataWriter<String> createSourceSplitDataWriter() { - String topicName = topicPattern.replace("[0-9]+", String.valueOf(numTopics)); + String topicName = "pulsar-" + index + "-shared"; operator.createTopic(topicName, 1); + index++; String partitionName = TopicNameUtils.topicNameWithPartition(topicName, 0); - TopicPartition partition = new TopicPartition(partitionName, 0, createFullRange()); - PulsarPartitionDataWriter writer = - new PulsarPartitionDataWriter(operator.client(), partition); - - topicNameToSplitWriters.put(partitionName, writer); - numTopics++; + PulsarPartitionDataWriter writer = new PulsarPartitionDataWriter(operator, partitionName); + writers.add(writer); return writer; } @@ -103,11 +95,10 @@ public class MultipleTopicConsumingContext extends PulsarTestContext<String> { } @Override - public void close() throws Exception { - for (SourceSplitDataWriter<String> writer : topicNameToSplitWriters.values()) { + public void close() { + for (PulsarPartitionDataWriter writer : writers) { writer.close(); } - - topicNameToSplitWriters.clear(); + writers.clear(); } } diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/common/FlinkContainerWithPulsarEnvironment.java b/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/common/FlinkContainerWithPulsarEnvironment.java new file mode 100644 index 0000000..890d09e --- /dev/null +++ b/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/common/FlinkContainerWithPulsarEnvironment.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.tests.util.pulsar.common; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.MemorySize; +import org.apache.flink.tests.util.TestUtils; +import org.apache.flink.tests.util.flink.FlinkContainerTestEnvironment; + +import static org.apache.flink.configuration.TaskManagerOptions.TASK_OFF_HEAP_MEMORY; + +/** A Flink Container which would bundles pulsar connector in its classpath. */ +public class FlinkContainerWithPulsarEnvironment extends FlinkContainerTestEnvironment { + + public FlinkContainerWithPulsarEnvironment(int numTaskManagers, int numSlotsPerTaskManager) { + super( + numTaskManagers, + numSlotsPerTaskManager, + resourcePath("pulsar-connector.jar"), + resourcePath("pulsar-client-all.jar"), + resourcePath("pulsar-client-api.jar"), + resourcePath("pulsar-admin-api.jar"), + resourcePath("jul-to-slf4j.jar")); + } + + private static String resourcePath(String jarName) { + return TestUtils.getResource(jarName).toAbsolutePath().toString(); + } + + @Override + protected Configuration flinkConfiguration() { + Configuration configuration = super.flinkConfiguration(); + // Increase the off heap memory for avoiding direct buffer memory error on Pulsar e2e tests. + configuration.set(TASK_OFF_HEAP_MEMORY, MemorySize.ofMebiBytes(100)); + + return configuration; + } +} diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/common/KeyedPulsarPartitionDataWriter.java b/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/common/KeyedPulsarPartitionDataWriter.java new file mode 100644 index 0000000..eea97e6 --- /dev/null +++ b/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/common/KeyedPulsarPartitionDataWriter.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.tests.util.pulsar.common; + +import org.apache.flink.connector.pulsar.testutils.runtime.PulsarRuntimeOperator; +import org.apache.flink.connectors.test.common.external.SourceSplitDataWriter; + +import org.apache.pulsar.client.api.Schema; + +import java.util.Collection; +import java.util.List; + +import static java.util.stream.Collectors.toList; + +/** + * Source split data writer for writing test data into a Pulsar topic partition. It will write the + * message with two keys. + */ +public class KeyedPulsarPartitionDataWriter implements SourceSplitDataWriter<String> { + + private final PulsarRuntimeOperator operator; + private final String fullTopicName; + private final String key1; + private final String key2; + + public KeyedPulsarPartitionDataWriter( + PulsarRuntimeOperator operator, String fullTopicName, String key1, String key2) { + this.operator = operator; + this.fullTopicName = fullTopicName; + this.key1 = key1; + this.key2 = key2; + } + + @Override + public void writeRecords(Collection<String> records) { + operator.sendMessages(fullTopicName, Schema.STRING, key1, records); + + List<String> newRecords = records.stream().map(a -> a + key1).collect(toList()); + operator.sendMessages(fullTopicName, Schema.STRING, key2, newRecords); + } + + @Override + public void close() { + // Nothing to do. + } +} diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/common/UnorderedSourceTestSuiteBase.java b/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/common/UnorderedSourceTestSuiteBase.java new file mode 100644 index 0000000..c452fe6 --- /dev/null +++ b/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/common/UnorderedSourceTestSuiteBase.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.tests.util.pulsar.common; + +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.connector.source.Boundedness; +import org.apache.flink.api.connector.source.Source; +import org.apache.flink.connectors.test.common.environment.TestEnvironment; +import org.apache.flink.connectors.test.common.external.ExternalContext; +import org.apache.flink.connectors.test.common.external.SourceSplitDataWriter; +import org.apache.flink.connectors.test.common.junit.extensions.ConnectorTestingExtension; +import org.apache.flink.connectors.test.common.junit.extensions.TestCaseInvocationContextProvider; +import org.apache.flink.connectors.test.common.junit.extensions.TestLoggerExtension; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; + +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.TestInstance; +import org.junit.jupiter.api.TestTemplate; +import org.junit.jupiter.api.extension.ExtendWith; + +import java.util.Collection; +import java.util.List; +import java.util.concurrent.ThreadLocalRandom; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.containsInAnyOrder; + +/** A source test template for testing the messages which could be consumed in a unordered way. */ +@ExtendWith({ + ConnectorTestingExtension.class, + TestLoggerExtension.class, + TestCaseInvocationContextProvider.class +}) +@TestInstance(TestInstance.Lifecycle.PER_CLASS) +public abstract class UnorderedSourceTestSuiteBase<T> { + + @TestTemplate + @DisplayName("Test source with one split and four consumers") + public void testOneSplitWithMultipleConsumers( + TestEnvironment testEnv, ExternalContext<T> externalContext) throws Exception { + Collection<T> testData = + externalContext.generateTestData(0, ThreadLocalRandom.current().nextLong()); + SourceSplitDataWriter<T> writer = externalContext.createSourceSplitDataWriter(); + writer.writeRecords(testData); + + Source<T, ?, ?> source = externalContext.createSource(Boundedness.BOUNDED); + StreamExecutionEnvironment execEnv = testEnv.createExecutionEnvironment(); + List<T> results = + execEnv.fromSource(source, WatermarkStrategy.noWatermarks(), "Pulsar source") + .setParallelism(4) + .executeAndCollect( + "Source single split with four readers.", testData.size()); + + assertThat(results, containsInAnyOrder(testData.toArray())); + } +} diff --git a/tools/ci/java-ci-tools/src/main/resources/modules-skipping-deployment.modulelist b/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/resources/log4j2-test.properties similarity index 58% copy from tools/ci/java-ci-tools/src/main/resources/modules-skipping-deployment.modulelist copy to flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/resources/log4j2-test.properties index 521d055..835c2ec 100644 --- a/tools/ci/java-ci-tools/src/main/resources/modules-skipping-deployment.modulelist +++ b/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/resources/log4j2-test.properties @@ -16,26 +16,13 @@ # limitations under the License. ################################################################################ -# These modules are not deployed to maven central, despite their use of the shade plugin. +# Set root logger level to OFF to not flood build logs +# set manually to INFO for debugging purposes +rootLogger.level = OFF +rootLogger.appenderRef.test.ref = TestLogger -flink-examples-streaming-twitter -flink-examples-streaming-gcp-pubsub -flink-yarn-tests -flink-docs -flink-datastream-allround-test -flink-queryable-state-test -flink-confluent-schema-registry -flink-stream-stateful-job-upgrade-test -flink-elasticsearch7-test -flink-stream-state-ttl-test -flink-state-evolution-test -flink-elasticsearch6-test -flink-rocksdb-state-memory-control-test -flink-python-test -flink-streaming-kinesis-test -flink-tpch-test -flink-streaming-kafka-test-base -flink-heavy-deployment-stress-test -flink-elasticsearch5-test -flink-high-parallelism-iterations-test -flink-end-to-end-tests-common-kafka +appender.testlogger.name = TestLogger +appender.testlogger.type = CONSOLE +appender.testlogger.target = SYSTEM_ERR +appender.testlogger.layout.type = PatternLayout +appender.testlogger.layout.pattern = %-4r [%t] %-5p %c %x - %m%n diff --git a/flink-end-to-end-tests/pom.xml b/flink-end-to-end-tests/pom.xml index 8aada21..84ca52e 100644 --- a/flink-end-to-end-tests/pom.xml +++ b/flink-end-to-end-tests/pom.xml @@ -88,6 +88,7 @@ under the License. <module>flink-python-test</module> <module>flink-end-to-end-tests-hbase</module> <module>flink-glue-schema-registry-test</module> + <module>flink-end-to-end-tests-pulsar</module> </modules> <dependencyManagement> diff --git a/tools/ci/java-ci-tools/src/main/resources/modules-skipping-deployment.modulelist b/tools/ci/java-ci-tools/src/main/resources/modules-skipping-deployment.modulelist index 521d055..110ba08 100644 --- a/tools/ci/java-ci-tools/src/main/resources/modules-skipping-deployment.modulelist +++ b/tools/ci/java-ci-tools/src/main/resources/modules-skipping-deployment.modulelist @@ -39,3 +39,4 @@ flink-heavy-deployment-stress-test flink-elasticsearch5-test flink-high-parallelism-iterations-test flink-end-to-end-tests-common-kafka +flink-end-to-end-tests-pulsar