reiabreu commented on PR #3679: URL: https://github.com/apache/storm/pull/3679#issuecomment-2334813805
` emitter.refreshPartitions(taskPartitions);` is assigning the task's partitions to the emitter object. Then the emitter will poll all the assigned partitions and just emit forward the ones related with `currBatchTp` ``` final List<ConsumerRecord<K, V>> records = consumer.poll(pollTimeoutMs).records(currBatchTp); LOG.debug("Polled [{}] records from Kafka.", records.size()); if (!records.isEmpty()) { for (ConsumerRecord<K, V> record : records) { emitTuple(collector, record); } ``` The same logic is kept on the changes, with the difference you will be emitting records for all the partitions instead of just one. ``` ConsumerRecords<K, V> poll = consumer.poll(Duration.ofMillis(pollTimeoutMs)); for (KafkaTridentSpoutTopicPartition partition : partitions) { final List<ConsumerRecord<K, V>> records = poll.records(partition.getTopicPartition()); if (!records.isEmpty()) { for (ConsumerRecord<K, V> record : records) { emitTuple(collector, record); } ``` Since the assignment of partitions to the consumer is kept, the number of returned records will be the same under the same conditions. Your change is more efficient in the sense that you perform fewer pollings for the same partitions of a task. Is my understanding of this correct? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: dev-unsubscr...@storm.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org