This is an automated email from the ASF dual-hosted git repository.

fcsaky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-pulsar.git


The following commit(s) were added to refs/heads/main by this push:
     new 08c8e2e  [FLINK-38545] Update to Flink 1.20.3 and Pulsar 3.0.5
08c8e2e is described below

commit 08c8e2ea60e8ef01ff9a81a0b2bf8c1a132f5db4
Author: Ferenc Csaky <[email protected]>
AuthorDate: Mon Oct 27 18:04:10 2025 +0100

    [FLINK-38545] Update to Flink 1.20.3 and Pulsar 3.0.5
---
 .../pulsar/source/PulsarSourceOptions.java         | 22 +++++++--------
 .../pulsar/source/config/SourceConfiguration.java  | 11 ++++----
 .../pulsar/common/MiniClusterTestEnvironment.java  |  5 +++-
 .../pulsar/sink/writer/PulsarWriterTest.java       | 17 ++++++++++--
 .../source/enumerator/cursor/StopCursorTest.java   |  6 ++--
 .../reader/PulsarPartitionSplitReaderTest.java     | 32 ++++++++++++++++++++--
 .../source/reader/PulsarSourceReaderTest.java      |  4 +--
 .../PulsarDeserializationSchemaTest.java           |  3 +-
 .../pulsar/table/PulsarChangelogTableITCase.java   | 13 +++++++--
 .../table/testutils/PulsarTableTestUtils.java      |  2 +-
 .../runtime/container/PulsarContainerRuntime.java  |  2 +-
 .../testutils/source/PulsarSourceTestContext.java  |  6 ++--
 .../src/main/resources/META-INF/NOTICE             |  6 ++--
 pom.xml                                            | 27 +++++++++---------
 14 files changed, 105 insertions(+), 51 deletions(-)

diff --git 
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceOptions.java
 
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceOptions.java
index 3346fed..72b2026 100644
--- 
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceOptions.java
+++ 
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceOptions.java
@@ -103,26 +103,26 @@ public final class PulsarSourceOptions {
                                             "The source would use pulsar 
client's internal mechanism and commit cursor in a given interval.")
                                     .build());
 
