C0urante commented on code in PR #13191:
URL: https://github.com/apache/kafka/pull/13191#discussion_r1116427030


##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTaskTest.java:
##########
@@ -523,34 +513,26 @@ public void testSendRecordsTopicCreateRetriesMidway() {
         SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, TOPIC, 2, 
KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
         SourceRecord record3 = new SourceRecord(PARTITION, OFFSET, 
OTHER_TOPIC, 3, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
 
-        // First round
+        expectPreliminaryCalls(TOPIC);
         expectPreliminaryCalls(OTHER_TOPIC);
-        expectTopicCreation(TOPIC);
-        expectSendRecord();
-        expectSendRecord();
-
-        
EasyMock.expect(admin.describeTopics(OTHER_TOPIC)).andReturn(Collections.emptyMap());
-        // First call to create the topic times out
-        Capture<NewTopic> newTopicCapture = EasyMock.newCapture();
-        
EasyMock.expect(admin.createOrFindTopics(EasyMock.capture(newTopicCapture)))
-                .andThrow(new RetriableException(new 
TimeoutException("timeout")));
 
-        // Second round
-        expectTopicCreation(OTHER_TOPIC);
-        expectSendRecord(OTHER_TOPIC, false, emptyHeaders());
-
-        PowerMock.replayAll();
+        
when(admin.describeTopics(anyString())).thenReturn(Collections.emptyMap());
+        when(admin.createOrFindTopics(any(NewTopic.class)))
+                .thenReturn(createdTopic(TOPIC))
+                .thenThrow(new RetriableException(new 
TimeoutException("timeout")))
+                .thenReturn(createdTopic(OTHER_TOPIC));
 
         // Try to send 3, make first pass, second fail. Should save last two

Review Comment:
   Nit: This comment was never accurate (not on the current trunk, nor in this 
PR); we actually just save the last one. Can we update it to "Should save last 
record" to match the other comment of its kind?



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTaskTest.java:
##########
@@ -453,19 +444,22 @@ public void testSendRecordsTopicCreateRetries() {
         SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, TOPIC, 1, 
KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
         SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, TOPIC, 2, 
KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
 
-        // First call to describe the topic times out
         expectPreliminaryCalls();
-        
EasyMock.expect(admin.describeTopics(TOPIC)).andReturn(Collections.emptyMap());
-        Capture<NewTopic> newTopicCapture = EasyMock.newCapture();
-        
EasyMock.expect(admin.createOrFindTopics(EasyMock.capture(newTopicCapture)))
-                .andThrow(new RetriableException(new 
TimeoutException("timeout")));
-
-        // Second round
-        expectTopicCreation(TOPIC);
-        expectSendRecord();
-        expectSendRecord();
 
-        PowerMock.replayAll();
+        when(admin.describeTopics(TOPIC)).thenReturn(Collections.emptyMap());
+        when(admin.createOrFindTopics(any(NewTopic.class)))
+            .thenAnswer(new Answer<TopicAdmin.TopicCreationResponse>() {
+                boolean firstCall = true;
+
+                @Override
+                public TopicAdmin.TopicCreationResponse 
answer(InvocationOnMock invocation) {
+                    if (firstCall) {
+                        firstCall = false;
+                        throw new RetriableException(new 
TimeoutException("timeout"));
+                    }
+                    return createdTopic(TOPIC);
+                }
+            });
 
         workerTask.toSend = Arrays.asList(record1, record2);

Review Comment:
   Nicely done!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to