This is an automated email from the ASF dual-hosted git repository.

martijnvisser pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-pulsar.git

commit 9c98b83104a15a58d16cb0c753e201f33044a903
Author: Yufan Sheng <yu...@streamnative.io>
AuthorDate: Wed Feb 9 15:09:50 2022 +0800

    [FLINK-26025][connector/pulsar] Replace MockPulsar with new Pulsar test 
tools based on PulsarStandalone.
    
    1. Drop some unused fields in test classes.
    2. Fix the checkstyle issues for source test.
    3. Fix violations for Pulsar connector according to the 
flink-architecture-tests.
    4. Create a standalone Pulsar for test.
    5. Add new methods to PulsarRuntimeOperator.
    6. Fix the bug in PulsarContainerRuntime, support running tests in E2E 
environment.
    7. Create PulsarContainerTestEnvironment for supporting E2E tests.
    8. Add a lot of comments for Pulsar testing tools.
    9. Drop mocked Pulsar service, use standalone Pulsar instead.
---
 .../util/pulsar/PulsarSourceOrderedE2ECase.java    |  7 ++---
 .../util/pulsar/PulsarSourceUnorderedE2ECase.java  |  7 ++---
 .../pulsar/cases/ExclusiveSubscriptionContext.java | 14 ----------
 .../pulsar/cases/FailoverSubscriptionContext.java  | 14 ----------
 .../pulsar/cases/KeySharedSubscriptionContext.java |  7 ++---
 .../pulsar/cases/SharedSubscriptionContext.java    |  7 ++---
 .../common/PulsarContainerTestEnvironment.java     | 31 ++++++++++++++++++++++
 7 files changed, 39 insertions(+), 48 deletions(-)

diff --git 
a/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSourceOrderedE2ECase.java
 
b/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSourceOrderedE2ECase.java
index 7d22e80..502b41d 100644
--- 
a/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSourceOrderedE2ECase.java
+++ 
b/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSourceOrderedE2ECase.java
@@ -19,7 +19,6 @@
 package org.apache.flink.tests.util.pulsar;
 
 import org.apache.flink.connector.pulsar.testutils.PulsarTestContextFactory;
-import org.apache.flink.connector.pulsar.testutils.PulsarTestEnvironment;
 import org.apache.flink.connector.testframe.junit.annotations.TestContext;
 import org.apache.flink.connector.testframe.junit.annotations.TestEnv;
 import 
org.apache.flink.connector.testframe.junit.annotations.TestExternalSystem;
@@ -29,8 +28,7 @@ import org.apache.flink.streaming.api.CheckpointingMode;
 import org.apache.flink.tests.util.pulsar.cases.ExclusiveSubscriptionContext;
 import org.apache.flink.tests.util.pulsar.cases.FailoverSubscriptionContext;
 import 