-    public static final ConfigOption<Long> PULSAR_AUTO_COMMIT_CURSOR_INTERVAL =
+    public static final ConfigOption<Duration> 
PULSAR_AUTO_COMMIT_CURSOR_INTERVAL =
             ConfigOptions.key(SOURCE_CONFIG_PREFIX + 
"autoCommitCursorInterval")
-                    .longType()
-                    .defaultValue(Duration.ofSeconds(5).toMillis())
+                    .durationType()
+                    .defaultValue(Duration.ofSeconds(5))
                     .withDescription(
                             Description.builder()
                                     .text(
                                             "This option is used only when the 
user disables the checkpoint and uses Exclusive or Failover subscription.")
                                     .text(
-                                            " We would automatically commit 
the cursor using the given period (in ms).")
+                                            " We would automatically commit 
the cursor using the given duration.")
                                     .build());
 
-    public static final ConfigOption<Integer> PULSAR_FETCH_ONE_MESSAGE_TIME =
+    public static final ConfigOption<Duration> PULSAR_FETCH_ONE_MESSAGE_TIME =
             ConfigOptions.key(SOURCE_CONFIG_PREFIX + "fetchOneMessageTime")
-                    .intType()
+                    .durationType()
                     .noDefaultValue()
                     .withDescription(
                             Description.builder()
                                     .text(
-                                            "The time (in ms) for fetching one 
message from Pulsar. If time exceed and no message returned from Pulsar.")
+                                            "The time to wait for fetching one 
message from Pulsar. If time exceeded and no message returned from Pulsar.")
                                     .text(
                                             " We would consider there is no 
record at the current topic partition and stop fetching until next switch.")
                                     .linebreak()
@@ -134,13 +134,13 @@ public final class PulsarSourceOptions {
                                             " Add this option in source 
builder avoiding waiting too long.")
                                     .build());
 
-    public static final ConfigOption<Long> PULSAR_MAX_FETCH_TIME =
+    public static final ConfigOption<Duration> PULSAR_MAX_FETCH_TIME =
             ConfigOptions.key(SOURCE_CONFIG_PREFIX + "maxFetchTime")
-                    .longType()
-                    .defaultValue(Duration.ofSeconds(10).toMillis())
+                    .durationType()
+                    .defaultValue(Duration.ofSeconds(10))
                     .withDescription(
                             Description.builder()
-                                    .text("The maximum time (in ms) to wait 
when fetching records.")
+                                    .text("The maximum time to wait when 
fetching records.")
                                     .text(" A longer time increases throughput 
but also latency.")
                                     .text(
                                             " A fetch batch might be finished 
earlier because of %s.",
diff --git 
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/config/SourceConfiguration.java
 
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/config/SourceConfiguration.java
index b10a2bb..fbeeb76 100644
--- 
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/config/SourceConfiguration.java
+++ 
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/config/SourceConfiguration.java
@@ -60,7 +60,7 @@ public class SourceConfiguration extends PulsarConfiguration {
     private final long partitionDiscoveryIntervalMs;
     private final boolean enableAutoAcknowledgeMessage;
     private final long autoCommitCursorInterval;
-    private final int fetchOneMessageTime;
+    private final long fetchOneMessageTime;
     private final Duration maxFetchTime;
     private final int maxFetchRecords;
     private final CursorVerification verifyInitialOffsets;
@@ -78,9 +78,10 @@ public class SourceConfiguration extends PulsarConfiguration 
{
         this.messageQueueCapacity = get(ELEMENT_QUEUE_CAPACITY);
         this.partitionDiscoveryIntervalMs = 
get(PULSAR_PARTITION_DISCOVERY_INTERVAL_MS);
         this.enableAutoAcknowledgeMessage = 
get(PULSAR_ENABLE_AUTO_ACKNOWLEDGE_MESSAGE);
-        this.autoCommitCursorInterval = 
get(PULSAR_AUTO_COMMIT_CURSOR_INTERVAL);
-        this.fetchOneMessageTime = 
getOptional(PULSAR_FETCH_ONE_MESSAGE_TIME).orElse(0);
-        this.maxFetchTime = get(PULSAR_MAX_FETCH_TIME, Duration::ofMillis);
+        this.autoCommitCursorInterval = 
get(PULSAR_AUTO_COMMIT_CURSOR_INTERVAL).toMillis();
+        this.fetchOneMessageTime =
+                
getOptional(PULSAR_FETCH_ONE_MESSAGE_TIME).map(Duration::toMillis).orElse(0L);
+        this.maxFetchTime = get(PULSAR_MAX_FETCH_TIME);
         this.maxFetchRecords = get(PULSAR_MAX_FETCH_RECORDS);
         this.verifyInitialOffsets = get(PULSAR_VERIFY_INITIAL_OFFSETS);
         this.subscriptionName = get(PULSAR_SUBSCRIPTION_NAME);
@@ -136,7 +137,7 @@ public class SourceConfiguration extends 
PulsarConfiguration {
      * messages in {@link RecordsWithSplitIds} when meet this timeout and no 
message consumed.
      */
     public int getFetchOneMessageTime() {
-        return fetchOneMessageTime;
+        return (int) fetchOneMessageTime;
     }
 
     /**
diff --git 
a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/common/MiniClusterTestEnvironment.java
 
b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/common/MiniClusterTestEnvironment.java
index 0c65336..83a7f36 100644
--- 
a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/common/MiniClusterTestEnvironment.java
+++ 
b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/common/MiniClusterTestEnvironment.java
@@ -41,6 +41,7 @@ import java.io.IOException;
 import java.net.URI;
 import java.nio.file.Files;
 import java.nio.file.Path;
+import java.time.Duration;
 import java.util.Collections;
 import java.util.Optional;
 import java.util.concurrent.ExecutionException;
@@ -65,7 +66,9 @@ public class MiniClusterTestEnvironment implements 
TestEnvironment, ClusterContr
 
     public MiniClusterTestEnvironment() {
         Configuration conf = new Configuration();
-        conf.set(METRIC_FETCHER_UPDATE_INTERVAL, 
METRIC_FETCHER_UPDATE_INTERVAL_MS);
+        conf.set(
+                METRIC_FETCHER_UPDATE_INTERVAL,
+                Duration.ofMillis(METRIC_FETCHER_UPDATE_INTERVAL_MS));
         TaskExecutorResourceUtils.adjustForLocalExecution(conf);
         this.miniCluster =
                 new MiniClusterWithClientResource(
diff --git 
a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/sink/writer/PulsarWriterTest.java
 
b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/sink/writer/PulsarWriterTest.java
index 01914cc..d24f230 100644
--- 
a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/sink/writer/PulsarWriterTest.java
+++ 
b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/sink/writer/PulsarWriterTest.java
@@ -19,6 +19,8 @@
 package org.apache.flink.connector.pulsar.sink.writer;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobInfo;
+import org.apache.flink.api.common.TaskInfo;
 import org.apache.flink.api.common.operators.MailboxExecutor;
 import org.apache.flink.api.common.operators.ProcessingTimeService;
 import org.apache.flink.api.common.serialization.SerializationSchema;
@@ -45,7 +47,7 @@ import org.apache.flink.metrics.groups.OperatorIOMetricGroup;
 import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
 import org.apache.flink.metrics.testutils.MetricListener;
 import org.apache.flink.runtime.mailbox.SyncMailboxExecutor;
-import org.apache.flink.runtime.metrics.groups.InternalSinkWriterMetricGroup;
+import org.apache.flink.runtime.metrics.groups.MetricsGroupTestUtils;
 import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
 import org.apache.flink.util.UserCodeClassLoader;
@@ -173,7 +175,8 @@ class PulsarWriterTest extends PulsarTestSuiteBase {
                     
UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup()
                             .getIOMetricGroup();
             MetricGroup metricGroup = metricListener.getMetricGroup();
-            this.metricGroup = InternalSinkWriterMetricGroup.mock(metricGroup, 
ioMetricGroup);
+            this.metricGroup =
+                    MetricsGroupTestUtils.mockWriterMetricGroup(metricGroup, 
ioMetricGroup);
             this.timeService = new TestProcessingTimeService();
         }
 
@@ -251,6 +254,16 @@ class PulsarWriterTest extends PulsarTestSuiteBase {
                 }
             };
         }
+
+        @Override
+        public JobInfo getJobInfo() {
+            return null;
+        }
+
+        @Override
+        public TaskInfo getTaskInfo() {
+            return null;
+        }
     }
 
     private static class MockSinkWriterContext implements SinkWriter.Context {
diff --git 
a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/StopCursorTest.java
 
b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/StopCursorTest.java
index fc49073..2536753 100644
--- 
a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/StopCursorTest.java
+++ 
b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/StopCursorTest.java
@@ -33,6 +33,8 @@ import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Schema;
 import org.junit.jupiter.api.Test;
 
+import java.time.Duration;
+
 import static java.util.Collections.singletonList;
 import static org.apache.commons.lang3.RandomStringUtils.randomAlphabetic;
 import static org.apache.commons.lang3.RandomStringUtils.randomAlphanumeric;
@@ -95,8 +97,8 @@ class StopCursorTest extends PulsarTestSuiteBase {
     private SourceConfiguration sourceConfig() {
         Configuration config = operator().config();
         config.set(PULSAR_MAX_FETCH_RECORDS, 1);
-        config.set(PULSAR_FETCH_ONE_MESSAGE_TIME, 2000);
-        config.set(PULSAR_MAX_FETCH_TIME, 3000L);
+        config.set(PULSAR_FETCH_ONE_MESSAGE_TIME, Duration.ofSeconds(2));
+        config.set(PULSAR_MAX_FETCH_TIME, Duration.ofSeconds(3));
         config.set(PULSAR_SUBSCRIPTION_NAME, randomAlphabetic(10));
         config.set(PULSAR_ENABLE_AUTO_ACKNOWLEDGE_MESSAGE, true);
         return new SourceConfiguration(config);
diff --git 
a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/PulsarPartitionSplitReaderTest.java
 
b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/PulsarPartitionSplitReaderTest.java
index b2ef7d9..4b439a0 100644
--- 
a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/PulsarPartitionSplitReaderTest.java
+++ 
b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/PulsarPartitionSplitReaderTest.java
@@ -37,6 +37,7 @@ import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.junit.jupiter.api.Test;
 
 import java.io.IOException;
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.BitSet;
 import java.util.List;
@@ -107,6 +108,7 @@ class PulsarPartitionSplitReaderTest extends 
PulsarTestSuiteBase {
         String topicName = randomAlphabetic(10);
 
         operator().setupTopic(topicName, STRING, () -> randomAlphabetic(10));
+        waitForTopicMetadataReady(topicName, 0);
         seekStartPositionAndHandleSplit(splitReader, topicName, 0);
         fetchedMessages(splitReader, 0, true);
     }
@@ -118,6 +120,7 @@ class PulsarPartitionSplitReaderTest extends 
PulsarTestSuiteBase {
         String topicName = randomAlphabetic(10);
 
         operator().setupTopic(topicName, STRING, () -> randomAlphabetic(10));
+        waitForTopicMetadataReady(topicName, 0);
         seekStartPositionAndHandleSplit(splitReader, topicName, 0, 
MessageId.earliest);
         fetchedMessages(splitReader, NUM_RECORDS_PER_PARTITION, true);
     }
@@ -128,6 +131,7 @@ class PulsarPartitionSplitReaderTest extends 
PulsarTestSuiteBase {
         String topicName = randomAlphabetic(10);
 
         operator().setupTopic(topicName, STRING, () -> randomAlphabetic(10));
+        waitForTopicMetadataReady(topicName, 0);
         seekStartPositionAndHandleSplit(splitReader, topicName, 0, 
MessageId.latest);
         fetchedMessages(splitReader, 0, true);
     }
@@ -139,6 +143,7 @@ class PulsarPartitionSplitReaderTest extends 
PulsarTestSuiteBase {
         String topicName = randomAlphabetic(10);
 
         operator().setupTopic(topicName, STRING, () -> randomAlphabetic(10));
+        waitForTopicMetadataReady(topicName, 0);
         MessageIdImpl lastMessageId =
                 (MessageIdImpl)
                         operator()
@@ -214,6 +219,7 @@ class PulsarPartitionSplitReaderTest extends 
PulsarTestSuiteBase {
         String topicName = randomAlphabetic(10);
 
         operator().setupTopic(topicName, STRING, () -> randomAlphabetic(10));
+        waitForTopicMetadataReady(topicName, 0);
         handleSplit(splitReader, topicName, 0);
         fetchedMessages(splitReader, 0, true);
     }
@@ -225,6 +231,7 @@ class PulsarPartitionSplitReaderTest extends 
PulsarTestSuiteBase {
         String topicName = randomAlphabetic(10);
 
         operator().setupTopic(topicName, STRING, () -> randomAlphabetic(10));
+        waitForTopicMetadataReady(topicName, 0);
         handleSplit(splitReader, topicName, 0, MessageId.latest);
         fetchedMessages(splitReader, 0, true);
     }
@@ -236,6 +243,7 @@ class PulsarPartitionSplitReaderTest extends 
PulsarTestSuiteBase {
         String topicName = randomAlphabetic(10);
 
         operator().setupTopic(topicName, STRING, () -> randomAlphabetic(10));
+        waitForTopicMetadataReady(topicName, 0);
         handleSplit(splitReader, topicName, 0, MessageId.earliest);
         fetchedMessages(splitReader, NUM_RECORDS_PER_PARTITION, true);
     }
@@ -247,6 +255,8 @@ class PulsarPartitionSplitReaderTest extends 
PulsarTestSuiteBase {
         String topicName = randomAlphabetic(10);
 
         operator().setupTopic(topicName, STRING, () -> randomAlphabetic(10), 
20, false);
+        waitForTopicMetadataReady(topicName, 0);
+
         MessageIdImpl lastMessageId =
                 (MessageIdImpl)
                         operator()
@@ -272,6 +282,7 @@ class PulsarPartitionSplitReaderTest extends 
PulsarTestSuiteBase {
 
         int numRecords = 20;
         operator().setupTopic(topicName, STRING, () -> randomAlphabetic(10), 
numRecords, true);
+        waitForTopicMetadataReady(topicName, 0);
         MessageIdImpl lastMessageId =
                 (MessageIdImpl)
                         operator()
@@ -310,8 +321,8 @@ class PulsarPartitionSplitReaderTest extends 
PulsarTestSuiteBase {
     private SourceConfiguration sourceConfig() {
         Configuration config = operator().config();
         config.set(PULSAR_MAX_FETCH_RECORDS, 1);
-        config.set(PULSAR_FETCH_ONE_MESSAGE_TIME, 2000);
-        config.set(PULSAR_MAX_FETCH_TIME, 3000L);
+        config.set(PULSAR_FETCH_ONE_MESSAGE_TIME, Duration.ofSeconds(2));
+        config.set(PULSAR_MAX_FETCH_TIME, Duration.ofSeconds(3));
         config.set(PULSAR_SUBSCRIPTION_NAME, randomAlphabetic(10));
         config.set(PULSAR_ENABLE_AUTO_ACKNOWLEDGE_MESSAGE, true);
 
@@ -402,4 +413,21 @@ class PulsarPartitionSplitReaderTest extends 
PulsarTestSuiteBase {
 
         return messages;
     }
+
+    // Wait for topic metadata to stabilize after setupTopic
+    private void waitForTopicMetadataReady(String topicName, int partitionId) 
throws Exception {
+        String partitionTopicName = topicNameWithPartition(topicName, 
partitionId);
+        waitUtil(
+                () -> {
+                    try {
+                        MessageId id =
+                                
operator().admin().topics().getLastMessageId(partitionTopicName);
+                        return id != null;
+                    } catch (Exception e) {
+                        return false;
+                    }
+                },
+                ofSeconds(30),
+                "Topic metadata not ready for " + partitionTopicName);
+    }
 }
diff --git 
a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/PulsarSourceReaderTest.java
 
b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/PulsarSourceReaderTest.java
index 54ed4b2..3d4619d 100644
--- 
a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/PulsarSourceReaderTest.java
+++ 
b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/PulsarSourceReaderTest.java
@@ -228,8 +228,8 @@ class PulsarSourceReaderTest extends PulsarTestSuiteBase {
         Configuration configuration = operator().config();
 
         configuration.set(PULSAR_MAX_FETCH_RECORDS, 1);
-        configuration.set(PULSAR_FETCH_ONE_MESSAGE_TIME, 2000);
-        configuration.set(PULSAR_MAX_FETCH_TIME, 3000L);
+        configuration.set(PULSAR_FETCH_ONE_MESSAGE_TIME, 
Duration.ofSeconds(2));
+        configuration.set(PULSAR_MAX_FETCH_TIME, Duration.ofSeconds(3));
         configuration.set(PULSAR_SUBSCRIPTION_NAME, randomAlphabetic(10));
 
         PulsarDeserializationSchema<Integer> deserializationSchema =
diff --git 
a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/deserializer/PulsarDeserializationSchemaTest.java
 
b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/deserializer/PulsarDeserializationSchemaTest.java
index de0a729..ecdd820 100644
--- 
a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/deserializer/PulsarDeserializationSchemaTest.java
+++ 
b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/deserializer/PulsarDeserializationSchemaTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.connector.pulsar.source.reader.deserializer;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.eventtime.WatermarkStrategy;
 import org.apache.flink.api.common.serialization.SimpleStringSchema;
 import org.apache.flink.api.common.typeinfo.Types;
@@ -103,7 +104,7 @@ class PulsarDeserializationSchemaTest extends 
PulsarTestSuiteBase {
     @Test
     void createFromFlinkTypeInformation() throws Exception {
         PulsarDeserializationSchema<String> schema =
-                new PulsarTypeInformationWrapper<>(Types.STRING, null);
+                new PulsarTypeInformationWrapper<>(Types.STRING, new 
ExecutionConfig());
         schema.open(new PulsarTestingDeserializationContext(), sourceConfig);
         assertThatCode(() -> 
InstantiationUtil.clone(schema)).doesNotThrowAnyException();
 
diff --git 
a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/table/PulsarChangelogTableITCase.java
 
b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/table/PulsarChangelogTableITCase.java
index c448a08..f95cb8f 100644
--- 
a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/table/PulsarChangelogTableITCase.java
+++ 
b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/table/PulsarChangelogTableITCase.java
@@ -23,6 +23,7 @@ import 
org.apache.flink.formats.json.debezium.DebeziumJsonFormatFactory;
 import org.apache.flink.formats.json.maxwell.MaxwellJsonFormatFactory;
 import org.apache.flink.table.api.TableConfig;
 import org.apache.flink.table.api.TableResult;
+import org.apache.flink.table.api.config.AggregatePhaseStrategy;
 import org.apache.flink.table.api.config.ExecutionConfigOptions;
 import org.apache.flink.table.api.config.OptimizerConfigOptions;
 import org.apache.flink.test.junit5.MiniClusterExtension;
@@ -53,7 +54,9 @@ public class PulsarChangelogTableITCase extends 
PulsarTableTestBase {
         tableConf.set(
                 ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, 
Duration.ofSeconds(1));
         tableConf.set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, 5000L);
-        
tableConf.set(OptimizerConfigOptions.TABLE_OPTIMIZER_AGG_PHASE_STRATEGY, 
"TWO_PHASE");
+        tableConf.set(
+                OptimizerConfigOptions.TABLE_OPTIMIZER_AGG_PHASE_STRATEGY,
+                AggregatePhaseStrategy.TWO_PHASE);
 
         // ---------- Write the Debezium json into Pulsar -------------------
         List<String> lines = readLines("debezium-data-schema-exclude.txt");
@@ -182,7 +185,9 @@ public class PulsarChangelogTableITCase extends 
PulsarTableTestBase {
         tableConf.set(
                 ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, 
Duration.ofSeconds(1));
         tableConf.set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, 5000L);
-        
tableConf.set(OptimizerConfigOptions.TABLE_OPTIMIZER_AGG_PHASE_STRATEGY, 
"TWO_PHASE");
+        tableConf.set(
+                OptimizerConfigOptions.TABLE_OPTIMIZER_AGG_PHASE_STRATEGY,
+                AggregatePhaseStrategy.TWO_PHASE);
 
         // ---------- Write the Canal json into Pulsar -------------------
         List<String> lines = readLines("canal-data.txt");
@@ -323,7 +328,9 @@ public class PulsarChangelogTableITCase extends 
PulsarTableTestBase {
         tableConf.set(
                 ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, 
Duration.ofSeconds(1));
         tableConf.set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, 5000L);
-        
tableConf.set(OptimizerConfigOptions.TABLE_OPTIMIZER_AGG_PHASE_STRATEGY, 
"TWO_PHASE");
+        tableConf.set(
+                OptimizerConfigOptions.TABLE_OPTIMIZER_AGG_PHASE_STRATEGY,
+                AggregatePhaseStrategy.TWO_PHASE);
 
         // ---------- Write the Maxwell json into Pulsar -------------------
         List<String> lines = readLines("maxwell-data.txt");
diff --git 
a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/table/testutils/PulsarTableTestUtils.java
 
b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/table/testutils/PulsarTableTestUtils.java
index fe29b4b..8c89fa3 100644
--- 
a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/table/testutils/PulsarTableTestUtils.java
+++ 
b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/table/testutils/PulsarTableTestUtils.java
@@ -97,7 +97,7 @@ public class PulsarTableTestUtils {
         Collections.sort(expected);
         CommonTestUtils.waitUtil(
                 () -> {
-                    List<String> actual = 
TestValuesTableFactory.getResults(sinkName);
+                    List<String> actual = 
TestValuesTableFactory.getResultsAsStrings(sinkName);
                     Collections.sort(actual);
                     return expected.equals(actual);
                 },
diff --git 
a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/container/PulsarContainerRuntime.java
 
b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/container/PulsarContainerRuntime.java
index 097927e..9bd2252 100644
--- 
a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/container/PulsarContainerRuntime.java
+++ 
b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/container/PulsarContainerRuntime.java
@@ -54,7 +54,7 @@ public class PulsarContainerRuntime implements PulsarRuntime {
     private static final String PULSAR_ADMIN_URL =
             String.format("http://%s:%d";, PULSAR_INTERNAL_HOSTNAME, 
BROKER_HTTP_PORT);
 
-    private static final String CURRENT_VERSION = "3.0.0";
+    private static final String CURRENT_VERSION = "3.0.5";
 
     private final PulsarContainer container;
     private final AtomicBoolean started;
diff --git 
a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/source/PulsarSourceTestContext.java
 
b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/source/PulsarSourceTestContext.java
index aece284..76d058d 100644
--- 
a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/source/PulsarSourceTestContext.java
+++ 
b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/source/PulsarSourceTestContext.java
@@ -50,7 +50,7 @@ import static 
org.apache.pulsar.client.api.RegexSubscriptionMode.AllTopics;
 public abstract class PulsarSourceTestContext extends PulsarTestContext<String>
         implements DataStreamSourceExternalContext<String> {
 
-    private static final long DISCOVERY_INTERVAL = 1000L;
+    private static final long DISCOVERY_INTERVAL_MS = 1000L;
     private static final int BATCH_DATA_SIZE = 300;
 
     protected PulsarSourceTestContext(PulsarTestEnvironment environment) {
@@ -65,7 +65,7 @@ public abstract class PulsarSourceTestContext extends 
PulsarTestContext<String>
                         .setServiceUrl(operator.serviceUrl())
                         .setTopicPattern(topicPattern(), AllTopics)
                         .setSubscriptionName(subscriptionName())
-                        .setConfig(PULSAR_PARTITION_DISCOVERY_INTERVAL_MS, 
DISCOVERY_INTERVAL);
+                        .setConfig(PULSAR_PARTITION_DISCOVERY_INTERVAL_MS, 
DISCOVERY_INTERVAL_MS);
 
         // Set extra configuration for source builder.
         setSourceBuilder(builder);
@@ -117,7 +117,7 @@ public abstract class PulsarSourceTestContext extends 
PulsarTestContext<String>
 
     /**
      * The topic pattern which is used in Pulsar topic auto discovery. It was 
discovered every
-     * {@link #DISCOVERY_INTERVAL} ms;
+     * {@link #DISCOVERY_INTERVAL_MS};
      */
     protected abstract String topicPattern();
 
diff --git a/flink-sql-connector-pulsar/src/main/resources/META-INF/NOTICE 
b/flink-sql-connector-pulsar/src/main/resources/META-INF/NOTICE
index d8f02d8..9243328 100644
--- a/flink-sql-connector-pulsar/src/main/resources/META-INF/NOTICE
+++ b/flink-sql-connector-pulsar/src/main/resources/META-INF/NOTICE
@@ -7,9 +7,9 @@ The Apache Software Foundation (http://www.apache.org/).
 This project bundles the following dependencies under the Apache Software 
License 2.0 (http://www.apache.org/licenses/LICENSE-2.0.txt)
 
 - com.fasterxml.jackson.core:jackson-annotations:2.13.4
-- org.apache.pulsar:pulsar-client-admin-api:3.0.2
-- org.apache.pulsar:pulsar-client-all:3.0.2
-- org.apache.pulsar:pulsar-client-api:3.0.2
+- org.apache.pulsar:pulsar-client-admin-api:3.0.5
+- org.apache.pulsar:pulsar-client-all:3.0.5
+- org.apache.pulsar:pulsar-client-api:3.0.5
 
 This project bundles the following dependencies under the Bouncy Castle 
license.
 See bundled license files for details.
diff --git a/pom.xml b/pom.xml
index 210b074..b31790a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -51,9 +51,8 @@ under the License.
     </modules>
 
     <properties>
-        <flink.version>1.18.0</flink.version>
-        <flink-ci-tools.version>1.18.0</flink-ci-tools.version>
-        <pulsar.version>3.0.2</pulsar.version>
+        <flink.version>1.20.3</flink.version>
+        <pulsar.version>3.0.5</pulsar.version>
         <scala.binary.version>2.12</scala.binary.version>
         <bouncycastle.version>1.69</bouncycastle.version>
 
@@ -81,6 +80,7 @@ under the License.
         <kryo.version>2.24.0</kryo.version>
         <objenesis.version>3.3</objenesis.version>
         <jackson-bom.version>2.13.4.20221013</jackson-bom.version>
+        <snappy.java.version>1.1.10.4</snappy.java.version>
 
         <os-maven-plugin.version>1.7.0</os-maven-plugin.version>
         <protobuf-maven-plugin.version>0.6.1</protobuf-maven-plugin.version>
@@ -391,7 +391,7 @@ under the License.
                 <version>${jackson-bom.version}</version>
             </dependency>
 
-            <!-- For dependency convergence -->
+            <!-- Start of pinned versions for dependency convergence -->
 
             <dependency>
                 <groupId>org.apache.commons</groupId>
@@ -399,38 +399,30 @@ under the License.
                 <version>${commons-compress.version}</version>
             </dependency>
 
-            <!-- For dependency convergence  -->
             <dependency>
                 <groupId>org.apache.commons</groupId>
                 <artifactId>commons-lang3</artifactId>
                 <version>${commons-lang3.version}</version>
             </dependency>
 
-            <!-- For dependency convergence  -->
             <dependency>
                 <groupId>commons-io</groupId>
                 <artifactId>commons-io</artifactId>
                 <version>${commons-io.version}</version>
             </dependency>
 
-            <!-- For dependency convergence -->
-
             <dependency>
                 <groupId>net.bytebuddy</groupId>
                 <artifactId>byte-buddy</artifactId>
                 <version>${byte-buddy.version}</version>
             </dependency>
 
-            <!-- For dependency convergence -->
-
             <dependency>
                 <groupId>com.esotericsoftware.kryo</groupId>
                 <artifactId>kryo</artifactId>
                 <version>${kryo.version}</version>
             </dependency>
 
-            <!-- For dependency convergence -->
-
             <dependency>
                 <groupId>org.objenesis</groupId>
                 <artifactId>objenesis</artifactId>
@@ -443,7 +435,6 @@ under the License.
                 <version>${protobuf.version}</version>
             </dependency>
 
-            <!-- For dependency convergence -->
             <dependency>
                 <groupId>org.scala-lang</groupId>
                 <artifactId>scala-reflect</artifactId>
@@ -455,6 +446,14 @@ under the License.
                 <artifactId>scala-library</artifactId>
                 <version>${scala-library.version}</version>
             </dependency>
+
+            <dependency>
+                <groupId>org.xerial.snappy</groupId>
+                <artifactId>snappy-java</artifactId>
+                <version>${snappy.java.version}</version>
+            </dependency>
+
+            <!-- End of pinned versions for dependency convergence -->
         </dependencies>
     </dependencyManagement>
 
@@ -492,7 +491,7 @@ under the License.
                     <dependency>
                         <groupId>org.apache.flink</groupId>
                         <artifactId>flink-ci-tools</artifactId>
-                        <version>${flink-ci-tools.version}</version>
+                        <version>${flink.version}</version>
                     </dependency>
                 </dependencies>
             </plugin>

Reply via email to