This is an automated email from the ASF dual-hosted git repository.

martijnvisser pushed a commit to branch v3.0
in repository https://gitbox.apache.org/repos/asf/flink-connector-pulsar.git

commit bc467f322463e4fb811538a3651f9ba176172649
Author: Yufan Sheng <yu...@streamnative.io>
AuthorDate: Wed Sep 28 15:28:14 2022 +0800

    [FLINK-26182][Connector/pulsar] Create e2e tests for the Pulsar source and 
sink based on the connector testing framework.
---
 ...norderedE2ECase.java => PulsarSinkE2ECase.java} | 31 +++-----
 .../util/pulsar/PulsarSourceUnorderedE2ECase.java  |  4 +-
 .../FlinkContainerWithPulsarEnvironment.java       |  5 +-
 .../common/KeyedPulsarPartitionDataWriter.java     | 64 ----------------
 .../source/KeySharedSubscriptionContext.java       | 87 ----------------------
 .../pulsar/source/SharedSubscriptionContext.java   | 44 -----------
 6 files changed, 15 insertions(+), 220 deletions(-)

diff --git 
a/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSourceUnorderedE2ECase.java
 
b/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSinkE2ECase.java
similarity index 64%
copy from 
flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSourceUnorderedE2ECase.java
copy to 
flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSinkE2ECase.java
index 15333a1..a19f593 100644
--- 
a/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSourceUnorderedE2ECase.java
+++ 
b/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSinkE2ECase.java
@@ -19,7 +19,8 @@
 package org.apache.flink.tests.util.pulsar;
 
 import org.apache.flink.connector.pulsar.testutils.PulsarTestContextFactory;
-import 
org.apache.flink.connector.pulsar.testutils.source.UnorderedSourceTestSuiteBase;
+import org.apache.flink.connector.pulsar.testutils.sink.PulsarSinkTestContext;
+import 
org.apache.flink.connector.pulsar.testutils.sink.PulsarSinkTestSuiteBase;
 import org.apache.flink.connector.testframe.junit.annotations.TestContext;
 import org.apache.flink.connector.testframe.junit.annotations.TestEnv;
 import 
org.apache.flink.connector.testframe.junit.annotations.TestExternalSystem;
@@ -27,38 +28,30 @@ import 
org.apache.flink.connector.testframe.junit.annotations.TestSemantics;
 import org.apache.flink.streaming.api.CheckpointingMode;
 import 
org.apache.flink.tests.util.pulsar.common.FlinkContainerWithPulsarEnvironment;
 import 
org.apache.flink.tests.util.pulsar.common.PulsarContainerTestEnvironment;
-import org.apache.flink.tests.util.pulsar.source.KeySharedSubscriptionContext;
-import org.apache.flink.tests.util.pulsar.source.SharedSubscriptionContext;
 import org.apache.flink.testutils.junit.FailsOnJava11;
 
 import org.junit.experimental.categories.Category;
 
-/**
- * Pulsar E2E test based on connector testing framework. It's used for Shared 
& Key_Shared
- * subscription.
- */
+/** Pulsar sink E2E test based on connector testing framework. */
 @SuppressWarnings("unused")
 @Category(value = {FailsOnJava11.class})