org.apache.flink.tests.util.pulsar.common.FlinkContainerWithPulsarEnvironment;
-
-import static 
org.apache.flink.connector.pulsar.testutils.runtime.PulsarRuntime.container;
+import 
org.apache.flink.tests.util.pulsar.common.PulsarContainerTestEnvironment;
 
 /**
  * Pulsar E2E test based on connector testing framework. It's used for 
Failover & Exclusive
@@ -48,8 +46,7 @@ public class PulsarSourceOrderedE2ECase extends 
SourceTestSuiteBase<String> {
 
     // Defines ConnectorExternalSystem.
     @TestExternalSystem
-    PulsarTestEnvironment pulsar =
-            new 
PulsarTestEnvironment(container(flink.getFlinkContainers().getJobManager()));
+    PulsarContainerTestEnvironment pulsar = new 
PulsarContainerTestEnvironment(flink);
 
     // Defines a set of external context Factories for different test cases.
     @TestContext
diff --git 
a/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSourceUnorderedE2ECase.java
 
b/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSourceUnorderedE2ECase.java
index d14d8f9..5039048 100644
--- 
a/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSourceUnorderedE2ECase.java
+++ 
b/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSourceUnorderedE2ECase.java
@@ -19,7 +19,6 @@
 package org.apache.flink.tests.util.pulsar;
 
 import org.apache.flink.connector.pulsar.testutils.PulsarTestContextFactory;
-import org.apache.flink.connector.pulsar.testutils.PulsarTestEnvironment;
 import org.apache.flink.connector.testframe.junit.annotations.TestContext;
 import org.apache.flink.connector.testframe.junit.annotations.TestEnv;
 import 
org.apache.flink.connector.testframe.junit.annotations.TestExternalSystem;
@@ -28,10 +27,9 @@ import org.apache.flink.streaming.api.CheckpointingMode;
 import org.apache.flink.tests.util.pulsar.cases.KeySharedSubscriptionContext;
 import org.apache.flink.tests.util.pulsar.cases.SharedSubscriptionContext;
 import 
org.apache.flink.tests.util.pulsar.common.FlinkContainerWithPulsarEnvironment;
+import 
org.apache.flink.tests.util.pulsar.common.PulsarContainerTestEnvironment;
 import org.apache.flink.tests.util.pulsar.common.UnorderedSourceTestSuiteBase;
 
-import static 
org.apache.flink.connector.pulsar.testutils.runtime.PulsarRuntime.container;
-
 /**
  * Pulsar E2E test based on connector testing framework. It's used for Shared 
& Key_Shared
  * subscription.
@@ -48,8 +46,7 @@ public class PulsarSourceUnorderedE2ECase extends 
UnorderedSourceTestSuiteBase<S
 
     // Defines ConnectorExternalSystem.
     @TestExternalSystem
-    PulsarTestEnvironment pulsar =
-            new 
PulsarTestEnvironment(container(flink.getFlinkContainers().getJobManager()));
+    PulsarContainerTestEnvironment pulsar = new 
PulsarContainerTestEnvironment(flink);
 
     // Defines a set of external context Factories for different test cases.
     @SuppressWarnings("unused")
diff --git 
a/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/cases/ExclusiveSubscriptionContext.java
 
b/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/cases/ExclusiveSubscriptionContext.java
index 1245e14..6fea0c9 100644
--- 
a/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/cases/ExclusiveSubscriptionContext.java
+++ 
b/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/cases/ExclusiveSubscriptionContext.java
@@ -27,12 +27,8 @@ import java.net.URL;
 import java.util.Collections;
 import java.util.List;
 
-import static 
org.apache.flink.connector.pulsar.testutils.runtime.container.PulsarContainerRuntime.PULSAR_ADMIN_URL;
-import static 
org.apache.flink.connector.pulsar.testutils.runtime.container.PulsarContainerRuntime.PULSAR_SERVICE_URL;
-
 /** We would consume from test splits by using {@link 
SubscriptionType#Exclusive} subscription. */
 public class ExclusiveSubscriptionContext extends MultipleTopicTemplateContext 
{
-    private static final long serialVersionUID = 1L;
 
