This is an automated email from the ASF dual-hosted git repository.

shenghang pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 7e0b52dfc5 [Fix] [Connectors-v2] Fix Kafka source restore offsets on 
checkpoint recovery (#10612)
7e0b52dfc5 is described below

commit 7e0b52dfc566d809a6df0e38afb837d87ee5dac2
Author: yzeng1618 <[email protected]>
AuthorDate: Sun Mar 29 08:15:53 2026 +0800

    [Fix] [Connectors-v2] Fix Kafka source restore offsets on checkpoint 
recovery (#10612)
    
    Co-authored-by: zengyi <[email protected]>
---
 docs/en/connectors/source/Kafka.md                 |   4 +
 docs/zh/connectors/source/Kafka.md                 |   3 +
 .../kafka/source/KafkaSourceSplitEnumerator.java   |  80 +++++++------
 .../admin/KafkaSourceSplitEnumeratorTest.java      | 126 ++++++++++++++-------
 .../seatunnel/e2e/connector/kafka/KafkaIT.java     | 100 +++++++++++++---
 5 files changed, 226 insertions(+), 87 deletions(-)

diff --git a/docs/en/connectors/source/Kafka.md 
b/docs/en/connectors/source/Kafka.md
index 9557d07435..a43032b5a3 100644
--- a/docs/en/connectors/source/Kafka.md
+++ b/docs/en/connectors/source/Kafka.md
@@ -62,6 +62,10 @@ They can be downloaded via install-plugin.sh or from the 
Maven central repositor
 | reader_cache_queue_size             | Integer                                
                                     | No       | 1024                     | 
The reader shard cache queue is used to cache the data corresponding to the 
shards. The size of the shard cache depends on the number of shards obtained by 
each reader, rather than the amount of data in each shard.                      
                                                                                
                     [...]
 | is_native                           | Boolean                                
                                     | No       | false                    | 
Supports retaining the source information of the record.
 
+> On restore from checkpoint or savepoint, Kafka Source resumes from the 
checkpointed split offsets.
+> `start_mode` and consumer-group offsets are only used for the first startup 
or for newly
+> discovered partitions that do not have checkpointed state yet.
+
 ### debezium_record_table_filter
 
 We can use `debezium_record_table_filter` to filter the data in the debezium 
format. The configuration is as follows:
diff --git a/docs/zh/connectors/source/Kafka.md 
b/docs/zh/connectors/source/Kafka.md
index 5923a85c87..78d7f41668 100644
--- a/docs/zh/connectors/source/Kafka.md
+++ b/docs/zh/connectors/source/Kafka.md
@@ -62,6 +62,9 @@ import ChangeLog from '../changelog/connector-kafka.md';
 | reader_cache_queue_size             | Integer                             | 
否    | 1024                         | 
Reader分片缓存队列,用于缓存分片对应的数据。占用大小取决于每个reader得到的分片量,而不是每个分片的数据量。                     
                                                                                
                                                                                
                                                                               |
 | is_native                           | Boolean                             | 
No   | false                        | 支持保留record的源信息。                           
                                                                                
                                                                                
                                                                                
                                     |
 
+> 从 checkpoint 或 savepoint 恢复时,Kafka Source 会优先使用 checkpoint 中保存的 split offset。
+> `start_mode` 和 consumer group offset 只在首次启动,或为尚未存在 checkpoint 
状态的新发现分区初始化位点时生效。
+
 ### debezium_record_table_filter
 
 我们可以使用 `debezium_record_table_filter` 来过滤 debezium 格式的数据。配置如下:
diff --git 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplitEnumerator.java
 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplitEnumerator.java
index 95c4173512..07b7f9ccb4 100644
--- 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplitEnumerator.java
+++ 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplitEnumerator.java
@@ -22,7 +22,6 @@ import 
org.apache.seatunnel.shade.com.google.common.annotations.VisibleForTestin
 import org.apache.seatunnel.api.source.SourceSplitEnumerator;
 import org.apache.seatunnel.api.table.catalog.TablePath;
 import org.apache.seatunnel.common.config.Common;
-import org.apache.seatunnel.connectors.seatunnel.kafka.config.StartMode;
 import 
org.apache.seatunnel.connectors.seatunnel.kafka.exception.KafkaConnectorErrorCode;
 import 
org.apache.seatunnel.connectors.seatunnel.kafka.exception.KafkaConnectorException;
 import org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaSourceState;
@@ -73,6 +72,7 @@ public class KafkaSourceSplitEnumerator
     private volatile boolean initialized;
     private final Object lock = new Object();
     private final Map<String, TablePath> topicMappingTablePathMap = new 
HashMap<>();
+    private final Set<TopicPartition> restoringSplits = new HashSet<>();
 
     private boolean isStreamingMode;
     private final boolean isRestored;
@@ -92,21 +92,6 @@ public class KafkaSourceSplitEnumerator
         this.discoveryIntervalMillis = 
kafkaSourceConfig.getDiscoveryIntervalMillis();
         this.isStreamingMode = isStreamingMode;
         this.isRestored = isRestored;
-
-        if (this.isRestored) {
-            log.info("Task is being restored, forcing start mode to 
GROUP_OFFSETS for all topics");
-            this.tablePathMetadataMap.forEach(
-                    (tablePath, metadata) -> {
-                        StartMode originalMode = metadata.getStartMode();
-                        if (originalMode != StartMode.GROUP_OFFSETS) {
-                            log.info(
-                                    "Changing start mode from {} to 
GROUP_OFFSETS for table path: {}",
-                                    originalMode,
-                                    tablePath);
-                            metadata.setStartMode(StartMode.GROUP_OFFSETS);
-                        }
-                    });
-        }
     }
 
     @VisibleForTesting
@@ -115,14 +100,27 @@ public class KafkaSourceSplitEnumerator
             KafkaSourceConfig kafkaSourceConfig,
             Map<TopicPartition, KafkaSourceSplit> pendingSplit,
             Map<TopicPartition, KafkaSourceSplit> assignedSplit) {
-        this.tablePathMetadataMap = new HashMap<>();
+        this(adminClient, kafkaSourceConfig, pendingSplit, assignedSplit, 
false, false);
+    }
+
+    @VisibleForTesting
+    public KafkaSourceSplitEnumerator(
+            AdminClient adminClient,
+            KafkaSourceConfig kafkaSourceConfig,
+            Map<TopicPartition, KafkaSourceSplit> pendingSplit,
+            Map<TopicPartition, KafkaSourceSplit> assignedSplit,
+            boolean isStreamingMode,
+            boolean isRestored) {
+        this.tablePathMetadataMap =
+                kafkaSourceConfig == null ? new HashMap<>() : 
kafkaSourceConfig.getMapMetadata();
         this.context = null;
         this.discoveryIntervalMillis = -1;
         this.adminClient = adminClient;
         this.kafkaSourceConfig = kafkaSourceConfig;
         this.pendingSplit = pendingSplit;
         this.assignedSplit = assignedSplit;
-        this.isRestored = false;
+        this.isStreamingMode = isStreamingMode;
+        this.isRestored = isRestored;
     }
 
     @VisibleForTesting
@@ -131,8 +129,7 @@ public class KafkaSourceSplitEnumerator
             Map<TopicPartition, KafkaSourceSplit> pendingSplit,
             Map<TopicPartition, KafkaSourceSplit> assignedSplit,
             boolean isStreamingMode) {
-        this(adminClient, null, pendingSplit, assignedSplit);
-        this.isStreamingMode = isStreamingMode;
+        this(adminClient, null, pendingSplit, assignedSplit, isStreamingMode, 
false);
     }
 
     @Override
