This is an automated email from the ASF dual-hosted git repository.
rhauch pushed a commit to branch 1.1
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/1.1 by this push:
new 05fa34b KAFKA-9014: Fix AssertionError when SourceTask.poll returns
an empty list (#7491)
05fa34b is described below
commit 05fa34bf802a241a6e33249f2eac5e138706ae0b
Author: Konstantine Karantasis <[email protected]>
AuthorDate: Tue Oct 15 14:08:31 2019 -0700
KAFKA-9014: Fix AssertionError when SourceTask.poll returns an empty list
(#7491)
Author: Konstantine Karantasis <[email protected]>
Reviewer: Randall Hauch <[email protected]>
---
.../kafka/connect/runtime/WorkerSourceTask.java | 3 +-
.../connect/runtime/WorkerSourceTaskTest.java | 66 +++++++++++++++++++++-
2 files changed, 67 insertions(+), 2 deletions(-)
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
index 2646906..ed49f2a 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
@@ -262,7 +262,8 @@ class WorkerSourceTask extends WorkerTask {
private boolean sendRecords() {
int processed = 0;
recordBatch(toSend.size());
- final SourceRecordWriteCounter counter = new
SourceRecordWriteCounter(toSend.size(), sourceTaskMetricsGroup);
+ final SourceRecordWriteCounter counter =
+ toSend.size() > 0 ? new
SourceRecordWriteCounter(toSend.size(), sourceTaskMetricsGroup) : null;
for (final SourceRecord preTransformRecord : toSend) {
maybeThrowProducerSendException();
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
index 708e0a5..378029c 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
@@ -73,7 +73,8 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
-@PowerMockIgnore("javax.management.*")
+@PowerMockIgnore({"javax.management.*",
+ "org.apache.log4j.*"})
@RunWith(PowerMockRunner.class)
public class WorkerSourceTaskTest extends ThreadedTest {
private static final String TOPIC = "topic";
@@ -334,6 +335,51 @@ public class WorkerSourceTaskTest extends ThreadedTest {
}
@Test
+ public void testPollReturnsNoRecords() throws Exception {
+ // Test that the task handles an empty list of records
+ createWorkerTask();
+
+ sourceTask.initialize(EasyMock.anyObject(SourceTaskContext.class));
+ EasyMock.expectLastCall();
+ sourceTask.start(TASK_PROPS);
+ EasyMock.expectLastCall();
+ statusListener.onStartup(taskId);
+ EasyMock.expectLastCall();
+
+ // We'll wait for some data, then trigger a flush
+ final CountDownLatch pollLatch = expectEmptyPolls(1, new
AtomicInteger());
+ expectOffsetFlush(true);
+
+ sourceTask.stop();
+ EasyMock.expectLastCall();
+ expectOffsetFlush(true);
+
+ statusListener.onShutdown(taskId);
+ EasyMock.expectLastCall();
+
+ producer.close(EasyMock.anyInt(), EasyMock.anyObject(TimeUnit.class));
+ EasyMock.expectLastCall();
+
+ transformationChain.close();
+ EasyMock.expectLastCall();
+
+ PowerMock.replayAll();
+
+ workerTask.initialize(TASK_CONFIG);
+ Future<?> taskFuture = executor.submit(workerTask);
+
+ assertTrue(awaitLatch(pollLatch));
+ assertTrue(workerTask.commitOffsets());
+ workerTask.stop();
+ assertTrue(workerTask.awaitStop(1000));
+
+ taskFuture.get();
+ assertPollMetrics(0);
+
+ PowerMock.verifyAll();
+ }
+
+ @Test
public void testCommit() throws Exception {
// Test that the task commits properly when prompted
createWorkerTask();
@@ -676,6 +722,24 @@ public class WorkerSourceTaskTest extends ThreadedTest {
assertEquals(1800.0,
metrics.currentMetricValueAsDouble(group1.metricGroup(),
"source-record-active-count"), 0.001d);
}
+ private CountDownLatch expectEmptyPolls(int minimum, final AtomicInteger
count) throws InterruptedException {
+ final CountDownLatch latch = new CountDownLatch(minimum);
+ // Note that we stub these to allow any number of calls because the
thread will continue to
+ // run. The count passed in + latch returned just makes sure we get
*at least* that number of
+ // calls
+ EasyMock.expect(sourceTask.poll())
+ .andStubAnswer(new IAnswer<List<SourceRecord>>() {
+ @Override
+ public List<SourceRecord> answer() throws Throwable {
+ count.incrementAndGet();
+ latch.countDown();
+ Thread.sleep(10);
+ return Collections.emptyList();
+ }
+ });
+ return latch;
+ }
+
private CountDownLatch expectPolls(int minimum, final AtomicInteger count)
throws InterruptedException {
final CountDownLatch latch = new CountDownLatch(minimum);
// Note that we stub these to allow any number of calls because the
thread will continue to