-public class PulsarSourceUnorderedE2ECase extends 
UnorderedSourceTestSuiteBase<String> {
+public class PulsarSinkE2ECase extends PulsarSinkTestSuiteBase {
 
-    // Defines the Semantic.
     @TestSemantics
-    CheckpointingMode[] semantics = new CheckpointingMode[] 
{CheckpointingMode.EXACTLY_ONCE};
+    CheckpointingMode[] semantics =
+            new CheckpointingMode[] {
+                CheckpointingMode.EXACTLY_ONCE, CheckpointingMode.AT_LEAST_ONCE
+            };
 
-    // Defines TestEnvironment.
+    // Defines TestEnvironment
     @TestEnv
-    FlinkContainerWithPulsarEnvironment flink = new 
FlinkContainerWithPulsarEnvironment(1, 8);
+    FlinkContainerWithPulsarEnvironment flink = new 
FlinkContainerWithPulsarEnvironment(1, 6);
 
     // Defines ConnectorExternalSystem.
     @TestExternalSystem
     PulsarContainerTestEnvironment pulsar = new 
PulsarContainerTestEnvironment(flink);
 
-    // Defines a set of external context Factories for different test cases.
-    @TestContext
-    PulsarTestContextFactory<String, SharedSubscriptionContext> shared =
-            new PulsarTestContextFactory<>(pulsar, 
SharedSubscriptionContext::new);
-
     @TestContext
-    PulsarTestContextFactory<String, KeySharedSubscriptionContext> keyShared =
-            new PulsarTestContextFactory<>(pulsar, 
KeySharedSubscriptionContext::new);
+    PulsarTestContextFactory<String, PulsarSinkTestContext> sinkContext =
+            new PulsarTestContextFactory<>(pulsar, PulsarSinkTestContext::new);
 }
diff --git 
a/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSourceUnorderedE2ECase.java
 
b/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSourceUnorderedE2ECase.java
index 15333a1..89692d1 100644
--- 
a/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSourceUnorderedE2ECase.java
+++ 
b/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSourceUnorderedE2ECase.java
@@ -20,6 +20,8 @@ package org.apache.flink.tests.util.pulsar;
 
 import org.apache.flink.connector.pulsar.testutils.PulsarTestContextFactory;
 import 
org.apache.flink.connector.pulsar.testutils.source.UnorderedSourceTestSuiteBase;
+import 
org.apache.flink.connector.pulsar.testutils.source.cases.KeySharedSubscriptionContext;
+import 
org.apache.flink.connector.pulsar.testutils.source.cases.SharedSubscriptionContext;
 import org.apache.flink.connector.testframe.junit.annotations.TestContext;
 import org.apache.flink.connector.testframe.junit.annotations.TestEnv;
 import 
org.apache.flink.connector.testframe.junit.annotations.TestExternalSystem;
@@ -27,8 +29,6 @@ import 
org.apache.flink.connector.testframe.junit.annotations.TestSemantics;
 import org.apache.flink.streaming.api.CheckpointingMode;
 import 
org.apache.flink.tests.util.pulsar.common.FlinkContainerWithPulsarEnvironment;
 import 
org.apache.flink.tests.util.pulsar.common.PulsarContainerTestEnvironment;
-import org.apache.flink.tests.util.pulsar.source.KeySharedSubscriptionContext;
-import org.apache.flink.tests.util.pulsar.source.SharedSubscriptionContext;
 import org.apache.flink.testutils.junit.FailsOnJava11;
 
 import org.junit.experimental.categories.Category;
diff --git 
a/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/common/FlinkContainerWithPulsarEnvironment.java
 
b/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/common/FlinkContainerWithPulsarEnvironment.java
index f5e862f..65e99a8 100644
--- 
a/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/common/FlinkContainerWithPulsarEnvironment.java
+++ 
b/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/common/FlinkContainerWithPulsarEnvironment.java
@@ -51,11 +51,8 @@ public class FlinkContainerWithPulsarEnvironment extends 
FlinkContainerTestEnvir
         return 
ResourceTestUtils.getResource(jarName).toAbsolutePath().toString();
     }
 
