This is an automated email from the ASF dual-hosted git repository.

capistrant pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new f3ecfc7fc56 Allow large partition numbers in 
VersionedIntervalTimeline. (#18777)
f3ecfc7fc56 is described below

commit f3ecfc7fc564784edddba9324d5ae00918e73e25
Author: Gian Merlino <[email protected]>
AuthorDate: Wed Nov 26 08:06:29 2025 -0800

    Allow large partition numbers in VersionedIntervalTimeline. (#18777)
    
    * Allow large partition numbers in VersionedIntervalTimeline.
    
    The experimental segment locking feature (forceTimeChunkLock: false)
    reserves the first 32768 partition numbers for the "root generation",
    and then uses an "atomic update groups" scheme to replace root segment
    ranges with new sets of segments during reindexing operations.
    
    OvershadowableManager, which manages this atomic update scheme, imposes
    a limit of 32768 segments per time chunk. Previously, this applied even
    to people that are not using segment locking. In this patch, the class
    is now only used when segment locking is actually in play, meaning that
    the limit is not imposed under normal conditions.
    
    * Use the correct annotation.
    
    * Add embedded test, and config property to support it.
    
    * Do HighPartitionNumberTest with a different approach.
    
    * Changes from review.
    
    * Align behavior with tests.
    
    * Remove extraneous newline.
    
    * Fix comment.
---
 .../embedded/msq/HighPartitionNumberTest.java      | 181 +++++++++++++++++++++
 .../timeline/partition/OvershadowableManager.java  |  64 ++++++--
 .../druid/timeline/partition/PartitionHolder.java  |  48 +++---
 .../partition/PartitionHolderContents.java         |  87 ++++++++++
 .../partition/SimplePartitionHolderContents.java   | 160 ++++++++++++++++++
 .../timeline/VersionedIntervalTimelineTest.java    |  34 +++-
 .../partition/OvershadowableManagerTest.java       |  14 +-
 7 files changed, 541 insertions(+), 47 deletions(-)

diff --git 
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/msq/HighPartitionNumberTest.java
 
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/msq/HighPartitionNumberTest.java
new file mode 100644
index 00000000000..19555bc0438
--- /dev/null
+++ 
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/msq/HighPartitionNumberTest.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.testing.embedded.msq;
+
+import com.google.common.collect.Iterables;
+import org.apache.druid.indexing.overlord.Segments;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.msq.exec.ClusterStatisticsMergeMode;
+import org.apache.druid.msq.util.MultiStageQueryContext;
+import org.apache.druid.sql.calcite.planner.Calcites;
+import org.apache.druid.testing.embedded.EmbeddedBroker;
+import org.apache.druid.testing.embedded.EmbeddedCoordinator;
+import org.apache.druid.testing.embedded.EmbeddedDruidCluster;
+import org.apache.druid.testing.embedded.EmbeddedHistorical;
+import org.apache.druid.testing.embedded.EmbeddedIndexer;
+import org.apache.druid.testing.embedded.EmbeddedOverlord;
+import org.apache.druid.testing.embedded.indexing.MoreResources;
+import org.apache.druid.testing.embedded.indexing.Resources;
+import org.apache.druid.testing.embedded.junit5.EmbeddedClusterTestBase;
+import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.timeline.partition.NumberedShardSpec;
+import org.apache.druid.timeline.partition.PartitionIds;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Test to verify that high partition numbers (above the limit of {@link 
PartitionIds#ROOT_GEN_END_PARTITION_ID})
+ * work correctly when segment locking is not in play.
+ */
+public class HighPartitionNumberTest extends EmbeddedClusterTestBase
+{
+  /**
+   * Expected number of rows for three copies of {@link 
Resources.DataFile#tinyWiki1Json()}.
+   */
+  private static final int EXPECTED_TOTAL_ROWS = 9;
+
+  private final EmbeddedBroker broker = new EmbeddedBroker();
+  private final EmbeddedOverlord overlord = new EmbeddedOverlord();
+  private final EmbeddedCoordinator coordinator = new EmbeddedCoordinator();
+  private final EmbeddedIndexer indexer = new EmbeddedIndexer()
+      .setServerMemory(300_000_000L)
+      .addProperty("druid.worker.capacity", "2");
+
+  private EmbeddedMSQApis msqApis;
+
+  @Override
+  protected EmbeddedDruidCluster createCluster()
+  {
+    return EmbeddedDruidCluster
+        .withEmbeddedDerbyAndZookeeper()
+        .useLatchableEmitter()
+        .useDefaultTimeoutForLatchableEmitter(180_000)
+        .addServer(overlord)
+        .addServer(coordinator)
+        .addServer(indexer)
+        .addServer(broker)
+        .addServer(new EmbeddedHistorical());
+  }
+
+  @BeforeAll
+  public void initTestClient()
+  {
+    msqApis = new EmbeddedMSQApis(cluster, overlord);
+  }
+
+  @Test
+  public void testHighPartitionNumbers()
+  {
+    insertFirstSegment();
+    insertSecondSegment();
+    insertLastSegments();
+
+    cluster.callApi().waitForAllSegmentsToBeAvailable(dataSource, coordinator, 
broker);
+
+    // Verify that we have segments with partition numbers above the old limit
+    final int maxPartitionNum = Integer.parseInt(cluster.runSql(
+        "SELECT MAX(partition_num) FROM sys.segments WHERE datasource=%s",
+        Calcites.escapeStringLiteral(dataSource)
+    ).trim());
+
+    Assertions.assertEquals(32769 /* larger than Short.MAX_VALUE */, 
maxPartitionNum);
+
+    // Verify that all data is queryable
+    cluster.callApi().verifySqlQuery(
+        "SELECT COUNT(*) FROM %s",
+        dataSource,
+        String.valueOf(EXPECTED_TOTAL_ROWS)
+    );
+  }
+
+  /**
+   * Inserts {@link Resources.DataFile#tinyWiki1Json()} with partition number 
zero, using SQL.
+   */
+  private void insertFirstSegment()
+  {
+    // Insert tinyWiki1Json in 1 segment (it's a 3 line file).
+    String queryLocal = StringUtils.format(
+        MoreResources.MSQ.INSERT_TINY_WIKI_JSON,
+        dataSource,
+        Resources.DataFile.tinyWiki1Json()
+    );
+
+    Map<String, Object> context = Map.of(
+        MultiStageQueryContext.CTX_CLUSTER_STATISTICS_MERGE_MODE,
+        ClusterStatisticsMergeMode.PARALLEL
+    );
+
+    cluster.callApi().waitForTaskToSucceed(msqApis.submitTaskSql(context, 
queryLocal).getTaskId(), overlord);
+  }
+
+  /**
+   * Reinserts the segment from {@link #insertFirstSegment()} directly into 
metadata storage,
+   * with partition number 32766.
+   */
+  private void insertSecondSegment()
+  {
+    // Get the segment and reinsert it with a higher partition number.
+    final DataSegment firstSegment =
+        Iterables.getOnlyElement(
+            overlord.bindings()
+                    .segmentsMetadataStorage()
+                    .retrieveAllUsedSegments(dataSource, Segments.ONLY_VISIBLE)
+        );
+
+    overlord.bindings().segmentsMetadataStorage().commitSegments(
+        Set.of(
+            firstSegment.withShardSpec(
+                new NumberedShardSpec(
+                    Short.MAX_VALUE - 1,
+                    firstSegment.getShardSpec().getNumCorePartitions()
+                )
+            )
+        ),
+        null
+    );
+  }
+
+  /**
+   * Inserts {@link Resources.DataFile#tinyWiki1Json()} with SQL into three 
segments, starting at
+   * {@link Short#MAX_VALUE} and going to {@link Short#MAX_VALUE} + 2.
+   */
+  private void insertLastSegments()
+  {
+    // Insert tinyWiki1Json in 3 segment (it's a 3 line file).
+    String queryLocal = StringUtils.format(
+        MoreResources.MSQ.INSERT_TINY_WIKI_JSON,
+        dataSource,
+        Resources.DataFile.tinyWiki1Json()
+    );
+
+    Map<String, Object> context = Map.of(
+        MultiStageQueryContext.CTX_CLUSTER_STATISTICS_MERGE_MODE,
+        ClusterStatisticsMergeMode.PARALLEL,
+        MultiStageQueryContext.CTX_ROWS_PER_SEGMENT,
+        1
+    );
+
+    cluster.callApi().waitForTaskToSucceed(msqApis.submitTaskSql(context, 
queryLocal).getTaskId(), overlord);
+  }
+}
diff --git 
a/processing/src/main/java/org/apache/druid/timeline/partition/OvershadowableManager.java
 
b/processing/src/main/java/org/apache/druid/timeline/partition/OvershadowableManager.java
index c7376a874ab..a8cd6946cd4 100644
--- 
a/processing/src/main/java/org/apache/druid/timeline/partition/OvershadowableManager.java
+++ 
b/processing/src/main/java/org/apache/druid/timeline/partition/OvershadowableManager.java
@@ -37,6 +37,7 @@ import it.unimi.dsi.fastutil.shorts.ShortComparator;
 import it.unimi.dsi.fastutil.shorts.ShortComparators;
 import it.unimi.dsi.fastutil.shorts.ShortSortedSet;
 import it.unimi.dsi.fastutil.shorts.ShortSortedSets;
+import org.apache.druid.error.DruidException;
 import org.apache.druid.java.util.common.IAE;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.timeline.Overshadowable;
@@ -63,9 +64,12 @@ import java.util.TreeMap;
  * In {@link org.apache.druid.timeline.VersionedIntervalTimeline}, this class 
is used to manage segments in the same
  * timeChunk.
  *
+ * OvershadowableManager is only used when segment locking is in play, and 
segments with minor version != 0 have
+ * been created. See {@link PartitionHolder#add(PartitionChunk)}.
+ *
  * This class is not thread-safe.
  */
-class OvershadowableManager<T extends Overshadowable<T>>
+public class OvershadowableManager<T extends Overshadowable<T>> implements 
PartitionHolderContents<T>
 {
   /**
    * There are 3 states for atomicUpdateGroups.
@@ -97,7 +101,7 @@ class OvershadowableManager<T extends Overshadowable<T>>
   private final TreeMap<RootPartitionRange, 
Short2ObjectSortedMap<AtomicUpdateGroup<T>>> visibleGroupPerRange;
   private final TreeMap<RootPartitionRange, 
Short2ObjectSortedMap<AtomicUpdateGroup<T>>> overshadowedGroups;
 
-  OvershadowableManager()
+  public OvershadowableManager()
   {
     this.knownPartitionChunks = new HashMap<>();
     this.standbyGroups = new TreeMap<>();
@@ -105,10 +109,23 @@ class OvershadowableManager<T extends Overshadowable<T>>
     this.overshadowedGroups = new TreeMap<>();
   }
 
-  public static <T extends Overshadowable<T>> OvershadowableManager<T> 
copyVisible(OvershadowableManager<T> original)
+  public static <T extends Overshadowable<T>> OvershadowableManager<T> 
fromSimple(
+      final SimplePartitionHolderContents<T> contents
+  )
+  {
+    final OvershadowableManager<T> retVal = new OvershadowableManager<>();
+    final Iterator<PartitionChunk<T>> iter = contents.visibleChunksIterator();
+    while (iter.hasNext()) {
+      retVal.addChunk(iter.next());
+    }
+    return retVal;
+  }
+
+  @Override
+  public OvershadowableManager<T> copyVisible()
   {
     final OvershadowableManager<T> copy = new OvershadowableManager<>();
-    original.visibleGroupPerRange.forEach((partitionRange, versionToGroups) -> 
{
+    visibleGroupPerRange.forEach((partitionRange, versionToGroups) -> {
       // There should be only one group per partition range
       final AtomicUpdateGroup<T> group = 
versionToGroups.values().iterator().next();
       group.getChunks().forEach(chunk -> 
copy.knownPartitionChunks.put(chunk.getChunkNumber(), chunk));
@@ -121,10 +138,11 @@ class OvershadowableManager<T extends Overshadowable<T>>
     return copy;
   }
 
-  public static <T extends Overshadowable<T>> OvershadowableManager<T> 
deepCopy(OvershadowableManager<T> original)
+  @Override
+  public OvershadowableManager<T> deepCopy()
   {
-    final OvershadowableManager<T> copy = copyVisible(original);
-    original.overshadowedGroups.forEach((partitionRange, versionToGroups) -> {
+    final OvershadowableManager<T> copy = copyVisible();
+    overshadowedGroups.forEach((partitionRange, versionToGroups) -> {
       // There should be only one group per partition range
       final AtomicUpdateGroup<T> group = 
versionToGroups.values().iterator().next();
       group.getChunks().forEach(chunk -> 
copy.knownPartitionChunks.put(chunk.getChunkNumber(), chunk));
@@ -134,7 +152,7 @@ class OvershadowableManager<T extends Overshadowable<T>>
           new SingleEntryShort2ObjectSortedMap<>(group.getMinorVersion(), 
AtomicUpdateGroup.copy(group))
       );
     });
-    original.standbyGroups.forEach((partitionRange, versionToGroups) -> {
+    standbyGroups.forEach((partitionRange, versionToGroups) -> {
       // There should be only one group per partition range
       final AtomicUpdateGroup<T> group = 
versionToGroups.values().iterator().next();
       group.getChunks().forEach(chunk -> 
copy.knownPartitionChunks.put(chunk.getChunkNumber(), chunk));
@@ -635,13 +653,23 @@ class OvershadowableManager<T extends Overshadowable<T>>
     }
   }
 
-  boolean addChunk(PartitionChunk<T> chunk)
+  @Override
+  public boolean addChunk(PartitionChunk<T> chunk)
   {
+    // Chunks with minor version zero need to have restrained partition 
numbers with OvershadowableManager.
+    if (chunk.getObject().getMinorVersion() == 0 && chunk.getChunkNumber() >= 
PartitionIds.ROOT_GEN_END_PARTITION_ID) {
+      throw new ISE(
+          "PartitionId[%d] must be in the range [0, 32767] when using segment 
locking. "
+          + "Try compacting the interval to reduce the segment count, or use 
time chunk locking.",
+          chunk.getChunkNumber()
+      );
+    }
+
     // Sanity check. ExistingChunk should be usually null.
     final PartitionChunk<T> existingChunk = 
knownPartitionChunks.put(chunk.getChunkNumber(), chunk);
     if (existingChunk != null) {
       if (!existingChunk.equals(chunk)) {
-        throw new ISE(
+        throw DruidException.defensive(
             "existingChunk[%s] is different from newChunk[%s] for 
partitionId[%d]",
             existingChunk,
             chunk,
@@ -894,8 +922,9 @@ class OvershadowableManager<T extends Overshadowable<T>>
     }
   }
 
+  @Override
   @Nullable
-  PartitionChunk<T> removeChunk(PartitionChunk<T> partitionChunk)
+  public PartitionChunk<T> removeChunk(PartitionChunk<T> partitionChunk)
   {
     final PartitionChunk<T> knownChunk = 
knownPartitionChunks.get(partitionChunk.getChunkNumber());
     if (knownChunk == null) {
@@ -926,12 +955,14 @@ class OvershadowableManager<T extends Overshadowable<T>>
     return knownPartitionChunks.remove(partitionChunk.getChunkNumber());
   }
 
+  @Override
   public boolean isEmpty()
   {
     return visibleGroupPerRange.isEmpty();
   }
 
-  public boolean isComplete()
+  @Override
+  public boolean areVisibleChunksConsistent()
   {
     return Iterators.all(
         visibleGroupPerRange.values().iterator(),
@@ -945,7 +976,8 @@ class OvershadowableManager<T extends Overshadowable<T>>
   }
 
   @Nullable
-  PartitionChunk<T> getChunk(int partitionId)
+  @Override
+  public PartitionChunk<T> getChunk(int partitionId)
   {
     final PartitionChunk<T> chunk = knownPartitionChunks.get(partitionId);
     if (chunk == null) {
@@ -964,7 +996,8 @@ class OvershadowableManager<T extends Overshadowable<T>>
     }
   }
 
-  Iterator<PartitionChunk<T>> visibleChunksIterator()
+  @Override
+  public Iterator<PartitionChunk<T>> visibleChunksIterator()
   {
     final FluentIterable<Short2ObjectSortedMap<AtomicUpdateGroup<T>>> 
versionToGroupIterable = FluentIterable.from(
         visibleGroupPerRange.values()
@@ -978,7 +1011,8 @@ class OvershadowableManager<T extends Overshadowable<T>>
         }).iterator();
   }
 
-  List<PartitionChunk<T>> getOvershadowedChunks()
+  @Override
+  public List<PartitionChunk<T>> getOvershadowedChunks()
   {
     return getAllChunks(overshadowedGroups);
   }
diff --git 
a/processing/src/main/java/org/apache/druid/timeline/partition/PartitionHolder.java
 
b/processing/src/main/java/org/apache/druid/timeline/partition/PartitionHolder.java
index d2fa88916f8..a09121edc56 100644
--- 
a/processing/src/main/java/org/apache/druid/timeline/partition/PartitionHolder.java
+++ 
b/processing/src/main/java/org/apache/druid/timeline/partition/PartitionHolder.java
@@ -32,7 +32,11 @@ import java.util.Objects;
  */
 public class PartitionHolder<T extends Overshadowable<T>> implements 
Iterable<PartitionChunk<T>>
 {
-  private final OvershadowableManager<T> overshadowableManager;
+  /**
+   * Contents of this holder. Begins life as a {@link 
SimplePartitionHolderContents}, then changes to
+   * {@link OvershadowableManager} if any overshadowables with minor versions 
are encountered.
+   */
+  private PartitionHolderContents<T> contents = new 
SimplePartitionHolderContents<>();
 
   private short maxMinorVersion;
 
@@ -41,7 +45,7 @@ public class PartitionHolder<T extends Overshadowable<T>> 
implements Iterable<Pa
   )
   {
     return new PartitionHolder<>(
-        
OvershadowableManager.copyVisible(partitionHolder.overshadowableManager),
+        partitionHolder.contents.copyVisible(),
         partitionHolder.maxMinorVersion
     );
   }
@@ -49,34 +53,40 @@ public class PartitionHolder<T extends Overshadowable<T>> 
implements Iterable<Pa
   public static <T extends Overshadowable<T>> PartitionHolder<T> 
deepCopy(PartitionHolder<T> partitionHolder)
   {
     return new PartitionHolder<>(
-        OvershadowableManager.deepCopy(partitionHolder.overshadowableManager),
+        partitionHolder.contents.deepCopy(),
         partitionHolder.maxMinorVersion
     );
   }
 
   public PartitionHolder(PartitionChunk<T> initialChunk)
   {
-    this.overshadowableManager = new OvershadowableManager<>();
     add(initialChunk);
   }
 
   public PartitionHolder(List<PartitionChunk<T>> initialChunks)
   {
-    this.overshadowableManager = new OvershadowableManager<>();
     for (PartitionChunk<T> chunk : initialChunks) {
       add(chunk);
     }
   }
 
-  protected PartitionHolder(OvershadowableManager<T> overshadowableManager, 
short maxMinorVersion)
+  /**
+   * Constructor for situations where the caller already has a {@link 
PartitionHolderContents}. Generally used for
+   * copying. Also used for tests.
+   */
+  public PartitionHolder(PartitionHolderContents<T> contents, short 
maxMinorVersion)
   {
-    this.overshadowableManager = overshadowableManager;
+    this.contents = contents;
     this.maxMinorVersion = maxMinorVersion;
   }
 
   public boolean add(PartitionChunk<T> chunk)
   {
-    boolean added = overshadowableManager.addChunk(chunk);
+    if (chunk.getObject().getMinorVersion() != 0 && contents instanceof 
SimplePartitionHolderContents) {
+      // Swap simple map for an OvershadowableManager when minor versions are 
encountered.
+      contents = 
OvershadowableManager.fromSimple((SimplePartitionHolderContents<T>) contents);
+    }
+    boolean added = contents.addChunk(chunk);
     if (added && chunk.getObject().getMinorVersion() > maxMinorVersion) {
       maxMinorVersion = chunk.getObject().getMinorVersion();
     }
@@ -95,17 +105,17 @@ public class PartitionHolder<T extends Overshadowable<T>> 
implements Iterable<Pa
   @Nullable
   public PartitionChunk<T> remove(PartitionChunk<T> chunk)
   {
-    return overshadowableManager.removeChunk(chunk);
+    return contents.removeChunk(chunk);
   }
 
   public boolean isEmpty()
   {
-    return overshadowableManager.isEmpty();
+    return contents.isEmpty();
   }
 
   public boolean isComplete()
   {
-    if (overshadowableManager.isEmpty()) {
+    if (contents.isEmpty()) {
       return false;
     }
 
@@ -118,7 +128,7 @@ public class PartitionHolder<T extends Overshadowable<T>> 
implements Iterable<Pa
     }
 
     if (curr.isEnd()) {
-      return overshadowableManager.isComplete();
+      return contents.areVisibleChunksConsistent();
     }
 
     while (iter.hasNext()) {
@@ -128,7 +138,7 @@ public class PartitionHolder<T extends Overshadowable<T>> 
implements Iterable<Pa
       }
 
       if (next.isEnd()) {
-        return overshadowableManager.isComplete();
+        return contents.areVisibleChunksConsistent();
       }
       curr = next;
     }
@@ -138,18 +148,18 @@ public class PartitionHolder<T extends Overshadowable<T>> 
implements Iterable<Pa
 
   public PartitionChunk<T> getChunk(final int partitionNum)
   {
-    return overshadowableManager.getChunk(partitionNum);
+    return contents.getChunk(partitionNum);
   }
 
   @Override
   public Iterator<PartitionChunk<T>> iterator()
   {
-    return overshadowableManager.visibleChunksIterator();
+    return contents.visibleChunksIterator();
   }
 
   public List<PartitionChunk<T>> getOvershadowed()
   {
-    return overshadowableManager.getOvershadowedChunks();
+    return contents.getOvershadowedChunks();
   }
 
   public Iterable<T> payloads()
@@ -167,20 +177,20 @@ public class PartitionHolder<T extends Overshadowable<T>> 
implements Iterable<Pa
       return false;
     }
     PartitionHolder<?> that = (PartitionHolder<?>) o;
-    return Objects.equals(overshadowableManager, that.overshadowableManager);
+    return Objects.equals(contents, that.contents);
   }
 
   @Override
   public int hashCode()
   {
-    return Objects.hash(overshadowableManager);
+    return Objects.hash(contents);
   }
 
   @Override
   public String toString()
   {
     return "PartitionHolder{" +
-           "overshadowableManager=" + overshadowableManager +
+           "contents=" + contents +
            '}';
   }
 
diff --git 
a/processing/src/main/java/org/apache/druid/timeline/partition/PartitionHolderContents.java
 
b/processing/src/main/java/org/apache/druid/timeline/partition/PartitionHolderContents.java
new file mode 100644
index 00000000000..67d2ee7285e
--- /dev/null
+++ 
b/processing/src/main/java/org/apache/druid/timeline/partition/PartitionHolderContents.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.timeline.partition;
+
+import javax.annotation.Nullable;
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ * Contents for {@link PartitionHolder}.
+ *
+ * @see SimplePartitionHolderContents implementation when segment locking is 
not in play
+ * @see OvershadowableManager implementation when segment locking is in play
+ */
+public interface PartitionHolderContents<T>
+{
+  /**
+   * Whether this holder is empty.
+   */
+  boolean isEmpty();
+
+  /**
+   * Whether all visible chunks are consistent, meaning they can possibly be 
considered for
+   * {@link PartitionHolder#isComplete()}. When segment locking is not being 
used, all chunks
+   * are consistent, so this always returns true.
+   */
+  boolean areVisibleChunksConsistent();
+
+  /**
+   * Adds a chunk.
+   *
+   * @return true if no chunk previously existed with these partition 
boundaries
+   */
+  boolean addChunk(PartitionChunk<T> chunk);
+
+  /**
+   * Removes and returns a chunk with the same partition boundaries as the 
provided chunk. Returns null if
+   * no such chunk exists.
+   */
+  @Nullable
+  PartitionChunk<T> removeChunk(PartitionChunk<T> partitionChunk);
+
+  /**
+   * Returns the chunk with a given partition ID, or null if none exists.
+   */
+  @Nullable
+  PartitionChunk<T> getChunk(int partitionId);
+
+  /**
+   * Iterates through all visible chunks. When segment locking is not being 
used, all chunks are visible,
+   * so this returns all chunks.
+   */
+  Iterator<PartitionChunk<T>> visibleChunksIterator();
+
+  /**
+   * Returns chunks that are tracked but not visible. When segment locking is 
not being used, all chunks are
+   * visible, so this returns nothing.
+   */
+  List<PartitionChunk<T>> getOvershadowedChunks();
+
+  /**
+   * Returns a copy of this holder with only visible chunks.
+   */
+  PartitionHolderContents<T> copyVisible();
+
+  /**
+   * Returns a copy of this holder with all chunks.
+   */
+  PartitionHolderContents<T> deepCopy();
+}
diff --git 
a/processing/src/main/java/org/apache/druid/timeline/partition/SimplePartitionHolderContents.java
 
b/processing/src/main/java/org/apache/druid/timeline/partition/SimplePartitionHolderContents.java
new file mode 100644
index 00000000000..ea24cdb9743
--- /dev/null
+++ 
b/processing/src/main/java/org/apache/druid/timeline/partition/SimplePartitionHolderContents.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.timeline.partition;
+
+import it.unimi.dsi.fastutil.ints.Int2ObjectAVLTreeMap;
+import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.timeline.Overshadowable;
+
+import javax.annotation.Nullable;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * Contents for {@link PartitionHolder} when segment locking was not used, and 
therefore no chunks have
+ * {@link Overshadowable#getMinorVersion()}.
+ */
+public class SimplePartitionHolderContents<T extends Overshadowable<T>> 
implements PartitionHolderContents<T>
+{
+  /**
+   * Map of {@link PartitionChunk#getChunkNumber()} to the actual {@link 
PartitionChunk}.
+   */
+  private final Int2ObjectMap<PartitionChunk<T>> chunkForPartition = new 
Int2ObjectAVLTreeMap<>();
+
+  @Override
+  public boolean isEmpty()
+  {
+    return chunkForPartition.isEmpty();
+  }
+
+  @Override
+  public boolean areVisibleChunksConsistent()
+  {
+    return true;
+  }
+
+  @Override
+  public boolean addChunk(PartitionChunk<T> chunk)
+  {
+    if (chunk.getObject().getMinorVersion() != 0) {
+      throw DruidException.defensive("Cannot handle chunk with 
minorVersion[%d]", chunk.getObject().getMinorVersion());
+    }
+
+    final PartitionChunk<T> existingChunk = 
chunkForPartition.putIfAbsent(chunk.getChunkNumber(), chunk);
+    if (existingChunk != null) {
+      if (!existingChunk.equals(chunk)) {
+        throw DruidException.defensive(
+            "existingChunk[%s] is different from newChunk[%s] for 
partitionId[%d]",
+            existingChunk,
+            chunk,
+            chunk.getChunkNumber()
+        );
+      } else {
+        // A new chunk of the same major version and partitionId can be added 
in segment handoff
+        // from stream ingestion tasks to historicals
+        return false;
+      }
+    } else {
+      return true;
+    }
+  }
+
+  @Override
+  @Nullable
+  public PartitionChunk<T> removeChunk(PartitionChunk<T> chunk)
+  {
+    final PartitionChunk<T> knownChunk = 
chunkForPartition.get(chunk.getChunkNumber());
+    if (knownChunk == null) {
+      return null;
+    }
+
+    if (!knownChunk.equals(chunk)) {
+      throw DruidException.defensive(
+          "Unexpected state: Same partitionId[%d], but known partition[%s] is 
different from the input partition[%s]",
+          chunk.getChunkNumber(),
+          knownChunk,
+          chunk
+      );
+    }
+
+    return chunkForPartition.remove(chunk.getChunkNumber());
+  }
+
+  @Override
+  public PartitionChunk<T> getChunk(int partitionNum)
+  {
+    return chunkForPartition.get(partitionNum);
+  }
+
+  @Override
+  public Iterator<PartitionChunk<T>> visibleChunksIterator()
+  {
+    return chunkForPartition.values().iterator();
+  }
+
+  @Override
+  public List<PartitionChunk<T>> getOvershadowedChunks()
+  {
+    return List.of();
+  }
+
+  @Override
+  public PartitionHolderContents<T> copyVisible()
+  {
+    final SimplePartitionHolderContents<T> retVal = new 
SimplePartitionHolderContents<>();
+    for (PartitionChunk<T> chunk : chunkForPartition.values()) {
+      retVal.addChunk(chunk);
+    }
+    return retVal;
+  }
+
+  @Override
+  public PartitionHolderContents<T> deepCopy()
+  {
+    // All chunks are always visible, so copyVisible() and deepCopy() are the 
same thing.
+    return copyVisible();
+  }
+
+  @Override
+  public boolean equals(Object o)
+  {
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    SimplePartitionHolderContents<?> that = (SimplePartitionHolderContents<?>) 
o;
+    return Objects.equals(chunkForPartition, that.chunkForPartition);
+  }
+
+  @Override
+  public int hashCode()
+  {
+    return Objects.hashCode(chunkForPartition);
+  }
+
+  @Override
+  public String toString()
+  {
+    return "SimplePartitionHolderContents{" +
+           "chunkForPartition=" + chunkForPartition +
+           '}';
+  }
+}
diff --git 
a/processing/src/test/java/org/apache/druid/timeline/VersionedIntervalTimelineTest.java
 
b/processing/src/test/java/org/apache/druid/timeline/VersionedIntervalTimelineTest.java
index 23f9fa3fd5f..bd2e61e20cb 100644
--- 
a/processing/src/test/java/org/apache/druid/timeline/VersionedIntervalTimelineTest.java
+++ 
b/processing/src/test/java/org/apache/druid/timeline/VersionedIntervalTimelineTest.java
@@ -27,7 +27,9 @@ import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.java.util.common.Intervals;
 import org.apache.druid.timeline.partition.IntegerPartitionChunk;
 import org.apache.druid.timeline.partition.OvershadowableInteger;
+import org.apache.druid.timeline.partition.OvershadowableManager;
 import org.apache.druid.timeline.partition.PartitionHolder;
+import org.apache.druid.timeline.partition.PartitionHolderContents;
 import org.joda.time.Interval;
 import org.junit.Assert;
 import org.junit.Before;
@@ -1474,17 +1476,16 @@ public class VersionedIntervalTimelineTest extends 
VersionedIntervalTimelineTest
 
     final List<TimelineObjectHolder<String, OvershadowableInteger>> holders = 
timeline.lookup(interval);
 
+    final PartitionHolderContents<OvershadowableInteger> expectedContents = 
new OvershadowableManager<>();
+    expectedContents.addChunk(makeNumbered("1", 0, 0));
+    expectedContents.addChunk(makeNumbered("1", 1, 0));
+
     Assert.assertEquals(
         ImmutableList.of(
             new TimelineObjectHolder<>(
                 interval,
                 "1",
-                new PartitionHolder<>(
-                    ImmutableList.of(
-                        makeNumbered("1", 0, 0),
-                        makeNumbered("1", 1, 0)
-                    )
-                )
+                new PartitionHolder<>(expectedContents, (short) 0)
             )
         ),
         holders
@@ -1648,4 +1649,25 @@ public class VersionedIntervalTimelineTest extends 
VersionedIntervalTimelineTest
         
timeline.findNonOvershadowedObjectsInInterval(Intervals.of("2019-01-01/2019-01-04"),
 Partitions.INCOMPLETE_OK)
     );
   }
+
+  @Test
+  public void testLargePartitionNumbers()
+  {
+    add("2011-01-01/2011-01-10", "1", makeNumbered("1", 1, 1));
+    add("2011-01-01/2011-01-10", "1", makeNumbered("1", 100000, 2));
+    add("2011-01-01/2011-01-10", "1", makeNumbered("1", Integer.MAX_VALUE, 3));
+
+    final Iterable<OvershadowableInteger> allObjects = ImmutableList.copyOf(
+        
VersionedIntervalTimeline.getAllObjects(timeline.lookup(Intervals.of("2011-01-02T02/2011-01-04")))
+    );
+
+    Assert.assertEquals(
+        ImmutableList.of(
+            new OvershadowableInteger("1", 1, 1),
+            new OvershadowableInteger("1", 100000, 2),
+            new OvershadowableInteger("1", Integer.MAX_VALUE, 3)
+        ),
+        allObjects
+    );
+  }
 }
diff --git 
a/processing/src/test/java/org/apache/druid/timeline/partition/OvershadowableManagerTest.java
 
b/processing/src/test/java/org/apache/druid/timeline/partition/OvershadowableManagerTest.java
index 92f0fed47b4..40c2b70e792 100644
--- 
a/processing/src/test/java/org/apache/druid/timeline/partition/OvershadowableManagerTest.java
+++ 
b/processing/src/test/java/org/apache/druid/timeline/partition/OvershadowableManagerTest.java
@@ -84,7 +84,7 @@ public class OvershadowableManagerTest
     // standby chunk
     manager.addChunk(newNonRootChunk(2, 4, 1, 3));
 
-    OvershadowableManager<OvershadowableInteger> copy = 
OvershadowableManager.copyVisible(manager);
+    OvershadowableManager<OvershadowableInteger> copy = manager.copyVisible();
     Assert.assertTrue(copy.getOvershadowedChunks().isEmpty());
     Assert.assertTrue(copy.getStandbyChunks().isEmpty());
     Assert.assertEquals(
@@ -112,7 +112,7 @@ public class OvershadowableManagerTest
     // standby chunk
     manager.addChunk(newNonRootChunk(2, 4, 1, 3));
 
-    OvershadowableManager<OvershadowableInteger> copy = 
OvershadowableManager.deepCopy(manager);
+    OvershadowableManager<OvershadowableInteger> copy = manager.deepCopy();
     Assert.assertEquals(manager, copy);
   }
 
@@ -223,14 +223,14 @@ public class OvershadowableManagerTest
     PartitionChunk<OvershadowableInteger> chunk = newRootChunk();
     Assert.assertTrue(addVisibleToManager(chunk));
     assertManagerState();
-    Assert.assertTrue(manager.isComplete());
+    Assert.assertTrue(manager.areVisibleChunksConsistent());
     // Add a duplicate
     Assert.assertFalse(manager.addChunk(chunk));
     // Add a new one
     chunk = newRootChunk();
     Assert.assertTrue(addVisibleToManager(chunk));
     assertManagerState();
-    Assert.assertTrue(manager.isComplete());
+    Assert.assertTrue(manager.areVisibleChunksConsistent());
   }
 
   @Test
@@ -241,17 +241,17 @@ public class OvershadowableManagerTest
     PartitionChunk<OvershadowableInteger> chunk = newNonRootChunk(10, 12, 1, 
3);
     Assert.assertTrue(addVisibleToManager(chunk));
     assertManagerState();
-    Assert.assertFalse(manager.isComplete());
+    Assert.assertFalse(manager.areVisibleChunksConsistent());
     // Add a new one, atomicUpdateGroup is still not full
     chunk = newNonRootChunk(10, 12, 1, 3);
     Assert.assertTrue(addVisibleToManager(chunk));
     assertManagerState();
-    Assert.assertFalse(manager.isComplete());
+    Assert.assertFalse(manager.areVisibleChunksConsistent());
     // Add a new one, now atomicUpdateGroup is full
     chunk = newNonRootChunk(10, 12, 1, 3);
     Assert.assertTrue(addVisibleToManager(chunk));
     assertManagerState();
-    Assert.assertTrue(manager.isComplete());
+    Assert.assertTrue(manager.areVisibleChunksConsistent());
 
     // Add a new one to the full group
     expectedException.expect(IllegalStateException.class);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to