This is an automated email from the ASF dual-hosted git repository.

gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new e92e7b728 [Connector-V2] [Kafka] Fix Kafka Streaming problem (#2759)
e92e7b728 is described below

commit e92e7b728388e799e7fbe860ffc4f523a2f63b66
Author: Hisoka <[email protected]>
AuthorDate: Mon Sep 19 14:14:33 2022 +0800

    [Connector-V2] [Kafka] Fix Kafka Streaming problem (#2759)
    
    * [Connector-V2] [Kafka] Fix Kafka Streaming problem
---
 .../kafka/source/KafkaConsumerThread.java          |  80 ++++++++++
 .../seatunnel/kafka/source/KafkaSourceReader.java  | 168 ++++++++++++---------
 .../seatunnel/kafka/source/KafkaSourceSplit.java   |  21 ++-
 .../kafka/source/KafkaSourceSplitEnumerator.java   |  61 +++++---
 4 files changed, 242 insertions(+), 88 deletions(-)

diff --git 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaConsumerThread.java
 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaConsumerThread.java
new file mode 100644
index 000000000..618854a38
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaConsumerThread.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.kafka.source;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+
+import java.util.Properties;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+
+public class KafkaConsumerThread implements Runnable {
+
+    private final KafkaConsumer<byte[], byte[]> consumer;
+    private static final String CLIENT_ID_PREFIX = "seatunnel";
+    private final ConsumerMetadata metadata;
+
+    private final LinkedBlockingQueue<Consumer<KafkaConsumer<byte[], byte[]>>> 
tasks;
+
+    public KafkaConsumerThread(ConsumerMetadata metadata) {
+        this.metadata = metadata;
+        this.tasks = new LinkedBlockingQueue<>();
+        this.consumer = initConsumer(this.metadata.getBootstrapServers(), 
this.metadata.getConsumerGroup(),
+            this.metadata.getProperties(), 
!this.metadata.isCommitOnCheckpoint());
+    }
+
+    @Override
+    public void run() {
+        while (!Thread.currentThread().isInterrupted()) {
+            try {
+                Consumer<KafkaConsumer<byte[], byte[]>> task = tasks.poll(1, 
TimeUnit.SECONDS);
+                if (task != null) {
+                    task.accept(consumer);
+                }
+            } catch (InterruptedException e) {
+                throw new RuntimeException(e);
+            }
+        }
+    }
+
+    public LinkedBlockingQueue<Consumer<KafkaConsumer<byte[], byte[]>>> 
getTasks() {
+        return tasks;
+    }
+
+    private KafkaConsumer<byte[], byte[]> initConsumer(String bootstrapServer, 
String consumerGroup,
+                                                       Properties properties, 
boolean autoCommit) {
+        Properties props = new Properties();
+        properties.forEach((key, value) -> 
props.setProperty(String.valueOf(key), String.valueOf(value)));
+        props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, consumerGroup);
+        props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
bootstrapServer);
+        props.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, CLIENT_ID_PREFIX + 
"-enumerator-consumer-" + this.hashCode());
+
+        props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
+            ByteArrayDeserializer.class.getName());
+        props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
+            ByteArrayDeserializer.class.getName());
+        props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, 
String.valueOf(autoCommit));
+
+        // Disable auto create topics feature
+        props.setProperty(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG, 
"false");
+        return new KafkaConsumer<>(props);
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceReader.java
 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceReader.java
index 5a53aae3b..dc9c62093 100644
--- 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceReader.java
+++ 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceReader.java
@@ -24,43 +24,47 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 
 import com.google.common.collect.Maps;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
+import com.google.common.collect.Sets;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.serialization.ByteArrayDeserializer;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.time.Duration;
-import java.util.ArrayList;
-import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Properties;
 import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.stream.Collectors;
 
+@Slf4j
 public class KafkaSourceReader implements SourceReader<SeaTunnelRow, 
