reiabreu commented on PR #3679:
URL: https://github.com/apache/storm/pull/3679#issuecomment-2334813805

   ` emitter.refreshPartitions(taskPartitions);` is assigning the task's 
partitions to the emitter object.
   Then the emitter will poll all the assigned partitions and just emit forward 
the ones related with `currBatchTp`
   
   ```
     final List<ConsumerRecord<K, V>> records = 
consumer.poll(pollTimeoutMs).records(currBatchTp);
               LOG.debug("Polled [{}] records from Kafka.", records.size());
   
               if (!records.isEmpty()) {
                   for (ConsumerRecord<K, V> record : records) {
                       emitTuple(collector, record);
                   }
   ```
   
   The same logic is kept on the changes, with the difference you will be 
emitting records for all the partitions instead of just one. 
   
   ```
    ConsumerRecords<K, V> poll = 
consumer.poll(Duration.ofMillis(pollTimeoutMs));
           for (KafkaTridentSpoutTopicPartition partition : partitions) {
               final List<ConsumerRecord<K, V>> records  = 
poll.records(partition.getTopicPartition());
               if (!records.isEmpty()) {
                   for (ConsumerRecord<K, V> record : records) {
                       emitTuple(collector, record);
                   }
   ```
   
   Since the assignment of partitions to the consumer is kept, the number of 
returned records will be the same under the same conditions.
   
   Your change is more efficient in the sense that you perform fewer pollings 
for the same partitions of a task.
   
   Is my understanding of this correct?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscr...@storm.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to