This is an automated email from the ASF dual-hosted git repository.

jihoonson pushed a commit to branch 0.14.0-incubating
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git


The following commit(s) were added to refs/heads/0.14.0-incubating by this push:
     new b8a3af7  fix kafka index task doesn't resume when recieve duplicate 
request (#6990) (#7055)
b8a3af7 is described below

commit b8a3af78420dc97c3500de4206b8d7ab80160b4f
Author: Clint Wylie <cjwy...@gmail.com>
AuthorDate: Tue Feb 12 16:20:08 2019 -0800

    fix kafka index task doesn't resume when recieve duplicate request (#6990) 
(#7055)
    
    * fix kafka index task doesn't resume when recieve duplicate request
    
    * add unit test
---
 .../druid/indexing/kafka/KafkaIndexTaskTest.java   | 43 ++++++++++++++++++++++
 .../SeekableStreamIndexTaskRunner.java             |  1 +
 2 files changed, 44 insertions(+)

diff --git 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
index 04d3802..b7b3896 100644
--- 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
+++ 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
@@ -2127,6 +2127,49 @@ public class KafkaIndexTaskTest
     Assert.assertEquals(ImmutableList.of("d", "e"), readSegmentColumn("dim1", 
desc2));
   }
 
+  @Test(timeout = 60_000L)
+  public void testRunWithDuplicateRequest() throws Exception
+  {
+    // Insert data
+    try (final KafkaProducer<byte[], byte[]> kafkaProducer = 
kafkaServer.newProducer()) {
+      for (ProducerRecord<byte[], byte[]> record : records) {
+        kafkaProducer.send(record).get();
+      }
+    }
+
+    final KafkaIndexTask task = createTask(
+        null,
+        new KafkaIndexTaskIOConfig(
+            0,
+            "sequence0",
+            new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 200L)),
+            new SeekableStreamPartitions<>(topic, ImmutableMap.of(0, 500L)),
+            kafkaServer.consumerProperties(),
+            KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
+            true,
+            null,
+            null,
+            false
+        )
+    );
+
+    runTask(task);
+
+    while (!task.getRunner().getStatus().equals(Status.READING)) {
+      Thread.sleep(20);
+    }
+
+    // first setEndOffsets request
+    task.getRunner().pause();
+    task.getRunner().setEndOffsets(ImmutableMap.of(0, 500L), true);
+    Assert.assertEquals(Status.READING, task.getRunner().getStatus());
+
+    // duplicate setEndOffsets request
+    task.getRunner().pause();
+    task.getRunner().setEndOffsets(ImmutableMap.of(0, 500L), true);
+    Assert.assertEquals(Status.READING, task.getRunner().getStatus());
+  }
+
   private ListenableFuture<TaskStatus> runTask(final Task task)
   {
     try {
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
index 8572649..c86a2b5 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
@@ -1403,6 +1403,7 @@ public abstract class 
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
             exclusivePartitions) && !finish) ||
             (latestSequence.getEndOffsets().equals(sequenceNumbers) && 
finish)) {
           log.warn("Ignoring duplicate request, end sequences already set for 
sequences [%s]", sequenceNumbers);
+          resume();
           return Response.ok(sequenceNumbers).build();
         } else if (latestSequence.isCheckpointed()) {
           return Response.status(Response.Status.BAD_REQUEST)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org

Reply via email to