nicktelford opened a new pull request, #21631:
URL: https://github.com/apache/kafka/pull/21631
In `poll()`, `MockConsumer` iterates the added `records`, and only
checks that they belong to an assigned partition quite late in the
process.
This can cause an error if records were added for a partition that was
later removed from the assignment:
```java
consumer.assign(Arrays.asList(new TopicPartition("t1", 0), new
TopicPartition("t2", 0)));
consumer.addRecord(new ConsumerRecord("t1", 0, 0, "a", 123));
consumer.addRecord(new ConsumerRecord("t2", 0, 0, "b", 123));
consumer.assign(Collections.singleton(new TopicPartition("t1", 0)));
// throws IllegalStateException
consumer.poll(Duration.seconds(1));
```
Moving this check earlier in the process avoids this error in `poll()`,
instead discarding records that do not belong to the assignment.
This enables tests to proactively add all their records and then modify
the assignment multiple times, if necessary, to test specific
combinations of partitions.
This is particularly useful in some of the `streams` tests, where we
need to test with multiple state store changelogs, but the assignment
gets changed internally by Kafka Streams (notably,
`GlobalStateManagerImplTest`).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]