reiabreu commented on PR #3679:
URL: https://github.com/apache/storm/pull/3679#issuecomment-2334813805
` emitter.refreshPartitions(taskPartitions);` is assigning the task's
partitions to the emitter object.
Then the emitter will poll all the assigned partitions and just emit forward
the ones related with `currBatchTp`
```
final List<ConsumerRecord<K, V>> records =
consumer.poll(pollTimeoutMs).records(currBatchTp);
LOG.debug("Polled [{}] records from Kafka.", records.size());
if (!records.isEmpty()) {
for (ConsumerRecord<K, V> record : records) {
emitTuple(collector, record);
}
```
The same logic is kept on the changes, with the difference you will be
emitting records for all the partitions instead of just one.
```
ConsumerRecords<K, V> poll =
consumer.poll(Duration.ofMillis(pollTimeoutMs));
for (KafkaTridentSpoutTopicPartition partition : partitions) {
final List<ConsumerRecord<K, V>> records =
poll.records(partition.getTopicPartition());
if (!records.isEmpty()) {
for (ConsumerRecord<K, V> record : records) {
emitTuple(collector, record);
}
```
Since the assignment of partitions to the consumer is kept, the number of
returned records will be the same under the same conditions.
Your change is more efficient in the sense that you perform fewer pollings
for the same partitions of a task.
Is my understanding of this correct?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]