This is an automated email from the ASF dual-hosted git repository.
gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new e92e7b728 [Connector-V2] [Kafka] Fix Kafka Streaming problem (#2759)
e92e7b728 is described below
commit e92e7b728388e799e7fbe860ffc4f523a2f63b66
Author: Hisoka <[email protected]>
AuthorDate: Mon Sep 19 14:14:33 2022 +0800
[Connector-V2] [Kafka] Fix Kafka Streaming problem (#2759)
* [Connector-V2] [Kafka] Fix Kafka Streaming problem
---
.../kafka/source/KafkaConsumerThread.java | 80 ++++++++++
.../seatunnel/kafka/source/KafkaSourceReader.java | 168 ++++++++++++---------
.../seatunnel/kafka/source/KafkaSourceSplit.java | 21 ++-
.../kafka/source/KafkaSourceSplitEnumerator.java | 61 +++++---
4 files changed, 242 insertions(+), 88 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaConsumerThread.java
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaConsumerThread.java
new file mode 100644
index 000000000..618854a38
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaConsumerThread.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.kafka.source;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+
+import java.util.Properties;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+
+public class KafkaConsumerThread implements Runnable {
+
+ private final KafkaConsumer<byte[], byte[]> consumer;
+ private static final String CLIENT_ID_PREFIX = "seatunnel";
+ private final ConsumerMetadata metadata;
+
+ private final LinkedBlockingQueue<Consumer<KafkaConsumer<byte[], byte[]>>>
tasks;
+
+ public KafkaConsumerThread(ConsumerMetadata metadata) {
+ this.metadata = metadata;
+ this.tasks = new LinkedBlockingQueue<>();
+ this.consumer = initConsumer(this.metadata.getBootstrapServers(),
this.metadata.getConsumerGroup(),
+ this.metadata.getProperties(),
!this.metadata.isCommitOnCheckpoint());
+ }
+
+ @Override
+ public void run() {
+ while (!Thread.currentThread().isInterrupted()) {
+ try {
+ Consumer<KafkaConsumer<byte[], byte[]>> task = tasks.poll(1,
TimeUnit.SECONDS);
+ if (task != null) {
+ task.accept(consumer);
+ }
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ public LinkedBlockingQueue<Consumer<KafkaConsumer<byte[], byte[]>>>
getTasks() {
+ return tasks;
+ }
+
+ private KafkaConsumer<byte[], byte[]> initConsumer(String bootstrapServer,
String consumerGroup,
+ Properties properties,
boolean autoCommit) {
+ Properties props = new Properties();
+ properties.forEach((key, value) ->
props.setProperty(String.valueOf(key), String.valueOf(value)));
+ props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, consumerGroup);
+ props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
bootstrapServer);
+ props.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, CLIENT_ID_PREFIX +
"-enumerator-consumer-" + this.hashCode());
+
+ props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
+ ByteArrayDeserializer.class.getName());
+ props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
+ ByteArrayDeserializer.class.getName());
+ props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
String.valueOf(autoCommit));
+
+ // Disable auto create topics feature
+ props.setProperty(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG,
"false");
+ return new KafkaConsumer<>(props);
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceReader.java
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceReader.java
index 5a53aae3b..dc9c62093 100644
---
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceReader.java
+++
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceReader.java
@@ -24,43 +24,47 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import com.google.common.collect.Maps;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
+import com.google.common.collect.Sets;
+import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.time.Duration;
-import java.util.ArrayList;
-import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
-import java.util.Properties;
import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.stream.Collectors;
+@Slf4j
public class KafkaSourceReader implements SourceReader<SeaTunnelRow,
KafkaSourceSplit> {
private static final long THREAD_WAIT_TIME = 500L;
private static final long POLL_TIMEOUT = 10000L;
- private static final String CLIENT_ID_PREFIX = "seatunnel";
+
private static final Logger LOGGER =
LoggerFactory.getLogger(KafkaSourceReader.class);
private final SourceReader.Context context;
- private KafkaConsumer<byte[], byte[]> consumer;
private final ConsumerMetadata metadata;
private final Set<KafkaSourceSplit> sourceSplits;
- private final Map<TopicPartition, Long> endOffset;
+ private final Map<Long, Map<TopicPartition, Long>> checkpointOffsetMap;
+ private final ConcurrentMap<TopicPartition, KafkaSourceSplit>
sourceSplitMap;
+ private final Map<TopicPartition, KafkaConsumerThread> consumerThreadMap;
+ private final ExecutorService executorService;
// TODO support user custom type
private SeaTunnelRowType typeInfo;
- private volatile boolean isRunning;
KafkaSourceReader(ConsumerMetadata metadata, SeaTunnelRowType typeInfo,
SourceReader.Context context) {
@@ -68,68 +72,100 @@ public class KafkaSourceReader implements
SourceReader<SeaTunnelRow, KafkaSource
this.context = context;
this.typeInfo = typeInfo;
this.sourceSplits = new HashSet<>();
- this.endOffset = new HashMap<>();
+ this.consumerThreadMap = new ConcurrentHashMap<>();
+ this.sourceSplitMap = new ConcurrentHashMap<>();
+ this.checkpointOffsetMap = new ConcurrentHashMap<>();
+ this.executorService = Executors.newCachedThreadPool(
+ r -> new Thread(r, "Kafka Source Data Consumer"));
}
@Override
public void open() {
- this.consumer = initConsumer(this.metadata.getBootstrapServers(),
this.metadata.getConsumerGroup(),
- this.metadata.getProperties(),
!this.metadata.isCommitOnCheckpoint());
- isRunning = true;
}
@Override
public void close() throws IOException {
- isRunning = false;
- if (consumer != null) {
- consumer.close();
+ if (executorService != null) {
+ executorService.shutdownNow();
}
}
@Override
public void pollNext(Collector<SeaTunnelRow> output) throws Exception {
- if (sourceSplits.isEmpty() || sourceSplits.size() !=
this.endOffset.size()) {
+ if (sourceSplitMap.isEmpty()) {
Thread.sleep(THREAD_WAIT_TIME);
return;
}
- Set<TopicPartition> partitions = convertToPartition(sourceSplits);
- StringDeserializer stringDeserializer = new StringDeserializer();
-
stringDeserializer.configure(Maps.fromProperties(this.metadata.getProperties()),
false);
- consumer.assign(partitions);
- while (isRunning) {
- ConsumerRecords<byte[], byte[]> records =
consumer.poll(Duration.ofMillis(POLL_TIMEOUT));
- for (TopicPartition partition : partitions) {
- for (ConsumerRecord<byte[], byte[]> record :
records.records(partition)) {
-
- String v =
stringDeserializer.deserialize(partition.topic(), record.value());
- String t = partition.topic();
- output.collect(new SeaTunnelRow(new Object[]{t, v}));
-
- if (Boundedness.BOUNDED.equals(context.getBoundedness()) &&
- record.offset() >= endOffset.get(partition)) {
- break;
+ sourceSplits.forEach(sourceSplit ->
consumerThreadMap.computeIfAbsent(sourceSplit.getTopicPartition(), s -> {
+ KafkaConsumerThread thread = new KafkaConsumerThread(metadata);
+ executorService.submit(thread);
+ return thread;
+ }));
+ sourceSplits.forEach(sourceSplit -> {
+ CompletableFuture<Void> completableFuture = new
CompletableFuture<>();
+ try {
+
consumerThreadMap.get(sourceSplit.getTopicPartition()).getTasks().put(consumer
-> {
+ try {
+ Set<TopicPartition> partitions =
Sets.newHashSet(sourceSplit.getTopicPartition());
+ StringDeserializer stringDeserializer = new
StringDeserializer();
+
stringDeserializer.configure(Maps.fromProperties(this.metadata.getProperties()),
false);
+ consumer.assign(partitions);
+ if (sourceSplit.getStartOffset() >= 0) {
+ consumer.seek(sourceSplit.getTopicPartition(),
sourceSplit.getStartOffset());
+ }
+ ConsumerRecords<byte[], byte[]> records =
consumer.poll(Duration.ofMillis(POLL_TIMEOUT));
+ for (TopicPartition partition : partitions) {
+ List<ConsumerRecord<byte[], byte[]>> recordList =
records.records(partition);
+ for (ConsumerRecord<byte[], byte[]> record :
recordList) {
+
+ String v =
stringDeserializer.deserialize(partition.topic(), record.value());
+ String t = partition.topic();
+ output.collect(new SeaTunnelRow(new
Object[]{t, v}));
+
+ if
(Boundedness.BOUNDED.equals(context.getBoundedness()) &&
+ record.offset() >=
sourceSplit.getEndOffset()) {
+ break;
+ }
+ }
+ long lastOffset = -1;
+ if (!recordList.isEmpty()) {
+ lastOffset = recordList.get(recordList.size()
- 1).offset();
+ sourceSplit.setStartOffset(lastOffset + 1);
+ }
+
+ if (lastOffset >= sourceSplit.getEndOffset()) {
+ sourceSplit.setEndOffset(lastOffset);
+ }
+ }
+ } catch (Exception e) {
+ completableFuture.completeExceptionally(e);
}
- }
+ completableFuture.complete(null);
+ });
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
}
+ completableFuture.join();
+ });
- if (Boundedness.BOUNDED.equals(context.getBoundedness())) {
- // signal to the source that we have reached the end of the
data.
- context.signalNoMoreElement();
- break;
- }
+ if (Boundedness.BOUNDED.equals(context.getBoundedness())) {
+ // signal to the source that we have reached the end of the data.
+ context.signalNoMoreElement();
}
}
@Override
- public List<KafkaSourceSplit> snapshotState(long checkpointId) throws
Exception {
- return new ArrayList<>(sourceSplits);
+ public List<KafkaSourceSplit> snapshotState(long checkpointId) {
+ checkpointOffsetMap.put(checkpointId, sourceSplits.stream()
+ .collect(Collectors.toMap(KafkaSourceSplit::getTopicPartition,
KafkaSourceSplit::getEndOffset)));
+ return
sourceSplits.stream().map(KafkaSourceSplit::copy).collect(Collectors.toList());
}
@Override
public void addSplits(List<KafkaSourceSplit> splits) {
sourceSplits.addAll(splits);
- sourceSplits.forEach(partition -> {
- endOffset.put(partition.getTopicPartition(),
partition.getEndOffset());
+ sourceSplits.forEach(split -> {
+ sourceSplitMap.put(split.getTopicPartition(), split);
});
}
@@ -139,33 +175,25 @@ public class KafkaSourceReader implements
SourceReader<SeaTunnelRow, KafkaSource
}
@Override
- public void notifyCheckpointComplete(long checkpointId) throws Exception {
- if (this.metadata.isCommitOnCheckpoint()) {
- consumer.commitSync();
+ public void notifyCheckpointComplete(long checkpointId) {
+ if (!checkpointOffsetMap.containsKey(checkpointId)) {
+ log.warn("checkpoint {} do not exist or have already been
committed.", checkpointId);
+ } else {
+ checkpointOffsetMap.remove(checkpointId).forEach((topicPartition,
offset) -> {
+ try {
+ consumerThreadMap.get(topicPartition)
+ .getTasks().put(consumer -> {
+ if (this.metadata.isCommitOnCheckpoint()) {
+ Map<TopicPartition, OffsetAndMetadata> offsets
= new HashMap<>();
+ offsets.put(topicPartition, new
OffsetAndMetadata(offset));
+ consumer.commitSync(offsets);
+ }
+ });
+ } catch (InterruptedException e) {
+ log.error("commit offset to kafka failed", e);
+ }
+ });
}
}
- private KafkaConsumer<byte[], byte[]> initConsumer(String bootstrapServer,
String consumerGroup,
- Properties properties,
boolean autoCommit) {
- Properties props = new Properties();
- properties.forEach((key, value) ->
props.setProperty(String.valueOf(key), String.valueOf(value)));
- props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, consumerGroup);
- props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
bootstrapServer);
- props.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, CLIENT_ID_PREFIX +
"-enumerator-consumer-" + this.hashCode());
-
- props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
- ByteArrayDeserializer.class.getName());
- props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
- ByteArrayDeserializer.class.getName());
- props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
String.valueOf(autoCommit));
-
- // Disable auto create topics feature
- props.setProperty(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG,
"false");
- return new KafkaConsumer<>(props);
- }
-
- private Set<TopicPartition>
convertToPartition(Collection<KafkaSourceSplit> sourceSplits) {
- return
sourceSplits.stream().map(KafkaSourceSplit::getTopicPartition).collect(Collectors.toSet());
- }
-
}
diff --git
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplit.java
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplit.java
index be2348e96..67e66ab86 100644
---
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplit.java
+++
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplit.java
@@ -26,12 +26,27 @@ import java.util.Objects;
public class KafkaSourceSplit implements SourceSplit {
private TopicPartition topicPartition;
+ private long startOffset = -1L;
private long endOffset = -1L;
public KafkaSourceSplit(TopicPartition topicPartition) {
this.topicPartition = topicPartition;
}
+ public KafkaSourceSplit(TopicPartition topicPartition, long startOffset,
long endOffset) {
+ this.topicPartition = topicPartition;
+ this.startOffset = startOffset;
+ this.endOffset = endOffset;
+ }
+
+ public long getStartOffset() {
+ return startOffset;
+ }
+
+ public void setStartOffset(long startOffset) {
+ this.startOffset = startOffset;
+ }
+
public long getEndOffset() {
return endOffset;
}
@@ -67,6 +82,10 @@ public class KafkaSourceSplit implements SourceSplit {
@Override
public int hashCode() {
- return Objects.hash(topicPartition, endOffset);
+ return Objects.hash(topicPartition);
+ }
+
+ public KafkaSourceSplit copy() {
+ return new KafkaSourceSplit(this.topicPartition,
this.getStartOffset(), this.getEndOffset());
}
}
diff --git
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplitEnumerator.java
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplitEnumerator.java
index 52caad1fe..0852fe8fd 100644
---
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplitEnumerator.java
+++
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplitEnumerator.java
@@ -23,6 +23,7 @@ import
org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaSourceState;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.ListOffsetsResult;
import org.apache.kafka.clients.admin.OffsetSpec;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.TopicPartition;
@@ -51,29 +52,32 @@ public class KafkaSourceSplitEnumerator implements
SourceSplitEnumerator<KafkaSo
private AdminClient adminClient;
private Set<KafkaSourceSplit> pendingSplit;
- private Set<KafkaSourceSplit> assignedSplit;
+ private final Set<KafkaSourceSplit> assignedSplit;
KafkaSourceSplitEnumerator(ConsumerMetadata metadata,
Context<KafkaSourceSplit> context) {
this.metadata = metadata;
this.context = context;
+ this.assignedSplit = new HashSet<>();
+ this.pendingSplit = new HashSet<>();
}
KafkaSourceSplitEnumerator(ConsumerMetadata metadata,
Context<KafkaSourceSplit> context,
KafkaSourceState sourceState) {
this(metadata, context);
- this.assignedSplit = sourceState.getAssignedSplit();
}
@Override
public void open() {
this.adminClient = initAdminClient(this.metadata.getProperties());
- this.assignedSplit = new HashSet<>();
- this.pendingSplit = new HashSet<>();
}
@Override
public void run() throws ExecutionException, InterruptedException {
- pendingSplit = getTopicInfo();
+ getTopicInfo().forEach(split -> {
+ if (!assignedSplit.contains(split)) {
+ pendingSplit.add(split);
+ }
+ });
assignSplit();
}
@@ -87,11 +91,25 @@ public class KafkaSourceSplitEnumerator implements
SourceSplitEnumerator<KafkaSo
@Override
public void addSplitsBack(List<KafkaSourceSplit> splits, int subtaskId) {
if (!splits.isEmpty()) {
- pendingSplit.addAll(splits);
+ pendingSplit.addAll(convertToNextSplit(splits));
assignSplit();
}
}
+ private Collection<? extends KafkaSourceSplit>
convertToNextSplit(List<KafkaSourceSplit> splits) {
+ try {
+ Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo>
listOffsets =
+
getKafkaPartitionLatestOffset(splits.stream().map(KafkaSourceSplit::getTopicPartition).collect(Collectors.toList()));
+ splits.forEach(split -> {
+ split.setStartOffset(split.getEndOffset() + 1);
+
split.setEndOffset(listOffsets.get(split.getTopicPartition()).offset());
+ });
+ return splits;
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
@Override
public int currentUnassignedSplitSize() {
return pendingSplit.size();
@@ -131,30 +149,39 @@ public class KafkaSourceSplitEnumerator implements
SourceSplitEnumerator<KafkaSo
if (this.metadata.isPattern()) {
Pattern pattern = Pattern.compile(this.metadata.getTopic());
topics = this.adminClient.listTopics().names().get().stream()
- .filter(t ->
pattern.matcher(t).matches()).collect(Collectors.toSet());
+ .filter(t ->
pattern.matcher(t).matches()).collect(Collectors.toSet());
} else {
topics = Arrays.asList(this.metadata.getTopic().split(","));
}
log.info("Discovered topics: {}", topics);
Collection<TopicPartition> partitions =
-
adminClient.describeTopics(topics).all().get().values().stream().flatMap(t ->
t.partitions().stream()
- .map(p -> new TopicPartition(t.name(),
p.partition()))).collect(Collectors.toSet());
+
adminClient.describeTopics(topics).all().get().values().stream().flatMap(t ->
t.partitions().stream()
+ .map(p -> new TopicPartition(t.name(),
p.partition()))).collect(Collectors.toSet());
+ return
getKafkaPartitionLatestOffset(partitions).entrySet().stream().map(partition -> {
+ KafkaSourceSplit split = new KafkaSourceSplit(partition.getKey());
+ split.setEndOffset(partition.getValue().offset());
+ return split;
+ }).collect(Collectors.toSet());
+ }
+
+ private Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo>
getKafkaPartitionLatestOffset(Collection<TopicPartition> partitions) throws
InterruptedException, ExecutionException {
return
adminClient.listOffsets(partitions.stream().collect(Collectors.toMap(p -> p, p
-> OffsetSpec.latest())))
- .all().get().entrySet().stream().map(partition -> {
- KafkaSourceSplit split = new
KafkaSourceSplit(partition.getKey());
- split.setEndOffset(partition.getValue().offset());
- return split;
- }).collect(Collectors.toSet());
+ .all().get();
}
private void assignSplit() {
Map<Integer, List<KafkaSourceSplit>> readySplit = new
HashMap<>(Common.COLLECTION_SIZE);
- for (int taskID = 0; taskID < context.currentParallelism(); taskID++)
{
+ for (int taskID = 0; taskID < context.currentParallelism(); taskID++) {
readySplit.computeIfAbsent(taskID, id -> new ArrayList<>());
}
- pendingSplit.forEach(s ->
readySplit.get(getSplitOwner(s.getTopicPartition(),
context.currentParallelism()))
- .add(s));
+
+ pendingSplit.forEach(s -> {
+ if (!assignedSplit.contains(s)) {
+ readySplit.get(getSplitOwner(s.getTopicPartition(),
context.currentParallelism()))
+ .add(s);
+ }
+ });
readySplit.forEach(context::assignSplit);