This is an automated email from the ASF dual-hosted git repository.
rhauch pushed a commit to branch 1.0
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/1.0 by this push:
new f40b259 KAFKA-9014: Fix AssertionError when SourceTask.poll returns
an empty list (#7491)
f40b259 is described below
commit f40b2597ab64d1c4169c2b7507ba1db0f05b04cf
Author: Konstantine Karantasis <[email protected]>
AuthorDate: Tue Oct 15 14:08:31 2019 -0700
KAFKA-9014: Fix AssertionError when SourceTask.poll returns an empty list
(#7491)
Author: Konstantine Karantasis <[email protected]>
Reviewer: Randall Hauch <[email protected]>
---
.../kafka/connect/runtime/WorkerSourceTask.java | 3 +-
.../connect/runtime/WorkerSourceTaskTest.java | 66 +++++++++++++++++++++-
2 files changed, 67 insertions(+), 2 deletions(-)
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
index e90a490..467dc39 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
@@ -255,7 +255,8 @@ class WorkerSourceTask extends WorkerTask {
private boolean sendRecords() {
int processed = 0;
recordBatch(toSend.size());
- final SourceRecordWriteCounter counter = new
SourceRecordWriteCounter(toSend.size(), sourceTaskMetricsGroup);
+ final SourceRecordWriteCounter counter =
+ toSend.size() > 0 ? new
SourceRecordWriteCounter(toSend.size(), sourceTaskMetricsGroup) : null;
for (final SourceRecord preTransformRecord : toSend) {
maybeThrowProducerSendException();
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
index 2d6794a..6ef7aa6 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
@@ -72,7 +72,8 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
-@PowerMockIgnore("javax.management.*")
+@PowerMockIgnore({"javax.management.*",
+ "org.apache.log4j.*"})
@RunWith(PowerMockRunner.class)
public class WorkerSourceTaskTest extends ThreadedTest {
private static final String TOPIC = "topic";
@@ -332,6 +333,51 @@ public class WorkerSourceTaskTest extends ThreadedTest {
}
@Test
+ public void testPollReturnsNoRecords() throws Exception {
+ // Test that the task handles an empty list of records
+ createWorkerTask();
+
+ sourceTask.initialize(EasyMock.anyObject(SourceTaskContext.class));
+ EasyMock.expectLastCall();
+ sourceTask.start(TASK_PROPS);
+ EasyMock.expectLastCall();
+ statusListener.onStartup(taskId);
+ EasyMock.expectLastCall();
+
+ // We'll wait for some data, then trigger a flush
+ final CountDownLatch pollLatch = expectEmptyPolls(1, new
AtomicInteger());
+ expectOffsetFlush(true);
+
+ sourceTask.stop();
+ EasyMock.expectLastCall();
+ expectOffsetFlush(true);
+
+ statusListener.onShutdown(taskId);
+ EasyMock.expectLastCall();
+
+ producer.close(EasyMock.anyInt(), EasyMock.anyObject(TimeUnit.class));
+ EasyMock.expectLastCall();
+
+ transformationChain.close();
+ EasyMock.expectLastCall();
+
+ PowerMock.replayAll();
+
+ workerTask.initialize(TASK_CONFIG);
+ Future<?> taskFuture = executor.submit(workerTask);
+
+ assertTrue(awaitLatch(pollLatch));
+ assertTrue(workerTask.commitOffsets());
+ workerTask.stop();
+ assertTrue(workerTask.awaitStop(1000));
+
+ taskFuture.get();
+ assertPollMetrics(0);
+
+ PowerMock.verifyAll();
+ }
+
+ @Test
public void testCommit() throws Exception {
// Test that the task commits properly when prompted
createWorkerTask();
@@ -674,6 +720,24 @@ public class WorkerSourceTaskTest extends ThreadedTest {
assertEquals(1800.0,
metrics.currentMetricValueAsDouble(group1.metricGroup(),
"source-record-active-count"), 0.001d);
}
+ private CountDownLatch expectEmptyPolls(int minimum, final AtomicInteger
count) throws InterruptedException {
+ final CountDownLatch latch = new CountDownLatch(minimum);
+ // Note that we stub these to allow any number of calls because the
thread will continue to
+ // run. The count passed in + latch returned just makes sure we get
*at least* that number of
+ // calls
+ EasyMock.expect(sourceTask.poll())
+ .andStubAnswer(new IAnswer<List<SourceRecord>>() {
+ @Override
+ public List<SourceRecord> answer() throws Throwable {
+ count.incrementAndGet();
+ latch.countDown();
+ Thread.sleep(10);
+ return Collections.emptyList();
+ }
+ });
+ return latch;
+ }
+
private CountDownLatch expectPolls(int minimum, final AtomicInteger count)
throws InterruptedException {
final CountDownLatch latch = new CountDownLatch(minimum);
// Note that we stub these to allow any number of calls because the
thread will continue to