[GitHub] [flink] becketqin commented on a change in pull request #17991: [FLINK-25132][connector/kafka] Move record deserializing from SplitFetcher to RecordEmitter to support object-reusing deseriali

2021-12-14 Thread GitBox


becketqin commented on a change in pull request #17991:
URL: https://github.com/apache/flink/pull/17991#discussion_r769321825



##
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java
##
@@ -105,84 +91,38 @@ public KafkaPartitionSplitReader(
 }
 
 @Override
-public RecordsWithSplitIds> fetch() throws 
IOException {
-KafkaPartitionSplitRecords> recordsBySplits =
-new KafkaPartitionSplitRecords<>();
+public RecordsWithSplitIds> fetch() throws 
IOException {
 ConsumerRecords consumerRecords;
 try {
 consumerRecords = consumer.poll(Duration.ofMillis(POLL_TIMEOUT));
 } catch (WakeupException we) {
-recordsBySplits.prepareForRead();
-return recordsBySplits;
+return null;

Review comment:
   It seems that rerunning null here still might cause NPE in the 
`FetchTask.run()` even though null is only returned when the consumer is 
wakenup.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] becketqin commented on a change in pull request #17991: [FLINK-25132][connector/kafka] Move record deserializing from SplitFetcher to RecordEmitter to support object-reusing deseriali

2021-12-09 Thread GitBox


becketqin commented on a change in pull request #17991:
URL: https://github.com/apache/flink/pull/17991#discussion_r766356300



##
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java
##
@@ -452,82 +392,72 @@ private void maybeRegisterKafkaConsumerMetrics(
 
 //  private helper class 
 
-private static class KafkaPartitionSplitRecords implements 
RecordsWithSplitIds {
-private final Map> recordsBySplits;
-private final Set finishedSplits;
-private Iterator>> splitIterator;
-private String currentSplitId;
-private Iterator recordIterator;
+private static class KafkaPartitionSplitRecords
+implements RecordsWithSplitIds> {
+
+private final Set finishedSplits = new HashSet<>();
+private final Map stoppingOffsets = new 
HashMap<>();
+private final ConsumerRecords consumerRecords;
+private final KafkaSourceReaderMetrics metrics;
+private Iterator splitIterator;
+private Iterator> recordIterator;
+private TopicPartition currentTopicPartition;
+
+private KafkaPartitionSplitRecords(
+ConsumerRecords consumerRecords, 
KafkaSourceReaderMetrics metrics) {
+this.consumerRecords = consumerRecords;
+this.metrics = metrics;
+}
 
-private KafkaPartitionSplitRecords() {
-this.recordsBySplits = new HashMap<>();
-this.finishedSplits = new HashSet<>();
+private void prepareForRead() {
+splitIterator = consumerRecords.partitions().iterator();
 }
 
-private Collection recordsForSplit(String splitId) {
-return recordsBySplits.computeIfAbsent(splitId, id -> new 
ArrayList<>());
+private void setPartitionStoppingOffset(
+TopicPartition topicPartition, long stoppingOffset) {
+stoppingOffsets.put(topicPartition, stoppingOffset);
 }
 
 private void addFinishedSplit(String splitId) {
 finishedSplits.add(splitId);
 }
 
-private void prepareForRead() {
-splitIterator = recordsBySplits.entrySet().iterator();
-}
-
-@Override
 @Nullable
+@Override
 public String nextSplit() {
 if (splitIterator.hasNext()) {
-Map.Entry> entry = splitIterator.next();
-currentSplitId = entry.getKey();
-recordIterator = entry.getValue().iterator();
-return currentSplitId;
+currentTopicPartition = splitIterator.next();
+recordIterator = 
consumerRecords.records(currentTopicPartition).iterator();
+return currentTopicPartition.toString();
 } else {
-currentSplitId = null;
+currentTopicPartition = null;
 recordIterator = null;
 return null;
 }
 }
 
-@Override
 @Nullable
-public T nextRecordFromSplit() {
+@Override
+public ConsumerRecord nextRecordFromSplit() {
 Preconditions.checkNotNull(
-currentSplitId,
+currentTopicPartition,
 "Make sure nextSplit() did not return null before "
 + "iterate over the records split.");
 if (recordIterator.hasNext()) {
-return recordIterator.next();
-} else {
-return null;
+final ConsumerRecord record = 
recordIterator.next();
+// Only emit records before stopping offset
+if (record.offset()
+< stoppingOffsets.getOrDefault(currentTopicPartition, 
Long.MAX_VALUE)) {

Review comment:
   Can we avoid retrieving the stopping offsets for the current partition 
on each record? Ideally the stopping offsets can be cached and updated when the 
current split changes.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] becketqin commented on a change in pull request #17991: [FLINK-25132][connector/kafka] Move record deserializing from SplitFetcher to RecordEmitter to support object-reusing deseriali

2021-12-05 Thread GitBox


becketqin commented on a change in pull request #17991:
URL: https://github.com/apache/flink/pull/17991#discussion_r762673730



##
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java
##
@@ -147,33 +133,17 @@ public KafkaPartitionSplitReader(
 recordsBySplits);
 break;
 }
-// Add the record to the partition collector.
-try {
-deserializationSchema.deserialize(consumerRecord, 
collector);
-collector
-.getRecords()
-.forEach(
-r ->
-recordsForSplit.add(
-new Tuple3<>(
-r,
-
consumerRecord.offset(),
-
consumerRecord.timestamp(;
-// Finish the split because there might not be any message 
after this point.
-// Keep polling
-// will just block forever.
-if (consumerRecord.offset() == stoppingOffset - 1) {
-finishSplitAtRecord(
-tp,
-stoppingOffset,
-consumerRecord.offset(),
-finishedPartitions,
-recordsBySplits);
-}
-} catch (Exception e) {
-throw new IOException("Failed to deserialize consumer 
record due to", e);
-} finally {
-collector.reset();
+recordsForSplit.add(consumerRecord);

Review comment:
   Given that the deserialization is no longer performed in the split 
fetcher. It seems that we don't have to iterate over all the records here any 
more. Instead, the `KafkaPartitionSplitRecords` can be changed to a wrapper 
around `ConsumerRecords`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org