This is an automated email from the ASF dual-hosted git repository.
fpaul pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-kafka.git
The following commit(s) were added to refs/heads/main by this push:
new 20ca57d7 [FLINK-38862] Configurable partition discovery interval in
upsert mode
20ca57d7 is described below
commit 20ca57d7efdcca52b5ce44afbccc1e7ad66ff82b
Author: Efrat Levitan <[email protected]>
AuthorDate: Tue Jan 6 11:43:45 2026 +0200
[FLINK-38862] Configurable partition discovery interval in upsert mode
---
.../table/UpsertKafkaDynamicTableFactory.java | 9 +++++
.../kafka/table/KafkaDynamicTableFactoryTest.java | 45 ++++++++++++++++++++++
.../table/UpsertKafkaDynamicTableFactoryTest.java | 39 +++++++++++++++++++
3 files changed, 93 insertions(+)
diff --git
a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaDynamicTableFactory.java
b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaDynamicTableFactory.java
index c421c4ef..430a25ff 100644
---
a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaDynamicTableFactory.java
+++
b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaDynamicTableFactory.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.connector.kafka.source.KafkaSourceOptions;
import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
import
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptionsUtil.BoundedOptions;
import org.apache.flink.table.api.ValidationException;
@@ -63,6 +64,7 @@ import static
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOp
import static
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_BOUNDED_SPECIFIC_OFFSETS;
import static
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_BOUNDED_TIMESTAMP_MILLIS;
import static
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_PARALLELISM;
+import static
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_TOPIC_PARTITION_DISCOVERY;
import static
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SINK_BUFFER_FLUSH_INTERVAL;
import static
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SINK_BUFFER_FLUSH_MAX_ROWS;
import static
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SINK_PARALLELISM;
@@ -119,6 +121,7 @@ public class UpsertKafkaDynamicTableFactory
options.add(TRANSACTIONAL_ID_PREFIX);
options.add(SCAN_PARALLELISM);
options.add(TRANSACTION_NAMING_STRATEGY);
+ options.add(SCAN_TOPIC_PARTITION_DISCOVERY);
return options;
}
@@ -157,6 +160,12 @@ public class UpsertKafkaDynamicTableFactory
Integer parallelism = tableOptions.get(SCAN_PARALLELISM);
+ final Duration partitionDiscoveryInterval =
+ tableOptions.get(SCAN_TOPIC_PARTITION_DISCOVERY);
+ properties.setProperty(
+ KafkaSourceOptions.PARTITION_DISCOVERY_INTERVAL_MS.key(),
+ Long.toString(partitionDiscoveryInterval.toMillis()));
+
return new KafkaDynamicSource(
context.getPhysicalRowDataType(),
keyDecodingFormat,
diff --git
a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryTest.java
b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryTest.java
index 4417346c..208cfead 100644
---
a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryTest.java
+++
b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryTest.java
@@ -263,6 +263,51 @@ public class KafkaDynamicTableFactoryTest {
assertThat(sourceProvider.getParallelism()).hasValue(100);
}
+ @Test
+ public void testTableSourceWithCustomPartitionDiscoveryInterval() {
+ final String partitionDiscoveryInterval = "100 ms";
+ final long expectedPartitionDiscoveryInterval = 100;
+ final Map<String, String> modifiedOptions =
+ getModifiedOptions(
+ getBasicSourceOptions(),
+ options ->
+ options.put(
+
"scan.topic-partition-discovery.interval",
+ partitionDiscoveryInterval));
+ final DynamicTableSource actualSource = createTableSource(SCHEMA,
modifiedOptions);
+ final KafkaDynamicSource actualKafkaSource = (KafkaDynamicSource)
actualSource;
+
+ final Map<TopicPartition, Long> specificOffsets = new HashMap<>();
+ specificOffsets.put(new TopicPartition(TOPIC, PARTITION_0), OFFSET_0);
+ specificOffsets.put(new TopicPartition(TOPIC, PARTITION_1), OFFSET_1);
+
+ final DecodingFormat<DeserializationSchema<RowData>>
valueDecodingFormat =
+ new DecodingFormatMock(",", true);
+ final Properties properties = new Properties();
+ properties.putAll(KAFKA_SOURCE_PROPERTIES);
+ properties.setProperty(
+ "partition.discovery.interval.ms",
+ Long.toString(expectedPartitionDiscoveryInterval));
+
+ // Test scan source equals
+ final KafkaDynamicSource expectedKafkaSource =
+ createExpectedScanSource(
+ SCHEMA_DATA_TYPE,
+ null,
+ valueDecodingFormat,
+ new int[0],
+ new int[] {0, 1, 2},
+ null,
+ Collections.singletonList(TOPIC),
+ null,
+ properties,
+ StartupMode.SPECIFIC_OFFSETS,
+ specificOffsets,
+ 0,
+ null);
+ assertThat(actualKafkaSource).isEqualTo(expectedKafkaSource);
+ }
+
@Test
public void testTableSourceWithPattern() {
final Map<String, String> modifiedOptions =
diff --git
a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaDynamicTableFactoryTest.java
b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaDynamicTableFactoryTest.java
index 5bc42f3b..740c4892 100644
---
a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaDynamicTableFactoryTest.java
+++
b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaDynamicTableFactoryTest.java
@@ -40,6 +40,7 @@ import
org.apache.flink.streaming.api.operators.StreamOperatorFactory;
import org.apache.flink.streaming.api.transformations.SourceTransformation;
import org.apache.flink.streaming.connectors.kafka.config.BoundedMode;
import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
+import
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.ScanBoundedMode;
import
org.apache.flink.streaming.connectors.kafka.testutils.MockPartitionOffsetsRetriever;
import
org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperatorFactory;
import org.apache.flink.table.api.DataTypes;
@@ -135,9 +136,11 @@ public class UpsertKafkaDynamicTableFactoryTest extends
TestLogger {
private static final Properties UPSERT_KAFKA_SOURCE_PROPERTIES = new
Properties();
private static final Properties UPSERT_KAFKA_SINK_PROPERTIES = new
Properties();
+ private static final String DISCOVERY_INTERVAL = "1000 ms";
static {
UPSERT_KAFKA_SOURCE_PROPERTIES.setProperty("bootstrap.servers",
"dummy");
+
UPSERT_KAFKA_SOURCE_PROPERTIES.setProperty("partition.discovery.interval.ms",
"1000");
UPSERT_KAFKA_SINK_PROPERTIES.setProperty("bootstrap.servers", "dummy");
}
@@ -839,6 +842,40 @@ public class UpsertKafkaDynamicTableFactoryTest extends
TestLogger {
"sink.transactional-id-prefix must be
specified when using DeliveryGuarantee.EXACTLY_ONCE."));
}
+ @Test
+ public void testTableSourceWithCustomPartitionDiscoveryInterval() {
+ final String partitionDiscoveryInterval = "100 ms";
+ final long expectedPartitionDiscoveryInterval = 100;
+ final DataType producedDataType =
SOURCE_SCHEMA.toPhysicalRowDataType();
+ // Construct table source using options and table source factory
+ final Map<String, String> modifiedOptions =
+ getModifiedOptions(
+ getFullSourceOptions(),
+ options ->
+ options.put(
+
"scan.topic-partition-discovery.interval",
+ partitionDiscoveryInterval));
+ final DynamicTableSource actualSource =
createTableSource(SOURCE_SCHEMA, modifiedOptions);
+ final Properties properties = new Properties();
+ properties.putAll(UPSERT_KAFKA_SOURCE_PROPERTIES);
+ properties.setProperty(
+ "partition.discovery.interval.ms",
+ Long.toString(expectedPartitionDiscoveryInterval));
+
+ final KafkaDynamicSource expectedSource =
+ createExpectedScanSource(
+ producedDataType,
+ keyDecodingFormat,
+ valueDecodingFormat,
+ SOURCE_KEY_FIELDS,
+ SOURCE_VALUE_FIELDS,
+ null,
+ Collections.singletonList(SOURCE_TOPIC),
+ properties,
+ null);
+ assertThat(actualSource).isEqualTo(expectedSource);
+ }
+
//
--------------------------------------------------------------------------------------------
// Utilities
//
--------------------------------------------------------------------------------------------
@@ -860,6 +897,8 @@ public class UpsertKafkaDynamicTableFactoryTest extends
TestLogger {
options.put("connector", UpsertKafkaDynamicTableFactory.IDENTIFIER);
options.put("topic", SOURCE_TOPIC);
options.put("properties.bootstrap.servers", "dummy");
+ options.put("scan.topic-partition-discovery.interval",
DISCOVERY_INTERVAL);
+
// key format options
options.put("key.format", TestFormatFactory.IDENTIFIER);
options.put(