This is an automated email from the ASF dual-hosted git repository. martijnvisser pushed a commit to branch v3.0 in repository https://gitbox.apache.org/repos/asf/flink-connector-pulsar.git
commit bc467f322463e4fb811538a3651f9ba176172649 Author: Yufan Sheng <yu...@streamnative.io> AuthorDate: Wed Sep 28 15:28:14 2022 +0800 [FLINK-26182][Connector/pulsar] Create e2e tests for the Pulsar source and sink based on the connector testing framework. --- ...norderedE2ECase.java => PulsarSinkE2ECase.java} | 31 +++----- .../util/pulsar/PulsarSourceUnorderedE2ECase.java | 4 +- .../FlinkContainerWithPulsarEnvironment.java | 5 +- .../common/KeyedPulsarPartitionDataWriter.java | 64 ---------------- .../source/KeySharedSubscriptionContext.java | 87 ---------------------- .../pulsar/source/SharedSubscriptionContext.java | 44 ----------- 6 files changed, 15 insertions(+), 220 deletions(-) diff --git a/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSourceUnorderedE2ECase.java b/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSinkE2ECase.java similarity index 64% copy from flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSourceUnorderedE2ECase.java copy to flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSinkE2ECase.java index 15333a1..a19f593 100644 --- a/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSourceUnorderedE2ECase.java +++ b/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSinkE2ECase.java @@ -19,7 +19,8 @@ package org.apache.flink.tests.util.pulsar; import org.apache.flink.connector.pulsar.testutils.PulsarTestContextFactory; -import org.apache.flink.connector.pulsar.testutils.source.UnorderedSourceTestSuiteBase; +import org.apache.flink.connector.pulsar.testutils.sink.PulsarSinkTestContext; +import org.apache.flink.connector.pulsar.testutils.sink.PulsarSinkTestSuiteBase; import org.apache.flink.connector.testframe.junit.annotations.TestContext; import org.apache.flink.connector.testframe.junit.annotations.TestEnv; import org.apache.flink.connector.testframe.junit.annotations.TestExternalSystem; @@ -27,38 +28,30 @@ import org.apache.flink.connector.testframe.junit.annotations.TestSemantics; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.tests.util.pulsar.common.FlinkContainerWithPulsarEnvironment; import org.apache.flink.tests.util.pulsar.common.PulsarContainerTestEnvironment; -import org.apache.flink.tests.util.pulsar.source.KeySharedSubscriptionContext; -import org.apache.flink.tests.util.pulsar.source.SharedSubscriptionContext; import org.apache.flink.testutils.junit.FailsOnJava11; import org.junit.experimental.categories.Category; -/** - * Pulsar E2E test based on connector testing framework. It's used for Shared & Key_Shared - * subscription. - */ +/** Pulsar sink E2E test based on connector testing framework. */ @SuppressWarnings("unused") @Category(value = {FailsOnJava11.class}) -public class PulsarSourceUnorderedE2ECase extends UnorderedSourceTestSuiteBase<String> { +public class PulsarSinkE2ECase extends PulsarSinkTestSuiteBase { - // Defines the Semantic. @TestSemantics - CheckpointingMode[] semantics = new CheckpointingMode[] {CheckpointingMode.EXACTLY_ONCE}; + CheckpointingMode[] semantics = + new CheckpointingMode[] { + CheckpointingMode.EXACTLY_ONCE, CheckpointingMode.AT_LEAST_ONCE + }; - // Defines TestEnvironment. + // Defines TestEnvironment @TestEnv - FlinkContainerWithPulsarEnvironment flink = new FlinkContainerWithPulsarEnvironment(1, 8); + FlinkContainerWithPulsarEnvironment flink = new FlinkContainerWithPulsarEnvironment(1, 6); // Defines ConnectorExternalSystem. @TestExternalSystem PulsarContainerTestEnvironment pulsar = new PulsarContainerTestEnvironment(flink); - // Defines a set of external context Factories for different test cases. - @TestContext - PulsarTestContextFactory<String, SharedSubscriptionContext> shared = - new PulsarTestContextFactory<>(pulsar, SharedSubscriptionContext::new); - @TestContext - PulsarTestContextFactory<String, KeySharedSubscriptionContext> keyShared = - new PulsarTestContextFactory<>(pulsar, KeySharedSubscriptionContext::new); + PulsarTestContextFactory<String, PulsarSinkTestContext> sinkContext = + new PulsarTestContextFactory<>(pulsar, PulsarSinkTestContext::new); } diff --git a/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSourceUnorderedE2ECase.java b/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSourceUnorderedE2ECase.java index 15333a1..89692d1 100644 --- a/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSourceUnorderedE2ECase.java +++ b/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSourceUnorderedE2ECase.java @@ -20,6 +20,8 @@ package org.apache.flink.tests.util.pulsar; import org.apache.flink.connector.pulsar.testutils.PulsarTestContextFactory; import org.apache.flink.connector.pulsar.testutils.source.UnorderedSourceTestSuiteBase; +import org.apache.flink.connector.pulsar.testutils.source.cases.KeySharedSubscriptionContext; +import org.apache.flink.connector.pulsar.testutils.source.cases.SharedSubscriptionContext; import org.apache.flink.connector.testframe.junit.annotations.TestContext; import org.apache.flink.connector.testframe.junit.annotations.TestEnv; import org.apache.flink.connector.testframe.junit.annotations.TestExternalSystem; @@ -27,8 +29,6 @@ import org.apache.flink.connector.testframe.junit.annotations.TestSemantics; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.tests.util.pulsar.common.FlinkContainerWithPulsarEnvironment; import org.apache.flink.tests.util.pulsar.common.PulsarContainerTestEnvironment; -import org.apache.flink.tests.util.pulsar.source.KeySharedSubscriptionContext; -import org.apache.flink.tests.util.pulsar.source.SharedSubscriptionContext; import org.apache.flink.testutils.junit.FailsOnJava11; import org.junit.experimental.categories.Category; diff --git a/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/common/FlinkContainerWithPulsarEnvironment.java b/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/common/FlinkContainerWithPulsarEnvironment.java index f5e862f..65e99a8 100644 --- a/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/common/FlinkContainerWithPulsarEnvironment.java +++ b/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/common/FlinkContainerWithPulsarEnvironment.java @@ -51,11 +51,8 @@ public class FlinkContainerWithPulsarEnvironment extends FlinkContainerTestEnvir return ResourceTestUtils.getResource(jarName).toAbsolutePath().toString(); } - protected static Configuration flinkConfiguration() { + private static Configuration flinkConfiguration() { Configuration configuration = new Configuration(); - // Increase the off heap memory of TaskManager to avoid direct buffer memory error in Pulsar - // e2e tests. - configuration.set(TaskManagerOptions.TASK_OFF_HEAP_MEMORY, MemorySize.ofMebiBytes(100)); // Increase the jvm metaspace memory to avoid java.lang.OutOfMemoryError: Metaspace configuration.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.ofMebiBytes(2048)); diff --git a/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/common/KeyedPulsarPartitionDataWriter.java b/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/common/KeyedPulsarPartitionDataWriter.java deleted file mode 100644 index d5f6e11..0000000 --- a/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/common/KeyedPulsarPartitionDataWriter.java +++ /dev/null @@ -1,64 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.tests.util.pulsar.common; - -import org.apache.flink.connector.pulsar.testutils.runtime.PulsarRuntimeOperator; -import org.apache.flink.connector.testframe.external.ExternalSystemSplitDataWriter; - -import org.apache.pulsar.client.api.Schema; - -import java.util.List; - -import static java.util.stream.Collectors.toList; - -/** - * Source split data writer for writing test data into a Pulsar topic partition. It will write the - * message with two keys. - */ -public class KeyedPulsarPartitionDataWriter implements ExternalSystemSplitDataWriter<String> { - - private final PulsarRuntimeOperator operator; - private final String fullTopicName; - private final String keyToRead; - private final String keyToExclude; - - public KeyedPulsarPartitionDataWriter( - PulsarRuntimeOperator operator, - String fullTopicName, - String keyToRead, - String keyToExclude) { - this.operator = operator; - this.fullTopicName = fullTopicName; - this.keyToRead = keyToRead; - this.keyToExclude = keyToExclude; - } - - @Override - public void writeRecords(List<String> records) { - List<String> newRecords = records.stream().map(a -> a + keyToRead).collect(toList()); - - operator.sendMessages(fullTopicName, Schema.STRING, keyToExclude, newRecords); - operator.sendMessages(fullTopicName, Schema.STRING, keyToRead, records); - } - - @Override - public void close() { - // Nothing to do. - } -} diff --git a/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/source/KeySharedSubscriptionContext.java b/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/source/KeySharedSubscriptionContext.java deleted file mode 100644 index 0cae6e5..0000000 --- a/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/source/KeySharedSubscriptionContext.java +++ /dev/null @@ -1,87 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.tests.util.pulsar.source; - -import org.apache.flink.connector.pulsar.source.PulsarSourceBuilder; -import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicRange; -import org.apache.flink.connector.pulsar.source.enumerator.topic.range.FixedRangeGenerator; -import org.apache.flink.connector.pulsar.testutils.PulsarTestEnvironment; -import org.apache.flink.connector.pulsar.testutils.source.cases.MultipleTopicConsumingContext; -import org.apache.flink.connector.testframe.external.ExternalSystemSplitDataWriter; -import org.apache.flink.connector.testframe.external.source.TestingSourceSettings; -import org.apache.flink.tests.util.pulsar.common.KeyedPulsarPartitionDataWriter; - -import org.apache.pulsar.client.api.SubscriptionType; -import org.apache.pulsar.common.util.Murmur3_32Hash; - -import static java.util.Collections.singletonList; -import static org.apache.commons.lang3.RandomStringUtils.randomAlphabetic; -import static org.apache.flink.connector.pulsar.source.enumerator.topic.TopicRange.RANGE_SIZE; -import static org.apache.pulsar.client.api.SubscriptionType.Key_Shared; - -/** We would consume from test splits by using {@link SubscriptionType#Key_Shared} subscription. */ -public class KeySharedSubscriptionContext extends MultipleTopicConsumingContext { - - private final String keyToRead; - private final String keyToExclude; - - public KeySharedSubscriptionContext(PulsarTestEnvironment environment) { - super(environment, Key_Shared); - - this.keyToRead = randomAlphabetic(8); - - // Make sure they have different hash code. - int readHash = keyHash(keyToRead); - String randomKey; - do { - randomKey = randomAlphabetic(8); - } while (keyHash(randomKey) == readHash); - this.keyToExclude = randomKey; - } - - @Override - public ExternalSystemSplitDataWriter<String> createSourceSplitDataWriter( - TestingSourceSettings sourceSettings) { - String partitionName = generatePartitionName(); - return new KeyedPulsarPartitionDataWriter(operator, partitionName, keyToRead, keyToExclude); - } - - @Override - protected String displayName() { - return "consume message by Key_Shared"; - } - - @Override - protected void setSourceBuilder(PulsarSourceBuilder<String> builder) { - int keyHash = keyHash(keyToRead); - TopicRange range = new TopicRange(keyHash, keyHash); - - builder.setRangeGenerator(new FixedRangeGenerator(singletonList(range))); - } - - @Override - protected String subscriptionName() { - return "pulsar-key-shared-subscription"; - } - - // This method is copied from Pulsar for calculating message key hash. - private int keyHash(String key) { - return Murmur3_32Hash.getInstance().makeHash(key.getBytes()) % RANGE_SIZE; - } -} diff --git a/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/source/SharedSubscriptionContext.java b/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/source/SharedSubscriptionContext.java deleted file mode 100644 index fe9f078..0000000 --- a/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/source/SharedSubscriptionContext.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.tests.util.pulsar.source; - -import org.apache.flink.connector.pulsar.testutils.PulsarTestEnvironment; -import org.apache.flink.connector.pulsar.testutils.source.cases.MultipleTopicConsumingContext; - -import org.apache.pulsar.client.api.SubscriptionType; - -import static org.apache.pulsar.client.api.SubscriptionType.Shared; - -/** We would consume from test splits by using {@link SubscriptionType#Shared} subscription. */ -public class SharedSubscriptionContext extends MultipleTopicConsumingContext { - - public SharedSubscriptionContext(PulsarTestEnvironment environment) { - super(environment, Shared); - } - - @Override - protected String displayName() { - return "consume message by Shared"; - } - - @Override - protected String subscriptionName() { - return "pulsar-shared-subscription"; - } -}