This is an automated email from the ASF dual-hosted git repository.

fpaul pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-kafka.git


The following commit(s) were added to refs/heads/main by this push:
     new 20ca57d7 [FLINK-38862] Configurable partition discovery interval in 
upsert mode
20ca57d7 is described below

commit 20ca57d7efdcca52b5ce44afbccc1e7ad66ff82b
Author: Efrat Levitan <[email protected]>
AuthorDate: Tue Jan 6 11:43:45 2026 +0200

    [FLINK-38862] Configurable partition discovery interval in upsert mode
---
 .../table/UpsertKafkaDynamicTableFactory.java      |  9 +++++
 .../kafka/table/KafkaDynamicTableFactoryTest.java  | 45 ++++++++++++++++++++++
 .../table/UpsertKafkaDynamicTableFactoryTest.java  | 39 +++++++++++++++++++
 3 files changed, 93 insertions(+)

diff --git 
a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaDynamicTableFactory.java
 
b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaDynamicTableFactory.java
index c421c4ef..430a25ff 100644
--- 
a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaDynamicTableFactory.java
+++ 
b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaDynamicTableFactory.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.connector.kafka.source.KafkaSourceOptions;
 import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
 import 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptionsUtil.BoundedOptions;
 import org.apache.flink.table.api.ValidationException;
@@ -63,6 +64,7 @@ import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOp
 import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_BOUNDED_SPECIFIC_OFFSETS;
 import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_BOUNDED_TIMESTAMP_MILLIS;
 import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_PARALLELISM;
+import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_TOPIC_PARTITION_DISCOVERY;
 import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SINK_BUFFER_FLUSH_INTERVAL;
 import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SINK_BUFFER_FLUSH_MAX_ROWS;
 import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SINK_PARALLELISM;
@@ -119,6 +121,7 @@ public class UpsertKafkaDynamicTableFactory
         options.add(TRANSACTIONAL_ID_PREFIX);
         options.add(SCAN_PARALLELISM);
         options.add(TRANSACTION_NAMING_STRATEGY);
+        options.add(SCAN_TOPIC_PARTITION_DISCOVERY);
         return options;
     }
 
@@ -157,6 +160,12 @@ public class UpsertKafkaDynamicTableFactory
 
         Integer parallelism = tableOptions.get(SCAN_PARALLELISM);
 
