divijvaidya commented on code in PR #12735: URL: https://github.com/apache/kafka/pull/12735#discussion_r994015093
########## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java: ########## @@ -374,30 +344,37 @@ public void testErrorHandlingInSourceTasks() throws Exception { Struct struct2 = new Struct(valSchema).put("val", 6789); SourceRecord record2 = new SourceRecord(emptyMap(), emptyMap(), TOPIC, PARTITION1, valSchema, struct2); - EasyMock.expect(workerSourceTask.isStopping()).andReturn(false); - EasyMock.expect(workerSourceTask.isStopping()).andReturn(false); - EasyMock.expect(workerSourceTask.isStopping()).andReturn(true); + when(workerSourceTask.isStopping()).thenReturn(false); + when(workerSourceTask.isStopping()).thenReturn(false); + when(workerSourceTask.isStopping()).thenReturn(false); - EasyMock.expect(workerSourceTask.commitOffsets()).andReturn(true); + doReturn(true).when(workerSourceTask).commitOffsets(); - offsetStore.start(); - EasyMock.expectLastCall(); - sourceTask.initialize(EasyMock.anyObject()); - EasyMock.expectLastCall(); - sourceTask.start(EasyMock.anyObject()); - EasyMock.expectLastCall(); + when(sourceTask.poll()).thenReturn(singletonList(record1)); + when(sourceTask.poll()).thenReturn(singletonList(record2)); - EasyMock.expect(sourceTask.poll()).andReturn(singletonList(record1)); - EasyMock.expect(sourceTask.poll()).andReturn(singletonList(record2)); expectTopicCreation(TOPIC); - EasyMock.expect(producer.send(EasyMock.anyObject(), EasyMock.anyObject())).andReturn(null).times(2); Review Comment: ``` when(producer.send(any(), any())) .thenReturn(null) .thenReturn(null); ``` is missing here. ########## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java: ########## @@ -374,30 +344,37 @@ public void testErrorHandlingInSourceTasks() throws Exception { Struct struct2 = new Struct(valSchema).put("val", 6789); SourceRecord record2 = new SourceRecord(emptyMap(), emptyMap(), TOPIC, PARTITION1, valSchema, struct2); - EasyMock.expect(workerSourceTask.isStopping()).andReturn(false); - EasyMock.expect(workerSourceTask.isStopping()).andReturn(false); - EasyMock.expect(workerSourceTask.isStopping()).andReturn(true); + when(workerSourceTask.isStopping()).thenReturn(false); + when(workerSourceTask.isStopping()).thenReturn(false); + when(workerSourceTask.isStopping()).thenReturn(false); - EasyMock.expect(workerSourceTask.commitOffsets()).andReturn(true); + doReturn(true).when(workerSourceTask).commitOffsets(); - offsetStore.start(); - EasyMock.expectLastCall(); - sourceTask.initialize(EasyMock.anyObject()); - EasyMock.expectLastCall(); - sourceTask.start(EasyMock.anyObject()); - EasyMock.expectLastCall(); + when(sourceTask.poll()).thenReturn(singletonList(record1)); Review Comment: Please modify to `when(sourceTask.poll()).thenReturn(singletonList(record1)).thenReturn(singletonList(record2));` ########## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java: ########## @@ -571,13 +511,15 @@ private void createSinkTask(TargetState initialState, RetryWithToleranceOperator oo.put("schemas.enable", "false"); converter.configure(oo); - TransformationChain<SinkRecord> sinkTransforms = new TransformationChain<>(singletonList(new FaultyPassthrough<SinkRecord>()), retryWithToleranceOperator); + TransformationChain<SinkRecord> sinkTransforms = + new TransformationChain<>(singletonList(new FaultyPassthrough<SinkRecord>()), retryWithToleranceOperator); - workerSinkTask = new WorkerSinkTask( + workerSinkTask = spy(new WorkerSinkTask( Review Comment: Please help me understand the reason on why are we spying this class. If not required, perhaps we can remove this? (Note that Spy adds a lot of latency to test execution and hence might be the reason for slow execution as you pointed out) ########## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java: ########## @@ -438,30 +414,28 @@ public void testErrorHandlingInSourceTasksWthBadConverter() throws Exception { Struct struct2 = new Struct(valSchema).put("val", 6789); SourceRecord record2 = new SourceRecord(emptyMap(), emptyMap(), TOPIC, PARTITION1, valSchema, struct2); - EasyMock.expect(workerSourceTask.isStopping()).andReturn(false); - EasyMock.expect(workerSourceTask.isStopping()).andReturn(false); - EasyMock.expect(workerSourceTask.isStopping()).andReturn(true); + when(workerSourceTask.isStopping()) + .thenReturn(false) + .thenReturn(false) + .thenReturn(true); - EasyMock.expect(workerSourceTask.commitOffsets()).andReturn(true); + doReturn(true).when(workerSourceTask).commitOffsets(); - offsetStore.start(); - EasyMock.expectLastCall(); - sourceTask.initialize(EasyMock.anyObject()); - EasyMock.expectLastCall(); - sourceTask.start(EasyMock.anyObject()); - EasyMock.expectLastCall(); - - EasyMock.expect(sourceTask.poll()).andReturn(singletonList(record1)); - EasyMock.expect(sourceTask.poll()).andReturn(singletonList(record2)); + when(sourceTask.poll()) + .thenReturn(singletonList(record1)) + .thenReturn(singletonList(record2)); expectTopicCreation(TOPIC); - EasyMock.expect(producer.send(EasyMock.anyObject(), EasyMock.anyObject())).andReturn(null).times(2); - PowerMock.replayAll(); + when(producer.send(any(), any())) + .thenReturn(null) + .thenReturn(null); workerSourceTask.initialize(TASK_CONFIG); workerSourceTask.initializeAndStart(); workerSourceTask.execute(); + + assertEquals(null, producer.send(any())); Review Comment: Please help me understand as to why this is required? It would invoke `producer.send()` again after the code under test and lead to an extra invocation of `producer.send` ########## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java: ########## @@ -374,30 +344,37 @@ public void testErrorHandlingInSourceTasks() throws Exception { Struct struct2 = new Struct(valSchema).put("val", 6789); SourceRecord record2 = new SourceRecord(emptyMap(), emptyMap(), TOPIC, PARTITION1, valSchema, struct2); - EasyMock.expect(workerSourceTask.isStopping()).andReturn(false); - EasyMock.expect(workerSourceTask.isStopping()).andReturn(false); - EasyMock.expect(workerSourceTask.isStopping()).andReturn(true); + when(workerSourceTask.isStopping()).thenReturn(false); + when(workerSourceTask.isStopping()).thenReturn(false); + when(workerSourceTask.isStopping()).thenReturn(false); Review Comment: Please modify this to `when(workerSourceTask.isStopping()).thenReturn(false).thenReturn(false).thenReturn(true)`. Currently it incorrectly sets the last invocation to `false`. Also, I might be wrong but re-defining invocation behaviour picks up the latest value and discards the earlier ones. Hence the right way is to chain them as I mentioned above. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org