[GitHub] [kafka] C0urante commented on a diff in pull request #13191: KAFKA-14060: Replace EasyMock and PowerMock with Mockito in AbstractWorkerSourceTaskTest

2023-02-23 Thread via GitHub


C0urante commented on code in PR #13191:
URL: https://github.com/apache/kafka/pull/13191#discussion_r1116427030


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTaskTest.java:
##
@@ -523,34 +513,26 @@ public void testSendRecordsTopicCreateRetriesMidway() {
 SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, TOPIC, 2, 
KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
 SourceRecord record3 = new SourceRecord(PARTITION, OFFSET, 
OTHER_TOPIC, 3, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
 
-// First round
+expectPreliminaryCalls(TOPIC);
 expectPreliminaryCalls(OTHER_TOPIC);
-expectTopicCreation(TOPIC);
-expectSendRecord();
-expectSendRecord();
-
-
EasyMock.expect(admin.describeTopics(OTHER_TOPIC)).andReturn(Collections.emptyMap());
-// First call to create the topic times out
-Capture newTopicCapture = EasyMock.newCapture();
-
EasyMock.expect(admin.createOrFindTopics(EasyMock.capture(newTopicCapture)))
-.andThrow(new RetriableException(new 
TimeoutException("timeout")));
 
-// Second round
-expectTopicCreation(OTHER_TOPIC);
-expectSendRecord(OTHER_TOPIC, false, emptyHeaders());
-
-PowerMock.replayAll();
+
when(admin.describeTopics(anyString())).thenReturn(Collections.emptyMap());
+when(admin.createOrFindTopics(any(NewTopic.class)))
+.thenReturn(createdTopic(TOPIC))
+.thenThrow(new RetriableException(new 
TimeoutException("timeout")))
+.thenReturn(createdTopic(OTHER_TOPIC));
 
 // Try to send 3, make first pass, second fail. Should save last two

Review Comment:
   Nit: This comment was never accurate (not on the current trunk, nor in this 
PR); we actually just save the last one. Can we update it to "Should save last 
record" to match the other comment of its kind?



##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTaskTest.java:
##
@@ -453,19 +444,22 @@ public void testSendRecordsTopicCreateRetries() {
 SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, TOPIC, 1, 
KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
 SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, TOPIC, 2, 
KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
 
-// First call to describe the topic times out
 expectPreliminaryCalls();
-
EasyMock.expect(admin.describeTopics(TOPIC)).andReturn(Collections.emptyMap());
-Capture newTopicCapture = EasyMock.newCapture();
-
EasyMock.expect(admin.createOrFindTopics(EasyMock.capture(newTopicCapture)))
-.andThrow(new RetriableException(new 
TimeoutException("timeout")));
-
-// Second round
-expectTopicCreation(TOPIC);
-expectSendRecord();
-expectSendRecord();
 
-PowerMock.replayAll();
+when(admin.describeTopics(TOPIC)).thenReturn(Collections.emptyMap());
+when(admin.createOrFindTopics(any(NewTopic.class)))
+.thenAnswer(new Answer() {
+boolean firstCall = true;
+
+@Override
+public TopicAdmin.TopicCreationResponse 
answer(InvocationOnMock invocation) {
+if (firstCall) {
+firstCall = false;
+throw new RetriableException(new 
TimeoutException("timeout"));
+}
+return createdTopic(TOPIC);
+}
+});
 
 workerTask.toSend = Arrays.asList(record1, record2);

Review Comment:
   Nicely done!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #13191: KAFKA-14060: Replace EasyMock and PowerMock with Mockito in AbstractWorkerSourceTaskTest

2023-02-15 Thread via GitHub


C0urante commented on code in PR #13191:
URL: https://github.com/apache/kafka/pull/13191#discussion_r1107510674


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTaskTest.java:
##
@@ -235,115 +236,100 @@ public void testMetricsGroup() {
 public void testSendRecordsConvertsData() {
 createWorkerTask();
 
-List records = new ArrayList<>();
 // Can just use the same record for key and value
-records.add(new SourceRecord(PARTITION, OFFSET, "topic", null, 
KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD));
-
-Capture> sent = 
expectSendRecordAnyTimes();
+List records = Collections.singletonList(
+new SourceRecord(PARTITION, OFFSET, "topic", null, KEY_SCHEMA, 
KEY, RECORD_SCHEMA, RECORD)
+);
 
+expectSendRecord(emptyHeaders());
 expectTopicCreation(TOPIC);
 
-PowerMock.replayAll();
-
 workerTask.toSend = records;
 workerTask.sendRecords();
+
+ArgumentCaptor> sent = 
verifySendRecord();
+
 assertEquals(SERIALIZED_KEY, sent.getValue().key());
 assertEquals(SERIALIZED_RECORD, sent.getValue().value());
 
-PowerMock.verifyAll();
+verifyTaskGetTopic();
 }
 
 @Test
 public void testSendRecordsPropagatesTimestamp() {
 final Long timestamp = System.currentTimeMillis();
-
 createWorkerTask();
 
-List records = Collections.singletonList(
-new SourceRecord(PARTITION, OFFSET, "topic", null, KEY_SCHEMA, 
KEY, RECORD_SCHEMA, RECORD, timestamp)
-);
-
-Capture> sent = 
expectSendRecordAnyTimes();
-
+expectSendRecord(emptyHeaders());
 expectTopicCreation(TOPIC);
 
-PowerMock.replayAll();
-
-workerTask.toSend = records;
+workerTask.toSend = Collections.singletonList(
+new SourceRecord(PARTITION, OFFSET, "topic", null, KEY_SCHEMA, 
KEY, RECORD_SCHEMA, RECORD, timestamp)
+);
 workerTask.sendRecords();
+
+ArgumentCaptor> sent = 
verifySendRecord();
 assertEquals(timestamp, sent.getValue().timestamp());
 
-PowerMock.verifyAll();
+verifyTaskGetTopic();
 }
 
 @Test
 public void testSendRecordsCorruptTimestamp() {
 final Long timestamp = -3L;
 createWorkerTask();
 
-List records = Collections.singletonList(
+expectSendRecord(emptyHeaders());
+expectTopicCreation(TOPIC);

Review Comment:
   Why is this added? We're testing a scenario where the task fails on an 
invalid record timestamp, it should never get to the point of attempting to 
create a topic.



##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTaskTest.java:
##
@@ -639,144 +644,112 @@ public void 
testTopicCreateSucceedsWhenCreateReturnsNewTopicFound() {
 SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, TOPIC, 1, 
KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
 SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, TOPIC, 2, 
KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
 
-expectPreliminaryCalls();
-
EasyMock.expect(admin.describeTopics(TOPIC)).andReturn(Collections.emptyMap());
+expectSendRecord(emptyHeaders());
 
-Capture newTopicCapture = EasyMock.newCapture();
-
EasyMock.expect(admin.createOrFindTopics(EasyMock.capture(newTopicCapture))).andReturn(createdTopic(TOPIC));
-
-expectSendRecord();
-expectSendRecord();
-
-PowerMock.replayAll();
+when(admin.describeTopics(TOPIC)).thenReturn(Collections.emptyMap());
+
when(admin.createOrFindTopics(any(NewTopic.class))).thenReturn(createdTopic(TOPIC));
 
 workerTask.toSend = Arrays.asList(record1, record2);
 workerTask.sendRecords();
+
+ArgumentCaptor> sent = 
verifySendRecord(2);
+
+List> capturedValues = 
sent.getAllValues();
+assertEquals(2, capturedValues.size());
 }
 
-private Capture> expectSendRecord(
-String topic,
-boolean anyTimes,
-Headers headers
-) {
+private void expectSendRecord(Headers headers) {
 if (headers != null)
-expectConvertHeadersAndKeyValue(topic, anyTimes, headers);
+expectConvertHeadersAndKeyValue(headers);
 
-expectApplyTransformationChain(anyTimes);
+expectApplyTransformationChain();
 
-Capture> sent = EasyMock.newCapture();
-
-IExpectationSetters> expect = EasyMock.expect(
-producer.send(EasyMock.capture(sent), 
EasyMock.capture(producerCallbacks)));
+expectTaskGetTopic();
+}
 
-IAnswer> expectResponse = () -> {
-synchronized (producerCallbacks) {
-for (Callback cb : producerCallbacks.getValues()) {
-cb.onCompletion(new RecordMetadata(new 
TopicPartition("foo", 0), 0, 0, 0L, 0, 0), null);
-  

[GitHub] [kafka] C0urante commented on a diff in pull request #13191: KAFKA-14060: Replace EasyMock and PowerMock with Mockito in AbstractWorkerSourceTaskTest

2023-02-06 Thread via GitHub


C0urante commented on code in PR #13191:
URL: https://github.com/apache/kafka/pull/13191#discussion_r1097852548


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTaskTest.java:
##
@@ -129,7 +131,7 @@ public class AbstractWorkerSourceTaskTest {
 @Mock private ConnectorOffsetBackingStore offsetStore;
 @Mock private StatusBackingStore statusBackingStore;
 @Mock private WorkerSourceTaskContext sourceTaskContext;
-@MockStrict private TaskStatus.Listener statusListener;

Review Comment:
   In order to retain the same guarantees we have currently w/r/t interactions 
with this class, can we add a call to 
`verifyNoMoreInteractions(statusListener);` in the `tearDown` method?



##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTaskTest.java:
##
@@ -639,93 +814,25 @@ public void 
testTopicCreateSucceedsWhenCreateReturnsNewTopicFound() {
 SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, TOPIC, 1, 
KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
 SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, TOPIC, 2, 
KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
 
-expectPreliminaryCalls();
-
EasyMock.expect(admin.describeTopics(TOPIC)).andReturn(Collections.emptyMap());
-
-Capture newTopicCapture = EasyMock.newCapture();
-
EasyMock.expect(admin.createOrFindTopics(EasyMock.capture(newTopicCapture))).andReturn(createdTopic(TOPIC));
-
-expectSendRecord();
-expectSendRecord();
-
-PowerMock.replayAll();
+when(transformationChain.apply(any(SourceRecord.class)))
+.thenAnswer((Answer) invocation -> 
invocation.getArgument(0));
+when(headerConverter.fromConnectHeader(anyString(), anyString(), 
eq(Schema.STRING_SCHEMA),
+anyString()))
+.thenAnswer((Answer) invocation -> {
+String headerValue = invocation.getArgument(3, String.class);
+return headerValue.getBytes(StandardCharsets.UTF_8);
+});
+when(keyConverter.fromConnectData(eq(TOPIC), any(Headers.class), 
eq(KEY_SCHEMA), eq(KEY)))
+.thenReturn(SERIALIZED_KEY);
+when(valueConverter.fromConnectData(eq(TOPIC), any(Headers.class), 
eq(RECORD_SCHEMA),
+eq(RECORD)))
+.thenReturn(SERIALIZED_RECORD);
+when(admin.describeTopics(TOPIC)).thenReturn(Collections.emptyMap());
+
when(admin.createOrFindTopics(any(NewTopic.class))).thenReturn(createdTopic(TOPIC));
 
 workerTask.toSend = Arrays.asList(record1, record2);
 workerTask.sendRecords();
-}
-
-private Capture> expectSendRecord(
-String topic,
-boolean anyTimes,
-Headers headers
-) {
-if (headers != null)
-expectConvertHeadersAndKeyValue(topic, anyTimes, headers);
-
-expectApplyTransformationChain(anyTimes);
-
-Capture> sent = EasyMock.newCapture();
-
-IExpectationSetters> expect = EasyMock.expect(
-producer.send(EasyMock.capture(sent), 
EasyMock.capture(producerCallbacks)));
-
-IAnswer> expectResponse = () -> {
-synchronized (producerCallbacks) {
-for (Callback cb : producerCallbacks.getValues()) {
-cb.onCompletion(new RecordMetadata(new 
TopicPartition("foo", 0), 0, 0, 0L, 0, 0), null);
-}
-producerCallbacks.reset();
-}
-return null;
-};
-
-if (anyTimes)
-expect.andStubAnswer(expectResponse);
-else
-expect.andAnswer(expectResponse);
-
-expectTaskGetTopic(anyTimes);
-
-return sent;
-}
-
-private Capture> expectSendRecordAnyTimes() 
{
-return expectSendRecord(TOPIC, true, emptyHeaders());
-}
-
-private Capture> expectSendRecord() {
-return expectSendRecord(TOPIC, false, emptyHeaders());
-}
-
-private void expectTaskGetTopic(boolean anyTimes) {
-final Capture connectorCapture = EasyMock.newCapture();
-final Capture topicCapture = EasyMock.newCapture();
-IExpectationSetters expect = 
EasyMock.expect(statusBackingStore.getTopic(
-EasyMock.capture(connectorCapture),
-EasyMock.capture(topicCapture)));
-if (anyTimes) {
-expect.andStubAnswer(() -> new TopicStatus(
-topicCapture.getValue(),
-new ConnectorTaskId(connectorCapture.getValue(), 0),
-Time.SYSTEM.milliseconds()));
-} else {
-expect.andAnswer(() -> new TopicStatus(
-topicCapture.getValue(),
-new ConnectorTaskId(connectorCapture.getValue(), 0),
-Time.SYSTEM.milliseconds()));
-}
-if (connectorCapture.hasCaptured() && topicCapture.hasCaptured()) {
-