This is an automated email from the ASF dual-hosted git repository. martijnvisser pushed a commit to branch v3.0 in repository https://gitbox.apache.org/repos/asf/flink-connector-pulsar.git
commit d75fc2c3e1915dbd004ae0ac9c3c5e1c878685d9 Author: Yufan Sheng <yu...@streamnative.io> AuthorDate: Wed Feb 9 15:09:50 2022 +0800 [FLINK-26025][connector/pulsar] Replace MockPulsar with new Pulsar test tools based on PulsarStandalone. 1. Drop some unused fields in test classes. 2. Fix the checkstyle issues for source test. 3. Fix violations for Pulsar connector according to the flink-architecture-tests. 4. Create a standalone Pulsar for test. 5. Add new methods to PulsarRuntimeOperator. 6. Fix the bug in PulsarContainerRuntime, support running tests in E2E environment. 7. Create PulsarContainerTestEnvironment for supporting E2E tests. 8. Add a lot of comments for Pulsar testing tools. 9. Drop mocked Pulsar service, use standalone Pulsar instead. --- .../util/pulsar/PulsarSourceOrderedE2ECase.java | 7 ++--- .../util/pulsar/PulsarSourceUnorderedE2ECase.java | 7 ++--- .../pulsar/cases/ExclusiveSubscriptionContext.java | 14 ---------- .../pulsar/cases/FailoverSubscriptionContext.java | 14 ---------- .../pulsar/cases/KeySharedSubscriptionContext.java | 7 ++--- .../pulsar/cases/SharedSubscriptionContext.java | 7 ++--- .../common/PulsarContainerTestEnvironment.java | 31 ++++++++++++++++++++++ 7 files changed, 39 insertions(+), 48 deletions(-) diff --git a/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSourceOrderedE2ECase.java b/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSourceOrderedE2ECase.java index 7d22e80..502b41d 100644 --- a/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSourceOrderedE2ECase.java +++ b/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSourceOrderedE2ECase.java @@ -19,7 +19,6 @@ package org.apache.flink.tests.util.pulsar; import org.apache.flink.connector.pulsar.testutils.PulsarTestContextFactory; -import org.apache.flink.connector.pulsar.testutils.PulsarTestEnvironment; import org.apache.flink.connector.testframe.junit.annotations.TestContext; import org.apache.flink.connector.testframe.junit.annotations.TestEnv; import org.apache.flink.connector.testframe.junit.annotations.TestExternalSystem; @@ -29,8 +28,7 @@ import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.tests.util.pulsar.cases.ExclusiveSubscriptionContext; import org.apache.flink.tests.util.pulsar.cases.FailoverSubscriptionContext; import org.apache.flink.tests.util.pulsar.common.FlinkContainerWithPulsarEnvironment; - -import static org.apache.flink.connector.pulsar.testutils.runtime.PulsarRuntime.container; +import org.apache.flink.tests.util.pulsar.common.PulsarContainerTestEnvironment; /** * Pulsar E2E test based on connector testing framework. It's used for Failover & Exclusive @@ -48,8 +46,7 @@ public class PulsarSourceOrderedE2ECase extends SourceTestSuiteBase<String> { // Defines ConnectorExternalSystem. @TestExternalSystem - PulsarTestEnvironment pulsar = - new PulsarTestEnvironment(container(flink.getFlinkContainers().getJobManager())); + PulsarContainerTestEnvironment pulsar = new PulsarContainerTestEnvironment(flink); // Defines a set of external context Factories for different test cases. @TestContext diff --git a/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSourceUnorderedE2ECase.java b/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSourceUnorderedE2ECase.java index d14d8f9..5039048 100644 --- a/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSourceUnorderedE2ECase.java +++ b/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSourceUnorderedE2ECase.java @@ -19,7 +19,6 @@ package org.apache.flink.tests.util.pulsar; import org.apache.flink.connector.pulsar.testutils.PulsarTestContextFactory; -import org.apache.flink.connector.pulsar.testutils.PulsarTestEnvironment; import org.apache.flink.connector.testframe.junit.annotations.TestContext; import org.apache.flink.connector.testframe.junit.annotations.TestEnv; import org.apache.flink.connector.testframe.junit.annotations.TestExternalSystem; @@ -28,10 +27,9 @@ import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.tests.util.pulsar.cases.KeySharedSubscriptionContext; import org.apache.flink.tests.util.pulsar.cases.SharedSubscriptionContext; import org.apache.flink.tests.util.pulsar.common.FlinkContainerWithPulsarEnvironment; +import org.apache.flink.tests.util.pulsar.common.PulsarContainerTestEnvironment; import org.apache.flink.tests.util.pulsar.common.UnorderedSourceTestSuiteBase; -import static org.apache.flink.connector.pulsar.testutils.runtime.PulsarRuntime.container; - /** * Pulsar E2E test based on connector testing framework. It's used for Shared & Key_Shared * subscription. @@ -48,8 +46,7 @@ public class PulsarSourceUnorderedE2ECase extends UnorderedSourceTestSuiteBase<S // Defines ConnectorExternalSystem. @TestExternalSystem - PulsarTestEnvironment pulsar = - new PulsarTestEnvironment(container(flink.getFlinkContainers().getJobManager())); + PulsarContainerTestEnvironment pulsar = new PulsarContainerTestEnvironment(flink); // Defines a set of external context Factories for different test cases. @SuppressWarnings("unused") diff --git a/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/cases/ExclusiveSubscriptionContext.java b/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/cases/ExclusiveSubscriptionContext.java index 1245e14..6fea0c9 100644 --- a/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/cases/ExclusiveSubscriptionContext.java +++ b/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/cases/ExclusiveSubscriptionContext.java @@ -27,12 +27,8 @@ import java.net.URL; import java.util.Collections; import java.util.List; -import static org.apache.flink.connector.pulsar.testutils.runtime.container.PulsarContainerRuntime.PULSAR_ADMIN_URL; -import static org.apache.flink.connector.pulsar.testutils.runtime.container.PulsarContainerRuntime.PULSAR_SERVICE_URL; - /** We would consume from test splits by using {@link SubscriptionType#Exclusive} subscription. */ public class ExclusiveSubscriptionContext extends MultipleTopicTemplateContext { - private static final long serialVersionUID = 1L; public ExclusiveSubscriptionContext(PulsarTestEnvironment environment) { this(environment, Collections.emptyList()); @@ -57,14 +53,4 @@ public class ExclusiveSubscriptionContext extends MultipleTopicTemplateContext { protected SubscriptionType subscriptionType() { return SubscriptionType.Exclusive; } - - @Override - protected String serviceUrl() { - return PULSAR_SERVICE_URL; - } - - @Override - protected String adminUrl() { - return PULSAR_ADMIN_URL; - } } diff --git a/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/cases/FailoverSubscriptionContext.java b/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/cases/FailoverSubscriptionContext.java index 8ec1685..c473488 100644 --- a/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/cases/FailoverSubscriptionContext.java +++ b/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/cases/FailoverSubscriptionContext.java @@ -27,12 +27,8 @@ import java.net.URL; import java.util.Collections; import java.util.List; -import static org.apache.flink.connector.pulsar.testutils.runtime.container.PulsarContainerRuntime.PULSAR_ADMIN_URL; -import static org.apache.flink.connector.pulsar.testutils.runtime.container.PulsarContainerRuntime.PULSAR_SERVICE_URL; - /** We would consume from test splits by using {@link SubscriptionType#Failover} subscription. */ public class FailoverSubscriptionContext extends MultipleTopicTemplateContext { - private static final long serialVersionUID = 1L; public FailoverSubscriptionContext(PulsarTestEnvironment environment) { this(environment, Collections.emptyList()); @@ -57,14 +53,4 @@ public class FailoverSubscriptionContext extends MultipleTopicTemplateContext { protected SubscriptionType subscriptionType() { return SubscriptionType.Failover; } - - @Override - protected String serviceUrl() { - return PULSAR_SERVICE_URL; - } - - @Override - protected String adminUrl() { - return PULSAR_ADMIN_URL; - } } diff --git a/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/cases/KeySharedSubscriptionContext.java b/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/cases/KeySharedSubscriptionContext.java index 303783a..5ad369b 100644 --- a/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/cases/KeySharedSubscriptionContext.java +++ b/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/cases/KeySharedSubscriptionContext.java @@ -46,13 +46,10 @@ import static java.util.Collections.singletonList; import static org.apache.commons.lang3.RandomStringUtils.randomAlphabetic; import static org.apache.flink.connector.pulsar.source.enumerator.topic.TopicRange.RANGE_SIZE; import static org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema.pulsarSchema; -import static org.apache.flink.connector.pulsar.testutils.runtime.container.PulsarContainerRuntime.PULSAR_ADMIN_URL; -import static org.apache.flink.connector.pulsar.testutils.runtime.container.PulsarContainerRuntime.PULSAR_SERVICE_URL; import static org.apache.pulsar.client.api.Schema.STRING; /** We would consume from test splits by using {@link SubscriptionType#Key_Shared} subscription. */ public class KeySharedSubscriptionContext extends PulsarTestContext<String> { - private static final long serialVersionUID = 1L; private int index = 0; @@ -92,8 +89,8 @@ public class KeySharedSubscriptionContext extends PulsarTestContext<String> { PulsarSourceBuilder<String> builder = PulsarSource.builder() .setDeserializationSchema(pulsarSchema(STRING)) - .setServiceUrl(PULSAR_SERVICE_URL) - .setAdminUrl(PULSAR_ADMIN_URL) + .setServiceUrl(operator.serviceUrl()) + .setAdminUrl(operator.adminUrl()) .setTopicPattern( "pulsar-[0-9]+-key-shared", RegexSubscriptionMode.AllTopics) .setSubscriptionType(SubscriptionType.Key_Shared) diff --git a/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/cases/SharedSubscriptionContext.java b/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/cases/SharedSubscriptionContext.java index de53595..1a2db66 100644 --- a/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/cases/SharedSubscriptionContext.java +++ b/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/cases/SharedSubscriptionContext.java @@ -40,13 +40,10 @@ import java.util.Collections; import java.util.List; import static org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema.pulsarSchema; -import static org.apache.flink.connector.pulsar.testutils.runtime.container.PulsarContainerRuntime.PULSAR_ADMIN_URL; -import static org.apache.flink.connector.pulsar.testutils.runtime.container.PulsarContainerRuntime.PULSAR_SERVICE_URL; import static org.apache.pulsar.client.api.Schema.STRING; /** We would consuming from test splits by using {@link SubscriptionType#Shared} subscription. */ public class SharedSubscriptionContext extends PulsarTestContext<String> { - private static final long serialVersionUID = 1L; private int index = 0; @@ -71,8 +68,8 @@ public class SharedSubscriptionContext extends PulsarTestContext<String> { PulsarSourceBuilder<String> builder = PulsarSource.builder() .setDeserializationSchema(pulsarSchema(STRING)) - .setServiceUrl(PULSAR_SERVICE_URL) - .setAdminUrl(PULSAR_ADMIN_URL) + .setServiceUrl(operator.serviceUrl()) + .setAdminUrl(operator.adminUrl()) .setTopicPattern("pulsar-[0-9]+-shared", RegexSubscriptionMode.AllTopics) .setSubscriptionType(SubscriptionType.Shared) .setSubscriptionName("pulsar-shared"); diff --git a/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/common/PulsarContainerTestEnvironment.java b/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/common/PulsarContainerTestEnvironment.java new file mode 100644 index 0000000..654347b --- /dev/null +++ b/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/common/PulsarContainerTestEnvironment.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.tests.util.pulsar.common; + +import org.apache.flink.connector.pulsar.testutils.PulsarTestEnvironment; + +import static org.apache.flink.connector.pulsar.testutils.runtime.PulsarRuntime.container; + +/** This test environment is used for create a Pulsar standalone instance for e2e tests. */ +public class PulsarContainerTestEnvironment extends PulsarTestEnvironment { + + public PulsarContainerTestEnvironment(FlinkContainerWithPulsarEnvironment flinkEnvironment) { + super(container(flinkEnvironment.getFlinkContainers().getJobManager())); + } +}