-    protected static Configuration flinkConfiguration() {
+    private static Configuration flinkConfiguration() {
         Configuration configuration = new Configuration();
-        // Increase the off heap memory of TaskManager to avoid direct buffer 
memory error in Pulsar
-        // e2e tests.
-        configuration.set(TaskManagerOptions.TASK_OFF_HEAP_MEMORY, 
MemorySize.ofMebiBytes(100));
 
         // Increase the jvm metaspace memory to avoid 
java.lang.OutOfMemoryError: Metaspace
         configuration.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY, 
MemorySize.ofMebiBytes(2048));
diff --git 
a/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/common/KeyedPulsarPartitionDataWriter.java
 
b/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/common/KeyedPulsarPartitionDataWriter.java
deleted file mode 100644
index d5f6e11..0000000
--- 
a/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/common/KeyedPulsarPartitionDataWriter.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.tests.util.pulsar.common;
-
-import 
org.apache.flink.connector.pulsar.testutils.runtime.PulsarRuntimeOperator;
-import 
org.apache.flink.connector.testframe.external.ExternalSystemSplitDataWriter;
-
-import org.apache.pulsar.client.api.Schema;
-
-import java.util.List;
-
-import static java.util.stream.Collectors.toList;
-
-/**
- * Source split data writer for writing test data into a Pulsar topic 
partition. It will write the
- * message with two keys.
- */
-public class KeyedPulsarPartitionDataWriter implements 
ExternalSystemSplitDataWriter<String> {
-
-    private final PulsarRuntimeOperator operator;
-    private final String fullTopicName;
-    private final String keyToRead;
-    private final String keyToExclude;
-
-    public KeyedPulsarPartitionDataWriter(
-            PulsarRuntimeOperator operator,
-            String fullTopicName,
-            String keyToRead,
-            String keyToExclude) {
-        this.operator = operator;
-        this.fullTopicName = fullTopicName;
-        this.keyToRead = keyToRead;
-        this.keyToExclude = keyToExclude;
-    }
-
-    @Override
-    public void writeRecords(List<String> records) {
-        List<String> newRecords = records.stream().map(a -> a + 
keyToRead).collect(toList());
-
-        operator.sendMessages(fullTopicName, Schema.STRING, keyToExclude, 
newRecords);
-        operator.sendMessages(fullTopicName, Schema.STRING, keyToRead, 
records);
-    }
-
-    @Override
-    public void close() {
-        // Nothing to do.
-    }
-}
diff --git 
a/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/source/KeySharedSubscriptionContext.java
 
b/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/source/KeySharedSubscriptionContext.java
deleted file mode 100644
index 0cae6e5..0000000
--- 
a/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/source/KeySharedSubscriptionContext.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.tests.util.pulsar.source;
-
-import org.apache.flink.connector.pulsar.source.PulsarSourceBuilder;
-import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicRange;
-import 
org.apache.flink.connector.pulsar.source.enumerator.topic.range.FixedRangeGenerator;
-import org.apache.flink.connector.pulsar.testutils.PulsarTestEnvironment;
-import 
org.apache.flink.connector.pulsar.testutils.source.cases.MultipleTopicConsumingContext;
-import 
org.apache.flink.connector.testframe.external.ExternalSystemSplitDataWriter;
-import 
org.apache.flink.connector.testframe.external.source.TestingSourceSettings;
-import 
org.apache.flink.tests.util.pulsar.common.KeyedPulsarPartitionDataWriter;
-
-import org.apache.pulsar.client.api.SubscriptionType;
-import org.apache.pulsar.common.util.Murmur3_32Hash;
-
-import static java.util.Collections.singletonList;
-import static org.apache.commons.lang3.RandomStringUtils.randomAlphabetic;
-import static 
org.apache.flink.connector.pulsar.source.enumerator.topic.TopicRange.RANGE_SIZE;
-import static org.apache.pulsar.client.api.SubscriptionType.Key_Shared;
-
-/** We would consume from test splits by using {@link 
SubscriptionType#Key_Shared} subscription. */
-public class KeySharedSubscriptionContext extends 
MultipleTopicConsumingContext {
-
-    private final String keyToRead;
-    private final String keyToExclude;
-
-    public KeySharedSubscriptionContext(PulsarTestEnvironment environment) {
-        super(environment, Key_Shared);
-
-        this.keyToRead = randomAlphabetic(8);
-
-        // Make sure they have different hash code.
-        int readHash = keyHash(keyToRead);
-        String randomKey;
-        do {
-            randomKey = randomAlphabetic(8);
-        } while (keyHash(randomKey) == readHash);
-        this.keyToExclude = randomKey;
-    }
-
-    @Override
-    public ExternalSystemSplitDataWriter<String> createSourceSplitDataWriter(
-            TestingSourceSettings sourceSettings) {
-        String partitionName = generatePartitionName();
-        return new KeyedPulsarPartitionDataWriter(operator, partitionName, 
keyToRead, keyToExclude);
-    }
-
-    @Override
-    protected String displayName() {
-        return "consume message by Key_Shared";
-    }
-
-    @Override
-    protected void setSourceBuilder(PulsarSourceBuilder<String> builder) {
-        int keyHash = keyHash(keyToRead);
-        TopicRange range = new TopicRange(keyHash, keyHash);
-
-        builder.setRangeGenerator(new 
FixedRangeGenerator(singletonList(range)));
-    }
-
-    @Override
-    protected String subscriptionName() {
-        return "pulsar-key-shared-subscription";
-    }
-
-    // This method is copied from Pulsar for calculating message key hash.
-    private int keyHash(String key) {
-        return Murmur3_32Hash.getInstance().makeHash(key.getBytes()) % 
RANGE_SIZE;
-    }
-}
diff --git 
a/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/source/SharedSubscriptionContext.java
 
b/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/source/SharedSubscriptionContext.java
deleted file mode 100644
index fe9f078..0000000
--- 
a/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/source/SharedSubscriptionContext.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.tests.util.pulsar.source;
-
-import org.apache.flink.connector.pulsar.testutils.PulsarTestEnvironment;
-import 
org.apache.flink.connector.pulsar.testutils.source.cases.MultipleTopicConsumingContext;
-
-import org.apache.pulsar.client.api.SubscriptionType;
-
-import static org.apache.pulsar.client.api.SubscriptionType.Shared;
-
-/** We would consume from test splits by using {@link SubscriptionType#Shared} 
subscription. */
-public class SharedSubscriptionContext extends MultipleTopicConsumingContext {
-
-    public SharedSubscriptionContext(PulsarTestEnvironment environment) {
-        super(environment, Shared);
-    }
-
-    @Override
-    protected String displayName() {
-        return "consume message by Shared";
-    }
-
-    @Override
-    protected String subscriptionName() {
-        return "pulsar-shared-subscription";
-    }
-}

Reply via email to