This is an automated email from the ASF dual-hosted git repository.
abhishekrb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 470c8ed7b01 Make `numCorePartitions` as 0 for tombstones (#15379)
470c8ed7b01 is described below
commit 470c8ed7b01ebea19278c83c390d51bc5c03111c
Author: Abhishek Radhakrishnan <[email protected]>
AuthorDate: Mon Nov 20 09:42:51 2023 -0800
Make `numCorePartitions` as 0 for tombstones (#15379)
* Make numCorePartitions as 0 in the TombstoneShardSpec.
* fix up test
* Add tombstone core partition tests
* review comment
* Need to register the test shard type to make jackson happy
---
.../timeline/partition/TombstoneShardSpec.java | 10 +-
.../timeline/partition/TombstoneShardSpecTest.java | 2 +-
.../IndexerSQLMetadataStorageCoordinatorTest.java | 124 ++++++++++++++++++++-
3 files changed, 129 insertions(+), 7 deletions(-)
diff --git
a/processing/src/main/java/org/apache/druid/timeline/partition/TombstoneShardSpec.java
b/processing/src/main/java/org/apache/druid/timeline/partition/TombstoneShardSpec.java
index 62dea7b0fa9..a4b15458997 100644
---
a/processing/src/main/java/org/apache/druid/timeline/partition/TombstoneShardSpec.java
+++
b/processing/src/main/java/org/apache/druid/timeline/partition/TombstoneShardSpec.java
@@ -29,7 +29,9 @@ import java.util.Map;
import java.util.Objects;
/**
- * A shard spec to represent tombstones. Its partition number is always zero
and contains 1 core partitions.
+ * A shard spec to represent tombstones. Its partition number is always zero
and contains zero core partitions as it
+ * contains no data. This allows other shard types appending to an existing
{@link TombstoneShardSpec} to exist independently
+ * in the timeline even if the {@link TombstoneShardSpec} is dropped.
*/
public class TombstoneShardSpec implements ShardSpec
{
@@ -69,7 +71,7 @@ public class TombstoneShardSpec implements ShardSpec
@JsonProperty("partitions")
public int getNumCorePartitions()
{
- return 1;
+ return 0;
}
@Override
@@ -88,8 +90,8 @@ public class TombstoneShardSpec implements ShardSpec
public String toString()
{
return "TombstoneShardSpec{" +
- "partitionNum=" + 0 +
- ", partitions=" + 1 +
+ "partitionNum=" + getPartitionNum() +
+ ", partitions=" + getNumCorePartitions() +
'}';
}
diff --git
a/processing/src/test/java/org/apache/druid/timeline/partition/TombstoneShardSpecTest.java
b/processing/src/test/java/org/apache/druid/timeline/partition/TombstoneShardSpecTest.java
index 2cdfcdf937a..0a08869dbfc 100644
---
a/processing/src/test/java/org/apache/druid/timeline/partition/TombstoneShardSpecTest.java
+++
b/processing/src/test/java/org/apache/druid/timeline/partition/TombstoneShardSpecTest.java
@@ -60,7 +60,7 @@ public class TombstoneShardSpecTest
@Test
public void getNumCorePartitions()
{
- assertEquals(1, tombstoneShardSpec.getNumCorePartitions());
+ assertEquals(0, tombstoneShardSpec.getNumCorePartitions());
}
@Test
diff --git
a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
index 888c1006717..5d76296d67b 100644
---
a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
+++
b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
@@ -19,6 +19,7 @@
package org.apache.druid.metadata;
+import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
@@ -42,6 +43,7 @@ import
org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.timeline.SegmentTimeline;
import org.apache.druid.timeline.partition.DimensionRangeShardSpec;
import org.apache.druid.timeline.partition.HashBasedNumberedPartialShardSpec;
import org.apache.druid.timeline.partition.HashBasedNumberedShardSpec;
@@ -55,6 +57,7 @@ import org.apache.druid.timeline.partition.PartialShardSpec;
import org.apache.druid.timeline.partition.PartitionIds;
import org.apache.druid.timeline.partition.ShardSpec;
import org.apache.druid.timeline.partition.SingleDimensionShardSpec;
+import org.apache.druid.timeline.partition.TombstoneShardSpec;
import org.assertj.core.api.Assertions;
import org.joda.time.DateTime;
import org.joda.time.Interval;
@@ -1965,7 +1968,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest
* - Later, after the above was dropped, another segment on same interval
was created by the stream but this
* time there was an integrity violation in the pending segments table
because the
* {@link IndexerSQLMetadataStorageCoordinator#createNewSegment(Handle,
String, Interval, PartialShardSpec, String)}
- * method returned an segment id that already existed in the pending
segments table
+ * method returned a segment id that already existed in the pending segments
table
*/
@Test
public void testAllocatePendingSegmentAfterDroppingExistingSegment()
@@ -2907,6 +2910,110 @@ public class IndexerSQLMetadataStorageCoordinatorTest
Assert.assertEquals(3, resultForEternity.size());
}
+ @Test
+ public void testTimelineVisibilityWith0CorePartitionTombstone() throws
IOException
+ {
+ final Interval interval = Intervals.of("2020/2021");
+ // Create and commit a tombstone segment
+ final DataSegment tombstoneSegment = createSegment(
+ interval,
+ "version",
+ new TombstoneShardSpec()
+ );
+
+ final Set<DataSegment> tombstones = new
HashSet<>(Collections.singleton(tombstoneSegment));
+
Assert.assertTrue(coordinator.commitSegments(tombstones).containsAll(tombstones));
+
+ // Allocate and commit a data segment by appending to the same interval
+ final SegmentIdWithShardSpec identifier =
coordinator.allocatePendingSegment(
+ DS.WIKI,
+ "seq",
+ tombstoneSegment.getVersion(),
+ interval,
+ NumberedPartialShardSpec.instance(),
+ "version",
+ false
+ );
+
+
Assert.assertEquals("wiki_2020-01-01T00:00:00.000Z_2021-01-01T00:00:00.000Z_version_1",
identifier.toString());
+ Assert.assertEquals(0, identifier.getShardSpec().getNumCorePartitions());
+
+ final DataSegment dataSegment = createSegment(
+ interval,
+ "version",
+ identifier.getShardSpec()
+ );
+ final Set<DataSegment> dataSegments = new
HashSet<>(Collections.singleton(dataSegment));
+
Assert.assertTrue(coordinator.commitSegments(dataSegments).containsAll(dataSegments));
+
+ // Mark the tombstone as unused
+ markAllSegmentsUnused(tombstones);
+
+ final Collection<DataSegment> allUsedSegments =
coordinator.retrieveAllUsedSegments(
+ DS.WIKI,
+ Segments.ONLY_VISIBLE
+ );
+
+ // The appended data segment will still be visible in the timeline since
the
+ // tombstone contains 0 core partitions
+ SegmentTimeline segmentTimeline =
SegmentTimeline.forSegments(allUsedSegments);
+ Assert.assertEquals(1, segmentTimeline.lookup(interval).size());
+ Assert.assertEquals(dataSegment,
segmentTimeline.lookup(interval).get(0).getObject().getChunk(1).getObject());
+ }
+
+ @Test
+ public void testTimelineWith1CorePartitionTombstone() throws IOException
+ {
+ // Register the old generation tombstone spec for this test.
+ mapper.registerSubtypes(TombstoneShardSpecWith1CorePartition.class);
+
+ final Interval interval = Intervals.of("2020/2021");
+ // Create and commit an old generation tombstone with 1 core partition
+ final DataSegment tombstoneSegment = createSegment(
+ interval,
+ "version",
+ new TombstoneShardSpecWith1CorePartition()
+ );
+
+ final Set<DataSegment> tombstones = new
HashSet<>(Collections.singleton(tombstoneSegment));
+
Assert.assertTrue(coordinator.commitSegments(tombstones).containsAll(tombstones));
+
+ // Allocate and commit a data segment by appending to the same interval
+ final SegmentIdWithShardSpec identifier =
coordinator.allocatePendingSegment(
+ DS.WIKI,
+ "seq",
+ tombstoneSegment.getVersion(),
+ interval,
+ NumberedPartialShardSpec.instance(),
+ "version",
+ false
+ );
+
+
Assert.assertEquals("wiki_2020-01-01T00:00:00.000Z_2021-01-01T00:00:00.000Z_version_1",
identifier.toString());
+ Assert.assertEquals(1, identifier.getShardSpec().getNumCorePartitions());
+
+ final DataSegment dataSegment = createSegment(
+ interval,
+ "version",
+ identifier.getShardSpec()
+ );
+ final Set<DataSegment> dataSegments = new
HashSet<>(Collections.singleton(dataSegment));
+
Assert.assertTrue(coordinator.commitSegments(dataSegments).containsAll(dataSegments));
+
+ // Mark the tombstone as unused
+ markAllSegmentsUnused(tombstones);
+
+ final Collection<DataSegment> allUsedSegments =
coordinator.retrieveAllUsedSegments(
+ DS.WIKI,
+ Segments.ONLY_VISIBLE
+ );
+
+ // The appended data segment will not be visible in the timeline since the
old generation
+ // tombstone contains 1 core partition
+ SegmentTimeline segmentTimeline =
SegmentTimeline.forSegments(allUsedSegments);
+ Assert.assertEquals(0, segmentTimeline.lookup(interval).size());
+ }
+
private static class DS
{
static final String WIKI = "wiki";
@@ -2936,7 +3043,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest
}
final Set<DataSegment> segmentsSet = new HashSet<>(segments);
final Set<DataSegment> committedSegments =
coordinator.commitSegments(segmentsSet);
- Assert.assertTrue(committedSegments.containsAll(new HashSet<>(segments)));
+ Assert.assertTrue(committedSegments.containsAll(segmentsSet));
return segments;
}
@@ -2961,4 +3068,17 @@ public class IndexerSQLMetadataStorageCoordinatorTest
}
);
}
+
+ /**
+ * This test-only shard type is to test the behavior of "old generation"
tombstones with 1 core partition.
+ */
+ private static class TombstoneShardSpecWith1CorePartition extends
TombstoneShardSpec
+ {
+ @Override
+ @JsonProperty("partitions")
+ public int getNumCorePartitions()
+ {
+ return 1;
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]