hgeraldino commented on code in PR #13191:
URL: https://github.com/apache/kafka/pull/13191#discussion_r1112474934


##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTaskTest.java:
##########
@@ -639,144 +644,112 @@ public void 
testTopicCreateSucceedsWhenCreateReturnsNewTopicFound() {
         SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, TOPIC, 1, 
KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
         SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, TOPIC, 2, 
KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
 
-        expectPreliminaryCalls();
-        
EasyMock.expect(admin.describeTopics(TOPIC)).andReturn(Collections.emptyMap());
+        expectSendRecord(emptyHeaders());
 
-        Capture<NewTopic> newTopicCapture = EasyMock.newCapture();
-        
EasyMock.expect(admin.createOrFindTopics(EasyMock.capture(newTopicCapture))).andReturn(createdTopic(TOPIC));
-
-        expectSendRecord();
-        expectSendRecord();
-
-        PowerMock.replayAll();
+        when(admin.describeTopics(TOPIC)).thenReturn(Collections.emptyMap());
+        
when(admin.createOrFindTopics(any(NewTopic.class))).thenReturn(createdTopic(TOPIC));
 
         workerTask.toSend = Arrays.asList(record1, record2);
         workerTask.sendRecords();
+
+        ArgumentCaptor<ProducerRecord<byte[], byte[]>> sent = 
verifySendRecord(2);
+
+        List<ProducerRecord<byte[], byte[]>> capturedValues = 
sent.getAllValues();
+        assertEquals(2, capturedValues.size());
     }
 