     public ExclusiveSubscriptionContext(PulsarTestEnvironment environment) {
         this(environment, Collections.emptyList());
@@ -57,14 +53,4 @@ public class ExclusiveSubscriptionContext extends 
MultipleTopicTemplateContext {
     protected SubscriptionType subscriptionType() {
         return SubscriptionType.Exclusive;
     }
-
-    @Override
-    protected String serviceUrl() {
-        return PULSAR_SERVICE_URL;
-    }
-
-    @Override
-    protected String adminUrl() {
-        return PULSAR_ADMIN_URL;
-    }
 }
diff --git 
a/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/cases/FailoverSubscriptionContext.java
 
b/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/cases/FailoverSubscriptionContext.java
index 8ec1685..c473488 100644
--- 
a/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/cases/FailoverSubscriptionContext.java
+++ 
b/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/cases/FailoverSubscriptionContext.java
@@ -27,12 +27,8 @@ import java.net.URL;
 import java.util.Collections;
 import java.util.List;
 
-import static 
org.apache.flink.connector.pulsar.testutils.runtime.container.PulsarContainerRuntime.PULSAR_ADMIN_URL;
-import static 
org.apache.flink.connector.pulsar.testutils.runtime.container.PulsarContainerRuntime.PULSAR_SERVICE_URL;
-
 /** We would consume from test splits by using {@link 
SubscriptionType#Failover} subscription. */
 public class FailoverSubscriptionContext extends MultipleTopicTemplateContext {
-    private static final long serialVersionUID = 1L;
 
     public FailoverSubscriptionContext(PulsarTestEnvironment environment) {
         this(environment, Collections.emptyList());
@@ -57,14 +53,4 @@ public class FailoverSubscriptionContext extends 
MultipleTopicTemplateContext {
     protected SubscriptionType subscriptionType() {
         return SubscriptionType.Failover;
     }
-
-    @Override
-    protected String serviceUrl() {
-        return PULSAR_SERVICE_URL;
-    }
-
-    @Override
-    protected String adminUrl() {
-        return PULSAR_ADMIN_URL;
-    }
 }
diff --git 
a/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/cases/KeySharedSubscriptionContext.java
 
b/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/cases/KeySharedSubscriptionContext.java
index 303783a..5ad369b 100644
--- 
a/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/cases/KeySharedSubscriptionContext.java
+++ 
b/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/cases/KeySharedSubscriptionContext.java
@@ -46,13 +46,10 @@ import static java.util.Collections.singletonList;
 import static org.apache.commons.lang3.RandomStringUtils.randomAlphabetic;
 import static 
org.apache.flink.connector.pulsar.source.enumerator.topic.TopicRange.RANGE_SIZE;
 import static 
org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema.pulsarSchema;
-import static 
org.apache.flink.connector.pulsar.testutils.runtime.container.PulsarContainerRuntime.PULSAR_ADMIN_URL;
-import static 
org.apache.flink.connector.pulsar.testutils.runtime.container.PulsarContainerRuntime.PULSAR_SERVICE_URL;
 import static org.apache.pulsar.client.api.Schema.STRING;
 
 /** We would consume from test splits by using {@link 
SubscriptionType#Key_Shared} subscription. */
 public class KeySharedSubscriptionContext extends PulsarTestContext<String> {
-    private static final long serialVersionUID = 1L;
 
     private int index = 0;
 
@@ -92,8 +89,8 @@ public class KeySharedSubscriptionContext extends 
PulsarTestContext<String> {
         PulsarSourceBuilder<String> builder =
                 PulsarSource.builder()
                         .setDeserializationSchema(pulsarSchema(STRING))
-                        .setServiceUrl(PULSAR_SERVICE_URL)
-                        .setAdminUrl(PULSAR_ADMIN_URL)
+                        .setServiceUrl(operator.serviceUrl())
+                        .setAdminUrl(operator.adminUrl())
                         .setTopicPattern(
                                 "pulsar-[0-9]+-key-shared", 
RegexSubscriptionMode.AllTopics)
                         .setSubscriptionType(SubscriptionType.Key_Shared)
diff --git 
a/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/cases/SharedSubscriptionContext.java
 
b/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/cases/SharedSubscriptionContext.java
index de53595..1a2db66 100644
--- 
a/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/cases/SharedSubscriptionContext.java
+++ 
b/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/cases/SharedSubscriptionContext.java
@@ -40,13 +40,10 @@ import java.util.Collections;
 import java.util.List;
 
 import static 
org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema.pulsarSchema;
-import static 
org.apache.flink.connector.pulsar.testutils.runtime.container.PulsarContainerRuntime.PULSAR_ADMIN_URL;
-import static 
org.apache.flink.connector.pulsar.testutils.runtime.container.PulsarContainerRuntime.PULSAR_SERVICE_URL;
 import static org.apache.pulsar.client.api.Schema.STRING;
 
 /** We would consuming from test splits by using {@link 
SubscriptionType#Shared} subscription. */
 public class SharedSubscriptionContext extends PulsarTestContext<String> {
-    private static final long serialVersionUID = 1L;
 
     private int index = 0;
 
@@ -71,8 +68,8 @@ public class SharedSubscriptionContext extends 
PulsarTestContext<String> {
         PulsarSourceBuilder<String> builder =
                 PulsarSource.builder()
                         .setDeserializationSchema(pulsarSchema(STRING))
-                        .setServiceUrl(PULSAR_SERVICE_URL)
-                        .setAdminUrl(PULSAR_ADMIN_URL)
+                        .setServiceUrl(operator.serviceUrl())
+                        .setAdminUrl(operator.adminUrl())
                         .setTopicPattern("pulsar-[0-9]+-shared", 
RegexSubscriptionMode.AllTopics)
                         .setSubscriptionType(SubscriptionType.Shared)
                         .setSubscriptionName("pulsar-shared");
diff --git 
a/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/common/PulsarContainerTestEnvironment.java
 
b/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/common/PulsarContainerTestEnvironment.java
new file mode 100644
index 0000000..654347b
--- /dev/null
+++ 
b/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/common/PulsarContainerTestEnvironment.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tests.util.pulsar.common;
+
+import org.apache.flink.connector.pulsar.testutils.PulsarTestEnvironment;
+
+import static 
org.apache.flink.connector.pulsar.testutils.runtime.PulsarRuntime.container;
+
+/** This test environment is used for create a Pulsar standalone instance for 
e2e tests. */
+public class PulsarContainerTestEnvironment extends PulsarTestEnvironment {
+
+    public PulsarContainerTestEnvironment(FlinkContainerWithPulsarEnvironment 
flinkEnvironment) {
+        
super(container(flinkEnvironment.getFlinkContainers().getJobManager()));
+    }
+}

Reply via email to