hgeraldino commented on code in PR #13383:
URL: https://github.com/apache/kafka/pull/13383#discussion_r1215974269


##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java:
##########
@@ -832,202 +776,165 @@ private CountDownLatch expectPolls(int minimum, final 
AtomicInteger count) throw
         // Note that we stub these to allow any number of calls because the 
thread will continue to
         // run. The count passed in + latch returned just makes sure we get 
*at least* that number of
         // calls
-        EasyMock.expect(sourceTask.poll())
-                .andStubAnswer(() -> {
-                    count.incrementAndGet();
-                    latch.countDown();
-                    Thread.sleep(10);
-                    return RECORDS;
-                });
+        doAnswer((Answer<List<SourceRecord>>) invocation -> {
+            count.incrementAndGet();
+            latch.countDown();
+            Thread.sleep(10);
+            return RECORDS;
+        }).when(sourceTask).poll();
+
         // Fallout of the poll() call
-        expectSendRecordAnyTimes();
+        expectSendRecord();
         return latch;
     }
 
     private CountDownLatch expectPolls(int count) throws InterruptedException {
         return expectPolls(count, new AtomicInteger());
     }
 
-    @SuppressWarnings("unchecked")
-    private void expectSendRecordSyncFailure(Throwable error) {
-        expectConvertHeadersAndKeyValue(false);
-        expectApplyTransformationChain(false);
-
-        EasyMock.expect(
-            producer.send(EasyMock.anyObject(ProducerRecord.class),
-                
EasyMock.anyObject(org.apache.kafka.clients.producer.Callback.class)))
-            .andThrow(error);
+    private void expectSendRecord() throws InterruptedException {
+        expectSendRecordTaskCommitRecordSucceed();
     }
 
-    private Capture<ProducerRecord<byte[], byte[]>> expectSendRecordAnyTimes() 
throws InterruptedException {
-        return expectSendRecordTaskCommitRecordSucceed(true);
+    //
+    private void expectSendRecordProducerCallbackFail() throws 
InterruptedException {
+        expectSendRecord(TOPIC, false, false, true, emptyHeaders());
     }
 
-    private Capture<ProducerRecord<byte[], byte[]>> expectSendRecordOnce() 
throws InterruptedException {
-        return expectSendRecordTaskCommitRecordSucceed(false);
+    private void expectSendRecordTaskCommitRecordSucceed() throws 
InterruptedException {
+        expectSendRecord(TOPIC, true, true, true, emptyHeaders());
     }
 
-    private Capture<ProducerRecord<byte[], byte[]>> 
expectSendRecordProducerCallbackFail() throws InterruptedException {
-        return expectSendRecord(TOPIC, false, false, false, true, 
emptyHeaders());
-    }
-
-    private Capture<ProducerRecord<byte[], byte[]>> 
expectSendRecordTaskCommitRecordSucceed(boolean anyTimes) throws 
InterruptedException {
-        return expectSendRecord(TOPIC, anyTimes, true, true, true, 
emptyHeaders());
-    }
-
-    private Capture<ProducerRecord<byte[], byte[]>> 
expectSendRecordTaskCommitRecordFail(boolean anyTimes) throws 
InterruptedException {
-        return expectSendRecord(TOPIC, anyTimes, true, false, true, 
emptyHeaders());
-    }
-
-    private Capture<ProducerRecord<byte[], byte[]>> expectSendRecord(
-        String topic,
-        boolean anyTimes,
-        boolean sendSuccess,
-        boolean commitSuccess,
-        boolean isMockedConverters,
-        Headers headers
+    private void expectSendRecord(
+            String topic,
+            boolean sendSuccess,
+            boolean commitSuccess,
+            boolean isMockedConverters,
+            Headers headers
     ) throws InterruptedException {
         if (isMockedConverters) {
-            expectConvertHeadersAndKeyValue(topic, anyTimes, headers);
+            expectConvertHeadersAndKeyValue(topic, headers);
         }
 
-        expectApplyTransformationChain(anyTimes);
-
-        Capture<ProducerRecord<byte[], byte[]>> sent = EasyMock.newCapture();
-
-        // 1. Converted data passed to the producer, which will need callbacks 
invoked for flush to work
-        IExpectationSetters<Future<RecordMetadata>> expect = EasyMock.expect(
-            producer.send(EasyMock.capture(sent),
-                EasyMock.capture(producerCallbacks)));
-        IAnswer<Future<RecordMetadata>> expectResponse = () -> {
-            synchronized (producerCallbacks) {
-                for (org.apache.kafka.clients.producer.Callback cb : 
producerCallbacks.getValues()) {
-                    if (sendSuccess) {
-                        cb.onCompletion(new RecordMetadata(new 
TopicPartition("foo", 0), 0, 0,
-                            0L, 0, 0), null);
-                    } else {
-                        cb.onCompletion(null, new 
TopicAuthorizationException("foo"));
-                    }
-                }
-                producerCallbacks.reset();
-            }
-            return sendFuture;
-        };
-        if (anyTimes)
-            expect.andStubAnswer(expectResponse);
-        else
-            expect.andAnswer(expectResponse);
+        expectApplyTransformationChain();
 
         if (sendSuccess) {
             // 2. As a result of a successful producer send callback, we'll 
notify the source task of the record commit
-            expectTaskCommitRecordWithOffset(anyTimes, commitSuccess);
-            expectTaskGetTopic(anyTimes);
+            expectTaskCommitRecordWithOffset(commitSuccess);
+            expectTaskGetTopic();
         }
 
-        return sent;
+        doAnswer(producerSendAnswer(sendSuccess))
+                .when(producer).send(any(ProducerRecord.class), 
any(Callback.class));
     }
 
-    private void expectConvertHeadersAndKeyValue(boolean anyTimes) {
-        expectConvertHeadersAndKeyValue(TOPIC, anyTimes, emptyHeaders());
+    private Answer<Future<RecordMetadata>> producerSendAnswer(boolean 
sendSuccess) {
+        return invocation -> {
+            Callback cb = invocation.getArgument(1);
+            if (sendSuccess) {
+                cb.onCompletion(new RecordMetadata(new TopicPartition("foo", 
0), 0, 0, 0L, 0, 0),
+                        null);
+            } else {
+                cb.onCompletion(null, new TopicAuthorizationException("foo"));
+            }
+
+            return null;
+        };
     }
 
-    private void expectConvertHeadersAndKeyValue(String topic, boolean 
anyTimes, Headers headers) {
-        for (Header header : headers) {
-            IExpectationSetters<byte[]> convertHeaderExpect = 
EasyMock.expect(headerConverter.fromConnectHeader(topic, header.key(), 
Schema.STRING_SCHEMA, new String(header.value())));
-            if (anyTimes)
-                convertHeaderExpect.andStubReturn(header.value());
-            else
-                convertHeaderExpect.andReturn(header.value());
+    private void expectConvertHeadersAndKeyValue(String topic, Headers 
headers) {
+        if (headers.iterator().hasNext()) {
+            when(headerConverter.fromConnectHeader(anyString(), anyString(), 
eq(Schema.STRING_SCHEMA),
+                    anyString()))
+                    .thenAnswer((Answer<byte[]>) invocation -> {
+                        String headerValue = invocation.getArgument(3, 
String.class);
+                        return headerValue.getBytes(StandardCharsets.UTF_8);
+                    });
         }
-        IExpectationSetters<byte[]> convertKeyExpect = 
EasyMock.expect(keyConverter.fromConnectData(topic, headers, KEY_SCHEMA, KEY));
-        if (anyTimes)
-            convertKeyExpect.andStubReturn(SERIALIZED_KEY);
-        else
-            convertKeyExpect.andReturn(SERIALIZED_KEY);
-        IExpectationSetters<byte[]> convertValueExpect = 
EasyMock.expect(valueConverter.fromConnectData(topic, headers, RECORD_SCHEMA, 
RECORD));
-        if (anyTimes)
-            convertValueExpect.andStubReturn(SERIALIZED_RECORD);
-        else
-            convertValueExpect.andReturn(SERIALIZED_RECORD);
+
+        when(keyConverter.fromConnectData(eq(topic), any(Headers.class), 
eq(KEY_SCHEMA), eq(KEY)))
+                .thenReturn(SERIALIZED_KEY);
+        when(valueConverter.fromConnectData(eq(topic), any(Headers.class), 
eq(RECORD_SCHEMA),
+                eq(RECORD)))
+                .thenReturn(SERIALIZED_RECORD);
     }
 
-    private void expectApplyTransformationChain(boolean anyTimes) {
-        final Capture<SourceRecord> recordCapture = EasyMock.newCapture();
-        IExpectationSetters<SourceRecord> convertKeyExpect = 
EasyMock.expect(transformationChain.apply(EasyMock.capture(recordCapture)));
-        if (anyTimes)
-            convertKeyExpect.andStubAnswer(recordCapture::getValue);
-        else
-            convertKeyExpect.andAnswer(recordCapture::getValue);
+    private void expectApplyTransformationChain() {
+        when(transformationChain.apply(any(SourceRecord.class)))
+                .thenAnswer(AdditionalAnswers.returnsFirstArg());
     }
 
-    private void expectTaskCommitRecordWithOffset(boolean anyTimes, boolean 
succeed) throws InterruptedException {
-        sourceTask.commitRecord(EasyMock.anyObject(SourceRecord.class), 
EasyMock.anyObject(RecordMetadata.class));
-        IExpectationSetters<Void> expect = EasyMock.expectLastCall();
+    //
+    private void expectTaskCommitRecordWithOffset(boolean succeed) throws 
InterruptedException {

Review Comment:
   Looks like we don't. Removed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to