-    private Capture<ProducerRecord<byte[], byte[]>> expectSendRecord(
-            String topic,
-            boolean anyTimes,
-            Headers headers
-    ) {
+    private void expectSendRecord(Headers headers) {
         if (headers != null)
-            expectConvertHeadersAndKeyValue(topic, anyTimes, headers);
+            expectConvertHeadersAndKeyValue(headers);
 
-        expectApplyTransformationChain(anyTimes);
+        expectApplyTransformationChain();
 
-        Capture<ProducerRecord<byte[], byte[]>> sent = EasyMock.newCapture();
-
-        IExpectationSetters<Future<RecordMetadata>> expect = EasyMock.expect(
-                producer.send(EasyMock.capture(sent), 
EasyMock.capture(producerCallbacks)));
+        expectTaskGetTopic();
+    }
 
-        IAnswer<Future<RecordMetadata>> expectResponse = () -> {
-            synchronized (producerCallbacks) {
-                for (Callback cb : producerCallbacks.getValues()) {
-                    cb.onCompletion(new RecordMetadata(new 
TopicPartition("foo", 0), 0, 0, 0L, 0, 0), null);
-                }
-                producerCallbacks.reset();
-            }
-            return null;
-        };
+    private ArgumentCaptor<ProducerRecord<byte[], byte[]>> verifySendRecord() {
+        return verifySendRecord(1);
+    }
 
-        if (anyTimes)
-            expect.andStubAnswer(expectResponse);
-        else
-            expect.andAnswer(expectResponse);
+    private ArgumentCaptor<ProducerRecord<byte[], byte[]>> 
verifySendRecord(int times) {
+        ArgumentCaptor<ProducerRecord<byte[], byte[]>> sent = 
ArgumentCaptor.forClass(ProducerRecord.class);
+        ArgumentCaptor<Callback> producerCallbacks = 
ArgumentCaptor.forClass(Callback.class);
+        verify(producer, times(times)).send(sent.capture(), 
producerCallbacks.capture());
 
-        expectTaskGetTopic(anyTimes);
+        for (Callback cb : producerCallbacks.getAllValues()) {
+            cb.onCompletion(new RecordMetadata(new TopicPartition("foo", 0), 
0, 0, 0L, 0, 0),
+                    null);
+        }
 
         return sent;
     }
 
-    private Capture<ProducerRecord<byte[], byte[]>> expectSendRecordAnyTimes() 
{
-        return expectSendRecord(TOPIC, true, emptyHeaders());
+    private void expectTaskGetTopic() {
+        when(statusBackingStore.getTopic(anyString(), 
anyString())).thenAnswer((Answer<TopicStatus>) invocation -> {
+            String connector = invocation.getArgument(0, String.class);
+            String topic = invocation.getArgument(1, String.class);
+            return new TopicStatus(topic, new ConnectorTaskId(connector, 0), 
Time.SYSTEM.milliseconds());
+        });
     }
 
-    private Capture<ProducerRecord<byte[], byte[]>> expectSendRecord() {
-        return expectSendRecord(TOPIC, false, emptyHeaders());
-    }
+    private void verifyTaskGetTopic() {
+        ArgumentCaptor<String> connectorCapture = 
ArgumentCaptor.forClass(String.class);
+        ArgumentCaptor<String> topicCapture = 
ArgumentCaptor.forClass(String.class);
+        ArgumentCaptor<NewTopic> newTopicCapture = 
ArgumentCaptor.forClass(NewTopic.class);
+        verify(statusBackingStore).getTopic(connectorCapture.capture(), 
topicCapture.capture());
 
-    private void expectTaskGetTopic(boolean anyTimes) {
-        final Capture<String> connectorCapture = EasyMock.newCapture();
-        final Capture<String> topicCapture = EasyMock.newCapture();
-        IExpectationSetters<TopicStatus> expect = 
EasyMock.expect(statusBackingStore.getTopic(
-                EasyMock.capture(connectorCapture),
-                EasyMock.capture(topicCapture)));
-        if (anyTimes) {
-            expect.andStubAnswer(() -> new TopicStatus(
-                    topicCapture.getValue(),
-                    new ConnectorTaskId(connectorCapture.getValue(), 0),
-                    Time.SYSTEM.milliseconds()));
-        } else {
-            expect.andAnswer(() -> new TopicStatus(
-                    topicCapture.getValue(),
-                    new ConnectorTaskId(connectorCapture.getValue(), 0),
-                    Time.SYSTEM.milliseconds()));
-        }
-        if (connectorCapture.hasCaptured() && topicCapture.hasCaptured()) {
-            assertEquals("job", connectorCapture.getValue());
-            assertEquals(TOPIC, topicCapture.getValue());
-        }
+        assertEquals("job", connectorCapture.getValue());
+        assertEquals(TOPIC, topicCapture.getValue());
+
+        verify(admin).createOrFindTopics(newTopicCapture.capture());
+        assertEquals(TOPIC, newTopicCapture.getValue().name());
     }
 
+    @SuppressWarnings("SameParameterValue")
     private void expectTopicCreation(String topic) {
-        if (config.topicCreationEnable()) {
-            
EasyMock.expect(admin.describeTopics(topic)).andReturn(Collections.emptyMap());
-            Capture<NewTopic> newTopicCapture = EasyMock.newCapture();
-            
EasyMock.expect(admin.createOrFindTopics(EasyMock.capture(newTopicCapture))).andReturn(createdTopic(topic));
-        }
+        when(admin.describeTopics(topic)).thenReturn(Collections.emptyMap());
+        
when(admin.createOrFindTopics(any(NewTopic.class))).thenReturn(createdTopic(topic));
     }
 
+    @SuppressWarnings("SameParameterValue")
     private TopicAdmin.TopicCreationResponse createdTopic(String topic) {
         Set<String> created = Collections.singleton(topic);
         Set<String> existing = Collections.emptySet();
         return new TopicAdmin.TopicCreationResponse(created, existing);
     }
 
+    @SuppressWarnings("SameParameterValue")
     private TopicAdmin.TopicCreationResponse foundTopic(String topic) {
         Set<String> created = Collections.emptySet();
         Set<String> existing = Collections.singleton(topic);
         return new TopicAdmin.TopicCreationResponse(created, existing);
     }
 
     private void expectPreliminaryCalls() {
-        expectPreliminaryCalls(TOPIC);
-    }
+        expectConvertHeadersAndKeyValue(emptyHeaders());
+        expectApplyTransformationChain();
+    }
+
+    private void expectConvertHeadersAndKeyValue(Headers headers) {

Review Comment:
   Makes sense. IJ noticed that the parameter was always the same and suggested 
to just use the constant, but I'll change it back



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to