This is an automated email from the ASF dual-hosted git repository.

abhishekrb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 470c8ed7b01 Make `numCorePartitions` as 0 for tombstones (#15379)
470c8ed7b01 is described below

commit 470c8ed7b01ebea19278c83c390d51bc5c03111c
Author: Abhishek Radhakrishnan <[email protected]>
AuthorDate: Mon Nov 20 09:42:51 2023 -0800

    Make `numCorePartitions` as 0 for tombstones (#15379)
    
    * Make numCorePartitions as 0 in the TombstoneShardSpec.
    
    * fix up test
    
    * Add tombstone core partition tests
    
    * review comment
    
    * Need to register the test shard type to make jackson happy
---
 .../timeline/partition/TombstoneShardSpec.java     |  10 +-
 .../timeline/partition/TombstoneShardSpecTest.java |   2 +-
 .../IndexerSQLMetadataStorageCoordinatorTest.java  | 124 ++++++++++++++++++++-
 3 files changed, 129 insertions(+), 7 deletions(-)

diff --git 
a/processing/src/main/java/org/apache/druid/timeline/partition/TombstoneShardSpec.java
 
b/processing/src/main/java/org/apache/druid/timeline/partition/TombstoneShardSpec.java
index 62dea7b0fa9..a4b15458997 100644
--- 
a/processing/src/main/java/org/apache/druid/timeline/partition/TombstoneShardSpec.java
+++ 
b/processing/src/main/java/org/apache/druid/timeline/partition/TombstoneShardSpec.java
@@ -29,7 +29,9 @@ import java.util.Map;
 import java.util.Objects;
 
 /**
- * A shard spec to represent tombstones. Its partition number is always zero 
and contains 1 core partitions.
+ * A shard spec to represent tombstones. Its partition number is always zero 
and contains zero core partitions as it
+ * contains no data. This allows other shard types appending to an existing 
{@link TombstoneShardSpec} to exist independently
+ * in the timeline even if the {@link TombstoneShardSpec} is dropped.
  */
 public class TombstoneShardSpec implements ShardSpec
 {
@@ -69,7 +71,7 @@ public class TombstoneShardSpec implements ShardSpec
   @JsonProperty("partitions")
   public int getNumCorePartitions()
   {
-    return 1;
+    return 0;
   }
 
   @Override
@@ -88,8 +90,8 @@ public class TombstoneShardSpec implements ShardSpec
   public String toString()
   {
     return "TombstoneShardSpec{" +
-           "partitionNum=" + 0 +
-           ", partitions=" + 1 +
+           "partitionNum=" + getPartitionNum() +
+           ", partitions=" + getNumCorePartitions() +
            '}';
   }
 
diff --git 
a/processing/src/test/java/org/apache/druid/timeline/partition/TombstoneShardSpecTest.java
 
b/processing/src/test/java/org/apache/druid/timeline/partition/TombstoneShardSpecTest.java
index 2cdfcdf937a..0a08869dbfc 100644
--- 
a/processing/src/test/java/org/apache/druid/timeline/partition/TombstoneShardSpecTest.java
+++ 
b/processing/src/test/java/org/apache/druid/timeline/partition/TombstoneShardSpecTest.java
@@ -60,7 +60,7 @@ public class TombstoneShardSpecTest
   @Test
   public void getNumCorePartitions()
   {
-    assertEquals(1, tombstoneShardSpec.getNumCorePartitions());
+    assertEquals(0, tombstoneShardSpec.getNumCorePartitions());
   }
 
   @Test
diff --git 
a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
 
b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
index 888c1006717..5d76296d67b 100644
--- 
a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
+++ 
b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
@@ -19,6 +19,7 @@
 
 package org.apache.druid.metadata;
 
+import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
@@ -42,6 +43,7 @@ import 
org.apache.druid.java.util.common.parsers.CloseableIterator;
 import org.apache.druid.segment.TestHelper;
 import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
 import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.timeline.SegmentTimeline;
 import org.apache.druid.timeline.partition.DimensionRangeShardSpec;
 import org.apache.druid.timeline.partition.HashBasedNumberedPartialShardSpec;
 import org.apache.druid.timeline.partition.HashBasedNumberedShardSpec;
@@ -55,6 +57,7 @@ import org.apache.druid.timeline.partition.PartialShardSpec;
 import org.apache.druid.timeline.partition.PartitionIds;
 import org.apache.druid.timeline.partition.ShardSpec;
 import org.apache.druid.timeline.partition.SingleDimensionShardSpec;
+import org.apache.druid.timeline.partition.TombstoneShardSpec;
 import org.assertj.core.api.Assertions;
 import org.joda.time.DateTime;
 import org.joda.time.Interval;
@@ -1965,7 +1968,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest
    * - Later, after the above was dropped, another segment on same interval 
was created by the stream but this
    * time there was an integrity violation in the pending segments table 
because the
    * {@link IndexerSQLMetadataStorageCoordinator#createNewSegment(Handle, 
String, Interval, PartialShardSpec, String)}
-   * method returned an segment id that already existed in the pending 
segments table
+   * method returned a segment id that already existed in the pending segments 
table
    */
   @Test
   public void testAllocatePendingSegmentAfterDroppingExistingSegment()
@@ -2907,6 +2910,110 @@ public class IndexerSQLMetadataStorageCoordinatorTest
     Assert.assertEquals(3, resultForEternity.size());
   }
 
+  @Test
+  public void testTimelineVisibilityWith0CorePartitionTombstone() throws 
IOException
+  {
+    final Interval interval = Intervals.of("2020/2021");
+    // Create and commit a tombstone segment
+    final DataSegment tombstoneSegment = createSegment(
+        interval,
+        "version",
+        new TombstoneShardSpec()
+    );
+
+    final Set<DataSegment> tombstones = new 
HashSet<>(Collections.singleton(tombstoneSegment));
+    
Assert.assertTrue(coordinator.commitSegments(tombstones).containsAll(tombstones));
+
+    // Allocate and commit a data segment by appending to the same interval
+    final SegmentIdWithShardSpec identifier = 
coordinator.allocatePendingSegment(
+        DS.WIKI,
+        "seq",
+        tombstoneSegment.getVersion(),
+        interval,
+        NumberedPartialShardSpec.instance(),
+        "version",
+        false
+    );
+
+    
Assert.assertEquals("wiki_2020-01-01T00:00:00.000Z_2021-01-01T00:00:00.000Z_version_1",
 identifier.toString());
+    Assert.assertEquals(0, identifier.getShardSpec().getNumCorePartitions());
+
+    final DataSegment dataSegment = createSegment(
+        interval,
+        "version",
+        identifier.getShardSpec()
+    );
+    final Set<DataSegment> dataSegments = new 
HashSet<>(Collections.singleton(dataSegment));
+    
Assert.assertTrue(coordinator.commitSegments(dataSegments).containsAll(dataSegments));
+
+    // Mark the tombstone as unused
+    markAllSegmentsUnused(tombstones);
+
+    final Collection<DataSegment> allUsedSegments = 
coordinator.retrieveAllUsedSegments(
+        DS.WIKI,
+        Segments.ONLY_VISIBLE
+    );
+
+    // The appended data segment will still be visible in the timeline since 
the
+    // tombstone contains 0 core partitions
+    SegmentTimeline segmentTimeline = 
SegmentTimeline.forSegments(allUsedSegments);
+    Assert.assertEquals(1, segmentTimeline.lookup(interval).size());
+    Assert.assertEquals(dataSegment, 
segmentTimeline.lookup(interval).get(0).getObject().getChunk(1).getObject());
+  }
+
+  @Test
+  public void testTimelineWith1CorePartitionTombstone() throws IOException
+  {
+    // Register the old generation tombstone spec for this test.
+    mapper.registerSubtypes(TombstoneShardSpecWith1CorePartition.class);
+
+    final Interval interval = Intervals.of("2020/2021");
+    // Create and commit an old generation tombstone with 1 core partition
+    final DataSegment tombstoneSegment = createSegment(
+        interval,
+        "version",
+        new TombstoneShardSpecWith1CorePartition()
+    );
+
+    final Set<DataSegment> tombstones = new 
HashSet<>(Collections.singleton(tombstoneSegment));
+    
Assert.assertTrue(coordinator.commitSegments(tombstones).containsAll(tombstones));
+
+    // Allocate and commit a data segment by appending to the same interval
+    final SegmentIdWithShardSpec identifier = 
coordinator.allocatePendingSegment(
+        DS.WIKI,
+        "seq",
+        tombstoneSegment.getVersion(),
+        interval,
+        NumberedPartialShardSpec.instance(),
+        "version",
+        false
+    );
+
+    
Assert.assertEquals("wiki_2020-01-01T00:00:00.000Z_2021-01-01T00:00:00.000Z_version_1",
 identifier.toString());
+    Assert.assertEquals(1, identifier.getShardSpec().getNumCorePartitions());
+
+    final DataSegment dataSegment = createSegment(
+        interval,
+        "version",
+        identifier.getShardSpec()
+    );
+    final Set<DataSegment> dataSegments = new 
HashSet<>(Collections.singleton(dataSegment));
+    
Assert.assertTrue(coordinator.commitSegments(dataSegments).containsAll(dataSegments));
+
+    // Mark the tombstone as unused
+    markAllSegmentsUnused(tombstones);
+
+    final Collection<DataSegment> allUsedSegments = 
coordinator.retrieveAllUsedSegments(
+        DS.WIKI,
+        Segments.ONLY_VISIBLE
+    );
+
+    // The appended data segment will not be visible in the timeline since the 
old generation
+    // tombstone contains 1 core partition
+    SegmentTimeline segmentTimeline = 
SegmentTimeline.forSegments(allUsedSegments);
+    Assert.assertEquals(0, segmentTimeline.lookup(interval).size());
+  }
+
   private static class DS
   {
     static final String WIKI = "wiki";
@@ -2936,7 +3043,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest
     }
     final Set<DataSegment> segmentsSet = new HashSet<>(segments);
     final Set<DataSegment> committedSegments = 
coordinator.commitSegments(segmentsSet);
-    Assert.assertTrue(committedSegments.containsAll(new HashSet<>(segments)));
+    Assert.assertTrue(committedSegments.containsAll(segmentsSet));
 
     return segments;
   }
@@ -2961,4 +3068,17 @@ public class IndexerSQLMetadataStorageCoordinatorTest
         }
     );
   }
+
+  /**
+   * This test-only shard type is to test the behavior of "old generation" 
tombstones with 1 core partition.
+   */
+  private static class TombstoneShardSpecWith1CorePartition extends 
TombstoneShardSpec
+  {
+    @Override
+    @JsonProperty("partitions")
+    public int getNumCorePartitions()
+    {
+      return 1;
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to