This is an automated email from the ASF dual-hosted git repository.

jihoonson pushed a commit to branch 0.14.0-incubating
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git


The following commit(s) were added to refs/heads/0.14.0-incubating by this push:
     new a8ae715  index_parallel: support !appendToExisting with no explicit 
intervals (#7046) (#7113)
a8ae715 is described below

commit a8ae7156b1751fdc2ce2a978f9728cbf79b0ed2c
Author: Jihoon Son <jihoon...@apache.org>
AuthorDate: Fri Feb 22 16:39:09 2019 -0800

    index_parallel: support !appendToExisting with no explicit intervals 
(#7046) (#7113)
    
    * index_parallel: support !appendToExisting with no explicit intervals
    
    This enables ParallelIndexSupervisorTask to dynamically request locks at 
runtime
    if it is run without explicit intervals in the granularity spec and with
    appendToExisting set to false.  Previously, it behaved as if 
appendToExisting
    was set to true, which was undocumented and inconsistent with IndexTask and
    Hadoop indexing.
    
    Also, when ParallelIndexSupervisorTask allocates segments in the explicit
    interval case, fail if its locks on the interval have been revoked.
    
    Also make a few other additions/clarifications to native ingestion docs.
    
    Fixes #6989.
    
    * Review feedback.
    
    PR description on GitHub updated to match.
    
    * Make native batch ingestion partitions start at 0
    
    * Fix to previous commit
    
    * Unit test. Verified to fail without the other commits on this branch.
    
    * Another round of review
    
    * Slightly scarier warning
---
 docs/content/ingestion/ingestion-spec.md           |  4 +-
 docs/content/ingestion/native_tasks.md             | 27 +++++++++
 .../org/apache/druid/indexing/common/Counters.java |  4 +-
 .../task/batch/parallel/ParallelIndexSubTask.java  |  3 +-
 .../parallel/ParallelIndexSupervisorTask.java      | 70 +++++++++++++++++-----
 .../indexing/common/task/IngestionTestBase.java    |  5 ++
 .../parallel/ParallelIndexSupervisorTaskTest.java  | 28 ++++++++-
 .../clients/CoordinatorResourceTestClient.java     | 27 +++++++++
 .../tests/indexer/AbstractITBatchIndexTest.java    | 25 ++++++--
 .../apache/druid/tests/indexer/ITIndexerTest.java  |  3 +-
 .../druid/tests/indexer/ITParallelIndexTest.java   | 14 ++++-
 .../wikipedia_parallel_reindex_queries.json        | 18 ++++++
 .../indexer/wikipedia_parallel_reindex_task.json   | 65 ++++++++++++++++++++
 13 files changed, 262 insertions(+), 31 deletions(-)

diff --git a/docs/content/ingestion/ingestion-spec.md 
b/docs/content/ingestion/ingestion-spec.md
index 46e7334..b578b54 100644
--- a/docs/content/ingestion/ingestion-spec.md
+++ b/docs/content/ingestion/ingestion-spec.md
@@ -286,7 +286,7 @@ This spec is used to generated segments with uniform 
intervals.
 | segmentGranularity | string | The granularity to create time chunks at. 
Multiple segments can be created per time chunk. For example, with 'DAY' 
`segmentGranularity`, the events of the same day fall into the same time chunk 
which can be optionally further partitioned into multiple segments based on 
other configurations and input size. See 
[Granularity](../querying/granularities.html) for supported granularities.| no 
(default == 'DAY') |
 | queryGranularity | string | The minimum granularity to be able to query 
results at and the granularity of the data inside the segment. E.g. a value of 
"minute" will mean that data is aggregated at minutely granularity. That is, if 
there are collisions in the tuple (minute(timestamp), dimensions), then it will 
aggregate values together using the aggregators instead of storing individual 
rows. A granularity of 'NONE' means millisecond granularity. See 
[Granularity](../querying/granularit [...]
 | rollup | boolean | rollup or not | no (default == true) |
-| intervals | string | A list of intervals for the raw data being ingested. 
Ignored for real-time ingestion. | no. If specified, batch ingestion tasks may 
skip determining partitions phase which results in faster ingestion. |
+| intervals | JSON string array | A list of intervals for the raw data being 
ingested. Ignored for real-time ingestion. | no. If specified, Hadoop and 
native non-parallel batch ingestion tasks may skip determining partitions phase 
which results in faster ingestion; native parallel ingestion tasks can request 
all their locks up-front instead of one by one. Batch ingestion will thrown 
away any data not in the specified intervals. |
 
 ### Arbitrary Granularity Spec
 
@@ -296,7 +296,7 @@ This spec is used to generate segments with arbitrary 
intervals (it tries to cre
 |-------|------|-------------|----------|
 | queryGranularity | string | The minimum granularity to be able to query 
results at and the granularity of the data inside the segment. E.g. a value of 
"minute" will mean that data is aggregated at minutely granularity. That is, if 
there are collisions in the tuple (minute(timestamp), dimensions), then it will 
aggregate values together using the aggregators instead of storing individual 
rows. A granularity of 'NONE' means millisecond granularity. See 
[Granularity](../querying/granularit [...]
 | rollup | boolean | rollup or not | no (default == true) |
-| intervals | string | A list of intervals for the raw data being ingested. 
Ignored for real-time ingestion. | no. If specified, batch ingestion tasks may 
skip determining partitions phase which results in faster ingestion. |
+| intervals | JSON string array | A list of intervals for the raw data being 
ingested. Ignored for real-time ingestion. | no. If specified, Hadoop and 
native non-parallel batch ingestion tasks may skip determining partitions phase 
which results in faster ingestion; native parallel ingestion tasks can request 
all their locks up-front instead of one by one. Batch ingestion will thrown 
away any data not in the specified intervals. |
 
 # Transform Spec
 
diff --git a/docs/content/ingestion/native_tasks.md 
b/docs/content/ingestion/native_tasks.md
index 963adea..5d7ffa1 100644
--- a/docs/content/ingestion/native_tasks.md
+++ b/docs/content/ingestion/native_tasks.md
@@ -30,6 +30,9 @@ MiddleManager.
 
 Please check [Hadoop-based Batch Ingestion VS Native Batch 
Ingestion](./hadoop-vs-native-batch.html) for differences between native batch 
ingestion and Hadoop-based ingestion.
 
+To run either kind of native batch indexing task, write an ingestion spec as 
specified below. Then POST it to the
+[`/druid/indexer/v1/task` endpoint on the 
Overlord](../operations/api-reference.html#tasks), or use the `post-index-task` 
script included with Druid.
+
 Parallel Index Task
 --------------------------------
 
@@ -124,6 +127,11 @@ An example ingestion spec is:
 }
 ```
 
+By default, batch ingestion replaces all data in any segment that it writes 
to. If you'd like to add to the segment
+instead, set the appendToExisting flag in ioConfig. Note that it only replaces 
data in segments where it actively adds
+data: if there are segments in your granularitySpec's intervals that have no 
data written by this task, they will be
+left alone.
+
 #### Task Properties
 
 |property|description|required?|
@@ -139,6 +147,14 @@ This field is required.
 
 See [Ingestion Spec DataSchema](../ingestion/ingestion-spec.html#dataschema)
 
+If you specify `intervals` explicitly in your dataSchema's granularitySpec, 
batch ingestion will lock the full intervals
+specified when it starts up, and you will learn quickly if the specified 
interval overlaps with locks held by other
+tasks (eg, Kafka ingestion). Otherwise, batch ingestion will lock each 
interval as it is discovered, so you may only
+learn that the task overlaps with a higher-priority task later in ingestion.  
If you specify `intervals` explicitly, any
+rows outside the specified intervals will be thrown away. We recommend setting 
`intervals` explicitly if you know the
+time range of the data so that locking failure happens faster, and so that you 
don't accidentally replace data outside
+that range if there's some stray data with unexpected timestamps.
+
 #### IOConfig
 
 |property|description|default|required?|
@@ -463,6 +479,11 @@ The Local Index Task is designed to be used for smaller 
data sets. The task exec
 }
 ```
 
+By default, batch ingestion replaces all data in any segment that it writes 
to. If you'd like to add to the segment
+instead, set the appendToExisting flag in ioConfig. Note that it only replaces 
data in segments where it actively adds
+data: if there are segments in your granularitySpec's intervals that have no 
data written by this task, they will be
+left alone.
+
 #### Task Properties
 
 |property|description|required?|
@@ -478,6 +499,12 @@ This field is required.
 
 See [Ingestion Spec DataSchema](../ingestion/ingestion-spec.html#dataschema)
 
+If you do not specify `intervals` explicitly in your dataSchema's 
granularitySpec, the Local Index Task will do an extra
+pass over the data to determine the range to lock when it starts up.  If you 
specify `intervals` explicitly, any rows
+outside the specified intervals will be thrown away. We recommend setting 
`intervals` explicitly if you know the time
+range of the data because it allows the task to skip the extra pass, and so 
that you don't accidentally replace data outside
+that range if there's some stray data with unexpected timestamps.
+
 #### IOConfig
 
 |property|description|default|required?|
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/Counters.java 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/Counters.java
index e463890..a57f419 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/Counters.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/Counters.java
@@ -25,7 +25,7 @@ import java.util.concurrent.atomic.AtomicLong;
 
 public final class Counters
 {
-  public static <K> int incrementAndGetInt(ConcurrentHashMap<K, AtomicInteger> 
counters, K key)
+  public static <K> int getAndIncrementInt(ConcurrentHashMap<K, AtomicInteger> 
counters, K key)
   {
     // get() before computeIfAbsent() is an optimization to avoid locking in 
computeIfAbsent() if not needed.
     // See 
https://github.com/apache/incubator-druid/pull/6898#discussion_r251384586.
@@ -33,7 +33,7 @@ public final class Counters
     if (counter == null) {
       counter = counters.computeIfAbsent(key, k -> new AtomicInteger());
     }
-    return counter.incrementAndGet();
+    return counter.getAndIncrement();
   }
 
   public static <K> long incrementAndGetLong(ConcurrentHashMap<K, AtomicLong> 
counters, K key)
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSubTask.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSubTask.java
index 18e87d7..8004243 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSubTask.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSubTask.java
@@ -235,9 +235,8 @@ public class ParallelIndexSubTask extends AbstractTask
   )
   {
     final DataSchema dataSchema = ingestionSchema.getDataSchema();
-    final boolean explicitIntervals = 
dataSchema.getGranularitySpec().bucketIntervals().isPresent();
     final ParallelIndexIOConfig ioConfig = ingestionSchema.getIOConfig();
-    if (ioConfig.isAppendToExisting() || !explicitIntervals) {
+    if (ioConfig.isAppendToExisting()) {
       return new ActionBasedSegmentAllocator(
           toolbox.getTaskActionClient(),
           dataSchema,
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
index 385797f..61f4517 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
@@ -33,8 +33,10 @@ import org.apache.druid.data.input.FirehoseFactory;
 import org.apache.druid.indexer.TaskStatus;
 import org.apache.druid.indexing.common.Counters;
 import org.apache.druid.indexing.common.TaskLock;
+import org.apache.druid.indexing.common.TaskLockType;
 import org.apache.druid.indexing.common.TaskToolbox;
 import org.apache.druid.indexing.common.actions.LockListAction;
+import org.apache.druid.indexing.common.actions.LockTryAcquireAction;
 import org.apache.druid.indexing.common.actions.TaskActionClient;
 import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory;
 import org.apache.druid.indexing.common.task.AbstractTask;
@@ -360,43 +362,79 @@ public class ParallelIndexSupervisorTask extends 
AbstractTask implements ChatHan
   {
     final String dataSource = getDataSource();
     final GranularitySpec granularitySpec = 
getIngestionSchema().getDataSchema().getGranularitySpec();
-    final SortedSet<Interval> bucketIntervals = Preconditions.checkNotNull(
-        granularitySpec.bucketIntervals().orNull(),
-        "bucketIntervals"
-    );
+    final Optional<SortedSet<Interval>> bucketIntervals = 
granularitySpec.bucketIntervals();
+
     // List locks whenever allocating a new segment because locks might be 
revoked and no longer valid.
-    final Map<Interval, String> versions = toolbox
+    final List<TaskLock> locks = toolbox
         .getTaskActionClient()
-        .submit(new LockListAction())
+        .submit(new LockListAction());
+    final TaskLock revokedLock = 
locks.stream().filter(TaskLock::isRevoked).findAny().orElse(null);
+    if (revokedLock != null) {
+      throw new ISE("Lock revoked: [%s]", revokedLock);
+    }
+    final Map<Interval, String> versions = locks
         .stream()
         .collect(Collectors.toMap(TaskLock::getInterval, 
TaskLock::getVersion));
 
-    final Optional<Interval> maybeInterval = 
granularitySpec.bucketInterval(timestamp);
-    if (!maybeInterval.isPresent()) {
-      throw new IAE("Could not find interval for timestamp [%s]", timestamp);
-    }
+    Interval interval;
+    String version;
+    boolean justLockedInterval = false;
+    if (bucketIntervals.isPresent()) {
+      // If the granularity spec has explicit intervals, we just need to find 
the interval (of the segment
+      // granularity); we already tried to lock it at task startup.
+      final Optional<Interval> maybeInterval = 
granularitySpec.bucketInterval(timestamp);
+      if (!maybeInterval.isPresent()) {
+        throw new IAE("Could not find interval for timestamp [%s]", timestamp);
+      }
+
+      interval = maybeInterval.get();
+      if (!bucketIntervals.get().contains(interval)) {
+        throw new ISE("Unspecified interval[%s] in granularitySpec[%s]", 
interval, granularitySpec);
+      }
 
-    final Interval interval = maybeInterval.get();
-    if (!bucketIntervals.contains(interval)) {
-      throw new ISE("Unspecified interval[%s] in granularitySpec[%s]", 
interval, granularitySpec);
+      version = findVersion(versions, interval);
+      if (version == null) {
+        throw new ISE("Cannot find a version for interval[%s]", interval);
+      }
+    } else {
+      // We don't have explicit intervals. We can use the segment granularity 
to figure out what
+      // interval we need, but we might not have already locked it.
+      interval = granularitySpec.getSegmentGranularity().bucket(timestamp);
+      version = findVersion(versions, interval);
+      if (version == null) {
+        // We don't have a lock for this interval, so we should lock it now.
+        final TaskLock lock = Preconditions.checkNotNull(
+            toolbox.getTaskActionClient().submit(new 
LockTryAcquireAction(TaskLockType.EXCLUSIVE, interval)),
+            "Cannot acquire a lock for interval[%s]", interval
+        );
+        version = lock.getVersion();
+        justLockedInterval = true;
+      }
     }
 
-    final int partitionNum = 
Counters.incrementAndGetInt(partitionNumCountersPerInterval, interval);
+    final int partitionNum = 
Counters.getAndIncrementInt(partitionNumCountersPerInterval, interval);
+    if (justLockedInterval && partitionNum != 0) {
+      throw new ISE(
+          "Expected partitionNum to be 0 for interval [%s] right after 
locking, but got [%s]",
+          interval, partitionNum
+      );
+    }
     return new SegmentIdWithShardSpec(
         dataSource,
         interval,
-        findVersion(versions, interval),
+        version,
         new NumberedShardSpec(partitionNum, 0)
     );
   }
 
+  @Nullable
   private static String findVersion(Map<Interval, String> versions, Interval 
interval)
   {
     return versions.entrySet().stream()
                    .filter(entry -> entry.getKey().contains(interval))
                    .map(Entry::getValue)
                    .findFirst()
-                   .orElseThrow(() -> new ISE("Cannot find a version for 
interval[%s]", interval));
+                   .orElse(null);
   }
 
   /**
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java
index 8406adb..c70a71a 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java
@@ -113,6 +113,11 @@ public abstract class IngestionTestBase
     return lockbox;
   }
 
+  public IndexerSQLMetadataStorageCoordinator getStorageCoordinator()
+  {
+    return storageCoordinator;
+  }
+
   public TaskActionToolbox createTaskActionToolbox()
   {
     storageCoordinator.start();
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskTest.java
index 1d25f73..241e9f5 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskTest.java
@@ -36,6 +36,7 @@ import 
org.apache.druid.query.aggregation.LongSumAggregatorFactory;
 import org.apache.druid.segment.indexing.DataSchema;
 import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
 import org.apache.druid.segment.realtime.firehose.LocalFirehoseFactory;
+import org.apache.druid.timeline.DataSegment;
 import org.joda.time.Interval;
 import org.junit.After;
 import org.junit.Assert;
@@ -51,6 +52,7 @@ import java.nio.file.Files;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
 
 public class ParallelIndexSupervisorTaskTest extends 
AbstractParallelIndexSupervisorTaskTest
@@ -126,8 +128,7 @@ public class ParallelIndexSupervisorTaskTest extends 
AbstractParallelIndexSuperv
     }
   }
 
-  @Test
-  public void testWithoutInterval() throws Exception
+  private void runTestWithoutIntervalTask() throws Exception
   {
     final ParallelIndexSupervisorTask task = newTask(
         null,
@@ -142,6 +143,29 @@ public class ParallelIndexSupervisorTaskTest extends 
AbstractParallelIndexSuperv
     prepareTaskForLocking(task);
     Assert.assertTrue(task.isReady(actionClient));
     Assert.assertEquals(TaskState.SUCCESS, task.run(toolbox).getStatusCode());
+    shutdownTask(task);
+  }
+
+  @Test
+  public void testWithoutInterval() throws Exception
+  {
+    // Ingest all data.
+    runTestWithoutIntervalTask();
+
+    // Read the segments for one day.
+    final Interval interval = Intervals.of("2017-12-24/P1D");
+    final List<DataSegment> oldSegments =
+        getStorageCoordinator().getUsedSegmentsForInterval("dataSource", 
interval);
+    Assert.assertEquals(1, oldSegments.size());
+
+    // Reingest the same data. Each segment should get replaced by a segment 
with a newer version.
+    runTestWithoutIntervalTask();
+
+    // Verify that the segment has been replaced.
+    final List<DataSegment> newSegments =
+        getStorageCoordinator().getUsedSegmentsForInterval("dataSource", 
interval);
+    Assert.assertEquals(1, newSegments.size());
+    
Assert.assertTrue(oldSegments.get(0).getVersion().compareTo(newSegments.get(0).getVersion())
 < 0);
   }
 
   @Test()
diff --git 
a/integration-tests/src/main/java/org/apache/druid/testing/clients/CoordinatorResourceTestClient.java
 
b/integration-tests/src/main/java/org/apache/druid/testing/clients/CoordinatorResourceTestClient.java
index df6ad20..04147b0 100644
--- 
a/integration-tests/src/main/java/org/apache/druid/testing/clients/CoordinatorResourceTestClient.java
+++ 
b/integration-tests/src/main/java/org/apache/druid/testing/clients/CoordinatorResourceTestClient.java
@@ -32,6 +32,7 @@ import 
org.apache.druid.java.util.http.client.response.StatusResponseHandler;
 import org.apache.druid.java.util.http.client.response.StatusResponseHolder;
 import org.apache.druid.testing.IntegrationTestingConfig;
 import org.apache.druid.testing.guice.TestClient;
+import org.apache.druid.timeline.DataSegment;
 import org.jboss.netty.handler.codec.http.HttpMethod;
 import org.jboss.netty.handler.codec.http.HttpResponseStatus;
 import org.joda.time.Interval;
@@ -41,6 +42,8 @@ import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
 
 public class CoordinatorResourceTestClient
 {
@@ -80,6 +83,11 @@ public class CoordinatorResourceTestClient
     return StringUtils.format("%sdatasources/%s/intervals", 
getCoordinatorURL(), StringUtils.urlEncode(dataSource));
   }
 
+  private String getFullSegmentsURL(String dataSource)
+  {
+    return StringUtils.format("%sdatasources/%s/segments?full", 
getCoordinatorURL(), StringUtils.urlEncode(dataSource));
+  }
+
   private String getLoadStatusURL()
   {
     return StringUtils.format("%s%s", getCoordinatorURL(), "loadstatus");
@@ -123,6 +131,25 @@ public class CoordinatorResourceTestClient
     return segments;
   }
 
+  // return a set of the segment versions for the specified datasource
+  public Set<String> getSegmentVersions(final String dataSource)
+  {
+    ArrayList<DataSegment> segments;
+    try {
+      StatusResponseHolder response = makeRequest(HttpMethod.GET, 
getFullSegmentsURL(dataSource));
+
+      segments = jsonMapper.readValue(
+          response.getContent(), new TypeReference<List<DataSegment>>()
+          {
+          }
+      );
+    }
+    catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+    return segments.stream().map(s -> 
s.getVersion()).collect(Collectors.toSet());
+  }
+
   private Map<String, Integer> getLoadStatus()
   {
     Map<String, Integer> status;
diff --git 
a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractITBatchIndexTest.java
 
b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractITBatchIndexTest.java
index 3555ef8..2034b60 100644
--- 
a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractITBatchIndexTest.java
+++ 
b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractITBatchIndexTest.java
@@ -33,6 +33,7 @@ import org.junit.Assert;
 import java.io.IOException;
 import java.io.InputStream;
 import java.util.List;
+import java.util.Set;
 
 public class AbstractITBatchIndexTest extends AbstractIndexerTest
 {
@@ -49,7 +50,8 @@ public class AbstractITBatchIndexTest extends 
AbstractIndexerTest
   void doIndexTestTest(
       String dataSource,
       String indexTaskFilePath,
-      String queryFilePath
+      String queryFilePath,
+      boolean waitForNewVersion
   ) throws IOException
   {
     final String fullDatasourceName = dataSource + 
config.getExtraDatasourceNameSuffix();
@@ -59,7 +61,7 @@ public class AbstractITBatchIndexTest extends 
AbstractIndexerTest
         fullDatasourceName
     );
 
-    submitTaskAndWait(taskSpec, fullDatasourceName);
+    submitTaskAndWait(taskSpec, fullDatasourceName, waitForNewVersion);
     try {
 
       String queryResponseTemplate;
@@ -107,7 +109,7 @@ public class AbstractITBatchIndexTest extends 
AbstractIndexerTest
         fullReindexDatasourceName
     );
 
-    submitTaskAndWait(taskSpec, fullReindexDatasourceName);
+    submitTaskAndWait(taskSpec, fullReindexDatasourceName, false);
     try {
       String queryResponseTemplate;
       try {
@@ -144,7 +146,7 @@ public class AbstractITBatchIndexTest extends 
AbstractIndexerTest
       String queryFilePath
   )
   {
-    submitTaskAndWait(indexTaskFilePath, dataSource);
+    submitTaskAndWait(indexTaskFilePath, dataSource, false);
     try {
       sqlQueryHelper.testQueriesFromFile(queryFilePath, 2);
     }
@@ -154,12 +156,25 @@ public class AbstractITBatchIndexTest extends 
AbstractIndexerTest
     }
   }
 
-  private void submitTaskAndWait(String taskSpec, String dataSourceName)
+  private void submitTaskAndWait(String taskSpec, String dataSourceName, 
boolean waitForNewVersion)
   {
+    final Set<String> oldVersions = waitForNewVersion ? 
coordinator.getSegmentVersions(dataSourceName) : null;
+
     final String taskID = indexer.submitTask(taskSpec);
     LOG.info("TaskID for loading index task %s", taskID);
     indexer.waitUntilTaskCompletes(taskID);
 
+    // ITParallelIndexTest does a second round of ingestion to replace 
segements in an existing
+    // data source. For that second round we need to make sure the coordinator 
actually learned
+    // about the new segments befor waiting for it to report that all segments 
are loaded; otherwise
+    // this method could return too early because the coordinator is merely 
reporting that all the
+    // original segments have loaded.
+    if (waitForNewVersion) {
+      RetryUtil.retryUntilTrue(
+          () -> 
!oldVersions.containsAll(coordinator.getSegmentVersions(dataSourceName)), "See 
a new version"
+      );
+    }
+
     RetryUtil.retryUntilTrue(
         () -> coordinator.areSegmentsLoaded(dataSourceName), "Segment Load"
     );
diff --git 
a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITIndexerTest.java
 
b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITIndexerTest.java
index 8412e49..245c3dd 100644
--- 
a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITIndexerTest.java
+++ 
b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITIndexerTest.java
@@ -45,7 +45,8 @@ public class ITIndexerTest extends AbstractITBatchIndexTest
       doIndexTestTest(
           INDEX_DATASOURCE,
           INDEX_TASK,
-          INDEX_QUERIES_RESOURCE
+          INDEX_QUERIES_RESOURCE,
+          false
       );
       doReindexTest(
           INDEX_DATASOURCE,
diff --git 
a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITParallelIndexTest.java
 
b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITParallelIndexTest.java
index 80ca6e1..e457a1c 100644
--- 
a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITParallelIndexTest.java
+++ 
b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITParallelIndexTest.java
@@ -30,6 +30,8 @@ public class ITParallelIndexTest extends 
AbstractITBatchIndexTest
 {
   private static String INDEX_TASK = 
"/indexer/wikipedia_parallel_index_task.json";
   private static String INDEX_QUERIES_RESOURCE = 
"/indexer/wikipedia_parallel_index_queries.json";
+  private static String REINDEX_TASK = 
"/indexer/wikipedia_parallel_reindex_task.json";
+  private static String REINDEX_QUERIES_RESOURCE = 
"/indexer/wikipedia_parallel_reindex_queries.json";
   private static String INDEX_DATASOURCE = "wikipedia_parallel_index_test";
 
   @Test
@@ -39,7 +41,17 @@ public class ITParallelIndexTest extends 
AbstractITBatchIndexTest
       doIndexTestTest(
           INDEX_DATASOURCE,
           INDEX_TASK,
-          INDEX_QUERIES_RESOURCE
+          INDEX_QUERIES_RESOURCE,
+          false
+      );
+
+      // Index again, this time only choosing the second data file, and 
without explicit intervals chosen.
+      // The second datafile covers both day segments, so this should replace 
them, as reflected in the queries.
+      doIndexTestTest(
+          INDEX_DATASOURCE,
+          REINDEX_TASK,
+          REINDEX_QUERIES_RESOURCE,
+          true
       );
     }
   }
diff --git 
a/integration-tests/src/test/resources/indexer/wikipedia_parallel_reindex_queries.json
 
b/integration-tests/src/test/resources/indexer/wikipedia_parallel_reindex_queries.json
new file mode 100644
index 0000000..bbbeca9
--- /dev/null
+++ 
b/integration-tests/src/test/resources/indexer/wikipedia_parallel_reindex_queries.json
@@ -0,0 +1,18 @@
+[
+    {
+        "description": "timeseries, 1 agg, all should only show data2",
+        "query":{
+            "queryType" : "timeBoundary",
+            "dataSource": "%%DATASOURCE%%"
+        },
+        "expectedResults":[
+            {
+                "timestamp" : "2013-08-31T11:58:39.000Z",
+                "result" : {
+                    "minTime" : "2013-08-31T11:58:39.000Z",
+                    "maxTime" : "2013-09-01T01:02:33.000Z"
+                }
+            }
+        ]
+    }
+]
\ No newline at end of file
diff --git 
a/integration-tests/src/test/resources/indexer/wikipedia_parallel_reindex_task.json
 
b/integration-tests/src/test/resources/indexer/wikipedia_parallel_reindex_task.json
new file mode 100644
index 0000000..c06890b
--- /dev/null
+++ 
b/integration-tests/src/test/resources/indexer/wikipedia_parallel_reindex_task.json
@@ -0,0 +1,65 @@
+{
+    "type": "index_parallel",
+    "spec": {
+        "dataSchema": {
+            "dataSource": "%%DATASOURCE%%",
+            "metricsSpec": [
+                {
+                    "type": "count",
+                    "name": "count"
+                },
+                {
+                    "type": "doubleSum",
+                    "name": "added",
+                    "fieldName": "added"
+                },
+                {
+                    "type": "doubleSum",
+                    "name": "deleted",
+                    "fieldName": "deleted"
+                },
+                {
+                    "type": "doubleSum",
+                    "name": "delta",
+                    "fieldName": "delta"
+                }
+            ],
+            "granularitySpec": {
+                "segmentGranularity": "DAY",
+                "queryGranularity": "second"
+            },
+            "parser": {
+                "parseSpec": {
+                    "format" : "json",
+                    "timestampSpec": {
+                        "column": "timestamp"
+                    },
+                    "dimensionsSpec": {
+                        "dimensions": [
+                            "page",
+                            {"type": "string", "name": "language", 
"createBitmapIndex": false},
+                            "user",
+                            "unpatrolled",
+                            "newPage",
+                            "robot",
+                            "anonymous",
+                            "namespace",
+                            "continent",
+                            "country",
+                            "region",
+                            "city"
+                        ]
+                    }
+                }
+            }
+        },
+        "ioConfig": {
+            "type": "index_parallel",
+            "firehose": {
+                "type": "local",
+                "baseDir": "/resources/data/batch_index",
+                "filter": "wikipedia_index_data2*"
+            }
+        }
+    }
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org

Reply via email to