hgeraldino commented on code in PR #15316: URL: https://github.com/apache/kafka/pull/15316#discussion_r1494861815
########## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskMockitoTest.java: ########## @@ -447,6 +457,85 @@ public void testPollRedelivery() { assertSinkMetricValue("offset-commit-completion-total", 1.0); } + @Test + @SuppressWarnings("unchecked") + public void testPollRedeliveryWithConsumerRebalance() { + createTask(initialState); + expectTaskGetTopic(); + + workerTask.initialize(TASK_CONFIG); + workerTask.initializeAndStart(); + verifyInitializeTask(); + + Set<TopicPartition> newAssignment = new HashSet<>(Arrays.asList(TOPIC_PARTITION, TOPIC_PARTITION2, TOPIC_PARTITION3)); + + when(consumer.assignment()) + .thenReturn(INITIAL_ASSIGNMENT, INITIAL_ASSIGNMENT, INITIAL_ASSIGNMENT) + .thenReturn(newAssignment, newAssignment, newAssignment) + .thenReturn(Collections.singleton(TOPIC_PARTITION3), + Collections.singleton(TOPIC_PARTITION3), + Collections.singleton(TOPIC_PARTITION3)); + + INITIAL_ASSIGNMENT.forEach(tp -> when(consumer.position(tp)).thenReturn(FIRST_OFFSET)); + when(consumer.position(TOPIC_PARTITION3)).thenReturn(FIRST_OFFSET); + + when(consumer.poll(any(Duration.class))) + .thenAnswer((Answer<ConsumerRecords<byte[], byte[]>>) invocation -> { + rebalanceListener.getValue().onPartitionsAssigned(INITIAL_ASSIGNMENT); + return ConsumerRecords.empty(); + }) + .thenAnswer(expectConsumerPoll(1)) + // Empty consumer poll (all partitions are paused) with rebalance; one new partition is assigned + .thenAnswer(invocation -> { + rebalanceListener.getValue().onPartitionsRevoked(Collections.emptySet()); + rebalanceListener.getValue().onPartitionsAssigned(Collections.singleton(TOPIC_PARTITION3)); + return ConsumerRecords.empty(); + }) + .thenAnswer(expectConsumerPoll(0)) + // Non-empty consumer poll; all initially-assigned partitions are revoked in rebalance, and new partitions are allowed to resume + .thenAnswer(invocation -> { + ConsumerRecord<byte[], byte[]> newRecord = new ConsumerRecord<>(TOPIC, PARTITION3, FIRST_OFFSET, RAW_KEY, RAW_VALUE); + + rebalanceListener.getValue().onPartitionsRevoked(INITIAL_ASSIGNMENT); + rebalanceListener.getValue().onPartitionsAssigned(Collections.emptyList()); + return new ConsumerRecords<>(Collections.singletonMap(TOPIC_PARTITION3, Collections.singletonList(newRecord))); Review Comment: You're right, the test was a bit off mainly because the first call to `task.put(..)` shouldn't have thrown an exception. Here's the sequence now (which matches the original WorkerSinkTaskTest): 1. Iteration#1: partitions on INITIAL_ASSIGNMENT are assigned 2. Iteration#2: `task.put(...)` throws, partitions are paused 3. Iteration#3: P3 is assigned, `task.put(...)` throws (task is already paused so it's a noop) 4. Iteration#4: `task.put(...)` throws, noop 5. Iteration#5: initial assignment is revoked (pending messages for P1 are removed); `consumer.poll(...)` returns a record for P3, which is successfully processed. I've added the corresponding verifications after each step. Hope it makes sense now -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org