hgeraldino commented on code in PR #13191: URL: https://github.com/apache/kafka/pull/13191#discussion_r1112485394
########## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTaskTest.java: ########## @@ -453,19 +444,22 @@ public void testSendRecordsTopicCreateRetries() { SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, TOPIC, 1, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD); SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, TOPIC, 2, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD); - // First call to describe the topic times out expectPreliminaryCalls(); - EasyMock.expect(admin.describeTopics(TOPIC)).andReturn(Collections.emptyMap()); - Capture<NewTopic> newTopicCapture = EasyMock.newCapture(); - EasyMock.expect(admin.createOrFindTopics(EasyMock.capture(newTopicCapture))) - .andThrow(new RetriableException(new TimeoutException("timeout"))); - - // Second round - expectTopicCreation(TOPIC); - expectSendRecord(); - expectSendRecord(); - PowerMock.replayAll(); + when(admin.describeTopics(TOPIC)).thenReturn(Collections.emptyMap()); + when(admin.createOrFindTopics(any(NewTopic.class))) + .thenAnswer(new Answer<TopicAdmin.TopicCreationResponse>() { + boolean firstCall = true; + + @Override + public TopicAdmin.TopicCreationResponse answer(InvocationOnMock invocation) { + if (firstCall) { + firstCall = false; + throw new RetriableException(new TimeoutException("timeout")); + } + return createdTopic(TOPIC); + } + }); workerTask.toSend = Arrays.asList(record1, record2); Review Comment: This one is a little bit trickier, as we cannot do partial verification without resetting the mock. What I ended up doing was checking that calls to `createOrFindTopics` happen twice midway, and verify the arguments, then check once again at the end of the test - this last verification is a cumulative of all 3 calls. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org