This is an automated email from the ASF dual-hosted git repository.

asdf2014 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git


The following commit(s) were added to refs/heads/master by this push:
     new c020272  Fix KafkaRecordSupplier assign (#7260)
c020272 is described below

commit c020272adda784dd02c8ac1b108a57d6d2045d1c
Author: Jonathan Wei <jon-...@users.noreply.github.com>
AuthorDate: Wed Mar 13 23:36:14 2019 -0700

    Fix KafkaRecordSupplier assign (#7260)
    
    * Fix KafkaRecordSupplier assign
    
    * TeamCity fix
---
 .../druid/indexing/kafka/KafkaRecordSupplier.java  |  1 -
 .../druid/indexing/kafka/KafkaIndexTaskTest.java   | 63 ++++++++++++++++++++++
 2 files changed, 63 insertions(+), 1 deletion(-)

diff --git 
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java
 
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java
index 6c3d053..be25c49 100644
--- 
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java
+++ 
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java
@@ -69,7 +69,6 @@ public class KafkaRecordSupplier implements 
RecordSupplier<Integer, Long>
                         .stream()
                         .map(x -> new TopicPartition(x.getStream(), 
x.getPartitionId()))
                         .collect(Collectors.toSet()));
-    seekToEarliest(streamPartitions);
   }
 
   @Override
diff --git 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
index 070396f..461ac9e 100644
--- 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
+++ 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
@@ -2330,6 +2330,69 @@ public class KafkaIndexTaskTest
     Assert.assertEquals(ImmutableList.of("f"), readSegmentColumn("dim1", 
desc4));
   }
 
+  @Test(timeout = 60_000L)
+  public void testCanStartFromLaterThanEarliestOffset() throws Exception
+  {
+    if (!isIncrementalHandoffSupported) {
+      return;
+    }
+    final String baseSequenceName = "sequence0";
+    maxRowsPerSegment = Integer.MAX_VALUE;
+    maxTotalRows = null;
+
+    // Insert data
+    int numToAdd = records.size();
+
+    try (final KafkaProducer<byte[], byte[]> kafkaProducer = 
kafkaServer.newProducer()) {
+      kafkaProducer.initTransactions();
+      kafkaProducer.beginTransaction();
+      for (ProducerRecord<byte[], byte[]> record : records) {
+        kafkaProducer.send(record).get();
+      }
+      kafkaProducer.commitTransaction();
+    }
+
+    Map<String, Object> consumerProps = kafkaServer.consumerProperties();
+    consumerProps.put("max.poll.records", "1");
+
+    final SeekableStreamPartitions<Integer, Long> startPartitions = new 
SeekableStreamPartitions<>(
+        topic,
+        ImmutableMap.of(
+            0,
+            0L,
+            1,
+            1L
+        )
+    );
+
+    final SeekableStreamPartitions<Integer, Long> endPartitions = new 
SeekableStreamPartitions<>(
+        topic,
+        ImmutableMap.of(
+            0,
+            10L,
+            1,
+            2L
+        )
+    );
+
+    final KafkaIndexTask task = createTask(
+        null,
+        new KafkaIndexTaskIOConfig(
+            0,
+            baseSequenceName,
+            startPartitions,
+            endPartitions,
+            consumerProps,
+            KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
+            true,
+            null,
+            null
+        )
+    );
+    final ListenableFuture<TaskStatus> future = runTask(task);
+    Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode());
+  }
+
   private List<ScanResultValue> scanData(final Task task, QuerySegmentSpec 
spec)
   {
     ScanQuery query = new Druids.ScanQueryBuilder().dataSource(


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org

Reply via email to