+        final Duration partitionDiscoveryInterval =
+                tableOptions.get(SCAN_TOPIC_PARTITION_DISCOVERY);
+        properties.setProperty(
+                KafkaSourceOptions.PARTITION_DISCOVERY_INTERVAL_MS.key(),
+                Long.toString(partitionDiscoveryInterval.toMillis()));
+
         return new KafkaDynamicSource(
                 context.getPhysicalRowDataType(),
                 keyDecodingFormat,
diff --git 
a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryTest.java
 
b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryTest.java
index 4417346c..208cfead 100644
--- 
a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryTest.java
+++ 
b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryTest.java
@@ -263,6 +263,51 @@ public class KafkaDynamicTableFactoryTest {
         assertThat(sourceProvider.getParallelism()).hasValue(100);
     }
 
+    @Test
+    public void testTableSourceWithCustomPartitionDiscoveryInterval() {
+        final String partitionDiscoveryInterval = "100 ms";
+        final long expectedPartitionDiscoveryInterval = 100;
+        final Map<String, String> modifiedOptions =
+                getModifiedOptions(
+                        getBasicSourceOptions(),
+                        options ->
+                                options.put(
+                                        
"scan.topic-partition-discovery.interval",
+                                        partitionDiscoveryInterval));
+        final DynamicTableSource actualSource = createTableSource(SCHEMA, 
modifiedOptions);
+        final KafkaDynamicSource actualKafkaSource = (KafkaDynamicSource) 
actualSource;
+
+        final Map<TopicPartition, Long> specificOffsets = new HashMap<>();
+        specificOffsets.put(new TopicPartition(TOPIC, PARTITION_0), OFFSET_0);
+        specificOffsets.put(new TopicPartition(TOPIC, PARTITION_1), OFFSET_1);
+
+        final DecodingFormat<DeserializationSchema<RowData>> 
valueDecodingFormat =
+                new DecodingFormatMock(",", true);
+        final Properties properties = new Properties();
+        properties.putAll(KAFKA_SOURCE_PROPERTIES);
+        properties.setProperty(
+                "partition.discovery.interval.ms",
+                Long.toString(expectedPartitionDiscoveryInterval));
+
+        // Test scan source equals
+        final KafkaDynamicSource expectedKafkaSource =
+                createExpectedScanSource(
+                        SCHEMA_DATA_TYPE,
+                        null,
+                        valueDecodingFormat,
+                        new int[0],
+                        new int[] {0, 1, 2},
+                        null,
+                        Collections.singletonList(TOPIC),
+                        null,
+                        properties,
+                        StartupMode.SPECIFIC_OFFSETS,
+                        specificOffsets,
+                        0,
+                        null);
+        assertThat(actualKafkaSource).isEqualTo(expectedKafkaSource);
+    }
+
     @Test
     public void testTableSourceWithPattern() {
         final Map<String, String> modifiedOptions =
diff --git 
a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaDynamicTableFactoryTest.java
 
b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaDynamicTableFactoryTest.java
index 5bc42f3b..740c4892 100644
--- 
a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaDynamicTableFactoryTest.java
+++ 
b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaDynamicTableFactoryTest.java
@@ -40,6 +40,7 @@ import 
org.apache.flink.streaming.api.operators.StreamOperatorFactory;
 import org.apache.flink.streaming.api.transformations.SourceTransformation;
 import org.apache.flink.streaming.connectors.kafka.config.BoundedMode;
 import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
+import 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.ScanBoundedMode;
 import 
org.apache.flink.streaming.connectors.kafka.testutils.MockPartitionOffsetsRetriever;
 import 
org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperatorFactory;
 import org.apache.flink.table.api.DataTypes;
@@ -135,9 +136,11 @@ public class UpsertKafkaDynamicTableFactoryTest extends 
TestLogger {
 
     private static final Properties UPSERT_KAFKA_SOURCE_PROPERTIES = new 
Properties();
     private static final Properties UPSERT_KAFKA_SINK_PROPERTIES = new 
Properties();
+    private static final String DISCOVERY_INTERVAL = "1000 ms";
 
     static {
         UPSERT_KAFKA_SOURCE_PROPERTIES.setProperty("bootstrap.servers", 
"dummy");
+        
UPSERT_KAFKA_SOURCE_PROPERTIES.setProperty("partition.discovery.interval.ms", 
"1000");
 
         UPSERT_KAFKA_SINK_PROPERTIES.setProperty("bootstrap.servers", "dummy");
     }
@@ -839,6 +842,40 @@ public class UpsertKafkaDynamicTableFactoryTest extends 
TestLogger {
                                 "sink.transactional-id-prefix must be 
specified when using DeliveryGuarantee.EXACTLY_ONCE."));
     }
 
+    @Test
+    public void testTableSourceWithCustomPartitionDiscoveryInterval() {
+        final String partitionDiscoveryInterval = "100 ms";
+        final long expectedPartitionDiscoveryInterval = 100;
+        final DataType producedDataType = 
SOURCE_SCHEMA.toPhysicalRowDataType();
+        // Construct table source using options and table source factory
+        final Map<String, String> modifiedOptions =
+                getModifiedOptions(
+                        getFullSourceOptions(),
+                        options ->
+                                options.put(
+                                        
"scan.topic-partition-discovery.interval",
+                                        partitionDiscoveryInterval));
+        final DynamicTableSource actualSource = 
createTableSource(SOURCE_SCHEMA, modifiedOptions);
+        final Properties properties = new Properties();
+        properties.putAll(UPSERT_KAFKA_SOURCE_PROPERTIES);
+        properties.setProperty(
+                "partition.discovery.interval.ms",
+                Long.toString(expectedPartitionDiscoveryInterval));
+
+        final KafkaDynamicSource expectedSource =
+                createExpectedScanSource(
+                        producedDataType,
+                        keyDecodingFormat,
+                        valueDecodingFormat,
+                        SOURCE_KEY_FIELDS,
+                        SOURCE_VALUE_FIELDS,
+                        null,
+                        Collections.singletonList(SOURCE_TOPIC),
+                        properties,
+                        null);
+        assertThat(actualSource).isEqualTo(expectedSource);
+    }
+
     // 
--------------------------------------------------------------------------------------------
     // Utilities
     // 
--------------------------------------------------------------------------------------------
@@ -860,6 +897,8 @@ public class UpsertKafkaDynamicTableFactoryTest extends 
TestLogger {
         options.put("connector", UpsertKafkaDynamicTableFactory.IDENTIFIER);
         options.put("topic", SOURCE_TOPIC);
         options.put("properties.bootstrap.servers", "dummy");
+        options.put("scan.topic-partition-discovery.interval", 
DISCOVERY_INTERVAL);
+
         // key format options
         options.put("key.format", TestFormatFactory.IDENTIFIER);
         options.put(

Reply via email to