@@ -172,6 +169,7 @@ public class KafkaSourceSplitEnumerator
         }
         synchronized (lock) {
             assignSplit();
+            restoringSplits.clear();
         }
         if (!initialized) {
             initialized = true;
@@ -179,7 +177,13 @@ public class KafkaSourceSplitEnumerator
     }
 
     private void setPartitionStartOffset() throws ExecutionException, 
InterruptedException {
-        Set<TopicPartition> pendingTopicPartitions = pendingSplit.keySet();
+        Set<TopicPartition> pendingTopicPartitions =
+                pendingSplit.keySet().stream()
+                        .filter(tp -> !restoringSplits.contains(tp))
+                        .collect(Collectors.toSet());
+        if (pendingTopicPartitions.isEmpty()) {
+            return;
+        }
         Map<TopicPartition, Long> topicPartitionOffsets = new HashMap<>();
         Map<TopicPartition, Long> topicPartitionEndOffsets = new HashMap<>();
         // Set kafka TopicPartition based on the topicPath granularity
@@ -194,10 +198,7 @@ public class KafkaSourceSplitEnumerator
             ConsumerMetadata metadata = tablePathMetadataMap.get(tablePath);
             Set<TopicPartition> topicPartitions = 
tablePathPartitionMap.get(tablePath);
 
-            StartMode effectiveStartMode =
-                    isRestored ? StartMode.GROUP_OFFSETS : 
metadata.getStartMode();
-
-            switch (effectiveStartMode) {
+            switch (metadata.getStartMode()) {
                 case EARLIEST:
                     topicPartitionOffsets.putAll(
                             listOffsets(topicPartitions, 
OffsetSpec.earliest()));
@@ -265,6 +266,18 @@ public class KafkaSourceSplitEnumerator
     @Override
     public void addSplitsBack(List<KafkaSourceSplit> splits, int subtaskId) {
         if (!splits.isEmpty()) {
+            if (isRestored && !initialized) {
+                Map<TopicPartition, KafkaSourceSplit> restoredSplits =
+                        splits.stream()
+                                .map(KafkaSourceSplit::copy)
+                                .collect(
+                                        Collectors.toMap(
+                                                
KafkaSourceSplit::getTopicPartition,
+                                                split -> split));
+                restoringSplits.addAll(restoredSplits.keySet());
+                pendingSplit.putAll(restoredSplits);
+                return;
+            }
             Map<TopicPartition, ? extends KafkaSourceSplit> nextSplit = 
convertToNextSplit(splits);
             // remove them from the assignedSplit, so we can reassign them
             nextSplit.keySet().forEach(assignedSplit::remove);
@@ -282,15 +295,16 @@ public class KafkaSourceSplitEnumerator
                                     .filter(Objects::nonNull)
                                     .collect(Collectors.toList()),
                             OffsetSpec.latest());
-            splits.forEach(
-                    split -> {
-                        split.setStartOffset(split.getEndOffset() + 1);
-                        split.setEndOffset(
-                                isStreamingMode
-                                        ? Long.MAX_VALUE
-                                        : 
latestOffsets.get(split.getTopicPartition()));
-                    });
             return splits.stream()
+                    .map(KafkaSourceSplit::copy)
+                    .peek(
+                            split -> {
+                                split.setStartOffset(split.getEndOffset() + 1);
+                                split.setEndOffset(
+                                        isStreamingMode
+                                                ? Long.MAX_VALUE
+                                                : 
latestOffsets.get(split.getTopicPartition()));
+                            })
                     
.collect(Collectors.toMap(KafkaSourceSplit::getTopicPartition, split -> split));
         } catch (Exception e) {
             throw new KafkaConnectorException(
diff --git 
a/seatunnel-connectors-v2/connector-kafka/src/test/java/org/apache/kafka/clients/admin/KafkaSourceSplitEnumeratorTest.java
 
b/seatunnel-connectors-v2/connector-kafka/src/test/java/org/apache/kafka/clients/admin/KafkaSourceSplitEnumeratorTest.java
index a9f38f613d..7e9111552d 100644
--- 
a/seatunnel-connectors-v2/connector-kafka/src/test/java/org/apache/kafka/clients/admin/KafkaSourceSplitEnumeratorTest.java
+++ 
b/seatunnel-connectors-v2/connector-kafka/src/test/java/org/apache/kafka/clients/admin/KafkaSourceSplitEnumeratorTest.java
@@ -17,9 +17,10 @@
 
 package org.apache.kafka.clients.admin;
 
-import org.apache.seatunnel.shade.com.google.common.collect.Lists;
-
 import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.connectors.seatunnel.kafka.config.StartMode;
+import org.apache.seatunnel.connectors.seatunnel.kafka.source.ConsumerMetadata;
 import 
org.apache.seatunnel.connectors.seatunnel.kafka.source.KafkaSourceConfig;
 import org.apache.seatunnel.connectors.seatunnel.kafka.source.KafkaSourceSplit;
 import 
org.apache.seatunnel.connectors.seatunnel.kafka.source.KafkaSourceSplitEnumerator;
@@ -34,6 +35,9 @@ import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.mockito.Mockito;
 
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
@@ -52,28 +56,34 @@ class KafkaSourceSplitEnumeratorTest {
 
     @BeforeEach
     void init() {
+        
Mockito.when(kafkaSourceConfig.getMapMetadata()).thenReturn(Collections.emptyMap());
+        
Mockito.when(kafkaSourceConfig.isIgnoreNoLeaderPartition()).thenReturn(false);
 
         Mockito.when(adminClient.listOffsets(Mockito.any(java.util.Map.class)))
-                .thenReturn(
-                        new ListOffsetsResult(
-                                new HashMap<
-                                        TopicPartition,
-                                        
KafkaFuture<ListOffsetsResult.ListOffsetsResultInfo>>() {
-                                    {
-                                        put(
-                                                partition0,
-                                                KafkaFuture.completedFuture(
-                                                        new 
ListOffsetsResult.ListOffsetsResultInfo(
-                                                                0, 0, 
Optional.of(0))));
-                                        put(
-                                                partition2,
-                                                KafkaFuture.completedFuture(
-                                                        new 
ListOffsetsResult.ListOffsetsResultInfo(
-                                                                0, 0, 
Optional.of(0))));
-                                    }
-                                }));
+                .thenAnswer(
+                        invocation -> {
+                            Map<TopicPartition, OffsetSpec> requestedOffsets =
+                                    invocation.getArgument(0);
+                            Map<
+                                            TopicPartition,
+                                            
KafkaFuture<ListOffsetsResult.ListOffsetsResultInfo>>
+                                    offsets = new HashMap<>();
+                            requestedOffsets
+                                    .keySet()
+                                    .forEach(
+                                            tp ->
+                                                    offsets.put(
+                                                            tp,
+                                                            
KafkaFuture.completedFuture(
+                                                                    new 
ListOffsetsResult
+                                                                            
.ListOffsetsResultInfo(
+                                                                            0,
+                                                                            0,
+                                                                            
Optional.of(0)))));
+                            return new ListOffsetsResult(offsets);
+                        });
 
-        List<TopicPartitionInfo> mockTopicPartition = Lists.newArrayList();
+        List<TopicPartitionInfo> mockTopicPartition = new ArrayList<>();
         TopicPartitionInfo topicPartitionWithLeader =
                 new TopicPartitionInfo(
                         0,
@@ -102,27 +112,24 @@ class KafkaSourceSplitEnumeratorTest {
     }
 
     @Test
-    void addSplitsBack() {
-        // test
-        Map<TopicPartition, KafkaSourceSplit> assignedSplit =
-                new HashMap<TopicPartition, KafkaSourceSplit>() {
-                    {
-                        put(partition0, new KafkaSourceSplit(null, 
partition0));
-                    }
-                };
+    void addSplitsBackShouldPreserveCheckpointOffsetsDuringRestore() {
+        Map<TopicPartition, KafkaSourceSplit> assignedSplit = new HashMap<>();
         Map<TopicPartition, KafkaSourceSplit> pendingSplit = new HashMap<>();
-        List<KafkaSourceSplit> splits = Arrays.asList(new 
KafkaSourceSplit(null, partition0));
+        List<KafkaSourceSplit> splits =
+                Collections.singletonList(
+                        new KafkaSourceSplit(TablePath.DEFAULT, partition0, 
123L, Long.MAX_VALUE));
         KafkaSourceSplitEnumerator enumerator =
-                new KafkaSourceSplitEnumerator(adminClient, null, 
pendingSplit, assignedSplit);
+                new KafkaSourceSplitEnumerator(
+                        adminClient, kafkaSourceConfig, pendingSplit, 
assignedSplit, true, true);
         enumerator.addSplitsBack(splits, 1);
-        Assertions.assertTrue(pendingSplit.size() == splits.size());
-        Assertions.assertNull(assignedSplit.get(partition0));
-        Assertions.assertTrue(pendingSplit.get(partition0).getEndOffset() == 
0);
+        Assertions.assertEquals(splits.size(), pendingSplit.size());
+        Assertions.assertTrue(assignedSplit.isEmpty());
+        Assertions.assertEquals(123L, 
pendingSplit.get(partition0).getStartOffset());
+        Assertions.assertEquals(Long.MAX_VALUE, 
pendingSplit.get(partition0).getEndOffset());
     }
 
     @Test
-    void addStreamingSplitsBack() {
-        // test
+    void addSplitsBackShouldAdvanceOffsetsAfterInitialization() throws 
Exception {
         Map<TopicPartition, KafkaSourceSplit> assignedSplit =
                 new HashMap<TopicPartition, KafkaSourceSplit>() {
                     {
@@ -131,15 +138,43 @@ class KafkaSourceSplitEnumeratorTest {
                 };
         Map<TopicPartition, KafkaSourceSplit> pendingSplit = new HashMap<>();
         List<KafkaSourceSplit> splits =
-                Collections.singletonList(new KafkaSourceSplit(null, 
partition0));
+                Collections.singletonList(new KafkaSourceSplit(null, 
partition0, 10L, 15L));
         KafkaSourceSplitEnumerator enumerator =
                 new KafkaSourceSplitEnumerator(adminClient, pendingSplit, 
assignedSplit, true);
+        setInitialized(enumerator, true);
         enumerator.addSplitsBack(splits, 1);
         Assertions.assertEquals(pendingSplit.size(), splits.size());
         Assertions.assertNull(assignedSplit.get(partition0));
+        Assertions.assertEquals(16L, 
pendingSplit.get(partition0).getStartOffset());
         Assertions.assertTrue(pendingSplit.get(partition0).getEndOffset() == 
Long.MAX_VALUE);
     }
 
+    @Test
+    void setPartitionStartOffsetShouldNotOverrideRestoredSplits() throws 
Exception {
+        ConsumerMetadata metadata = new ConsumerMetadata();
+        metadata.setTopic("test");
+        metadata.setStartMode(StartMode.EARLIEST);
+        Mockito.when(kafkaSourceConfig.getMapMetadata())
+                .thenReturn(Collections.singletonMap(TablePath.DEFAULT, 
metadata));
+
+        Map<TopicPartition, KafkaSourceSplit> assignedSplit = new HashMap<>();
+        Map<TopicPartition, KafkaSourceSplit> pendingSplit = new HashMap<>();
+        KafkaSourceSplitEnumerator enumerator =
+                new KafkaSourceSplitEnumerator(
+                        adminClient, kafkaSourceConfig, pendingSplit, 
assignedSplit, true, true);
+
+        enumerator.addSplitsBack(
+                Collections.singletonList(
+                        new KafkaSourceSplit(TablePath.DEFAULT, partition0, 
123L, Long.MAX_VALUE)),
+                0);
+        enumerator.fetchPendingPartitionSplit();
+        invokeSetPartitionStartOffset(enumerator);
+
+        Assertions.assertEquals(123L, 
pendingSplit.get(partition0).getStartOffset());
+        Assertions.assertEquals(Long.MAX_VALUE, 
pendingSplit.get(partition0).getEndOffset());
+        Assertions.assertEquals(0L, 
pendingSplit.get(partition2).getStartOffset());
+    }
+
     @Test
     void addStreamingSplits() throws ExecutionException, InterruptedException {
         // test
@@ -214,7 +249,7 @@ class KafkaSourceSplitEnumeratorTest {
 
         // Test partition restoration: simulate partition2 getting a leader
         // Create new mock topic partition list with partition2 now having a 
leader
-        List<TopicPartitionInfo> restoredMockTopicPartition = 
Lists.newArrayList();
+        List<TopicPartitionInfo> restoredMockTopicPartition = new 
ArrayList<>();
         TopicPartitionInfo topicPartitionWithLeader =
                 new TopicPartitionInfo(
                         0,
@@ -254,4 +289,19 @@ class KafkaSourceSplitEnumeratorTest {
         Assertions.assertNotNull(pendingSplit.get(partition0));
         Assertions.assertNotNull(pendingSplit.get(partition2));
     }
+
+    private void setInitialized(KafkaSourceSplitEnumerator enumerator, boolean 
initialized)
+            throws Exception {
+        Field initializedField = 
KafkaSourceSplitEnumerator.class.getDeclaredField("initialized");
+        initializedField.setAccessible(true);
+        initializedField.set(enumerator, initialized);
+    }
+
+    private void invokeSetPartitionStartOffset(KafkaSourceSplitEnumerator 
enumerator)
+            throws Exception {
+        Method method =
+                
KafkaSourceSplitEnumerator.class.getDeclaredMethod("setPartitionStartOffset");
+        method.setAccessible(true);
+        method.invoke(enumerator);
+    }
 }
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java
index 7a5a188810..704febe57d 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java
@@ -61,6 +61,7 @@ import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.clients.consumer.OffsetResetStrategy;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerConfig;
@@ -170,12 +171,32 @@ public class KafkaIT extends TestSuiteBase implements 
TestResource {
                     new NewTopic("test_topic_source_skip_partition", 2, 
(short) 1);
             
testTopicSourceSkipPartition.configs(Collections.singletonMap("retention.ms", 
"-1"));
 
+            NewTopic kafkaExactlyOnceSource =
+                    new NewTopic("kafka_topic_exactly_once_1", 1, (short) 1);
+            
kafkaExactlyOnceSource.configs(Collections.singletonMap("retention.ms", "-1"));
+
+            NewTopic kafkaExactlyOnceSink =
+                    new NewTopic("kafka_topic_exactly_once_2", 1, (short) 1);
+            
kafkaExactlyOnceSink.configs(Collections.singletonMap("retention.ms", "-1"));
+
+            NewTopic kafkaExactlyOnceBatchSource =
+                    new NewTopic("kafka_topic_exactly_batch_once_1", 1, 
(short) 1);
+            
kafkaExactlyOnceBatchSource.configs(Collections.singletonMap("retention.ms", 
"-1"));
+
+            NewTopic kafkaExactlyOnceBatchSink =
+                    new NewTopic("kafka_topic_exactly_batch_once_2", 1, 
(short) 1);
+            
kafkaExactlyOnceBatchSink.configs(Collections.singletonMap("retention.ms", 
"-1"));
+
             List<NewTopic> topics =
                     Arrays.asList(
                             testTopicSource,
                             testTopicNativeSource,
                             testTopicSourceWithTimestamp,
-                            testTopicSourceSkipPartition);
+                            testTopicSourceSkipPartition,
+                            kafkaExactlyOnceSource,
+                            kafkaExactlyOnceSink,
+                            kafkaExactlyOnceBatchSource,
+                            kafkaExactlyOnceBatchSink);
             adminClient.createTopics(topics);
         }
 
@@ -722,6 +743,11 @@ public class KafkaIT extends TestSuiteBase implements 
TestResource {
                 srcEndAfterAll == srcEndBeforeStart + 25,
                 "Final end offset should advance by at least 25");
 
+        // Deliberately move the consumer-group offset past the savepoint 
position.
+        // Restore must still use the checkpointed split offsets instead of 
the external group
+        // offset, otherwise the 15 post-savepoint records will be skipped.
+        commitOffset(sourceTopic, "test_restore_group", srcEndAfterAll);
+
         // Restore the job from the savepoint asynchronously.
         CompletableFuture.runAsync(
                 () -> {
@@ -1080,6 +1106,25 @@ public class KafkaIT extends TestSuiteBase implements 
TestResource {
         }
     }
 
+    public void commitOffset(String topic, String groupId, long offset) {
+        Properties props = new Properties();
+        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
kafkaContainer.getBootstrapServers());
+        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
+        props.put(
+                ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
+                ByteArrayDeserializer.class.getName());
+        props.put(
+                ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
+                ByteArrayDeserializer.class.getName());
+        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
+
+        TopicPartition tp0 = new TopicPartition(topic, 0);
+        try (KafkaConsumer<byte[], byte[]> consumer = new 
KafkaConsumer<>(props)) {
+            consumer.assign(Collections.singletonList(tp0));
+            consumer.commitSync(Collections.singletonMap(tp0, new 
OffsetAndMetadata(offset)));
+        }
+    }
+
     @DisabledOnContainer(
             value = {},
             type = {EngineType.SPARK, EngineType.FLINK},
@@ -1504,6 +1549,7 @@ public class KafkaIT extends TestSuiteBase implements 
TestResource {
         String consumerTopic = "kafka_topic_exactly_once_2";
         String sourceData = "Seatunnel Exactly Once Example";
         final String jobId = "18696753645413";
+        long sinkStartOffset = endOffsetOnP0(consumerTopic);
         for (int i = 0; i < 10; i++) {
             ProducerRecord<byte[], byte[]> record =
                     new ProducerRecord<>(producerTopic, null, 
sourceData.getBytes());
@@ -1528,12 +1574,15 @@ public class KafkaIT extends TestSuiteBase implements 
TestResource {
                 .await()
                 .atMost(5, MINUTES)
                 .untilAsserted(
-                        () -> Assertions.assertTrue(checkData(consumerTopic, 
10, sourceData)));
+                        () ->
+                                Assertions.assertTrue(
+                                        checkData(consumerTopic, 
sinkStartOffset, 10, sourceData)));
 
         // Savepoint the running job (so restore should continue from this 
position).
         container.savepointJob(jobId);
 
         String sourceDataRestore = "Seatunnel Exactly Once Example Restore";
+        long restoreStartOffset = endOffsetOnP0(consumerTopic);
 
         for (int i = 0; i < 10; i++) {
             ProducerRecord<byte[], byte[]> record =
@@ -1559,7 +1608,11 @@ public class KafkaIT extends TestSuiteBase implements 
TestResource {
                 .untilAsserted(
                         () ->
                                 Assertions.assertTrue(
-                                        checkData(consumerTopic, 10, 
sourceDataRestore)));
+                                        checkData(
+                                                consumerTopic,
+                                                restoreStartOffset,
+                                                10,
+                                                sourceDataRestore)));
     }
 
     @TestTemplate
@@ -1571,6 +1624,7 @@ public class KafkaIT extends TestSuiteBase implements 
TestResource {
         String producerTopic = "kafka_topic_exactly_once_1";
         String consumerTopic = "kafka_topic_exactly_once_2";
         String sourceData = "Seatunnel Exactly Once Example";
+        long sinkStartOffset = endOffsetOnP0(consumerTopic);
         for (int i = 0; i < 10; i++) {
             ProducerRecord<byte[], byte[]> record =
                     new ProducerRecord<>(producerTopic, null, 
sourceData.getBytes());
@@ -1595,7 +1649,9 @@ public class KafkaIT extends TestSuiteBase implements 
TestResource {
                 .await()
                 .atMost(5, MINUTES)
                 .untilAsserted(
-                        () -> Assertions.assertTrue(checkData(consumerTopic, 
10, sourceData)));
+                        () ->
+                                Assertions.assertTrue(
+                                        checkData(consumerTopic, 
sinkStartOffset, 10, sourceData)));
     }
 
     @TestTemplate
@@ -1604,6 +1660,7 @@ public class KafkaIT extends TestSuiteBase implements 
TestResource {
         String producerTopic = "kafka_topic_exactly_batch_once_1";
         String consumerTopic = "kafka_topic_exactly_batch_once_2";
         String sourceData = "Seatunnel Exactly Once Example";
+        long sinkStartOffset = endOffsetOnP0(consumerTopic);
         for (int i = 0; i < 10; i++) {
             ProducerRecord<byte[], byte[]> record =
                     new ProducerRecord<>(producerTopic, null, 
sourceData.getBytes());
@@ -1622,20 +1679,21 @@ public class KafkaIT extends TestSuiteBase implements 
TestResource {
                     
container.executeJob("/kafka/kafka_to_kafka_exactly_once_batch.conf");
             Assertions.assertEquals(0, execResult.getExitCode());
             // wait for data written to kafka
-            Assertions.assertTrue(checkData(consumerTopic, endOffset, 
sourceData));
+            Assertions.assertTrue(checkData(consumerTopic, sinkStartOffset, 
endOffset, sourceData));
         } finally {
             closeKafkaConsumer(consumer);
         }
     }
 
     // Compare the values of data fields obtained from consumers
-    private boolean checkData(String topicName, long endOffset, String data) {
-        List<String> listData = getKafkaConsumerListData(topicName, endOffset);
-        if (listData.isEmpty() || listData.size() != endOffset) {
+    private boolean checkData(String topicName, long startOffset, long 
expectedCount, String data) {
+        List<String> listData = getKafkaConsumerListData(topicName, 
startOffset, expectedCount);
+        if (listData.isEmpty() || listData.size() != expectedCount) {
             log.error(
-                    "testKafkaToKafkaExactlyOnce get data size is not 
expect,get consumer data size {},get end offset {}",
+                    "testKafkaToKafkaExactlyOnce get data size is not 
expect,get consumer data size {},start offset {},expected count {}",
                     listData.size(),
-                    endOffset);
+                    startOffset,
+                    expectedCount);
             return false;
         }
         for (String value : listData) {
@@ -2152,28 +2210,38 @@ public class KafkaIT extends TestSuiteBase implements 
TestResource {
         }
     }
 
-    private List<String> getKafkaConsumerListData(String topicName, long 
endOffset) {
+    private List<String> getKafkaConsumerListData(
+            String topicName, long startOffset, long expectedCount) {
         KafkaConsumer<String, String> consumer = null;
         try {
             List<String> data = new ArrayList<>();
-            consumer = new KafkaConsumer<>(kafkaConsumerConfig());
-            consumer.subscribe(Arrays.asList(topicName));
-            Long lastProcessedOffset = -1L;
+            consumer = new KafkaConsumer<>(kafkaManualConsumerConfig());
+            TopicPartition topicPartition = new TopicPartition(topicName, 0);
+            consumer.assign(Collections.singletonList(topicPartition));
+            consumer.seek(topicPartition, startOffset);
+            long targetOffsetExclusive = startOffset + expectedCount;
+            Long lastProcessedOffset = startOffset - 1;
             do {
                 ConsumerRecords<String, String> records = 
consumer.poll(Duration.ofMillis(100));
                 for (ConsumerRecord<String, String> record : records) {
-                    if (lastProcessedOffset < record.offset()) {
+                    if (record.offset() >= startOffset && record.offset() < 
targetOffsetExclusive) {
                         data.add(record.value());
                     }
                     lastProcessedOffset = record.offset();
                 }
-            } while (lastProcessedOffset < endOffset - 1);
+            } while (lastProcessedOffset < targetOffsetExclusive - 1);
             return data;
         } finally {
             closeKafkaConsumer(consumer);
         }
     }
 
+    private Properties kafkaManualConsumerConfig() {
+        Properties props = kafkaConsumerConfig();
+        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
+        return props;
+    }
+
     private void closeKafkaConsumer(KafkaConsumer<String, String> consumer) {
         if (consumer != null) {
             try {


Reply via email to