hgeraldino commented on code in PR #15316:
URL: https://github.com/apache/kafka/pull/15316#discussion_r1494806542


##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskMockitoTest.java:
##########
@@ -601,6 +690,567 @@ public void testPartialRevocationAndAssignment() {
         verify(sinkTask, times(4)).put(Collections.emptyList());
     }
 
+    @Test
+    @SuppressWarnings("unchecked")
+    public void testPreCommitFailureAfterPartialRevocationAndAssignment() {
+        createTask(initialState);
+        expectTaskGetTopic();
+
+        workerTask.initialize(TASK_CONFIG);
+        workerTask.initializeAndStart();
+        verifyInitializeTask();
+
+        when(consumer.assignment())
+                .thenReturn(INITIAL_ASSIGNMENT, INITIAL_ASSIGNMENT)
+                .thenReturn(new HashSet<>(Arrays.asList(TOPIC_PARTITION2)))
+                .thenReturn(new HashSet<>(Arrays.asList(TOPIC_PARTITION2)))
+                .thenReturn(new HashSet<>(Arrays.asList(TOPIC_PARTITION2)))
+                .thenReturn(new HashSet<>(Arrays.asList(TOPIC_PARTITION2, 
TOPIC_PARTITION3)))
+                .thenReturn(new HashSet<>(Arrays.asList(TOPIC_PARTITION2, 
TOPIC_PARTITION3)))
+                .thenReturn(new HashSet<>(Arrays.asList(TOPIC_PARTITION2, 
TOPIC_PARTITION3)));
+
+        INITIAL_ASSIGNMENT.forEach(tp -> 
when(consumer.position(tp)).thenReturn(FIRST_OFFSET));
+        when(consumer.position(TOPIC_PARTITION3)).thenReturn(FIRST_OFFSET);
+
+        // First poll; assignment is [TP1, TP2]
+        when(consumer.poll(any(Duration.class)))
+                .thenAnswer((Answer<ConsumerRecords<byte[], byte[]>>) 
invocation -> {
+                    
rebalanceListener.getValue().onPartitionsAssigned(INITIAL_ASSIGNMENT);
+                    return ConsumerRecords.empty();
+                })
+                // Second poll; a single record is delivered from TP1
+                .thenAnswer(expectConsumerPoll(1))
+                // Third poll; assignment changes to [TP2]
+                .thenAnswer(invocation -> {
+                    
rebalanceListener.getValue().onPartitionsRevoked(Collections.singleton(TOPIC_PARTITION));
+                    
rebalanceListener.getValue().onPartitionsAssigned(Collections.emptySet());
+                    return ConsumerRecords.empty();
+                })
+                // Fourth poll; assignment changes to [TP2, TP3]
+                .thenAnswer(invocation -> {
+                    
rebalanceListener.getValue().onPartitionsRevoked(Collections.emptySet());
+                    
rebalanceListener.getValue().onPartitionsAssigned(Collections.singleton(TOPIC_PARTITION3));
+                    return ConsumerRecords.empty();
+                })
+                // Fifth poll; an offset commit takes place
+                .thenAnswer(expectConsumerPoll(0));
+
+        expectConversionAndTransformation(null, new RecordHeaders());
+
+        // First iteration--first call to poll, first consumer assignment
+        workerTask.iteration();
+        // Second iteration--second call to poll, delivery of one record
+        workerTask.iteration();
+        // Third iteration--third call to poll, partial consumer revocation
+        final Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
+        offsets.put(TOPIC_PARTITION, new OffsetAndMetadata(FIRST_OFFSET + 1));
+        when(sinkTask.preCommit(offsets)).thenReturn(offsets);
+        doAnswer(invocation -> null).when(consumer).commitSync(offsets);

Review Comment:
   good catch... updated



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to