[GitHub] [flink] becketqin commented on a change in pull request #17991: [FLINK-25132][connector/kafka] Move record deserializing from SplitFetcher to RecordEmitter to support object-reusing deseriali
becketqin commented on a change in pull request #17991: URL: https://github.com/apache/flink/pull/17991#discussion_r769321825 ## File path: flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java ## @@ -105,84 +91,38 @@ public KafkaPartitionSplitReader( } @Override -public RecordsWithSplitIds> fetch() throws IOException { -KafkaPartitionSplitRecords> recordsBySplits = -new KafkaPartitionSplitRecords<>(); +public RecordsWithSplitIds> fetch() throws IOException { ConsumerRecords consumerRecords; try { consumerRecords = consumer.poll(Duration.ofMillis(POLL_TIMEOUT)); } catch (WakeupException we) { -recordsBySplits.prepareForRead(); -return recordsBySplits; +return null; Review comment: It seems that rerunning null here still might cause NPE in the `FetchTask.run()` even though null is only returned when the consumer is wakenup. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] becketqin commented on a change in pull request #17991: [FLINK-25132][connector/kafka] Move record deserializing from SplitFetcher to RecordEmitter to support object-reusing deseriali
becketqin commented on a change in pull request #17991: URL: https://github.com/apache/flink/pull/17991#discussion_r766356300 ## File path: flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java ## @@ -452,82 +392,72 @@ private void maybeRegisterKafkaConsumerMetrics( // private helper class -private static class KafkaPartitionSplitRecords implements RecordsWithSplitIds { -private final Map> recordsBySplits; -private final Set finishedSplits; -private Iterator>> splitIterator; -private String currentSplitId; -private Iterator recordIterator; +private static class KafkaPartitionSplitRecords +implements RecordsWithSplitIds> { + +private final Set finishedSplits = new HashSet<>(); +private final Map stoppingOffsets = new HashMap<>(); +private final ConsumerRecords consumerRecords; +private final KafkaSourceReaderMetrics metrics; +private Iterator splitIterator; +private Iterator> recordIterator; +private TopicPartition currentTopicPartition; + +private KafkaPartitionSplitRecords( +ConsumerRecords consumerRecords, KafkaSourceReaderMetrics metrics) { +this.consumerRecords = consumerRecords; +this.metrics = metrics; +} -private KafkaPartitionSplitRecords() { -this.recordsBySplits = new HashMap<>(); -this.finishedSplits = new HashSet<>(); +private void prepareForRead() { +splitIterator = consumerRecords.partitions().iterator(); } -private Collection recordsForSplit(String splitId) { -return recordsBySplits.computeIfAbsent(splitId, id -> new ArrayList<>()); +private void setPartitionStoppingOffset( +TopicPartition topicPartition, long stoppingOffset) { +stoppingOffsets.put(topicPartition, stoppingOffset); } private void addFinishedSplit(String splitId) { finishedSplits.add(splitId); } -private void prepareForRead() { -splitIterator = recordsBySplits.entrySet().iterator(); -} - -@Override @Nullable +@Override public String nextSplit() { if (splitIterator.hasNext()) { -Map.Entry> entry = splitIterator.next(); -currentSplitId = entry.getKey(); -recordIterator = entry.getValue().iterator(); -return currentSplitId; +currentTopicPartition = splitIterator.next(); +recordIterator = consumerRecords.records(currentTopicPartition).iterator(); +return currentTopicPartition.toString(); } else { -currentSplitId = null; +currentTopicPartition = null; recordIterator = null; return null; } } -@Override @Nullable -public T nextRecordFromSplit() { +@Override +public ConsumerRecord nextRecordFromSplit() { Preconditions.checkNotNull( -currentSplitId, +currentTopicPartition, "Make sure nextSplit() did not return null before " + "iterate over the records split."); if (recordIterator.hasNext()) { -return recordIterator.next(); -} else { -return null; +final ConsumerRecord record = recordIterator.next(); +// Only emit records before stopping offset +if (record.offset() +< stoppingOffsets.getOrDefault(currentTopicPartition, Long.MAX_VALUE)) { Review comment: Can we avoid retrieving the stopping offsets for the current partition on each record? Ideally the stopping offsets can be cached and updated when the current split changes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] becketqin commented on a change in pull request #17991: [FLINK-25132][connector/kafka] Move record deserializing from SplitFetcher to RecordEmitter to support object-reusing deseriali
becketqin commented on a change in pull request #17991: URL: https://github.com/apache/flink/pull/17991#discussion_r762673730 ## File path: flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java ## @@ -147,33 +133,17 @@ public KafkaPartitionSplitReader( recordsBySplits); break; } -// Add the record to the partition collector. -try { -deserializationSchema.deserialize(consumerRecord, collector); -collector -.getRecords() -.forEach( -r -> -recordsForSplit.add( -new Tuple3<>( -r, - consumerRecord.offset(), - consumerRecord.timestamp(; -// Finish the split because there might not be any message after this point. -// Keep polling -// will just block forever. -if (consumerRecord.offset() == stoppingOffset - 1) { -finishSplitAtRecord( -tp, -stoppingOffset, -consumerRecord.offset(), -finishedPartitions, -recordsBySplits); -} -} catch (Exception e) { -throw new IOException("Failed to deserialize consumer record due to", e); -} finally { -collector.reset(); +recordsForSplit.add(consumerRecord); Review comment: Given that the deserialization is no longer performed in the split fetcher. It seems that we don't have to iterate over all the records here any more. Instead, the `KafkaPartitionSplitRecords` can be changed to a wrapper around `ConsumerRecords`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org