hgeraldino commented on code in PR #13383:
URL: https://github.com/apache/kafka/pull/13383#discussion_r1217174321


##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java:
##########
@@ -832,202 +776,165 @@ private CountDownLatch expectPolls(int minimum, final 
AtomicInteger count) throw
         // Note that we stub these to allow any number of calls because the 
thread will continue to
         // run. The count passed in + latch returned just makes sure we get 
*at least* that number of
         // calls
-        EasyMock.expect(sourceTask.poll())
-                .andStubAnswer(() -> {
-                    count.incrementAndGet();
-                    latch.countDown();
-                    Thread.sleep(10);
-                    return RECORDS;
-                });
+        doAnswer((Answer<List<SourceRecord>>) invocation -> {
+            count.incrementAndGet();
+            latch.countDown();
+            Thread.sleep(10);
+            return RECORDS;
+        }).when(sourceTask).poll();
+
         // Fallout of the poll() call
-        expectSendRecordAnyTimes();
+        expectSendRecord();
         return latch;
     }
 
     private CountDownLatch expectPolls(int count) throws InterruptedException {
         return expectPolls(count, new AtomicInteger());
     }
 
-    @SuppressWarnings("unchecked")
-    private void expectSendRecordSyncFailure(Throwable error) {
-        expectConvertHeadersAndKeyValue(false);
-        expectApplyTransformationChain(false);
-
-        EasyMock.expect(
-            producer.send(EasyMock.anyObject(ProducerRecord.class),
-                
EasyMock.anyObject(org.apache.kafka.clients.producer.Callback.class)))
-            .andThrow(error);
+    private void expectSendRecord() throws InterruptedException {
+        expectSendRecordTaskCommitRecordSucceed();
     }
 
-    private Capture<ProducerRecord<byte[], byte[]>> expectSendRecordAnyTimes() 
throws InterruptedException {
-        return expectSendRecordTaskCommitRecordSucceed(true);
+    //
+    private void expectSendRecordProducerCallbackFail() throws 
InterruptedException {
+        expectSendRecord(TOPIC, false, false, true, emptyHeaders());
     }
 
-    private Capture<ProducerRecord<byte[], byte[]>> expectSendRecordOnce() 
throws InterruptedException {
-        return expectSendRecordTaskCommitRecordSucceed(false);
+    private void expectSendRecordTaskCommitRecordSucceed() throws 
InterruptedException {
+        expectSendRecord(TOPIC, true, true, true, emptyHeaders());
     }
 
-    private Capture<ProducerRecord<byte[], byte[]>> 
expectSendRecordProducerCallbackFail() throws InterruptedException {
-        return expectSendRecord(TOPIC, false, false, false, true, 
emptyHeaders());
-    }
-
-    private Capture<ProducerRecord<byte[], byte[]>> 
expectSendRecordTaskCommitRecordSucceed(boolean anyTimes) throws 
InterruptedException {
-        return expectSendRecord(TOPIC, anyTimes, true, true, true, 
emptyHeaders());
-    }
-
-    private Capture<ProducerRecord<byte[], byte[]>> 
expectSendRecordTaskCommitRecordFail(boolean anyTimes) throws 
InterruptedException {
-        return expectSendRecord(TOPIC, anyTimes, true, false, true, 
emptyHeaders());
-    }
-
-    private Capture<ProducerRecord<byte[], byte[]>> expectSendRecord(
-        String topic,
-        boolean anyTimes,
-        boolean sendSuccess,
-        boolean commitSuccess,
-        boolean isMockedConverters,
-        Headers headers
+    private void expectSendRecord(
+            String topic,
+            boolean sendSuccess,
+            boolean commitSuccess,
+            boolean isMockedConverters,
+            Headers headers
     ) throws InterruptedException {
         if (isMockedConverters) {
-            expectConvertHeadersAndKeyValue(topic, anyTimes, headers);
+            expectConvertHeadersAndKeyValue(topic, headers);
         }
 
-        expectApplyTransformationChain(anyTimes);
-
-        Capture<ProducerRecord<byte[], byte[]>> sent = EasyMock.newCapture();
-
-        // 1. Converted data passed to the producer, which will need callbacks 
invoked for flush to work
-        IExpectationSetters<Future<RecordMetadata>> expect = EasyMock.expect(
-            producer.send(EasyMock.capture(sent),
-                EasyMock.capture(producerCallbacks)));
-        IAnswer<Future<RecordMetadata>> expectResponse = () -> {
-            synchronized (producerCallbacks) {
-                for (org.apache.kafka.clients.producer.Callback cb : 
producerCallbacks.getValues()) {
-                    if (sendSuccess) {
-                        cb.onCompletion(new RecordMetadata(new 
TopicPartition("foo", 0), 0, 0,
-                            0L, 0, 0), null);
-                    } else {
-                        cb.onCompletion(null, new 
TopicAuthorizationException("foo"));
-                    }
-                }
-                producerCallbacks.reset();
-            }
-            return sendFuture;
-        };
-        if (anyTimes)
-            expect.andStubAnswer(expectResponse);
-        else
-            expect.andAnswer(expectResponse);
+        expectApplyTransformationChain();
 
         if (sendSuccess) {
             // 2. As a result of a successful producer send callback, we'll 
notify the source task of the record commit
-            expectTaskCommitRecordWithOffset(anyTimes, commitSuccess);
-            expectTaskGetTopic(anyTimes);
+            expectTaskCommitRecordWithOffset(commitSuccess);

Review Comment:
   It was.
   
   Removed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to