This is an automated email from the ASF dual-hosted git repository.
fcsaky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-pulsar.git
The following commit(s) were added to refs/heads/main by this push:
new 08c8e2e [FLINK-38545] Update to Flink 1.20.3 and Pulsar 3.0.5
08c8e2e is described below
commit 08c8e2ea60e8ef01ff9a81a0b2bf8c1a132f5db4
Author: Ferenc Csaky <[email protected]>
AuthorDate: Mon Oct 27 18:04:10 2025 +0100
[FLINK-38545] Update to Flink 1.20.3 and Pulsar 3.0.5
---
.../pulsar/source/PulsarSourceOptions.java | 22 +++++++--------
.../pulsar/source/config/SourceConfiguration.java | 11 ++++----
.../pulsar/common/MiniClusterTestEnvironment.java | 5 +++-
.../pulsar/sink/writer/PulsarWriterTest.java | 17 ++++++++++--
.../source/enumerator/cursor/StopCursorTest.java | 6 ++--
.../reader/PulsarPartitionSplitReaderTest.java | 32 ++++++++++++++++++++--
.../source/reader/PulsarSourceReaderTest.java | 4 +--
.../PulsarDeserializationSchemaTest.java | 3 +-
.../pulsar/table/PulsarChangelogTableITCase.java | 13 +++++++--
.../table/testutils/PulsarTableTestUtils.java | 2 +-
.../runtime/container/PulsarContainerRuntime.java | 2 +-
.../testutils/source/PulsarSourceTestContext.java | 6 ++--
.../src/main/resources/META-INF/NOTICE | 6 ++--
pom.xml | 27 +++++++++---------
14 files changed, 105 insertions(+), 51 deletions(-)
diff --git
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceOptions.java
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceOptions.java
index 3346fed..72b2026 100644
---
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceOptions.java
+++
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceOptions.java
@@ -103,26 +103,26 @@ public final class PulsarSourceOptions {
"The source would use pulsar
client's internal mechanism and commit cursor in a given interval.")
.build());
- public static final ConfigOption<Long> PULSAR_AUTO_COMMIT_CURSOR_INTERVAL =
+ public static final ConfigOption<Duration>
PULSAR_AUTO_COMMIT_CURSOR_INTERVAL =
ConfigOptions.key(SOURCE_CONFIG_PREFIX +
"autoCommitCursorInterval")
- .longType()
- .defaultValue(Duration.ofSeconds(5).toMillis())
+ .durationType()
+ .defaultValue(Duration.ofSeconds(5))
.withDescription(
Description.builder()
.text(
"This option is used only when the
user disables the checkpoint and uses Exclusive or Failover subscription.")
.text(
- " We would automatically commit
the cursor using the given period (in ms).")
+ " We would automatically commit
the cursor using the given duration.")
.build());
- public static final ConfigOption<Integer> PULSAR_FETCH_ONE_MESSAGE_TIME =
+ public static final ConfigOption<Duration> PULSAR_FETCH_ONE_MESSAGE_TIME =
ConfigOptions.key(SOURCE_CONFIG_PREFIX + "fetchOneMessageTime")
- .intType()
+ .durationType()
.noDefaultValue()
.withDescription(
Description.builder()
.text(
- "The time (in ms) for fetching one
message from Pulsar. If time exceed and no message returned from Pulsar.")
+ "The time to wait for fetching one
message from Pulsar. If time exceeded and no message returned from Pulsar.")
.text(
" We would consider there is no
record at the current topic partition and stop fetching until next switch.")
.linebreak()
@@ -134,13 +134,13 @@ public final class PulsarSourceOptions {
" Add this option in source
builder avoiding waiting too long.")
.build());
- public static final ConfigOption<Long> PULSAR_MAX_FETCH_TIME =
+ public static final ConfigOption<Duration> PULSAR_MAX_FETCH_TIME =
ConfigOptions.key(SOURCE_CONFIG_PREFIX + "maxFetchTime")
- .longType()
- .defaultValue(Duration.ofSeconds(10).toMillis())
+ .durationType()
+ .defaultValue(Duration.ofSeconds(10))
.withDescription(
Description.builder()
- .text("The maximum time (in ms) to wait
when fetching records.")
+ .text("The maximum time to wait when
fetching records.")
.text(" A longer time increases throughput
but also latency.")
.text(
" A fetch batch might be finished
earlier because of %s.",
diff --git
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/config/SourceConfiguration.java
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/config/SourceConfiguration.java
index b10a2bb..fbeeb76 100644
---
a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/config/SourceConfiguration.java
+++
b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/config/SourceConfiguration.java
@@ -60,7 +60,7 @@ public class SourceConfiguration extends PulsarConfiguration {
private final long partitionDiscoveryIntervalMs;
private final boolean enableAutoAcknowledgeMessage;
private final long autoCommitCursorInterval;
- private final int fetchOneMessageTime;
+ private final long fetchOneMessageTime;
private final Duration maxFetchTime;
private final int maxFetchRecords;
private final CursorVerification verifyInitialOffsets;
@@ -78,9 +78,10 @@ public class SourceConfiguration extends PulsarConfiguration
{
this.messageQueueCapacity = get(ELEMENT_QUEUE_CAPACITY);
this.partitionDiscoveryIntervalMs =
get(PULSAR_PARTITION_DISCOVERY_INTERVAL_MS);
this.enableAutoAcknowledgeMessage =
get(PULSAR_ENABLE_AUTO_ACKNOWLEDGE_MESSAGE);
- this.autoCommitCursorInterval =
get(PULSAR_AUTO_COMMIT_CURSOR_INTERVAL);
- this.fetchOneMessageTime =
getOptional(PULSAR_FETCH_ONE_MESSAGE_TIME).orElse(0);
- this.maxFetchTime = get(PULSAR_MAX_FETCH_TIME, Duration::ofMillis);
+ this.autoCommitCursorInterval =
get(PULSAR_AUTO_COMMIT_CURSOR_INTERVAL).toMillis();
+ this.fetchOneMessageTime =
+
getOptional(PULSAR_FETCH_ONE_MESSAGE_TIME).map(Duration::toMillis).orElse(0L);
+ this.maxFetchTime = get(PULSAR_MAX_FETCH_TIME);
this.maxFetchRecords = get(PULSAR_MAX_FETCH_RECORDS);
this.verifyInitialOffsets = get(PULSAR_VERIFY_INITIAL_OFFSETS);
this.subscriptionName = get(PULSAR_SUBSCRIPTION_NAME);
@@ -136,7 +137,7 @@ public class SourceConfiguration extends
PulsarConfiguration {
* messages in {@link RecordsWithSplitIds} when meet this timeout and no
message consumed.
*/
public int getFetchOneMessageTime() {
- return fetchOneMessageTime;
+ return (int) fetchOneMessageTime;
}
/**
diff --git
a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/common/MiniClusterTestEnvironment.java
b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/common/MiniClusterTestEnvironment.java
index 0c65336..83a7f36 100644
---
a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/common/MiniClusterTestEnvironment.java
+++
b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/common/MiniClusterTestEnvironment.java
@@ -41,6 +41,7 @@ import java.io.IOException;
import java.net.URI;
import java.nio.file.Files;
import java.nio.file.Path;
+import java.time.Duration;
import java.util.Collections;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
@@ -65,7 +66,9 @@ public class MiniClusterTestEnvironment implements
TestEnvironment, ClusterContr
public MiniClusterTestEnvironment() {
Configuration conf = new Configuration();
- conf.set(METRIC_FETCHER_UPDATE_INTERVAL,
METRIC_FETCHER_UPDATE_INTERVAL_MS);
+ conf.set(
+ METRIC_FETCHER_UPDATE_INTERVAL,
+ Duration.ofMillis(METRIC_FETCHER_UPDATE_INTERVAL_MS));
TaskExecutorResourceUtils.adjustForLocalExecution(conf);
this.miniCluster =
new MiniClusterWithClientResource(
diff --git
a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/sink/writer/PulsarWriterTest.java
b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/sink/writer/PulsarWriterTest.java
index 01914cc..d24f230 100644
---
a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/sink/writer/PulsarWriterTest.java
+++
b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/sink/writer/PulsarWriterTest.java
@@ -19,6 +19,8 @@
package org.apache.flink.connector.pulsar.sink.writer;
import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobInfo;
+import org.apache.flink.api.common.TaskInfo;
import org.apache.flink.api.common.operators.MailboxExecutor;
import org.apache.flink.api.common.operators.ProcessingTimeService;
import org.apache.flink.api.common.serialization.SerializationSchema;
@@ -45,7 +47,7 @@ import org.apache.flink.metrics.groups.OperatorIOMetricGroup;
import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
import org.apache.flink.metrics.testutils.MetricListener;
import org.apache.flink.runtime.mailbox.SyncMailboxExecutor;
-import org.apache.flink.runtime.metrics.groups.InternalSinkWriterMetricGroup;
+import org.apache.flink.runtime.metrics.groups.MetricsGroupTestUtils;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
import org.apache.flink.util.UserCodeClassLoader;
@@ -173,7 +175,8 @@ class PulsarWriterTest extends PulsarTestSuiteBase {
UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup()
.getIOMetricGroup();
MetricGroup metricGroup = metricListener.getMetricGroup();
- this.metricGroup = InternalSinkWriterMetricGroup.mock(metricGroup,
ioMetricGroup);
+ this.metricGroup =
+ MetricsGroupTestUtils.mockWriterMetricGroup(metricGroup,
ioMetricGroup);
this.timeService = new TestProcessingTimeService();
}
@@ -251,6 +254,16 @@ class PulsarWriterTest extends PulsarTestSuiteBase {
}
};
}
+
+ @Override
+ public JobInfo getJobInfo() {
+ return null;
+ }
+
+ @Override
+ public TaskInfo getTaskInfo() {
+ return null;
+ }
}
private static class MockSinkWriterContext implements SinkWriter.Context {
diff --git
a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/StopCursorTest.java
b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/StopCursorTest.java
index fc49073..2536753 100644
---
a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/StopCursorTest.java
+++
b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/StopCursorTest.java
@@ -33,6 +33,8 @@ import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Schema;
import org.junit.jupiter.api.Test;
+import java.time.Duration;
+
import static java.util.Collections.singletonList;
import static org.apache.commons.lang3.RandomStringUtils.randomAlphabetic;
import static org.apache.commons.lang3.RandomStringUtils.randomAlphanumeric;
@@ -95,8 +97,8 @@ class StopCursorTest extends PulsarTestSuiteBase {
private SourceConfiguration sourceConfig() {
Configuration config = operator().config();
config.set(PULSAR_MAX_FETCH_RECORDS, 1);
- config.set(PULSAR_FETCH_ONE_MESSAGE_TIME, 2000);
- config.set(PULSAR_MAX_FETCH_TIME, 3000L);
+ config.set(PULSAR_FETCH_ONE_MESSAGE_TIME, Duration.ofSeconds(2));
+ config.set(PULSAR_MAX_FETCH_TIME, Duration.ofSeconds(3));
config.set(PULSAR_SUBSCRIPTION_NAME, randomAlphabetic(10));
config.set(PULSAR_ENABLE_AUTO_ACKNOWLEDGE_MESSAGE, true);
return new SourceConfiguration(config);
diff --git
a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/PulsarPartitionSplitReaderTest.java
b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/PulsarPartitionSplitReaderTest.java
index b2ef7d9..4b439a0 100644
---
a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/PulsarPartitionSplitReaderTest.java
+++
b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/PulsarPartitionSplitReaderTest.java
@@ -37,6 +37,7 @@ import org.apache.pulsar.client.impl.MessageIdImpl;
import org.junit.jupiter.api.Test;
import java.io.IOException;
+import java.time.Duration;
import java.util.ArrayList;
import java.util.BitSet;
import java.util.List;
@@ -107,6 +108,7 @@ class PulsarPartitionSplitReaderTest extends
PulsarTestSuiteBase {
String topicName = randomAlphabetic(10);
operator().setupTopic(topicName, STRING, () -> randomAlphabetic(10));
+ waitForTopicMetadataReady(topicName, 0);
seekStartPositionAndHandleSplit(splitReader, topicName, 0);
fetchedMessages(splitReader, 0, true);
}
@@ -118,6 +120,7 @@ class PulsarPartitionSplitReaderTest extends
PulsarTestSuiteBase {
String topicName = randomAlphabetic(10);
operator().setupTopic(topicName, STRING, () -> randomAlphabetic(10));
+ waitForTopicMetadataReady(topicName, 0);
seekStartPositionAndHandleSplit(splitReader, topicName, 0,
MessageId.earliest);
fetchedMessages(splitReader, NUM_RECORDS_PER_PARTITION, true);
}
@@ -128,6 +131,7 @@ class PulsarPartitionSplitReaderTest extends
PulsarTestSuiteBase {
String topicName = randomAlphabetic(10);
operator().setupTopic(topicName, STRING, () -> randomAlphabetic(10));
+ waitForTopicMetadataReady(topicName, 0);
seekStartPositionAndHandleSplit(splitReader, topicName, 0,
MessageId.latest);
fetchedMessages(splitReader, 0, true);
}
@@ -139,6 +143,7 @@ class PulsarPartitionSplitReaderTest extends
PulsarTestSuiteBase {
String topicName = randomAlphabetic(10);
operator().setupTopic(topicName, STRING, () -> randomAlphabetic(10));
+ waitForTopicMetadataReady(topicName, 0);
MessageIdImpl lastMessageId =
(MessageIdImpl)
operator()
@@ -214,6 +219,7 @@ class PulsarPartitionSplitReaderTest extends
PulsarTestSuiteBase {
String topicName = randomAlphabetic(10);
operator().setupTopic(topicName, STRING, () -> randomAlphabetic(10));
+ waitForTopicMetadataReady(topicName, 0);
handleSplit(splitReader, topicName, 0);
fetchedMessages(splitReader, 0, true);
}
@@ -225,6 +231,7 @@ class PulsarPartitionSplitReaderTest extends
PulsarTestSuiteBase {
String topicName = randomAlphabetic(10);
operator().setupTopic(topicName, STRING, () -> randomAlphabetic(10));
+ waitForTopicMetadataReady(topicName, 0);
handleSplit(splitReader, topicName, 0, MessageId.latest);
fetchedMessages(splitReader, 0, true);
}
@@ -236,6 +243,7 @@ class PulsarPartitionSplitReaderTest extends
PulsarTestSuiteBase {
String topicName = randomAlphabetic(10);
operator().setupTopic(topicName, STRING, () -> randomAlphabetic(10));
+ waitForTopicMetadataReady(topicName, 0);
handleSplit(splitReader, topicName, 0, MessageId.earliest);
fetchedMessages(splitReader, NUM_RECORDS_PER_PARTITION, true);
}
@@ -247,6 +255,8 @@ class PulsarPartitionSplitReaderTest extends
PulsarTestSuiteBase {
String topicName = randomAlphabetic(10);
operator().setupTopic(topicName, STRING, () -> randomAlphabetic(10),
20, false);
+ waitForTopicMetadataReady(topicName, 0);
+
MessageIdImpl lastMessageId =
(MessageIdImpl)
operator()
@@ -272,6 +282,7 @@ class PulsarPartitionSplitReaderTest extends
PulsarTestSuiteBase {
int numRecords = 20;
operator().setupTopic(topicName, STRING, () -> randomAlphabetic(10),
numRecords, true);
+ waitForTopicMetadataReady(topicName, 0);
MessageIdImpl lastMessageId =
(MessageIdImpl)
operator()
@@ -310,8 +321,8 @@ class PulsarPartitionSplitReaderTest extends
PulsarTestSuiteBase {
private SourceConfiguration sourceConfig() {
Configuration config = operator().config();
config.set(PULSAR_MAX_FETCH_RECORDS, 1);
- config.set(PULSAR_FETCH_ONE_MESSAGE_TIME, 2000);
- config.set(PULSAR_MAX_FETCH_TIME, 3000L);
+ config.set(PULSAR_FETCH_ONE_MESSAGE_TIME, Duration.ofSeconds(2));
+ config.set(PULSAR_MAX_FETCH_TIME, Duration.ofSeconds(3));
config.set(PULSAR_SUBSCRIPTION_NAME, randomAlphabetic(10));
config.set(PULSAR_ENABLE_AUTO_ACKNOWLEDGE_MESSAGE, true);
@@ -402,4 +413,21 @@ class PulsarPartitionSplitReaderTest extends
PulsarTestSuiteBase {
return messages;
}
+
+ // Wait for topic metadata to stabilize after setupTopic
+ private void waitForTopicMetadataReady(String topicName, int partitionId)
throws Exception {
+ String partitionTopicName = topicNameWithPartition(topicName,
partitionId);
+ waitUtil(
+ () -> {
+ try {
+ MessageId id =
+
operator().admin().topics().getLastMessageId(partitionTopicName);
+ return id != null;
+ } catch (Exception e) {
+ return false;
+ }
+ },
+ ofSeconds(30),
+ "Topic metadata not ready for " + partitionTopicName);
+ }
}
diff --git
a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/PulsarSourceReaderTest.java
b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/PulsarSourceReaderTest.java
index 54ed4b2..3d4619d 100644
---
a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/PulsarSourceReaderTest.java
+++
b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/PulsarSourceReaderTest.java
@@ -228,8 +228,8 @@ class PulsarSourceReaderTest extends PulsarTestSuiteBase {
Configuration configuration = operator().config();
configuration.set(PULSAR_MAX_FETCH_RECORDS, 1);
- configuration.set(PULSAR_FETCH_ONE_MESSAGE_TIME, 2000);
- configuration.set(PULSAR_MAX_FETCH_TIME, 3000L);
+ configuration.set(PULSAR_FETCH_ONE_MESSAGE_TIME,
Duration.ofSeconds(2));
+ configuration.set(PULSAR_MAX_FETCH_TIME, Duration.ofSeconds(3));
configuration.set(PULSAR_SUBSCRIPTION_NAME, randomAlphabetic(10));
PulsarDeserializationSchema<Integer> deserializationSchema =
diff --git
a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/deserializer/PulsarDeserializationSchemaTest.java
b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/deserializer/PulsarDeserializationSchemaTest.java
index de0a729..ecdd820 100644
---
a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/deserializer/PulsarDeserializationSchemaTest.java
+++
b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/deserializer/PulsarDeserializationSchemaTest.java
@@ -18,6 +18,7 @@
package org.apache.flink.connector.pulsar.source.reader.deserializer;
+import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.typeinfo.Types;
@@ -103,7 +104,7 @@ class PulsarDeserializationSchemaTest extends
PulsarTestSuiteBase {
@Test
void createFromFlinkTypeInformation() throws Exception {
PulsarDeserializationSchema<String> schema =
- new PulsarTypeInformationWrapper<>(Types.STRING, null);
+ new PulsarTypeInformationWrapper<>(Types.STRING, new
ExecutionConfig());
schema.open(new PulsarTestingDeserializationContext(), sourceConfig);
assertThatCode(() ->
InstantiationUtil.clone(schema)).doesNotThrowAnyException();
diff --git
a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/table/PulsarChangelogTableITCase.java
b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/table/PulsarChangelogTableITCase.java
index c448a08..f95cb8f 100644
---
a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/table/PulsarChangelogTableITCase.java
+++
b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/table/PulsarChangelogTableITCase.java
@@ -23,6 +23,7 @@ import
org.apache.flink.formats.json.debezium.DebeziumJsonFormatFactory;
import org.apache.flink.formats.json.maxwell.MaxwellJsonFormatFactory;
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.TableResult;
+import org.apache.flink.table.api.config.AggregatePhaseStrategy;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.api.config.OptimizerConfigOptions;
import org.apache.flink.test.junit5.MiniClusterExtension;
@@ -53,7 +54,9 @@ public class PulsarChangelogTableITCase extends
PulsarTableTestBase {
tableConf.set(
ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY,
Duration.ofSeconds(1));
tableConf.set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, 5000L);
-
tableConf.set(OptimizerConfigOptions.TABLE_OPTIMIZER_AGG_PHASE_STRATEGY,
"TWO_PHASE");
+ tableConf.set(
+ OptimizerConfigOptions.TABLE_OPTIMIZER_AGG_PHASE_STRATEGY,
+ AggregatePhaseStrategy.TWO_PHASE);
// ---------- Write the Debezium json into Pulsar -------------------
List<String> lines = readLines("debezium-data-schema-exclude.txt");
@@ -182,7 +185,9 @@ public class PulsarChangelogTableITCase extends
PulsarTableTestBase {
tableConf.set(
ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY,
Duration.ofSeconds(1));
tableConf.set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, 5000L);
-
tableConf.set(OptimizerConfigOptions.TABLE_OPTIMIZER_AGG_PHASE_STRATEGY,
"TWO_PHASE");
+ tableConf.set(
+ OptimizerConfigOptions.TABLE_OPTIMIZER_AGG_PHASE_STRATEGY,
+ AggregatePhaseStrategy.TWO_PHASE);
// ---------- Write the Canal json into Pulsar -------------------
List<String> lines = readLines("canal-data.txt");
@@ -323,7 +328,9 @@ public class PulsarChangelogTableITCase extends
PulsarTableTestBase {
tableConf.set(
ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY,
Duration.ofSeconds(1));
tableConf.set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, 5000L);
-
tableConf.set(OptimizerConfigOptions.TABLE_OPTIMIZER_AGG_PHASE_STRATEGY,
"TWO_PHASE");
+ tableConf.set(
+ OptimizerConfigOptions.TABLE_OPTIMIZER_AGG_PHASE_STRATEGY,
+ AggregatePhaseStrategy.TWO_PHASE);
// ---------- Write the Maxwell json into Pulsar -------------------
List<String> lines = readLines("maxwell-data.txt");
diff --git
a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/table/testutils/PulsarTableTestUtils.java
b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/table/testutils/PulsarTableTestUtils.java
index fe29b4b..8c89fa3 100644
---
a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/table/testutils/PulsarTableTestUtils.java
+++
b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/table/testutils/PulsarTableTestUtils.java
@@ -97,7 +97,7 @@ public class PulsarTableTestUtils {
Collections.sort(expected);
CommonTestUtils.waitUtil(
() -> {
- List<String> actual =
TestValuesTableFactory.getResults(sinkName);
+ List<String> actual =
TestValuesTableFactory.getResultsAsStrings(sinkName);
Collections.sort(actual);
return expected.equals(actual);
},
diff --git
a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/container/PulsarContainerRuntime.java
b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/container/PulsarContainerRuntime.java
index 097927e..9bd2252 100644
---
a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/container/PulsarContainerRuntime.java
+++
b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/container/PulsarContainerRuntime.java
@@ -54,7 +54,7 @@ public class PulsarContainerRuntime implements PulsarRuntime {
private static final String PULSAR_ADMIN_URL =
String.format("http://%s:%d", PULSAR_INTERNAL_HOSTNAME,
BROKER_HTTP_PORT);
- private static final String CURRENT_VERSION = "3.0.0";
+ private static final String CURRENT_VERSION = "3.0.5";
private final PulsarContainer container;
private final AtomicBoolean started;
diff --git
a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/source/PulsarSourceTestContext.java
b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/source/PulsarSourceTestContext.java
index aece284..76d058d 100644
---
a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/source/PulsarSourceTestContext.java
+++
b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/source/PulsarSourceTestContext.java
@@ -50,7 +50,7 @@ import static
org.apache.pulsar.client.api.RegexSubscriptionMode.AllTopics;
public abstract class PulsarSourceTestContext extends PulsarTestContext<String>
implements DataStreamSourceExternalContext<String> {
- private static final long DISCOVERY_INTERVAL = 1000L;
+ private static final long DISCOVERY_INTERVAL_MS = 1000L;
private static final int BATCH_DATA_SIZE = 300;
protected PulsarSourceTestContext(PulsarTestEnvironment environment) {
@@ -65,7 +65,7 @@ public abstract class PulsarSourceTestContext extends
PulsarTestContext<String>
.setServiceUrl(operator.serviceUrl())
.setTopicPattern(topicPattern(), AllTopics)
.setSubscriptionName(subscriptionName())
- .setConfig(PULSAR_PARTITION_DISCOVERY_INTERVAL_MS,
DISCOVERY_INTERVAL);
+ .setConfig(PULSAR_PARTITION_DISCOVERY_INTERVAL_MS,
DISCOVERY_INTERVAL_MS);
// Set extra configuration for source builder.
setSourceBuilder(builder);
@@ -117,7 +117,7 @@ public abstract class PulsarSourceTestContext extends
PulsarTestContext<String>
/**
* The topic pattern which is used in Pulsar topic auto discovery. It was
discovered every
- * {@link #DISCOVERY_INTERVAL} ms;
+ * {@link #DISCOVERY_INTERVAL_MS};
*/
protected abstract String topicPattern();
diff --git a/flink-sql-connector-pulsar/src/main/resources/META-INF/NOTICE
b/flink-sql-connector-pulsar/src/main/resources/META-INF/NOTICE
index d8f02d8..9243328 100644
--- a/flink-sql-connector-pulsar/src/main/resources/META-INF/NOTICE
+++ b/flink-sql-connector-pulsar/src/main/resources/META-INF/NOTICE
@@ -7,9 +7,9 @@ The Apache Software Foundation (http://www.apache.org/).
This project bundles the following dependencies under the Apache Software
License 2.0 (http://www.apache.org/licenses/LICENSE-2.0.txt)
- com.fasterxml.jackson.core:jackson-annotations:2.13.4
-- org.apache.pulsar:pulsar-client-admin-api:3.0.2
-- org.apache.pulsar:pulsar-client-all:3.0.2
-- org.apache.pulsar:pulsar-client-api:3.0.2
+- org.apache.pulsar:pulsar-client-admin-api:3.0.5
+- org.apache.pulsar:pulsar-client-all:3.0.5
+- org.apache.pulsar:pulsar-client-api:3.0.5
This project bundles the following dependencies under the Bouncy Castle
license.
See bundled license files for details.
diff --git a/pom.xml b/pom.xml
index 210b074..b31790a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -51,9 +51,8 @@ under the License.
</modules>
<properties>
- <flink.version>1.18.0</flink.version>
- <flink-ci-tools.version>1.18.0</flink-ci-tools.version>
- <pulsar.version>3.0.2</pulsar.version>
+ <flink.version>1.20.3</flink.version>
+ <pulsar.version>3.0.5</pulsar.version>
<scala.binary.version>2.12</scala.binary.version>
<bouncycastle.version>1.69</bouncycastle.version>
@@ -81,6 +80,7 @@ under the License.
<kryo.version>2.24.0</kryo.version>
<objenesis.version>3.3</objenesis.version>
<jackson-bom.version>2.13.4.20221013</jackson-bom.version>
+ <snappy.java.version>1.1.10.4</snappy.java.version>
<os-maven-plugin.version>1.7.0</os-maven-plugin.version>
<protobuf-maven-plugin.version>0.6.1</protobuf-maven-plugin.version>
@@ -391,7 +391,7 @@ under the License.
<version>${jackson-bom.version}</version>
</dependency>
- <!-- For dependency convergence -->
+ <!-- Start of pinned versions for dependency convergence -->
<dependency>
<groupId>org.apache.commons</groupId>
@@ -399,38 +399,30 @@ under the License.
<version>${commons-compress.version}</version>
</dependency>
- <!-- For dependency convergence -->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>${commons-lang3.version}</version>
</dependency>
- <!-- For dependency convergence -->
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>${commons-io.version}</version>
</dependency>
- <!-- For dependency convergence -->
-
<dependency>
<groupId>net.bytebuddy</groupId>
<artifactId>byte-buddy</artifactId>
<version>${byte-buddy.version}</version>
</dependency>
- <!-- For dependency convergence -->
-
<dependency>
<groupId>com.esotericsoftware.kryo</groupId>
<artifactId>kryo</artifactId>
<version>${kryo.version}</version>
</dependency>
- <!-- For dependency convergence -->
-
<dependency>
<groupId>org.objenesis</groupId>
<artifactId>objenesis</artifactId>
@@ -443,7 +435,6 @@ under the License.
<version>${protobuf.version}</version>
</dependency>
- <!-- For dependency convergence -->
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-reflect</artifactId>
@@ -455,6 +446,14 @@ under the License.
<artifactId>scala-library</artifactId>
<version>${scala-library.version}</version>
</dependency>
+
+ <dependency>
+ <groupId>org.xerial.snappy</groupId>
+ <artifactId>snappy-java</artifactId>
+ <version>${snappy.java.version}</version>
+ </dependency>
+
+ <!-- End of pinned versions for dependency convergence -->
</dependencies>
</dependencyManagement>
@@ -492,7 +491,7 @@ under the License.
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-ci-tools</artifactId>
- <version>${flink-ci-tools.version}</version>
+ <version>${flink.version}</version>
</dependency>
</dependencies>
</plugin>