KafkaSourceSplit> {
 
     private static final long THREAD_WAIT_TIME = 500L;
     private static final long POLL_TIMEOUT = 10000L;
-    private static final String CLIENT_ID_PREFIX = "seatunnel";
+
     private static final Logger LOGGER = 
LoggerFactory.getLogger(KafkaSourceReader.class);
 
     private final SourceReader.Context context;
-    private KafkaConsumer<byte[], byte[]> consumer;
     private final ConsumerMetadata metadata;
     private final Set<KafkaSourceSplit> sourceSplits;
-    private final Map<TopicPartition, Long> endOffset;
+    private final Map<Long, Map<TopicPartition, Long>> checkpointOffsetMap;
+    private final ConcurrentMap<TopicPartition, KafkaSourceSplit> 
sourceSplitMap;
+    private final Map<TopicPartition, KafkaConsumerThread> consumerThreadMap;
+    private final ExecutorService executorService;
     // TODO support user custom type
     private SeaTunnelRowType typeInfo;
-    private volatile boolean isRunning;
 
     KafkaSourceReader(ConsumerMetadata metadata, SeaTunnelRowType typeInfo,
                       SourceReader.Context context) {
@@ -68,68 +72,100 @@ public class KafkaSourceReader implements 
SourceReader<SeaTunnelRow, KafkaSource
         this.context = context;
         this.typeInfo = typeInfo;
         this.sourceSplits = new HashSet<>();
-        this.endOffset = new HashMap<>();
+        this.consumerThreadMap = new ConcurrentHashMap<>();
+        this.sourceSplitMap = new ConcurrentHashMap<>();
+        this.checkpointOffsetMap = new ConcurrentHashMap<>();
+        this.executorService = Executors.newCachedThreadPool(
+            r -> new Thread(r, "Kafka Source Data Consumer"));
     }
 
     @Override
     public void open() {
-        this.consumer = initConsumer(this.metadata.getBootstrapServers(), 
this.metadata.getConsumerGroup(),
-                this.metadata.getProperties(), 
!this.metadata.isCommitOnCheckpoint());
-        isRunning = true;
     }
 
     @Override
     public void close() throws IOException {
-        isRunning = false;
-        if (consumer != null) {
-            consumer.close();
+        if (executorService != null) {
+            executorService.shutdownNow();
         }
     }
 
     @Override
     public void pollNext(Collector<SeaTunnelRow> output) throws Exception {
-        if (sourceSplits.isEmpty() || sourceSplits.size() != 
this.endOffset.size()) {
+        if (sourceSplitMap.isEmpty()) {
             Thread.sleep(THREAD_WAIT_TIME);
             return;
         }
-        Set<TopicPartition> partitions = convertToPartition(sourceSplits);
-        StringDeserializer stringDeserializer = new StringDeserializer();
-        
stringDeserializer.configure(Maps.fromProperties(this.metadata.getProperties()),
 false);
-        consumer.assign(partitions);
-        while (isRunning) {
-            ConsumerRecords<byte[], byte[]> records = 
consumer.poll(Duration.ofMillis(POLL_TIMEOUT));
-            for (TopicPartition partition : partitions) {
-                for (ConsumerRecord<byte[], byte[]> record : 
records.records(partition)) {
-
-                    String v = 
stringDeserializer.deserialize(partition.topic(), record.value());
-                    String t = partition.topic();
-                    output.collect(new SeaTunnelRow(new Object[]{t, v}));
-
-                    if (Boundedness.BOUNDED.equals(context.getBoundedness()) &&
-                            record.offset() >= endOffset.get(partition)) {
-                        break;
+        sourceSplits.forEach(sourceSplit -> 
consumerThreadMap.computeIfAbsent(sourceSplit.getTopicPartition(), s -> {
+            KafkaConsumerThread thread = new KafkaConsumerThread(metadata);
+            executorService.submit(thread);
+            return thread;
+        }));
+        sourceSplits.forEach(sourceSplit -> {
+            CompletableFuture<Void> completableFuture = new 
CompletableFuture<>();
+            try {
+                
consumerThreadMap.get(sourceSplit.getTopicPartition()).getTasks().put(consumer 
-> {
+                    try {
+                        Set<TopicPartition> partitions = 
Sets.newHashSet(sourceSplit.getTopicPartition());
+                        StringDeserializer stringDeserializer = new 
StringDeserializer();
+                        
stringDeserializer.configure(Maps.fromProperties(this.metadata.getProperties()),
 false);
+                        consumer.assign(partitions);
+                        if (sourceSplit.getStartOffset() >= 0) {
+                            consumer.seek(sourceSplit.getTopicPartition(), 
sourceSplit.getStartOffset());
+                        }
+                        ConsumerRecords<byte[], byte[]> records = 
consumer.poll(Duration.ofMillis(POLL_TIMEOUT));
+                        for (TopicPartition partition : partitions) {
+                            List<ConsumerRecord<byte[], byte[]>> recordList = 
records.records(partition);
+                            for (ConsumerRecord<byte[], byte[]> record : 
recordList) {
+
+                                String v = 
stringDeserializer.deserialize(partition.topic(), record.value());
+                                String t = partition.topic();
+                                output.collect(new SeaTunnelRow(new 
Object[]{t, v}));
+
+                                if 
(Boundedness.BOUNDED.equals(context.getBoundedness()) &&
+                                    record.offset() >= 
sourceSplit.getEndOffset()) {
+                                    break;
+                                }
+                            }
+                            long lastOffset = -1;
+                            if (!recordList.isEmpty()) {
+                                lastOffset = recordList.get(recordList.size() 
- 1).offset();
+                                sourceSplit.setStartOffset(lastOffset + 1);
+                            }
+
+                            if (lastOffset >= sourceSplit.getEndOffset()) {
+                                sourceSplit.setEndOffset(lastOffset);
+                            }
+                        }
+                    } catch (Exception e) {
+                        completableFuture.completeExceptionally(e);
                     }
-                }
+                    completableFuture.complete(null);
+                });
+            } catch (InterruptedException e) {
+                throw new RuntimeException(e);
             }
+            completableFuture.join();
+        });
 
-            if (Boundedness.BOUNDED.equals(context.getBoundedness())) {
-                // signal to the source that we have reached the end of the 
data.
-                context.signalNoMoreElement();
-                break;
-            }
+        if (Boundedness.BOUNDED.equals(context.getBoundedness())) {
+            // signal to the source that we have reached the end of the data.
+            context.signalNoMoreElement();
         }
     }
 
     @Override
-    public List<KafkaSourceSplit> snapshotState(long checkpointId) throws 
Exception {
-        return new ArrayList<>(sourceSplits);
+    public List<KafkaSourceSplit> snapshotState(long checkpointId) {
+        checkpointOffsetMap.put(checkpointId, sourceSplits.stream()
+            .collect(Collectors.toMap(KafkaSourceSplit::getTopicPartition, 
KafkaSourceSplit::getEndOffset)));
+        return 
sourceSplits.stream().map(KafkaSourceSplit::copy).collect(Collectors.toList());
     }
 
     @Override
     public void addSplits(List<KafkaSourceSplit> splits) {
         sourceSplits.addAll(splits);
-        sourceSplits.forEach(partition -> {
-            endOffset.put(partition.getTopicPartition(), 
partition.getEndOffset());
+        sourceSplits.forEach(split -> {
+            sourceSplitMap.put(split.getTopicPartition(), split);
         });
     }
 
@@ -139,33 +175,25 @@ public class KafkaSourceReader implements 
SourceReader<SeaTunnelRow, KafkaSource
     }
 
     @Override
-    public void notifyCheckpointComplete(long checkpointId) throws Exception {
-        if (this.metadata.isCommitOnCheckpoint()) {
-            consumer.commitSync();
+    public void notifyCheckpointComplete(long checkpointId) {
+        if (!checkpointOffsetMap.containsKey(checkpointId)) {
+            log.warn("checkpoint {} do not exist or have already been 
committed.", checkpointId);
+        } else {
+            checkpointOffsetMap.remove(checkpointId).forEach((topicPartition, 
offset) -> {
+                try {
+                    consumerThreadMap.get(topicPartition)
+                        .getTasks().put(consumer -> {
+                            if (this.metadata.isCommitOnCheckpoint()) {
+                                Map<TopicPartition, OffsetAndMetadata> offsets 
= new HashMap<>();
+                                offsets.put(topicPartition, new 
OffsetAndMetadata(offset));
+                                consumer.commitSync(offsets);
+                            }
+                        });
+                } catch (InterruptedException e) {
+                    log.error("commit offset to kafka failed", e);
+                }
+            });
         }
     }
 
-    private KafkaConsumer<byte[], byte[]> initConsumer(String bootstrapServer, 
String consumerGroup,
-                                                       Properties properties, 
boolean autoCommit) {
-        Properties props = new Properties();
-        properties.forEach((key, value) -> 
props.setProperty(String.valueOf(key), String.valueOf(value)));
-        props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, consumerGroup);
-        props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
bootstrapServer);
-        props.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, CLIENT_ID_PREFIX + 
"-enumerator-consumer-" + this.hashCode());
-
-        props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
-                ByteArrayDeserializer.class.getName());
-        props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
-                ByteArrayDeserializer.class.getName());
-        props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, 
String.valueOf(autoCommit));
-
-        // Disable auto create topics feature
-        props.setProperty(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG, 
"false");
-        return new KafkaConsumer<>(props);
-    }
-
-    private Set<TopicPartition> 
convertToPartition(Collection<KafkaSourceSplit> sourceSplits) {
-        return 
sourceSplits.stream().map(KafkaSourceSplit::getTopicPartition).collect(Collectors.toSet());
-    }
-
 }
diff --git 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplit.java
 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplit.java
index be2348e96..67e66ab86 100644
--- 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplit.java
+++ 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplit.java
@@ -26,12 +26,27 @@ import java.util.Objects;
 public class KafkaSourceSplit implements SourceSplit {
 
     private TopicPartition topicPartition;
+    private long startOffset = -1L;
     private long endOffset = -1L;
 
     public KafkaSourceSplit(TopicPartition topicPartition) {
         this.topicPartition = topicPartition;
     }
 
+    public KafkaSourceSplit(TopicPartition topicPartition, long startOffset, 
long endOffset) {
+        this.topicPartition = topicPartition;
+        this.startOffset = startOffset;
+        this.endOffset = endOffset;
+    }
+
+    public long getStartOffset() {
+        return startOffset;
+    }
+
+    public void setStartOffset(long startOffset) {
+        this.startOffset = startOffset;
+    }
+
     public long getEndOffset() {
         return endOffset;
     }
@@ -67,6 +82,10 @@ public class KafkaSourceSplit implements SourceSplit {
 
     @Override
     public int hashCode() {
-        return Objects.hash(topicPartition, endOffset);
+        return Objects.hash(topicPartition);
+    }
+
+    public KafkaSourceSplit copy() {
+        return new KafkaSourceSplit(this.topicPartition, 
this.getStartOffset(), this.getEndOffset());
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplitEnumerator.java
 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplitEnumerator.java
index 52caad1fe..0852fe8fd 100644
--- 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplitEnumerator.java
+++ 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplitEnumerator.java
@@ -23,6 +23,7 @@ import 
org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaSourceState;
 
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.ListOffsetsResult;
 import org.apache.kafka.clients.admin.OffsetSpec;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.common.TopicPartition;
@@ -51,29 +52,32 @@ public class KafkaSourceSplitEnumerator implements 
SourceSplitEnumerator<KafkaSo
     private AdminClient adminClient;
 
     private Set<KafkaSourceSplit> pendingSplit;
-    private Set<KafkaSourceSplit> assignedSplit;
+    private final Set<KafkaSourceSplit> assignedSplit;
 
     KafkaSourceSplitEnumerator(ConsumerMetadata metadata, 
Context<KafkaSourceSplit> context) {
         this.metadata = metadata;
         this.context = context;
+        this.assignedSplit = new HashSet<>();
+        this.pendingSplit = new HashSet<>();
     }
 
     KafkaSourceSplitEnumerator(ConsumerMetadata metadata, 
Context<KafkaSourceSplit> context,
                                KafkaSourceState sourceState) {
         this(metadata, context);
-        this.assignedSplit = sourceState.getAssignedSplit();
     }
 
     @Override
     public void open() {
         this.adminClient = initAdminClient(this.metadata.getProperties());
-        this.assignedSplit = new HashSet<>();
-        this.pendingSplit = new HashSet<>();
     }
 
     @Override
     public void run() throws ExecutionException, InterruptedException {
-        pendingSplit = getTopicInfo();
+        getTopicInfo().forEach(split -> {
+            if (!assignedSplit.contains(split)) {
+                pendingSplit.add(split);
+            }
+        });
         assignSplit();
     }
 
@@ -87,11 +91,25 @@ public class KafkaSourceSplitEnumerator implements 
SourceSplitEnumerator<KafkaSo
     @Override
     public void addSplitsBack(List<KafkaSourceSplit> splits, int subtaskId) {
         if (!splits.isEmpty()) {
-            pendingSplit.addAll(splits);
+            pendingSplit.addAll(convertToNextSplit(splits));
             assignSplit();
         }
     }
 
+    private Collection<? extends KafkaSourceSplit> 
convertToNextSplit(List<KafkaSourceSplit> splits) {
+        try {
+            Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> 
listOffsets =
+                
getKafkaPartitionLatestOffset(splits.stream().map(KafkaSourceSplit::getTopicPartition).collect(Collectors.toList()));
+            splits.forEach(split -> {
+                split.setStartOffset(split.getEndOffset() + 1);
+                
split.setEndOffset(listOffsets.get(split.getTopicPartition()).offset());
+            });
+            return splits;
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
     @Override
     public int currentUnassignedSplitSize() {
         return pendingSplit.size();
@@ -131,30 +149,39 @@ public class KafkaSourceSplitEnumerator implements 
SourceSplitEnumerator<KafkaSo
         if (this.metadata.isPattern()) {
             Pattern pattern = Pattern.compile(this.metadata.getTopic());
             topics = this.adminClient.listTopics().names().get().stream()
-                    .filter(t -> 
pattern.matcher(t).matches()).collect(Collectors.toSet());
+                .filter(t -> 
pattern.matcher(t).matches()).collect(Collectors.toSet());
         } else {
             topics = Arrays.asList(this.metadata.getTopic().split(","));
         }
         log.info("Discovered topics: {}", topics);
 
         Collection<TopicPartition> partitions =
-                
adminClient.describeTopics(topics).all().get().values().stream().flatMap(t -> 
t.partitions().stream()
-                        .map(p -> new TopicPartition(t.name(), 
p.partition()))).collect(Collectors.toSet());
+            
adminClient.describeTopics(topics).all().get().values().stream().flatMap(t -> 
t.partitions().stream()
+                .map(p -> new TopicPartition(t.name(), 
p.partition()))).collect(Collectors.toSet());
+        return 
getKafkaPartitionLatestOffset(partitions).entrySet().stream().map(partition -> {
+            KafkaSourceSplit split = new KafkaSourceSplit(partition.getKey());
+            split.setEndOffset(partition.getValue().offset());
+            return split;
+        }).collect(Collectors.toSet());
+    }
+
+    private Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> 
getKafkaPartitionLatestOffset(Collection<TopicPartition> partitions) throws 
InterruptedException, ExecutionException {
         return 
adminClient.listOffsets(partitions.stream().collect(Collectors.toMap(p -> p, p 
-> OffsetSpec.latest())))
-                .all().get().entrySet().stream().map(partition -> {
-                    KafkaSourceSplit split = new 
KafkaSourceSplit(partition.getKey());
-                    split.setEndOffset(partition.getValue().offset());
-                    return split;
-                }).collect(Collectors.toSet());
+            .all().get();
     }
 
     private void assignSplit() {
         Map<Integer, List<KafkaSourceSplit>> readySplit = new 
HashMap<>(Common.COLLECTION_SIZE);
-        for (int taskID = 0;  taskID < context.currentParallelism(); taskID++) 
{
+        for (int taskID = 0; taskID < context.currentParallelism(); taskID++) {
             readySplit.computeIfAbsent(taskID, id -> new ArrayList<>());
         }
-        pendingSplit.forEach(s -> 
readySplit.get(getSplitOwner(s.getTopicPartition(), 
context.currentParallelism()))
-                .add(s));
+
+        pendingSplit.forEach(s -> {
+            if (!assignedSplit.contains(s)) {
+                readySplit.get(getSplitOwner(s.getTopicPartition(), 
context.currentParallelism()))
+                    .add(s);
+            }
+        });
 
         readySplit.forEach(context::assignSplit);
 

Reply via email to