This is an automated email from the ASF dual-hosted git repository.
shenghang pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 7e0b52dfc5 [Fix] [Connectors-v2] Fix Kafka source restore offsets on
checkpoint recovery (#10612)
7e0b52dfc5 is described below
commit 7e0b52dfc566d809a6df0e38afb837d87ee5dac2
Author: yzeng1618 <[email protected]>
AuthorDate: Sun Mar 29 08:15:53 2026 +0800
[Fix] [Connectors-v2] Fix Kafka source restore offsets on checkpoint
recovery (#10612)
Co-authored-by: zengyi <[email protected]>
---
docs/en/connectors/source/Kafka.md | 4 +
docs/zh/connectors/source/Kafka.md | 3 +
.../kafka/source/KafkaSourceSplitEnumerator.java | 80 +++++++------
.../admin/KafkaSourceSplitEnumeratorTest.java | 126 ++++++++++++++-------
.../seatunnel/e2e/connector/kafka/KafkaIT.java | 100 +++++++++++++---
5 files changed, 226 insertions(+), 87 deletions(-)
diff --git a/docs/en/connectors/source/Kafka.md
b/docs/en/connectors/source/Kafka.md
index 9557d07435..a43032b5a3 100644
--- a/docs/en/connectors/source/Kafka.md
+++ b/docs/en/connectors/source/Kafka.md
@@ -62,6 +62,10 @@ They can be downloaded via install-plugin.sh or from the
Maven central repositor
| reader_cache_queue_size | Integer
| No | 1024 |
The reader shard cache queue is used to cache the data corresponding to the
shards. The size of the shard cache depends on the number of shards obtained by
each reader, rather than the amount of data in each shard.
[...]
| is_native | Boolean
| No | false |
Supports retaining the source information of the record.
+> On restore from checkpoint or savepoint, Kafka Source resumes from the
checkpointed split offsets.
+> `start_mode` and consumer-group offsets are only used for the first startup
or for newly
+> discovered partitions that do not have checkpointed state yet.
+
### debezium_record_table_filter
We can use `debezium_record_table_filter` to filter the data in the debezium
format. The configuration is as follows:
diff --git a/docs/zh/connectors/source/Kafka.md
b/docs/zh/connectors/source/Kafka.md
index 5923a85c87..78d7f41668 100644
--- a/docs/zh/connectors/source/Kafka.md
+++ b/docs/zh/connectors/source/Kafka.md
@@ -62,6 +62,9 @@ import ChangeLog from '../changelog/connector-kafka.md';
| reader_cache_queue_size | Integer |
否 | 1024 |
Reader分片缓存队列,用于缓存分片对应的数据。占用大小取决于每个reader得到的分片量,而不是每个分片的数据量。
|
| is_native | Boolean |
No | false | 支持保留record的源信息。
|
+> 从 checkpoint 或 savepoint 恢复时,Kafka Source 会优先使用 checkpoint 中保存的 split offset。
+> `start_mode` 和 consumer group offset 只在首次启动,或为尚未存在 checkpoint
状态的新发现分区初始化位点时生效。
+
### debezium_record_table_filter
我们可以使用 `debezium_record_table_filter` 来过滤 debezium 格式的数据。配置如下:
diff --git
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplitEnumerator.java
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplitEnumerator.java
index 95c4173512..07b7f9ccb4 100644
---
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplitEnumerator.java
+++
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplitEnumerator.java
@@ -22,7 +22,6 @@ import
org.apache.seatunnel.shade.com.google.common.annotations.VisibleForTestin
import org.apache.seatunnel.api.source.SourceSplitEnumerator;
import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.common.config.Common;
-import org.apache.seatunnel.connectors.seatunnel.kafka.config.StartMode;
import
org.apache.seatunnel.connectors.seatunnel.kafka.exception.KafkaConnectorErrorCode;
import
org.apache.seatunnel.connectors.seatunnel.kafka.exception.KafkaConnectorException;
import org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaSourceState;
@@ -73,6 +72,7 @@ public class KafkaSourceSplitEnumerator
private volatile boolean initialized;
private final Object lock = new Object();
private final Map<String, TablePath> topicMappingTablePathMap = new
HashMap<>();
+ private final Set<TopicPartition> restoringSplits = new HashSet<>();
private boolean isStreamingMode;
private final boolean isRestored;
@@ -92,21 +92,6 @@ public class KafkaSourceSplitEnumerator
this.discoveryIntervalMillis =
kafkaSourceConfig.getDiscoveryIntervalMillis();
this.isStreamingMode = isStreamingMode;
this.isRestored = isRestored;
-
- if (this.isRestored) {
- log.info("Task is being restored, forcing start mode to
GROUP_OFFSETS for all topics");
- this.tablePathMetadataMap.forEach(
- (tablePath, metadata) -> {
- StartMode originalMode = metadata.getStartMode();
- if (originalMode != StartMode.GROUP_OFFSETS) {
- log.info(
- "Changing start mode from {} to
GROUP_OFFSETS for table path: {}",
- originalMode,
- tablePath);
- metadata.setStartMode(StartMode.GROUP_OFFSETS);
- }
- });
- }
}
@VisibleForTesting
@@ -115,14 +100,27 @@ public class KafkaSourceSplitEnumerator
KafkaSourceConfig kafkaSourceConfig,
Map<TopicPartition, KafkaSourceSplit> pendingSplit,
Map<TopicPartition, KafkaSourceSplit> assignedSplit) {
- this.tablePathMetadataMap = new HashMap<>();
+ this(adminClient, kafkaSourceConfig, pendingSplit, assignedSplit,
false, false);
+ }
+
+ @VisibleForTesting
+ public KafkaSourceSplitEnumerator(
+ AdminClient adminClient,
+ KafkaSourceConfig kafkaSourceConfig,
+ Map<TopicPartition, KafkaSourceSplit> pendingSplit,
+ Map<TopicPartition, KafkaSourceSplit> assignedSplit,
+ boolean isStreamingMode,
+ boolean isRestored) {
+ this.tablePathMetadataMap =
+ kafkaSourceConfig == null ? new HashMap<>() :
kafkaSourceConfig.getMapMetadata();
this.context = null;
this.discoveryIntervalMillis = -1;
this.adminClient = adminClient;
this.kafkaSourceConfig = kafkaSourceConfig;
this.pendingSplit = pendingSplit;
this.assignedSplit = assignedSplit;
- this.isRestored = false;
+ this.isStreamingMode = isStreamingMode;
+ this.isRestored = isRestored;
}
@VisibleForTesting
@@ -131,8 +129,7 @@ public class KafkaSourceSplitEnumerator
Map<TopicPartition, KafkaSourceSplit> pendingSplit,
Map<TopicPartition, KafkaSourceSplit> assignedSplit,
boolean isStreamingMode) {
- this(adminClient, null, pendingSplit, assignedSplit);
- this.isStreamingMode = isStreamingMode;
+ this(adminClient, null, pendingSplit, assignedSplit, isStreamingMode,
false);
}
@Override
@@ -172,6 +169,7 @@ public class KafkaSourceSplitEnumerator
}
synchronized (lock) {
assignSplit();
+ restoringSplits.clear();
}
if (!initialized) {
initialized = true;
@@ -179,7 +177,13 @@ public class KafkaSourceSplitEnumerator
}
private void setPartitionStartOffset() throws ExecutionException,
InterruptedException {
- Set<TopicPartition> pendingTopicPartitions = pendingSplit.keySet();
+ Set<TopicPartition> pendingTopicPartitions =
+ pendingSplit.keySet().stream()
+ .filter(tp -> !restoringSplits.contains(tp))
+ .collect(Collectors.toSet());
+ if (pendingTopicPartitions.isEmpty()) {
+ return;
+ }
Map<TopicPartition, Long> topicPartitionOffsets = new HashMap<>();
Map<TopicPartition, Long> topicPartitionEndOffsets = new HashMap<>();
// Set kafka TopicPartition based on the topicPath granularity
@@ -194,10 +198,7 @@ public class KafkaSourceSplitEnumerator
ConsumerMetadata metadata = tablePathMetadataMap.get(tablePath);
Set<TopicPartition> topicPartitions =
tablePathPartitionMap.get(tablePath);
- StartMode effectiveStartMode =
- isRestored ? StartMode.GROUP_OFFSETS :
metadata.getStartMode();
-
- switch (effectiveStartMode) {
+ switch (metadata.getStartMode()) {
case EARLIEST:
topicPartitionOffsets.putAll(
listOffsets(topicPartitions,
OffsetSpec.earliest()));
@@ -265,6 +266,18 @@ public class KafkaSourceSplitEnumerator
@Override
public void addSplitsBack(List<KafkaSourceSplit> splits, int subtaskId) {
if (!splits.isEmpty()) {
+ if (isRestored && !initialized) {
+ Map<TopicPartition, KafkaSourceSplit> restoredSplits =
+ splits.stream()
+ .map(KafkaSourceSplit::copy)
+ .collect(
+ Collectors.toMap(
+
KafkaSourceSplit::getTopicPartition,
+ split -> split));
+ restoringSplits.addAll(restoredSplits.keySet());
+ pendingSplit.putAll(restoredSplits);
+ return;
+ }
Map<TopicPartition, ? extends KafkaSourceSplit> nextSplit =
convertToNextSplit(splits);
// remove them from the assignedSplit, so we can reassign them
nextSplit.keySet().forEach(assignedSplit::remove);
@@ -282,15 +295,16 @@ public class KafkaSourceSplitEnumerator
.filter(Objects::nonNull)
.collect(Collectors.toList()),
OffsetSpec.latest());
- splits.forEach(
- split -> {
- split.setStartOffset(split.getEndOffset() + 1);
- split.setEndOffset(
- isStreamingMode
- ? Long.MAX_VALUE
- :
latestOffsets.get(split.getTopicPartition()));
- });
return splits.stream()
+ .map(KafkaSourceSplit::copy)
+ .peek(
+ split -> {
+ split.setStartOffset(split.getEndOffset() + 1);
+ split.setEndOffset(
+ isStreamingMode
+ ? Long.MAX_VALUE
+ :
latestOffsets.get(split.getTopicPartition()));
+ })
.collect(Collectors.toMap(KafkaSourceSplit::getTopicPartition, split -> split));
} catch (Exception e) {
throw new KafkaConnectorException(
diff --git
a/seatunnel-connectors-v2/connector-kafka/src/test/java/org/apache/kafka/clients/admin/KafkaSourceSplitEnumeratorTest.java
b/seatunnel-connectors-v2/connector-kafka/src/test/java/org/apache/kafka/clients/admin/KafkaSourceSplitEnumeratorTest.java
index a9f38f613d..7e9111552d 100644
---
a/seatunnel-connectors-v2/connector-kafka/src/test/java/org/apache/kafka/clients/admin/KafkaSourceSplitEnumeratorTest.java
+++
b/seatunnel-connectors-v2/connector-kafka/src/test/java/org/apache/kafka/clients/admin/KafkaSourceSplitEnumeratorTest.java
@@ -17,9 +17,10 @@
package org.apache.kafka.clients.admin;
-import org.apache.seatunnel.shade.com.google.common.collect.Lists;
-
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.connectors.seatunnel.kafka.config.StartMode;
+import org.apache.seatunnel.connectors.seatunnel.kafka.source.ConsumerMetadata;
import
org.apache.seatunnel.connectors.seatunnel.kafka.source.KafkaSourceConfig;
import org.apache.seatunnel.connectors.seatunnel.kafka.source.KafkaSourceSplit;
import
org.apache.seatunnel.connectors.seatunnel.kafka.source.KafkaSourceSplitEnumerator;
@@ -34,6 +35,9 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
@@ -52,28 +56,34 @@ class KafkaSourceSplitEnumeratorTest {
@BeforeEach
void init() {
+
Mockito.when(kafkaSourceConfig.getMapMetadata()).thenReturn(Collections.emptyMap());
+
Mockito.when(kafkaSourceConfig.isIgnoreNoLeaderPartition()).thenReturn(false);
Mockito.when(adminClient.listOffsets(Mockito.any(java.util.Map.class)))
- .thenReturn(
- new ListOffsetsResult(
- new HashMap<
- TopicPartition,
-
KafkaFuture<ListOffsetsResult.ListOffsetsResultInfo>>() {
- {
- put(
- partition0,
- KafkaFuture.completedFuture(
- new
ListOffsetsResult.ListOffsetsResultInfo(
- 0, 0,
Optional.of(0))));
- put(
- partition2,
- KafkaFuture.completedFuture(
- new
ListOffsetsResult.ListOffsetsResultInfo(
- 0, 0,
Optional.of(0))));
- }
- }));
+ .thenAnswer(
+ invocation -> {
+ Map<TopicPartition, OffsetSpec> requestedOffsets =
+ invocation.getArgument(0);
+ Map<
+ TopicPartition,
+
KafkaFuture<ListOffsetsResult.ListOffsetsResultInfo>>
+ offsets = new HashMap<>();
+ requestedOffsets
+ .keySet()
+ .forEach(
+ tp ->
+ offsets.put(
+ tp,
+
KafkaFuture.completedFuture(
+ new
ListOffsetsResult
+
.ListOffsetsResultInfo(
+ 0,
+ 0,
+
Optional.of(0)))));
+ return new ListOffsetsResult(offsets);
+ });
- List<TopicPartitionInfo> mockTopicPartition = Lists.newArrayList();
+ List<TopicPartitionInfo> mockTopicPartition = new ArrayList<>();
TopicPartitionInfo topicPartitionWithLeader =
new TopicPartitionInfo(
0,
@@ -102,27 +112,24 @@ class KafkaSourceSplitEnumeratorTest {
}
@Test
- void addSplitsBack() {
- // test
- Map<TopicPartition, KafkaSourceSplit> assignedSplit =
- new HashMap<TopicPartition, KafkaSourceSplit>() {
- {
- put(partition0, new KafkaSourceSplit(null,
partition0));
- }
- };
+ void addSplitsBackShouldPreserveCheckpointOffsetsDuringRestore() {
+ Map<TopicPartition, KafkaSourceSplit> assignedSplit = new HashMap<>();
Map<TopicPartition, KafkaSourceSplit> pendingSplit = new HashMap<>();
- List<KafkaSourceSplit> splits = Arrays.asList(new
KafkaSourceSplit(null, partition0));
+ List<KafkaSourceSplit> splits =
+ Collections.singletonList(
+ new KafkaSourceSplit(TablePath.DEFAULT, partition0,
123L, Long.MAX_VALUE));
KafkaSourceSplitEnumerator enumerator =
- new KafkaSourceSplitEnumerator(adminClient, null,
pendingSplit, assignedSplit);
+ new KafkaSourceSplitEnumerator(
+ adminClient, kafkaSourceConfig, pendingSplit,
assignedSplit, true, true);
enumerator.addSplitsBack(splits, 1);
- Assertions.assertTrue(pendingSplit.size() == splits.size());
- Assertions.assertNull(assignedSplit.get(partition0));
- Assertions.assertTrue(pendingSplit.get(partition0).getEndOffset() ==
0);
+ Assertions.assertEquals(splits.size(), pendingSplit.size());
+ Assertions.assertTrue(assignedSplit.isEmpty());
+ Assertions.assertEquals(123L,
pendingSplit.get(partition0).getStartOffset());
+ Assertions.assertEquals(Long.MAX_VALUE,
pendingSplit.get(partition0).getEndOffset());
}
@Test
- void addStreamingSplitsBack() {
- // test
+ void addSplitsBackShouldAdvanceOffsetsAfterInitialization() throws
Exception {
Map<TopicPartition, KafkaSourceSplit> assignedSplit =
new HashMap<TopicPartition, KafkaSourceSplit>() {
{
@@ -131,15 +138,43 @@ class KafkaSourceSplitEnumeratorTest {
};
Map<TopicPartition, KafkaSourceSplit> pendingSplit = new HashMap<>();
List<KafkaSourceSplit> splits =
- Collections.singletonList(new KafkaSourceSplit(null,
partition0));
+ Collections.singletonList(new KafkaSourceSplit(null,
partition0, 10L, 15L));
KafkaSourceSplitEnumerator enumerator =
new KafkaSourceSplitEnumerator(adminClient, pendingSplit,
assignedSplit, true);
+ setInitialized(enumerator, true);
enumerator.addSplitsBack(splits, 1);
Assertions.assertEquals(pendingSplit.size(), splits.size());
Assertions.assertNull(assignedSplit.get(partition0));
+ Assertions.assertEquals(16L,
pendingSplit.get(partition0).getStartOffset());
Assertions.assertTrue(pendingSplit.get(partition0).getEndOffset() ==
Long.MAX_VALUE);
}
+ @Test
+ void setPartitionStartOffsetShouldNotOverrideRestoredSplits() throws
Exception {
+ ConsumerMetadata metadata = new ConsumerMetadata();
+ metadata.setTopic("test");
+ metadata.setStartMode(StartMode.EARLIEST);
+ Mockito.when(kafkaSourceConfig.getMapMetadata())
+ .thenReturn(Collections.singletonMap(TablePath.DEFAULT,
metadata));
+
+ Map<TopicPartition, KafkaSourceSplit> assignedSplit = new HashMap<>();
+ Map<TopicPartition, KafkaSourceSplit> pendingSplit = new HashMap<>();
+ KafkaSourceSplitEnumerator enumerator =
+ new KafkaSourceSplitEnumerator(
+ adminClient, kafkaSourceConfig, pendingSplit,
assignedSplit, true, true);
+
+ enumerator.addSplitsBack(
+ Collections.singletonList(
+ new KafkaSourceSplit(TablePath.DEFAULT, partition0,
123L, Long.MAX_VALUE)),
+ 0);
+ enumerator.fetchPendingPartitionSplit();
+ invokeSetPartitionStartOffset(enumerator);
+
+ Assertions.assertEquals(123L,
pendingSplit.get(partition0).getStartOffset());
+ Assertions.assertEquals(Long.MAX_VALUE,
pendingSplit.get(partition0).getEndOffset());
+ Assertions.assertEquals(0L,
pendingSplit.get(partition2).getStartOffset());
+ }
+
@Test
void addStreamingSplits() throws ExecutionException, InterruptedException {
// test
@@ -214,7 +249,7 @@ class KafkaSourceSplitEnumeratorTest {
// Test partition restoration: simulate partition2 getting a leader
// Create new mock topic partition list with partition2 now having a
leader
- List<TopicPartitionInfo> restoredMockTopicPartition =
Lists.newArrayList();
+ List<TopicPartitionInfo> restoredMockTopicPartition = new
ArrayList<>();
TopicPartitionInfo topicPartitionWithLeader =
new TopicPartitionInfo(
0,
@@ -254,4 +289,19 @@ class KafkaSourceSplitEnumeratorTest {
Assertions.assertNotNull(pendingSplit.get(partition0));
Assertions.assertNotNull(pendingSplit.get(partition2));
}
+
+ private void setInitialized(KafkaSourceSplitEnumerator enumerator, boolean
initialized)
+ throws Exception {
+ Field initializedField =
KafkaSourceSplitEnumerator.class.getDeclaredField("initialized");
+ initializedField.setAccessible(true);
+ initializedField.set(enumerator, initialized);
+ }
+
+ private void invokeSetPartitionStartOffset(KafkaSourceSplitEnumerator
enumerator)
+ throws Exception {
+ Method method =
+
KafkaSourceSplitEnumerator.class.getDeclaredMethod("setPartitionStartOffset");
+ method.setAccessible(true);
+ method.invoke(enumerator);
+ }
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java
index 7a5a188810..704febe57d 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java
@@ -61,6 +61,7 @@ import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
@@ -170,12 +171,32 @@ public class KafkaIT extends TestSuiteBase implements
TestResource {
new NewTopic("test_topic_source_skip_partition", 2,
(short) 1);
testTopicSourceSkipPartition.configs(Collections.singletonMap("retention.ms",
"-1"));
+ NewTopic kafkaExactlyOnceSource =
+ new NewTopic("kafka_topic_exactly_once_1", 1, (short) 1);
+
kafkaExactlyOnceSource.configs(Collections.singletonMap("retention.ms", "-1"));
+
+ NewTopic kafkaExactlyOnceSink =
+ new NewTopic("kafka_topic_exactly_once_2", 1, (short) 1);
+
kafkaExactlyOnceSink.configs(Collections.singletonMap("retention.ms", "-1"));
+
+ NewTopic kafkaExactlyOnceBatchSource =
+ new NewTopic("kafka_topic_exactly_batch_once_1", 1,
(short) 1);
+
kafkaExactlyOnceBatchSource.configs(Collections.singletonMap("retention.ms",
"-1"));
+
+ NewTopic kafkaExactlyOnceBatchSink =
+ new NewTopic("kafka_topic_exactly_batch_once_2", 1,
(short) 1);
+
kafkaExactlyOnceBatchSink.configs(Collections.singletonMap("retention.ms",
"-1"));
+
List<NewTopic> topics =
Arrays.asList(
testTopicSource,
testTopicNativeSource,
testTopicSourceWithTimestamp,
- testTopicSourceSkipPartition);
+ testTopicSourceSkipPartition,
+ kafkaExactlyOnceSource,
+ kafkaExactlyOnceSink,
+ kafkaExactlyOnceBatchSource,
+ kafkaExactlyOnceBatchSink);
adminClient.createTopics(topics);
}
@@ -722,6 +743,11 @@ public class KafkaIT extends TestSuiteBase implements
TestResource {
srcEndAfterAll == srcEndBeforeStart + 25,
"Final end offset should advance by at least 25");
+ // Deliberately move the consumer-group offset past the savepoint
position.
+ // Restore must still use the checkpointed split offsets instead of
the external group
+ // offset, otherwise the 15 post-savepoint records will be skipped.
+ commitOffset(sourceTopic, "test_restore_group", srcEndAfterAll);
+
// Restore the job from the savepoint asynchronously.
CompletableFuture.runAsync(
() -> {
@@ -1080,6 +1106,25 @@ public class KafkaIT extends TestSuiteBase implements
TestResource {
}
}
+ public void commitOffset(String topic, String groupId, long offset) {
+ Properties props = new Properties();
+ props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
kafkaContainer.getBootstrapServers());
+ props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
+ props.put(
+ ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
+ ByteArrayDeserializer.class.getName());
+ props.put(
+ ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
+ ByteArrayDeserializer.class.getName());
+ props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
+
+ TopicPartition tp0 = new TopicPartition(topic, 0);
+ try (KafkaConsumer<byte[], byte[]> consumer = new
KafkaConsumer<>(props)) {
+ consumer.assign(Collections.singletonList(tp0));
+ consumer.commitSync(Collections.singletonMap(tp0, new
OffsetAndMetadata(offset)));
+ }
+ }
+
@DisabledOnContainer(
value = {},
type = {EngineType.SPARK, EngineType.FLINK},
@@ -1504,6 +1549,7 @@ public class KafkaIT extends TestSuiteBase implements
TestResource {
String consumerTopic = "kafka_topic_exactly_once_2";
String sourceData = "Seatunnel Exactly Once Example";
final String jobId = "18696753645413";
+ long sinkStartOffset = endOffsetOnP0(consumerTopic);
for (int i = 0; i < 10; i++) {
ProducerRecord<byte[], byte[]> record =
new ProducerRecord<>(producerTopic, null,
sourceData.getBytes());
@@ -1528,12 +1574,15 @@ public class KafkaIT extends TestSuiteBase implements
TestResource {
.await()
.atMost(5, MINUTES)
.untilAsserted(
- () -> Assertions.assertTrue(checkData(consumerTopic,
10, sourceData)));
+ () ->
+ Assertions.assertTrue(
+ checkData(consumerTopic,
sinkStartOffset, 10, sourceData)));
// Savepoint the running job (so restore should continue from this
position).
container.savepointJob(jobId);
String sourceDataRestore = "Seatunnel Exactly Once Example Restore";
+ long restoreStartOffset = endOffsetOnP0(consumerTopic);
for (int i = 0; i < 10; i++) {
ProducerRecord<byte[], byte[]> record =
@@ -1559,7 +1608,11 @@ public class KafkaIT extends TestSuiteBase implements
TestResource {
.untilAsserted(
() ->
Assertions.assertTrue(
- checkData(consumerTopic, 10,
sourceDataRestore)));
+ checkData(
+ consumerTopic,
+ restoreStartOffset,
+ 10,
+ sourceDataRestore)));
}
@TestTemplate
@@ -1571,6 +1624,7 @@ public class KafkaIT extends TestSuiteBase implements
TestResource {
String producerTopic = "kafka_topic_exactly_once_1";
String consumerTopic = "kafka_topic_exactly_once_2";
String sourceData = "Seatunnel Exactly Once Example";
+ long sinkStartOffset = endOffsetOnP0(consumerTopic);
for (int i = 0; i < 10; i++) {
ProducerRecord<byte[], byte[]> record =
new ProducerRecord<>(producerTopic, null,
sourceData.getBytes());
@@ -1595,7 +1649,9 @@ public class KafkaIT extends TestSuiteBase implements
TestResource {
.await()
.atMost(5, MINUTES)
.untilAsserted(
- () -> Assertions.assertTrue(checkData(consumerTopic,
10, sourceData)));
+ () ->
+ Assertions.assertTrue(
+ checkData(consumerTopic,
sinkStartOffset, 10, sourceData)));
}
@TestTemplate
@@ -1604,6 +1660,7 @@ public class KafkaIT extends TestSuiteBase implements
TestResource {
String producerTopic = "kafka_topic_exactly_batch_once_1";
String consumerTopic = "kafka_topic_exactly_batch_once_2";
String sourceData = "Seatunnel Exactly Once Example";
+ long sinkStartOffset = endOffsetOnP0(consumerTopic);
for (int i = 0; i < 10; i++) {
ProducerRecord<byte[], byte[]> record =
new ProducerRecord<>(producerTopic, null,
sourceData.getBytes());
@@ -1622,20 +1679,21 @@ public class KafkaIT extends TestSuiteBase implements
TestResource {
container.executeJob("/kafka/kafka_to_kafka_exactly_once_batch.conf");
Assertions.assertEquals(0, execResult.getExitCode());
// wait for data written to kafka
- Assertions.assertTrue(checkData(consumerTopic, endOffset,
sourceData));
+ Assertions.assertTrue(checkData(consumerTopic, sinkStartOffset,
endOffset, sourceData));
} finally {
closeKafkaConsumer(consumer);
}
}
// Compare the values of data fields obtained from consumers
- private boolean checkData(String topicName, long endOffset, String data) {
- List<String> listData = getKafkaConsumerListData(topicName, endOffset);
- if (listData.isEmpty() || listData.size() != endOffset) {
+ private boolean checkData(String topicName, long startOffset, long
expectedCount, String data) {
+ List<String> listData = getKafkaConsumerListData(topicName,
startOffset, expectedCount);
+ if (listData.isEmpty() || listData.size() != expectedCount) {
log.error(
- "testKafkaToKafkaExactlyOnce get data size is not
expect,get consumer data size {},get end offset {}",
+ "testKafkaToKafkaExactlyOnce get data size is not
expect,get consumer data size {},start offset {},expected count {}",
listData.size(),
- endOffset);
+ startOffset,
+ expectedCount);
return false;
}
for (String value : listData) {
@@ -2152,28 +2210,38 @@ public class KafkaIT extends TestSuiteBase implements
TestResource {
}
}
- private List<String> getKafkaConsumerListData(String topicName, long
endOffset) {
+ private List<String> getKafkaConsumerListData(
+ String topicName, long startOffset, long expectedCount) {
KafkaConsumer<String, String> consumer = null;
try {
List<String> data = new ArrayList<>();
- consumer = new KafkaConsumer<>(kafkaConsumerConfig());
- consumer.subscribe(Arrays.asList(topicName));
- Long lastProcessedOffset = -1L;
+ consumer = new KafkaConsumer<>(kafkaManualConsumerConfig());
+ TopicPartition topicPartition = new TopicPartition(topicName, 0);
+ consumer.assign(Collections.singletonList(topicPartition));
+ consumer.seek(topicPartition, startOffset);
+ long targetOffsetExclusive = startOffset + expectedCount;
+ Long lastProcessedOffset = startOffset - 1;
do {
ConsumerRecords<String, String> records =
consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
- if (lastProcessedOffset < record.offset()) {
+ if (record.offset() >= startOffset && record.offset() <
targetOffsetExclusive) {
data.add(record.value());
}
lastProcessedOffset = record.offset();
}
- } while (lastProcessedOffset < endOffset - 1);
+ } while (lastProcessedOffset < targetOffsetExclusive - 1);
return data;
} finally {
closeKafkaConsumer(consumer);
}
}
+ private Properties kafkaManualConsumerConfig() {
+ Properties props = kafkaConsumerConfig();
+ props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
+ return props;
+ }
+
private void closeKafkaConsumer(KafkaConsumer<String, String> consumer) {
if (consumer != null) {
try {