mjsax commented on code in PR #19188:
URL: https://github.com/apache/kafka/pull/19188#discussion_r2015184978
##########
clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java:
##########
@@ -274,13 +276,23 @@ public synchronized ConsumerRecords<K, V> poll(final
Duration timeout) {
// update the consumed offset
final Map<TopicPartition, List<ConsumerRecord<K, V>>> results = new
HashMap<>();
+
final Map<TopicPartition, OffsetAndMetadata> nextOffsetAndMetadata =
new HashMap<>();
+ final Map<TopicPartition, List<ConsumerRecord<K, V>>> nextPoll = new
HashMap<>();
final List<TopicPartition> toClear = new ArrayList<>();
-
+ long numPollRecords = 0L;
for (Map.Entry<TopicPartition, List<ConsumerRecord<K, V>>> entry :
this.records.entrySet()) {
if (!subscriptions.isPaused(entry.getKey())) {
final List<ConsumerRecord<K, V>> recs = entry.getValue();
+
+ List<ConsumerRecord<K, V>> remaining = new ArrayList<>();
Review Comment:
Not sure why we need `remaining` (similar question as for `nextPoll`)? (cf
other comments)
##########
clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java:
##########
@@ -294,13 +306,19 @@ public synchronized ConsumerRecords<K, V> poll(final
Duration timeout) {
rec.offset() + 1, rec.leaderEpoch(),
leaderAndEpoch);
subscriptions.position(entry.getKey(), newPosition);
nextOffsetAndMetadata.put(entry.getKey(), new
OffsetAndMetadata(rec.offset() + 1, rec.leaderEpoch(), ""));
+ numPollRecords++;
}
}
- toClear.add(entry.getKey());
+
+ if (remaining.isEmpty()) {
+ toClear.add(entry.getKey());
+ } else {
+ nextPoll.put(entry.getKey(), remaining);
+ }
}
}
-
toClear.forEach(records::remove);
+ records.putAll(nextPoll);
Review Comment:
Is seems clumsy to add stuff back -- it seems much more straightforward, to
remove individual record from from `this.records` one-by-one.
##########
clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java:
##########
@@ -294,13 +306,19 @@ public synchronized ConsumerRecords<K, V> poll(final
Duration timeout) {
rec.offset() + 1, rec.leaderEpoch(),
leaderAndEpoch);
subscriptions.position(entry.getKey(), newPosition);
nextOffsetAndMetadata.put(entry.getKey(), new
OffsetAndMetadata(rec.offset() + 1, rec.leaderEpoch(), ""));
+ numPollRecords++;
}
}
- toClear.add(entry.getKey());
Review Comment:
Should we remove `toClear` all together, and instead call `recs.remove(0)`
(we might want to switch from `ArrayList` to `LinkedList`)? Plus add a check if
a partition becomes empty and remove it entirely.
If we cannot this this here (`ConcurrentModificationException`?) we could
change `toClear` to type `Map<TopicPartition, Integer>` and just count how many
record we did remove.
##########
clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java:
##########
@@ -274,13 +276,23 @@ public synchronized ConsumerRecords<K, V> poll(final
Duration timeout) {
// update the consumed offset
final Map<TopicPartition, List<ConsumerRecord<K, V>>> results = new
HashMap<>();
+
final Map<TopicPartition, OffsetAndMetadata> nextOffsetAndMetadata =
new HashMap<>();
+ final Map<TopicPartition, List<ConsumerRecord<K, V>>> nextPoll = new
HashMap<>();
Review Comment:
Not sure why we need `nextPoll`? (cf other comments)
##########
clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java:
##########
@@ -274,13 +276,23 @@ public synchronized ConsumerRecords<K, V> poll(final
Duration timeout) {
// update the consumed offset
final Map<TopicPartition, List<ConsumerRecord<K, V>>> results = new
HashMap<>();
+
Review Comment:
nit: avoid unnecessary diffs
##########
clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java:
##########
@@ -274,13 +276,23 @@ public synchronized ConsumerRecords<K, V> poll(final
Duration timeout) {
// update the consumed offset
final Map<TopicPartition, List<ConsumerRecord<K, V>>> results = new
HashMap<>();
+
final Map<TopicPartition, OffsetAndMetadata> nextOffsetAndMetadata =
new HashMap<>();
+ final Map<TopicPartition, List<ConsumerRecord<K, V>>> nextPoll = new
HashMap<>();
final List<TopicPartition> toClear = new ArrayList<>();
-
+ long numPollRecords = 0L;
for (Map.Entry<TopicPartition, List<ConsumerRecord<K, V>>> entry :
this.records.entrySet()) {
if (!subscriptions.isPaused(entry.getKey())) {
final List<ConsumerRecord<K, V>> recs = entry.getValue();
+
+ List<ConsumerRecord<K, V>> remaining = new ArrayList<>();
for (final ConsumerRecord<K, V> rec : recs) {
+
+ if (numPollRecords >= this.maxPollRecords) {
+ remaining.add(rec);
+ continue;
Review Comment:
Why we `continue` here, instead of just breaking the `for` loop, and stop
consuming from `this.records`?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]