This is an automated email from the ASF dual-hosted git repository. jihoonson pushed a commit to branch 0.14.0-incubating in repository https://gitbox.apache.org/repos/asf/incubator-druid.git
The following commit(s) were added to refs/heads/0.14.0-incubating by this push: new b8a3af7 fix kafka index task doesn't resume when recieve duplicate request (#6990) (#7055) b8a3af7 is described below commit b8a3af78420dc97c3500de4206b8d7ab80160b4f Author: Clint Wylie <cjwy...@gmail.com> AuthorDate: Tue Feb 12 16:20:08 2019 -0800 fix kafka index task doesn't resume when recieve duplicate request (#6990) (#7055) * fix kafka index task doesn't resume when recieve duplicate request * add unit test --- .../druid/indexing/kafka/KafkaIndexTaskTest.java | 43 ++++++++++++++++++++++ .../SeekableStreamIndexTaskRunner.java | 1 + 2 files changed, 44 insertions(+) diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java index 04d3802..b7b3896 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java @@ -2127,6 +2127,49 @@ public class KafkaIndexTaskTest Assert.assertEquals(ImmutableList.of("d", "e"), readSegmentColumn("dim1", desc2)); } + @Test(timeout = 60_000L) + public void testRunWithDuplicateRequest() throws Exception + { + // Insert data + try (final KafkaProducer<byte[], byte[]> kafkaProducer = kafkaServer.newProducer()) { + for (ProducerRecord<byte[], byte[]> record : records) { + kafkaProducer.send(record).get(); + } + } + + final KafkaIndexTask task = createTask( + null, + new KafkaIndexTaskIOConfig( + 0, + "sequence0", + new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 200L)), + new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 500L)), + kafkaServer.consumerProperties(), + KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS, + true, + null, + null, + false + ) + ); + + runTask(task); + + while (!task.getRunner().getStatus().equals(Status.READING)) { + Thread.sleep(20); + } + + // first setEndOffsets request + task.getRunner().pause(); + task.getRunner().setEndOffsets(ImmutableMap.of(0, 500L), true); + Assert.assertEquals(Status.READING, task.getRunner().getStatus()); + + // duplicate setEndOffsets request + task.getRunner().pause(); + task.getRunner().setEndOffsets(ImmutableMap.of(0, 500L), true); + Assert.assertEquals(Status.READING, task.getRunner().getStatus()); + } + private ListenableFuture<TaskStatus> runTask(final Task task) { try { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java index 8572649..c86a2b5 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java @@ -1403,6 +1403,7 @@ public abstract class SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff exclusivePartitions) && !finish) || (latestSequence.getEndOffsets().equals(sequenceNumbers) && finish)) { log.warn("Ignoring duplicate request, end sequences already set for sequences [%s]", sequenceNumbers); + resume(); return Response.ok(sequenceNumbers).build(); } else if (latestSequence.isCheckpointed()) { return Response.status(Response.Status.BAD_REQUEST) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org