C0urante commented on code in PR #13191: URL: https://github.com/apache/kafka/pull/13191#discussion_r1116427030
########## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTaskTest.java: ########## @@ -523,34 +513,26 @@ public void testSendRecordsTopicCreateRetriesMidway() { SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, TOPIC, 2, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD); SourceRecord record3 = new SourceRecord(PARTITION, OFFSET, OTHER_TOPIC, 3, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD); - // First round + expectPreliminaryCalls(TOPIC); expectPreliminaryCalls(OTHER_TOPIC); - expectTopicCreation(TOPIC); - expectSendRecord(); - expectSendRecord(); - - EasyMock.expect(admin.describeTopics(OTHER_TOPIC)).andReturn(Collections.emptyMap()); - // First call to create the topic times out - Capture<NewTopic> newTopicCapture = EasyMock.newCapture(); - EasyMock.expect(admin.createOrFindTopics(EasyMock.capture(newTopicCapture))) - .andThrow(new RetriableException(new TimeoutException("timeout"))); - // Second round - expectTopicCreation(OTHER_TOPIC); - expectSendRecord(OTHER_TOPIC, false, emptyHeaders()); - - PowerMock.replayAll(); + when(admin.describeTopics(anyString())).thenReturn(Collections.emptyMap()); + when(admin.createOrFindTopics(any(NewTopic.class))) + .thenReturn(createdTopic(TOPIC)) + .thenThrow(new RetriableException(new TimeoutException("timeout"))) + .thenReturn(createdTopic(OTHER_TOPIC)); // Try to send 3, make first pass, second fail. Should save last two Review Comment: Nit: This comment was never accurate (not on the current trunk, nor in this PR); we actually just save the last one. Can we update it to "Should save last record" to match the other comment of its kind? ########## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTaskTest.java: ########## @@ -453,19 +444,22 @@ public void testSendRecordsTopicCreateRetries() { SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, TOPIC, 1, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD); SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, TOPIC, 2, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD); - // First call to describe the topic times out expectPreliminaryCalls(); - EasyMock.expect(admin.describeTopics(TOPIC)).andReturn(Collections.emptyMap()); - Capture<NewTopic> newTopicCapture = EasyMock.newCapture(); - EasyMock.expect(admin.createOrFindTopics(EasyMock.capture(newTopicCapture))) - .andThrow(new RetriableException(new TimeoutException("timeout"))); - - // Second round - expectTopicCreation(TOPIC); - expectSendRecord(); - expectSendRecord(); - PowerMock.replayAll(); + when(admin.describeTopics(TOPIC)).thenReturn(Collections.emptyMap()); + when(admin.createOrFindTopics(any(NewTopic.class))) + .thenAnswer(new Answer<TopicAdmin.TopicCreationResponse>() { + boolean firstCall = true; + + @Override + public TopicAdmin.TopicCreationResponse answer(InvocationOnMock invocation) { + if (firstCall) { + firstCall = false; + throw new RetriableException(new TimeoutException("timeout")); + } + return createdTopic(TOPIC); + } + }); workerTask.toSend = Arrays.asList(record1, record2); Review Comment: Nicely done! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org