hgeraldino commented on code in PR #15316:
URL: https://github.com/apache/kafka/pull/15316#discussion_r1494861815


##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskMockitoTest.java:
##########
@@ -447,6 +457,85 @@ public void testPollRedelivery() {
         assertSinkMetricValue("offset-commit-completion-total", 1.0);
     }
 
+    @Test
+    @SuppressWarnings("unchecked")
+    public void testPollRedeliveryWithConsumerRebalance() {
+        createTask(initialState);
+        expectTaskGetTopic();
+
+        workerTask.initialize(TASK_CONFIG);
+        workerTask.initializeAndStart();
+        verifyInitializeTask();
+
+        Set<TopicPartition> newAssignment = new 
HashSet<>(Arrays.asList(TOPIC_PARTITION, TOPIC_PARTITION2, TOPIC_PARTITION3));
+
+        when(consumer.assignment())
+                .thenReturn(INITIAL_ASSIGNMENT, INITIAL_ASSIGNMENT, 
INITIAL_ASSIGNMENT)
+                .thenReturn(newAssignment, newAssignment, newAssignment)
+                .thenReturn(Collections.singleton(TOPIC_PARTITION3),
+                        Collections.singleton(TOPIC_PARTITION3),
+                        Collections.singleton(TOPIC_PARTITION3));
+
+        INITIAL_ASSIGNMENT.forEach(tp -> 
when(consumer.position(tp)).thenReturn(FIRST_OFFSET));
+        when(consumer.position(TOPIC_PARTITION3)).thenReturn(FIRST_OFFSET);
+
+        when(consumer.poll(any(Duration.class)))
+                .thenAnswer((Answer<ConsumerRecords<byte[], byte[]>>) 
invocation -> {
+                    
rebalanceListener.getValue().onPartitionsAssigned(INITIAL_ASSIGNMENT);
+                    return ConsumerRecords.empty();
+                })
+                .thenAnswer(expectConsumerPoll(1))
+                // Empty consumer poll (all partitions are paused) with 
rebalance; one new partition is assigned
+                .thenAnswer(invocation -> {
+                    
rebalanceListener.getValue().onPartitionsRevoked(Collections.emptySet());
+                    
rebalanceListener.getValue().onPartitionsAssigned(Collections.singleton(TOPIC_PARTITION3));
+                    return ConsumerRecords.empty();
+                })
+                .thenAnswer(expectConsumerPoll(0))
+                // Non-empty consumer poll; all initially-assigned partitions 
are revoked in rebalance, and new partitions are allowed to resume
+                .thenAnswer(invocation -> {
+                    ConsumerRecord<byte[], byte[]> newRecord = new 
ConsumerRecord<>(TOPIC, PARTITION3, FIRST_OFFSET, RAW_KEY, RAW_VALUE);
+
+                    
rebalanceListener.getValue().onPartitionsRevoked(INITIAL_ASSIGNMENT);
+                    
rebalanceListener.getValue().onPartitionsAssigned(Collections.emptyList());
+                    return new 
ConsumerRecords<>(Collections.singletonMap(TOPIC_PARTITION3, 
Collections.singletonList(newRecord)));

Review Comment:
   You're right, the test was a bit off mainly because the first call to 
`task.put(..)` shouldn't have thrown an exception.
   
   Here's the sequence now (which matches the original WorkerSinkTaskTest):
   
   1. Iteration#1: partitions on INITIAL_ASSIGNMENT are assigned
   2. Iteration#2: `task.put(...)` throws, partitions are paused
   3. Iteration#3: P3 is assigned, `task.put(...)` throws (task is already 
paused so it's a noop)
   4. Iteration#4: `task.put(...)` throws, noop
   5. Iteration#5: initial assignment is revoked (pending messages for P1 are 
removed); `consumer.poll(...)` returns a record for P3, which is successfully 
processed.
   
   I've added the corresponding verifications after each step. Hope it makes 
sense now



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to