hgeraldino commented on code in PR #13383: URL: https://github.com/apache/kafka/pull/13383#discussion_r1215974269
########## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java: ########## @@ -832,202 +776,165 @@ private CountDownLatch expectPolls(int minimum, final AtomicInteger count) throw // Note that we stub these to allow any number of calls because the thread will continue to // run. The count passed in + latch returned just makes sure we get *at least* that number of // calls - EasyMock.expect(sourceTask.poll()) - .andStubAnswer(() -> { - count.incrementAndGet(); - latch.countDown(); - Thread.sleep(10); - return RECORDS; - }); + doAnswer((Answer<List<SourceRecord>>) invocation -> { + count.incrementAndGet(); + latch.countDown(); + Thread.sleep(10); + return RECORDS; + }).when(sourceTask).poll(); + // Fallout of the poll() call - expectSendRecordAnyTimes(); + expectSendRecord(); return latch; } private CountDownLatch expectPolls(int count) throws InterruptedException { return expectPolls(count, new AtomicInteger()); } - @SuppressWarnings("unchecked") - private void expectSendRecordSyncFailure(Throwable error) { - expectConvertHeadersAndKeyValue(false); - expectApplyTransformationChain(false); - - EasyMock.expect( - producer.send(EasyMock.anyObject(ProducerRecord.class), - EasyMock.anyObject(org.apache.kafka.clients.producer.Callback.class))) - .andThrow(error); + private void expectSendRecord() throws InterruptedException { + expectSendRecordTaskCommitRecordSucceed(); } - private Capture<ProducerRecord<byte[], byte[]>> expectSendRecordAnyTimes() throws InterruptedException { - return expectSendRecordTaskCommitRecordSucceed(true); + // + private void expectSendRecordProducerCallbackFail() throws InterruptedException { + expectSendRecord(TOPIC, false, false, true, emptyHeaders()); } - private Capture<ProducerRecord<byte[], byte[]>> expectSendRecordOnce() throws InterruptedException { - return expectSendRecordTaskCommitRecordSucceed(false); + private void expectSendRecordTaskCommitRecordSucceed() throws InterruptedException { + expectSendRecord(TOPIC, true, true, true, emptyHeaders()); } - private Capture<ProducerRecord<byte[], byte[]>> expectSendRecordProducerCallbackFail() throws InterruptedException { - return expectSendRecord(TOPIC, false, false, false, true, emptyHeaders()); - } - - private Capture<ProducerRecord<byte[], byte[]>> expectSendRecordTaskCommitRecordSucceed(boolean anyTimes) throws InterruptedException { - return expectSendRecord(TOPIC, anyTimes, true, true, true, emptyHeaders()); - } - - private Capture<ProducerRecord<byte[], byte[]>> expectSendRecordTaskCommitRecordFail(boolean anyTimes) throws InterruptedException { - return expectSendRecord(TOPIC, anyTimes, true, false, true, emptyHeaders()); - } - - private Capture<ProducerRecord<byte[], byte[]>> expectSendRecord( - String topic, - boolean anyTimes, - boolean sendSuccess, - boolean commitSuccess, - boolean isMockedConverters, - Headers headers + private void expectSendRecord( + String topic, + boolean sendSuccess, + boolean commitSuccess, + boolean isMockedConverters, + Headers headers ) throws InterruptedException { if (isMockedConverters) { - expectConvertHeadersAndKeyValue(topic, anyTimes, headers); + expectConvertHeadersAndKeyValue(topic, headers); } - expectApplyTransformationChain(anyTimes); - - Capture<ProducerRecord<byte[], byte[]>> sent = EasyMock.newCapture(); - - // 1. Converted data passed to the producer, which will need callbacks invoked for flush to work - IExpectationSetters<Future<RecordMetadata>> expect = EasyMock.expect( - producer.send(EasyMock.capture(sent), - EasyMock.capture(producerCallbacks))); - IAnswer<Future<RecordMetadata>> expectResponse = () -> { - synchronized (producerCallbacks) { - for (org.apache.kafka.clients.producer.Callback cb : producerCallbacks.getValues()) { - if (sendSuccess) { - cb.onCompletion(new RecordMetadata(new TopicPartition("foo", 0), 0, 0, - 0L, 0, 0), null); - } else { - cb.onCompletion(null, new TopicAuthorizationException("foo")); - } - } - producerCallbacks.reset(); - } - return sendFuture; - }; - if (anyTimes) - expect.andStubAnswer(expectResponse); - else - expect.andAnswer(expectResponse); + expectApplyTransformationChain(); if (sendSuccess) { // 2. As a result of a successful producer send callback, we'll notify the source task of the record commit - expectTaskCommitRecordWithOffset(anyTimes, commitSuccess); - expectTaskGetTopic(anyTimes); + expectTaskCommitRecordWithOffset(commitSuccess); + expectTaskGetTopic(); } - return sent; + doAnswer(producerSendAnswer(sendSuccess)) + .when(producer).send(any(ProducerRecord.class), any(Callback.class)); } - private void expectConvertHeadersAndKeyValue(boolean anyTimes) { - expectConvertHeadersAndKeyValue(TOPIC, anyTimes, emptyHeaders()); + private Answer<Future<RecordMetadata>> producerSendAnswer(boolean sendSuccess) { + return invocation -> { + Callback cb = invocation.getArgument(1); + if (sendSuccess) { + cb.onCompletion(new RecordMetadata(new TopicPartition("foo", 0), 0, 0, 0L, 0, 0), + null); + } else { + cb.onCompletion(null, new TopicAuthorizationException("foo")); + } + + return null; + }; } - private void expectConvertHeadersAndKeyValue(String topic, boolean anyTimes, Headers headers) { - for (Header header : headers) { - IExpectationSetters<byte[]> convertHeaderExpect = EasyMock.expect(headerConverter.fromConnectHeader(topic, header.key(), Schema.STRING_SCHEMA, new String(header.value()))); - if (anyTimes) - convertHeaderExpect.andStubReturn(header.value()); - else - convertHeaderExpect.andReturn(header.value()); + private void expectConvertHeadersAndKeyValue(String topic, Headers headers) { + if (headers.iterator().hasNext()) { + when(headerConverter.fromConnectHeader(anyString(), anyString(), eq(Schema.STRING_SCHEMA), + anyString())) + .thenAnswer((Answer<byte[]>) invocation -> { + String headerValue = invocation.getArgument(3, String.class); + return headerValue.getBytes(StandardCharsets.UTF_8); + }); } - IExpectationSetters<byte[]> convertKeyExpect = EasyMock.expect(keyConverter.fromConnectData(topic, headers, KEY_SCHEMA, KEY)); - if (anyTimes) - convertKeyExpect.andStubReturn(SERIALIZED_KEY); - else - convertKeyExpect.andReturn(SERIALIZED_KEY); - IExpectationSetters<byte[]> convertValueExpect = EasyMock.expect(valueConverter.fromConnectData(topic, headers, RECORD_SCHEMA, RECORD)); - if (anyTimes) - convertValueExpect.andStubReturn(SERIALIZED_RECORD); - else - convertValueExpect.andReturn(SERIALIZED_RECORD); + + when(keyConverter.fromConnectData(eq(topic), any(Headers.class), eq(KEY_SCHEMA), eq(KEY))) + .thenReturn(SERIALIZED_KEY); + when(valueConverter.fromConnectData(eq(topic), any(Headers.class), eq(RECORD_SCHEMA), + eq(RECORD))) + .thenReturn(SERIALIZED_RECORD); } - private void expectApplyTransformationChain(boolean anyTimes) { - final Capture<SourceRecord> recordCapture = EasyMock.newCapture(); - IExpectationSetters<SourceRecord> convertKeyExpect = EasyMock.expect(transformationChain.apply(EasyMock.capture(recordCapture))); - if (anyTimes) - convertKeyExpect.andStubAnswer(recordCapture::getValue); - else - convertKeyExpect.andAnswer(recordCapture::getValue); + private void expectApplyTransformationChain() { + when(transformationChain.apply(any(SourceRecord.class))) + .thenAnswer(AdditionalAnswers.returnsFirstArg()); } - private void expectTaskCommitRecordWithOffset(boolean anyTimes, boolean succeed) throws InterruptedException { - sourceTask.commitRecord(EasyMock.anyObject(SourceRecord.class), EasyMock.anyObject(RecordMetadata.class)); - IExpectationSetters<Void> expect = EasyMock.expectLastCall(); + // + private void expectTaskCommitRecordWithOffset(boolean succeed) throws InterruptedException { Review Comment: Looks like we don't. Removed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org