This is an automated email from the ASF dual-hosted git repository.

tison pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-pulsar.git


The following commit(s) were added to refs/heads/main by this push:
     new 3b6c3af  [FLINK-24302] Extend offheap memory for JDK11 test coverage 
(#55)
3b6c3af is described below

commit 3b6c3aff8aeca3cc17673bbc84b90b70c1c680a9
Author: tison <wander4...@gmail.com>
AuthorDate: Thu Jul 20 18:23:14 2023 +0800

    [FLINK-24302] Extend offheap memory for JDK11 test coverage (#55)
    
    Signed-off-by: tison <wander4...@gmail.com>
    Co-authored-by: Yufan Sheng <syh...@gmail.com>
---
 .../docs/connectors/datastream/pulsar.md           |  18 +-
 docs/content/docs/connectors/datastream/pulsar.md  |  17 --
 flink-connector-pulsar-e2e-tests/pom.xml           |  10 --
 .../flink/tests/util/pulsar/PulsarSinkE2ECase.java |   3 -
 .../tests/util/pulsar/PulsarSourceE2ECase.java     |   3 -
 .../util/pulsar/common/FlinkContainerUtils.java    |   7 +-
 .../source/config/PulsarSourceConfigUtils.java     |  13 +-
 .../pulsar/common/MiniClusterTestEnvironment.java  | 184 +++++++++++++++++++++
 .../connector/pulsar/sink/PulsarSinkITCase.java    |   4 +-
 .../pulsar/source/PulsarSourceITCase.java          |   4 +-
 10 files changed, 202 insertions(+), 61 deletions(-)

diff --git a/docs/content.zh/docs/connectors/datastream/pulsar.md 
b/docs/content.zh/docs/connectors/datastream/pulsar.md
index 28c993c..4625324 100644
--- a/docs/content.zh/docs/connectors/datastream/pulsar.md
+++ b/docs/content.zh/docs/connectors/datastream/pulsar.md
@@ -28,7 +28,7 @@ Flink 当前提供 [Apache Pulsar](https://pulsar.apache.org) Source 
和 Sink 
 
 ## 添加依赖
 
-当前支持 Pulsar 2.10.0 及其之后的版本,建议在总是将 Pulsar 升级至最新版。如果想要了解更多关于 Pulsar API 
兼容性设计,可以阅读文档 
[PIP-72](https://github.com/apache/pulsar/wiki/PIP-72%3A-Introduce-Pulsar-Interface-Taxonomy%3A-Audience-and-Stability-Classification)。
+当前支持 Pulsar 2.10.0 及其之后的版本,建议总是将 Pulsar 升级至最新版。如果想要了解更多对于 Pulsar API 
兼容性设计,可以阅读文档 
[PIP-72](https://github.com/apache/pulsar/wiki/PIP-72%3A-Introduce-Pulsar-Interface-Taxonomy%3A-Audience-and-Stability-Classification)。
 
 {{< connector_artifact flink-connector-pulsar pulsar >}}
 
@@ -1088,22 +1088,6 @@ PulsarSink<String> sink = PulsarSink.builder()
 
 用户遇到的问题可能与 Flink 无关,请先升级 Pulsar 的版本、Pulsar 客户端的版本,或者修改 Pulsar 的配置、Pulsar 
连接器的配置来尝试解决问题。
 
-## 已知问题
-
-本节介绍有关 Pulsar 连接器的一些已知问题。
-
-### 在 Java 11 上使用不稳定
-
-Pulsar connector 在 Java 11 中有一些尚未修复的问题。我们当前推荐在 Java 8 环境中运行Pulsar connector.
-
-### 不自动重连,而是抛出TransactionCoordinatorNotFound异常
-
-Pulsar 事务机制仍在积极发展中,当前版本并不稳定。 Pulsar 2.9.2
-引入了这个问题 [a break change](https://github.com/apache/pulsar/pull/13135)。
-如果您使用 Pulsar 2.9.2或更高版本与较旧的 Pulsar 
客户端一起使用,您可能会收到一个“TransactionCoordinatorNotFound”异常。
-
-您可以使用最新的`pulsar-client-all`分支来解决这个问题。
-
 {{< top >}}
 
 [schema-evolution]: 
https://pulsar.apache.org/docs/2.11.x/schema-evolution-compatibility/#schema-evolution
diff --git a/docs/content/docs/connectors/datastream/pulsar.md 
b/docs/content/docs/connectors/datastream/pulsar.md
index 98b701e..5f991fa 100644
--- a/docs/content/docs/connectors/datastream/pulsar.md
+++ b/docs/content/docs/connectors/datastream/pulsar.md
@@ -1276,23 +1276,6 @@ If you have a problem with Pulsar when using Flink, keep 
in mind that Flink only
 and your problem might be independent of Flink and sometimes can be solved by 
upgrading Pulsar brokers,
 reconfiguring Pulsar brokers or reconfiguring Pulsar connector in Flink.
 
-## Known Issues
-
-This section describes some known issues about the Pulsar connectors.
-
-### Unstable on Java 11
-
-Pulsar connector has some known issues on Java 11. It is recommended to run 
Pulsar connector
-on Java 8.
-
-### No TransactionCoordinatorNotFound, but automatic reconnect
-
-Pulsar transactions are still in active development and are not stable. Pulsar 
2.9.2
-introduces [a break change](https://github.com/apache/pulsar/pull/13135) in 
transactions.
-If you use Pulsar 2.9.2 or higher with an older Pulsar client, you might get a 
`TransactionCoordinatorNotFound` exception.
-
-You can use the latest `pulsar-client-all` release to resolve this issue.
-
 {{< top >}}
 
 [schema-evolution]: 
https://pulsar.apache.org/docs/2.11.x/schema-evolution-compatibility/#schema-evolution
diff --git a/flink-connector-pulsar-e2e-tests/pom.xml 
b/flink-connector-pulsar-e2e-tests/pom.xml
index c88e268..b730be7 100644
--- a/flink-connector-pulsar-e2e-tests/pom.xml
+++ b/flink-connector-pulsar-e2e-tests/pom.xml
@@ -75,7 +75,6 @@ under the License.
                                                                        
<includes>
                                                                                
<include>**/*.*</include>
                                                                        
</includes>
-                                                                       
<excludedGroups>${excludeE2E}</excludedGroups>
                                                                        
<systemPropertyVariables>
                                                                                
<moduleDir>${project.basedir}</moduleDir>
                                                                        
</systemPropertyVariables>
@@ -86,15 +85,6 @@ under the License.
                                </plugins>
                        </build>
                </profile>
-               <profile>
-                       <id>java11</id>
-                       <activation>
-                               <jdk>[11,)</jdk>
-                       </activation>
-                       <properties>
-                               
<excludeE2E>org.apache.flink.testutils.junit.FailsOnJava11</excludeE2E>
-                       </properties>
-               </profile>
        </profiles>
 
        <build>
diff --git 
a/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSinkE2ECase.java
 
b/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSinkE2ECase.java
index 7446760..ef63b84 100644
--- 
a/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSinkE2ECase.java
+++ 
b/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSinkE2ECase.java
@@ -31,14 +31,11 @@ import org.apache.flink.streaming.api.CheckpointingMode;
 import org.apache.flink.tests.util.pulsar.common.FlinkContainerUtils;
 import 
org.apache.flink.tests.util.pulsar.common.PulsarContainerTestEnvironment;
 
-import org.junit.jupiter.api.Tag;
-
 import static org.apache.flink.streaming.api.CheckpointingMode.AT_LEAST_ONCE;
 import static org.apache.flink.streaming.api.CheckpointingMode.EXACTLY_ONCE;
 
 /** Pulsar sink E2E test based on the connector testing framework. */
 @SuppressWarnings("unused")
-@Tag("org.apache.flink.testutils.junit.FailsOnJava11")
 public class PulsarSinkE2ECase extends SinkTestSuiteBase<String> {
 
     // Defines the Semantic.
diff --git 
a/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSourceE2ECase.java
 
b/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSourceE2ECase.java
index 4f4cadc..ee362b8 100644
--- 
a/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSourceE2ECase.java
+++ 
b/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSourceE2ECase.java
@@ -31,13 +31,10 @@ import org.apache.flink.streaming.api.CheckpointingMode;
 import org.apache.flink.tests.util.pulsar.common.FlinkContainerUtils;
 import 
org.apache.flink.tests.util.pulsar.common.PulsarContainerTestEnvironment;
 
-import org.junit.jupiter.api.Tag;
-
 import static org.apache.flink.streaming.api.CheckpointingMode.EXACTLY_ONCE;
 
 /** Pulsar source E2E test based on the connector testing framework. */
 @SuppressWarnings("unused")
-@Tag("org.apache.flink.testutils.junit.FailsOnJava11")
 public class PulsarSourceE2ECase extends SourceTestSuiteBase<String> {
 
     // Defines the Semantic.
diff --git 
a/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/common/FlinkContainerUtils.java
 
b/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/common/FlinkContainerUtils.java
index 43e951f..491c0ae 100644
--- 
a/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/common/FlinkContainerUtils.java
+++ 
b/flink-connector-pulsar-e2e-tests/src/test/java/org/apache/flink/tests/util/pulsar/common/FlinkContainerUtils.java
@@ -35,8 +35,11 @@ public class FlinkContainerUtils {
     public static Configuration flinkConfiguration() {
         Configuration configuration = new Configuration();
 
-        // Increase the jvm metaspace memory to avoid 
java.lang.OutOfMemoryError: Metaspace
-        configuration.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY, 
MemorySize.ofMebiBytes(2048));
+        // Increase the jvm metaspace memory to avoid 
java.lang.OutOfMemoryError in
+        // - JobManager  Metaspace
+        // - TaskManager Direct Memory
+        configuration.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY, 
MemorySize.ofMebiBytes(2560));
+        configuration.set(TaskManagerOptions.TASK_OFF_HEAP_MEMORY, 
MemorySize.ofMebiBytes(512));
         configuration.set(TaskManagerOptions.JVM_METASPACE, 
MemorySize.ofMebiBytes(512));
         configuration.set(JobManagerOptions.TOTAL_PROCESS_MEMORY, 
MemorySize.ofMebiBytes(2560));
         configuration.set(JobManagerOptions.JVM_METASPACE, 
MemorySize.ofMebiBytes(1024));
diff --git 
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/config/PulsarSourceConfigUtils.java
 
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/config/PulsarSourceConfigUtils.java
index 04ff85c..5f379d3 100644
--- 
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/config/PulsarSourceConfigUtils.java
+++ 
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/config/PulsarSourceConfigUtils.java
@@ -39,6 +39,7 @@ import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_ADMIN_URL;
 import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_AUTH_PARAMS;
 import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_AUTH_PARAM_MAP;
+import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_MEMORY_LIMIT_BYTES;
 import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_SERVICE_URL;
 import static 
org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_ACKNOWLEDGEMENTS_GROUP_TIME_MICROS;
 import static 
org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_ACK_RECEIPT_ENABLED;
@@ -131,9 +132,15 @@ public final class PulsarSourceConfigUtils {
                 PULSAR_EXPIRE_TIME_OF_INCOMPLETE_CHUNKED_MESSAGE_MILLIS,
                 v -> builder.expireTimeOfIncompleteChunkedMessage(v, 
MILLISECONDS));
         configuration.useOption(PULSAR_POOL_MESSAGES, builder::poolMessages);
-        configuration.useOption(
-                PULSAR_AUTO_SCALED_RECEIVER_QUEUE_SIZE_ENABLED,
-                builder::autoScaledReceiverQueueSizeEnabled);
+
+        if (configuration.contains(PULSAR_MEMORY_LIMIT_BYTES)) {
+            // Force to scale the receiver queue size if the memory limit has 
been configured.
+            builder.autoScaledReceiverQueueSizeEnabled(true);
+        } else {
+            configuration.useOption(
+                    PULSAR_AUTO_SCALED_RECEIVER_QUEUE_SIZE_ENABLED,
+                    builder::autoScaledReceiverQueueSizeEnabled);
+        }
 
         Map<String, String> properties = 
configuration.getProperties(PULSAR_CONSUMER_PROPERTIES);
         if (!properties.isEmpty()) {
diff --git 
a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/common/MiniClusterTestEnvironment.java
 
b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/common/MiniClusterTestEnvironment.java
new file mode 100644
index 0000000..0c65336
--- /dev/null
+++ 
b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/common/MiniClusterTestEnvironment.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.common;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.testframe.environment.ClusterControllable;
+import org.apache.flink.connector.testframe.environment.TestEnvironment;
+import 
org.apache.flink.connector.testframe.environment.TestEnvironmentSettings;
+import org.apache.flink.core.execution.JobClient;
+import 
org.apache.flink.runtime.highavailability.nonha.embedded.HaLeadershipControl;
+import org.apache.flink.runtime.minicluster.RpcServiceSharing;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils;
+import org.apache.flink.runtime.testutils.CommonTestUtils;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.util.TestStreamEnvironment;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+
+import org.apache.commons.io.FileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URI;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Collections;
+import java.util.Optional;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+import static 
org.apache.flink.configuration.MetricOptions.METRIC_FETCHER_UPDATE_INTERVAL;
+import static 
org.apache.flink.connector.testframe.utils.ConnectorTestConstants.METRIC_FETCHER_UPDATE_INTERVAL_MS;
+import static 
org.apache.flink.runtime.jobgraph.SavepointConfigOptions.SAVEPOINT_PATH;
+
+/** Test environment for running jobs on Flink mini-cluster. */
+@Experimental
+public class MiniClusterTestEnvironment implements TestEnvironment, 
ClusterControllable {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(MiniClusterTestEnvironment.class);
+
+    private final MiniClusterWithClientResource miniCluster;
+    private final Path checkpointPath;
+
+    // The index of current running TaskManager
+    private int latestTMIndex = 0;
+    private boolean isStarted = false;
+
+    public MiniClusterTestEnvironment() {
+        Configuration conf = new Configuration();
+        conf.set(METRIC_FETCHER_UPDATE_INTERVAL, 
METRIC_FETCHER_UPDATE_INTERVAL_MS);
+        TaskExecutorResourceUtils.adjustForLocalExecution(conf);
+        this.miniCluster =
+                new MiniClusterWithClientResource(
+                        new MiniClusterResourceConfiguration.Builder()
+                                .setConfiguration(conf)
+                                .setNumberTaskManagers(1)
+                                .setNumberSlotsPerTaskManager(6)
+                                
.setRpcServiceSharing(RpcServiceSharing.DEDICATED)
+                                .withHaLeadershipControl()
+                                .build());
+        try {
+            this.checkpointPath = 
Files.createTempDirectory("minicluster-environment-checkpoint-");
+        } catch (IOException e) {
+            throw new RuntimeException("Failed to create temporary checkpoint 
directory", e);
+        }
+    }
+
+    @Override
+    public StreamExecutionEnvironment createExecutionEnvironment(
+            TestEnvironmentSettings envOptions) {
+        Configuration configuration = new Configuration();
+        if (envOptions.getSavepointRestorePath() != null) {
+            configuration.setString(SAVEPOINT_PATH, 
envOptions.getSavepointRestorePath());
+        }
+        return new TestStreamEnvironment(
+                this.miniCluster.getMiniCluster(),
+                configuration,
+                this.miniCluster.getNumberSlots(),
+                envOptions.getConnectorJarPaths().stream()
+                        .map(url -> new 
org.apache.flink.core.fs.Path(url.getPath()))
+                        .collect(Collectors.toList()),
+                Collections.emptyList());
+    }
+
+    @Override
+    public Endpoint getRestEndpoint() {
+        try {
+            final URI restAddress = 
this.miniCluster.getMiniCluster().getRestAddress().get();
+            return new Endpoint(restAddress.getHost(), restAddress.getPort());
+        } catch (Exception e) {
+            throw new RuntimeException("Failed to get REST endpoint of 
MiniCluster", e);
+        }
+    }
+
+    @Override
+    public String getCheckpointUri() {
+        return checkpointPath.toUri().toString();
+    }
+
+    @Override
+    public void triggerJobManagerFailover(JobClient jobClient, Runnable 
afterFailAction)
+            throws ExecutionException, InterruptedException {
+        final Optional<HaLeadershipControl> controlOptional =
+                miniCluster.getMiniCluster().getHaLeadershipControl();
+        if (!controlOptional.isPresent()) {
+            throw new UnsupportedOperationException(
+                    "This MiniCluster does not support JobManager HA");
+        }
+        final HaLeadershipControl haLeadershipControl = controlOptional.get();
+        
haLeadershipControl.revokeJobMasterLeadership(jobClient.getJobID()).get();
+        afterFailAction.run();
+        
haLeadershipControl.grantJobMasterLeadership(jobClient.getJobID()).get();
+    }
+
+    @Override
+    public void triggerTaskManagerFailover(JobClient jobClient, Runnable 
afterFailAction)
+            throws Exception {
+        terminateTaskManager();
+        CommonTestUtils.waitForNoTaskRunning(
+                () -> 
miniCluster.getRestClusterClient().getJobDetails(jobClient.getJobID()).get());
+        afterFailAction.run();
+        startTaskManager();
+    }
+
+    @Override
+    public void isolateNetwork(JobClient jobClient, Runnable afterFailAction) {
+        throw new UnsupportedOperationException("Cannot isolate network in a 
MiniCluster");
+    }
+
+    @Override
+    public void startUp() throws Exception {
+        if (isStarted) {
+            return;
+        }
+        this.miniCluster.before();
+        LOG.debug("MiniCluster is running");
+        isStarted = true;
+    }
+
+    @Override
+    public void tearDown() throws Exception {
+        if (!isStarted) {
+            return;
+        }
+        isStarted = false;
+        this.miniCluster.after();
+        FileUtils.deleteDirectory(checkpointPath.toFile());
+        LOG.debug("MiniCluster has been tear down");
+    }
+
+    private void terminateTaskManager() throws Exception {
+        miniCluster.getMiniCluster().terminateTaskManager(latestTMIndex).get();
+        LOG.debug("TaskManager {} has been terminated.", latestTMIndex);
+    }
+
+    private void startTaskManager() throws Exception {
+        miniCluster.getMiniCluster().startTaskManager();
+        latestTMIndex++;
+        LOG.debug("New TaskManager {} has been launched.", latestTMIndex);
+    }
+
+    @Override
+    public String toString() {
+        return "MiniCluster";
+    }
+}
diff --git 
a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/sink/PulsarSinkITCase.java
 
b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/sink/PulsarSinkITCase.java
index 6ce020e..6998929 100644
--- 
a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/sink/PulsarSinkITCase.java
+++ 
b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/sink/PulsarSinkITCase.java
@@ -20,6 +20,7 @@ package org.apache.flink.connector.pulsar.sink;
 
 import org.apache.flink.api.common.serialization.SimpleStringSchema;
 import org.apache.flink.connector.base.DeliveryGuarantee;
+import org.apache.flink.connector.pulsar.common.MiniClusterTestEnvironment;
 import org.apache.flink.connector.pulsar.testutils.PulsarTestContextFactory;
 import org.apache.flink.connector.pulsar.testutils.PulsarTestEnvironment;
 import org.apache.flink.connector.pulsar.testutils.PulsarTestSuiteBase;
@@ -29,7 +30,6 @@ import 
org.apache.flink.connector.pulsar.testutils.sink.cases.AutoCreateTopicPro
 import 
org.apache.flink.connector.pulsar.testutils.sink.cases.EncryptedMessageProducingContext;
 import 
org.apache.flink.connector.pulsar.testutils.sink.cases.MultipleTopicsProducingContext;
 import 
org.apache.flink.connector.pulsar.testutils.sink.cases.SingleTopicProducingContext;
-import 
org.apache.flink.connector.testframe.environment.MiniClusterTestEnvironment;
 import org.apache.flink.connector.testframe.junit.annotations.TestContext;
 import org.apache.flink.connector.testframe.junit.annotations.TestEnv;
 import 
org.apache.flink.connector.testframe.junit.annotations.TestExternalSystem;
@@ -43,7 +43,6 @@ import org.apache.flink.test.junit5.MiniClusterExtension;
 import org.apache.flink.testutils.junit.SharedObjectsExtension;
 
 import org.junit.jupiter.api.Nested;
-import org.junit.jupiter.api.Tag;
 import org.junit.jupiter.api.extension.RegisterExtension;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.EnumSource;
@@ -59,7 +58,6 @@ import static org.apache.pulsar.client.api.Schema.STRING;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** Tests for using PulsarSink writing to a Pulsar cluster. */
-@Tag("org.apache.flink.testutils.junit.FailsOnJava11")
 class PulsarSinkITCase {
 
     /** Integration test based on the connector testing framework. */
diff --git 
a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/PulsarSourceITCase.java
 
b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/PulsarSourceITCase.java
index 268e2cb..c8a6b33 100644
--- 
a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/PulsarSourceITCase.java
+++ 
b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/PulsarSourceITCase.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.connector.pulsar.source;
 
+import org.apache.flink.connector.pulsar.common.MiniClusterTestEnvironment;
 import org.apache.flink.connector.pulsar.testutils.PulsarTestContextFactory;
 import org.apache.flink.connector.pulsar.testutils.PulsarTestEnvironment;
 import org.apache.flink.connector.pulsar.testutils.runtime.PulsarRuntime;
@@ -25,7 +26,6 @@ import 
org.apache.flink.connector.pulsar.testutils.source.cases.EncryptedMessage
 import 
org.apache.flink.connector.pulsar.testutils.source.cases.MultipleTopicsConsumingContext;
 import 
org.apache.flink.connector.pulsar.testutils.source.cases.PartialKeysConsumingContext;
 import 
org.apache.flink.connector.pulsar.testutils.source.cases.SingleTopicConsumingContext;
-import 
org.apache.flink.connector.testframe.environment.MiniClusterTestEnvironment;
 import org.apache.flink.connector.testframe.junit.annotations.TestContext;
 import org.apache.flink.connector.testframe.junit.annotations.TestEnv;
 import 
org.apache.flink.connector.testframe.junit.annotations.TestExternalSystem;
@@ -34,7 +34,6 @@ import 
org.apache.flink.connector.testframe.testsuites.SourceTestSuiteBase;
 import org.apache.flink.streaming.api.CheckpointingMode;
 
 import org.apache.pulsar.client.api.SubscriptionType;
-import org.junit.jupiter.api.Tag;
 
 import static org.apache.flink.streaming.api.CheckpointingMode.EXACTLY_ONCE;
 
@@ -42,7 +41,6 @@ import static 
org.apache.flink.streaming.api.CheckpointingMode.EXACTLY_ONCE;
  * Unit test class for {@link PulsarSource}. Used for {@link 
SubscriptionType#Exclusive}
  * subscription.
  */
-@Tag("org.apache.flink.testutils.junit.FailsOnJava11")
 class PulsarSourceITCase extends SourceTestSuiteBase<String> {
 
     // Defines test environment on Flink MiniCluster

Reply via email to