This is an automated email from the ASF dual-hosted git repository.

kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new f42ecc9f256 Fail concurrent replace tasks with finer segment 
granularity than append  (#17265)
f42ecc9f256 is described below

commit f42ecc9f25675d7ac1c5c759749ba9e074f4f40e
Author: AmatyaAvadhanula <[email protected]>
AuthorDate: Tue Oct 8 07:35:13 2024 +0530

    Fail concurrent replace tasks with finer segment granularity than append  
(#17265)
---
 .../concurrent/ConcurrentReplaceAndAppendTest.java | 182 +++++++++++++++++++++
 .../IndexerSQLMetadataStorageCoordinator.java      |  25 ++-
 .../IndexerSQLMetadataStorageCoordinatorTest.java  |  67 ++++++++
 3 files changed, 269 insertions(+), 5 deletions(-)

diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndAppendTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndAppendTest.java
index 96c72e7130d..c74177e2c38 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndAppendTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndAppendTest.java
@@ -103,6 +103,9 @@ public class ConcurrentReplaceAndAppendTest extends 
IngestionTestBase
   private static final Interval YEAR_23 = Intervals.of("2023/2024");
   private static final Interval JAN_23 = Intervals.of("2023-01/2023-02");
   private static final Interval DEC_23 = Intervals.of("2023-12/2024-01");
+  private static final Interval JAN_FEB_MAR_23 = 
Intervals.of("2023-01-01/2023-04-01");
+  private static final Interval APR_MAY_JUN_23 = 
Intervals.of("2023-04-01/2023-07-01");
+  private static final Interval JUL_AUG_SEP_23 = 
Intervals.of("2023-07-01/2023-10-01");
   private static final Interval OCT_NOV_DEC_23 = 
Intervals.of("2023-10-01/2024-01-01");
   private static final Interval FIRST_OF_JAN_23 = 
Intervals.of("2023-01-01/2023-01-02");
 
@@ -599,6 +602,185 @@ public class ConcurrentReplaceAndAppendTest extends 
IngestionTestBase
     verifyIntervalHasVisibleSegments(JAN_23, segmentV01);
   }
 
+  @Test
+  public void testLockReplaceQuarterAllocateAppendYear()
+  {
+    final TaskLock replaceLock = replaceTask.acquireReplaceLockOn(YEAR_23);
+    Assert.assertNotNull(replaceLock);
+
+    final DataSegment segmentV1Q1 = createSegment(JAN_FEB_MAR_23, 
replaceLock.getVersion());
+    final DataSegment segmentV1Q2 = createSegment(APR_MAY_JUN_23, 
replaceLock.getVersion());
+    final DataSegment segmentV1Q3 = createSegment(JUL_AUG_SEP_23, 
replaceLock.getVersion());
+    final DataSegment segmentV1Q4 = createSegment(OCT_NOV_DEC_23, 
replaceLock.getVersion());
+
+    Assert.assertTrue(
+        replaceTask.commitReplaceSegments(segmentV1Q1, segmentV1Q2, 
segmentV1Q3, segmentV1Q4)
+                   .isSuccess()
+    );
+    verifyIntervalHasUsedSegments(YEAR_23, segmentV1Q1, segmentV1Q2, 
segmentV1Q3, segmentV1Q4);
+    verifyIntervalHasVisibleSegments(YEAR_23, segmentV1Q1, segmentV1Q2, 
segmentV1Q3, segmentV1Q4);
+
+    final SegmentIdWithShardSpec pendingSegment
+        = appendTask.allocateSegmentForTimestamp(YEAR_23.getStart(), 
Granularities.YEAR);
+    Assert.assertEquals(JAN_FEB_MAR_23, pendingSegment.getInterval());
+    Assert.assertEquals(replaceLock.getVersion(), pendingSegment.getVersion());
+
+    final DataSegment appendedSegment = asSegment(pendingSegment);
+    appendTask.commitAppendSegments(appendedSegment);
+
+    verifyIntervalHasUsedSegments(YEAR_23, appendedSegment, segmentV1Q1, 
segmentV1Q2, segmentV1Q3, segmentV1Q4);
+    verifyIntervalHasVisibleSegments(YEAR_23, appendedSegment, segmentV1Q1, 
segmentV1Q2, segmentV1Q3, segmentV1Q4);
+  }
+
+  @Test
+  public void testLockAllocateAppendYearReplaceQuarter()
+  {
+    final TaskLock replaceLock = replaceTask.acquireReplaceLockOn(YEAR_23);
+    Assert.assertNotNull(replaceLock);
+
+    final SegmentIdWithShardSpec pendingSegment
+        = appendTask.allocateSegmentForTimestamp(YEAR_23.getStart(), 
Granularities.YEAR);
+    Assert.assertEquals(YEAR_23, pendingSegment.getInterval());
+    Assert.assertEquals(SEGMENT_V0, pendingSegment.getVersion());
+
+    final DataSegment segmentV01 = asSegment(pendingSegment);
+    appendTask.commitAppendSegments(segmentV01);
+
+    verifyIntervalHasUsedSegments(YEAR_23, segmentV01);
+    verifyIntervalHasVisibleSegments(YEAR_23, segmentV01);
+
+    final DataSegment segmentV1Q1 = createSegment(JAN_FEB_MAR_23, 
replaceLock.getVersion());
+    final DataSegment segmentV1Q2 = createSegment(APR_MAY_JUN_23, 
replaceLock.getVersion());
+    final DataSegment segmentV1Q3 = createSegment(JUL_AUG_SEP_23, 
replaceLock.getVersion());
+    final DataSegment segmentV1Q4 = createSegment(OCT_NOV_DEC_23, 
replaceLock.getVersion());
+
+    Assert.assertFalse(
+        replaceTask.commitReplaceSegments(segmentV1Q1, segmentV1Q2, 
segmentV1Q3, segmentV1Q4)
+                   .isSuccess()
+    );
+
+    verifyIntervalHasUsedSegments(YEAR_23, segmentV01);
+    verifyIntervalHasVisibleSegments(YEAR_23, segmentV01);
+  }
+
+  @Test
+  public void testLockAllocateReplaceQuarterAppendYear()
+  {
+    final TaskLock replaceLock = replaceTask.acquireReplaceLockOn(YEAR_23);
+    Assert.assertNotNull(replaceLock);
+
+    final SegmentIdWithShardSpec pendingSegment
+        = appendTask.allocateSegmentForTimestamp(YEAR_23.getStart(), 
Granularities.YEAR);
+    Assert.assertEquals(YEAR_23, pendingSegment.getInterval());
+    Assert.assertEquals(SEGMENT_V0, pendingSegment.getVersion());
+
+    final DataSegment segmentV1Q1 = createSegment(JAN_FEB_MAR_23, 
replaceLock.getVersion());
+    final DataSegment segmentV1Q2 = createSegment(APR_MAY_JUN_23, 
replaceLock.getVersion());
+    final DataSegment segmentV1Q3 = createSegment(JUL_AUG_SEP_23, 
replaceLock.getVersion());
+    final DataSegment segmentV1Q4 = createSegment(OCT_NOV_DEC_23, 
replaceLock.getVersion());
+
+    Assert.assertFalse(
+        replaceTask.commitReplaceSegments(segmentV1Q1, segmentV1Q2, 
segmentV1Q3, segmentV1Q4)
+                   .isSuccess()
+    );
+
+    final DataSegment segmentV01 = asSegment(pendingSegment);
+    appendTask.commitAppendSegments(segmentV01);
+
+    verifyIntervalHasUsedSegments(YEAR_23, segmentV01);
+    verifyIntervalHasVisibleSegments(YEAR_23, segmentV01);
+  }
+
+  @Test
+  public void testAllocateLockReplaceQuarterAppendYear()
+  {
+    final SegmentIdWithShardSpec pendingSegment
+        = appendTask.allocateSegmentForTimestamp(YEAR_23.getStart(), 
Granularities.YEAR);
+    Assert.assertEquals(YEAR_23, pendingSegment.getInterval());
+    Assert.assertEquals(SEGMENT_V0, pendingSegment.getVersion());
+
+    final TaskLock replaceLock = replaceTask.acquireReplaceLockOn(YEAR_23);
+    Assert.assertNotNull(replaceLock);
+
+    final DataSegment segmentV1Q1 = createSegment(JAN_FEB_MAR_23, 
replaceLock.getVersion());
+    final DataSegment segmentV1Q2 = createSegment(APR_MAY_JUN_23, 
replaceLock.getVersion());
+    final DataSegment segmentV1Q3 = createSegment(JUL_AUG_SEP_23, 
replaceLock.getVersion());
+    final DataSegment segmentV1Q4 = createSegment(OCT_NOV_DEC_23, 
replaceLock.getVersion());
+
+    Assert.assertFalse(
+        replaceTask.commitReplaceSegments(segmentV1Q1, segmentV1Q2, 
segmentV1Q3, segmentV1Q4)
+                   .isSuccess()
+    );
+
+    final DataSegment segmentV01 = asSegment(pendingSegment);
+    appendTask.commitAppendSegments(segmentV01);
+
+    verifyIntervalHasUsedSegments(YEAR_23, segmentV01);
+    verifyIntervalHasVisibleSegments(YEAR_23, segmentV01);
+  }
+
+  @Test
+  public void testAllocateLockAppendYearReplaceQuarter()
+  {
+    final SegmentIdWithShardSpec pendingSegment
+        = appendTask.allocateSegmentForTimestamp(YEAR_23.getStart(), 
Granularities.YEAR);
+    Assert.assertEquals(YEAR_23, pendingSegment.getInterval());
+    Assert.assertEquals(SEGMENT_V0, pendingSegment.getVersion());
+
+    final TaskLock replaceLock = replaceTask.acquireReplaceLockOn(YEAR_23);
+    Assert.assertNotNull(replaceLock);
+
+    final DataSegment segmentV01 = asSegment(pendingSegment);
+    appendTask.commitAppendSegments(segmentV01);
+
+    verifyIntervalHasUsedSegments(YEAR_23, segmentV01);
+    verifyIntervalHasVisibleSegments(YEAR_23, segmentV01);
+
+    final DataSegment segmentV1Q1 = createSegment(JAN_FEB_MAR_23, 
replaceLock.getVersion());
+    final DataSegment segmentV1Q2 = createSegment(APR_MAY_JUN_23, 
replaceLock.getVersion());
+    final DataSegment segmentV1Q3 = createSegment(JUL_AUG_SEP_23, 
replaceLock.getVersion());
+    final DataSegment segmentV1Q4 = createSegment(OCT_NOV_DEC_23, 
replaceLock.getVersion());
+
+    Assert.assertFalse(
+        replaceTask.commitReplaceSegments(segmentV1Q1, segmentV1Q2, 
segmentV1Q3, segmentV1Q4)
+                   .isSuccess()
+    );
+
+    verifyIntervalHasUsedSegments(YEAR_23, segmentV01);
+    verifyIntervalHasVisibleSegments(YEAR_23, segmentV01);
+  }
+
+  @Test
+  public void testAllocateAppendLockYearReplaceQuarter()
+  {
+    final SegmentIdWithShardSpec pendingSegment
+        = appendTask.allocateSegmentForTimestamp(YEAR_23.getStart(), 
Granularities.YEAR);
+    Assert.assertEquals(YEAR_23, pendingSegment.getInterval());
+    Assert.assertEquals(SEGMENT_V0, pendingSegment.getVersion());
+
+    final DataSegment segmentV01 = asSegment(pendingSegment);
+    appendTask.commitAppendSegments(segmentV01);
+
+    verifyIntervalHasUsedSegments(YEAR_23, segmentV01);
+    verifyIntervalHasVisibleSegments(YEAR_23, segmentV01);
+
+    final TaskLock replaceLock = replaceTask.acquireReplaceLockOn(YEAR_23);
+    Assert.assertNotNull(replaceLock);
+
+    final DataSegment segmentV1Q1 = createSegment(JAN_FEB_MAR_23, 
replaceLock.getVersion());
+    final DataSegment segmentV1Q2 = createSegment(APR_MAY_JUN_23, 
replaceLock.getVersion());
+    final DataSegment segmentV1Q3 = createSegment(JUL_AUG_SEP_23, 
replaceLock.getVersion());
+    final DataSegment segmentV1Q4 = createSegment(OCT_NOV_DEC_23, 
replaceLock.getVersion());
+
+    Assert.assertTrue(
+        replaceTask.commitReplaceSegments(segmentV1Q1, segmentV1Q2, 
segmentV1Q3, segmentV1Q4)
+                   .isSuccess()
+    );
+
+    verifyIntervalHasUsedSegments(YEAR_23, segmentV01, segmentV1Q1, 
segmentV1Q2, segmentV1Q3, segmentV1Q4);
+    verifyIntervalHasVisibleSegments(YEAR_23, segmentV1Q1, segmentV1Q2, 
segmentV1Q3, segmentV1Q4);
+  }
+
   @Test
   public void testAllocateAppendMonthLockReplaceDay()
   {
diff --git 
a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
 
b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
index 463232012ed..a512f793574 100644
--- 
a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
+++ 
b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
@@ -31,6 +31,7 @@ import com.google.common.collect.Lists;
 import com.google.common.hash.Hashing;
 import com.google.common.io.BaseEncoding;
 import com.google.inject.Inject;
+import org.apache.druid.error.DruidException;
 import org.apache.druid.error.InvalidInput;
 import org.apache.druid.indexing.overlord.DataSourceMetadata;
 import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
@@ -900,7 +901,15 @@ public class IndexerSQLMetadataStorageCoordinator 
implements IndexerMetadataStor
     } else if (pendingSegment.getId().getVersion().compareTo(replaceVersion) 
>= 0) {
       return false;
     } else if 
(!replaceInterval.contains(pendingSegment.getId().getInterval())) {
-      return false;
+      final SegmentId pendingSegmentId = pendingSegment.getId().asSegmentId();
+      throw DruidException.forPersona(DruidException.Persona.OPERATOR)
+                          .ofCategory(DruidException.Category.UNSUPPORTED)
+                          .build(
+                              "Replacing with a finer segment granularity than 
a concurrent append is unsupported."
+                              + " Cannot upgrade pendingSegment[%s] to 
version[%s] as the replace interval[%s]"
+                              + " does not fully contain the pendingSegment 
interval[%s].",
+                              pendingSegmentId, replaceVersion, 
replaceInterval, pendingSegmentId.getInterval()
+                          );
     } else {
       // Do not upgrade already upgraded pending segment
       return pendingSegment.getSequenceName() == null
@@ -2200,10 +2209,16 @@ public class IndexerSQLMetadataStorageCoordinator 
implements IndexerMetadataStor
           newInterval = replaceInterval;
           break;
         } else if (replaceInterval.overlaps(oldInterval)) {
-          throw new ISE(
-              "Incompatible segment intervals for commit: [%s] and [%s].",
-              oldInterval, replaceInterval
-          );
+          final String conflictingSegmentId = oldSegment.getId().toString();
+          final String upgradeVersion = 
upgradeSegmentToLockVersion.get(conflictingSegmentId);
+          throw DruidException.forPersona(DruidException.Persona.OPERATOR)
+                              .ofCategory(DruidException.Category.UNSUPPORTED)
+                              .build(
+                                  "Replacing with a finer segment granularity 
than a concurrent append is unsupported."
+                                  + " Cannot upgrade segment[%s] to 
version[%s] as the replace interval[%s]"
+                                  + " does not fully contain the pending 
segment interval[%s].",
+                                  conflictingSegmentId, upgradeVersion, 
replaceInterval, oldInterval
+                              );
         }
       }
 
diff --git 
a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
 
b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
index f82cfbf2a04..4b592e5f40d 100644
--- 
a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
+++ 
b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
@@ -319,6 +319,73 @@ public class IndexerSQLMetadataStorageCoordinatorTest 
extends IndexerSqlMetadata
     Assert.assertEquals(replaceLock.getVersion(), 
Iterables.getOnlyElement(observedLockVersions));
   }
 
+  @Test
+  public void 
testCommitReplaceSegments_partiallyOverlappingPendingSegmentUnsupported()
+  {
+    final ReplaceTaskLock replaceLock = new ReplaceTaskLock("g1", 
Intervals.of("2023-01-01/2023-02-01"), "2023-02-01");
+    final Set<DataSegment> segmentsAppendedWithReplaceLock = new HashSet<>();
+    final Map<DataSegment, ReplaceTaskLock> appendedSegmentToReplaceLockMap = 
new HashMap<>();
+    final PendingSegmentRecord pendingSegmentForInterval = new 
PendingSegmentRecord(
+        new SegmentIdWithShardSpec(
+            "foo",
+            Intervals.of("2023-01-01/2024-01-01"),
+            "2023-01-02",
+            new NumberedShardSpec(100, 0)
+        ),
+        "",
+        "",
+        null,
+        "append"
+    );
+    for (int i = 1; i < 9; i++) {
+      final DataSegment segment = new DataSegment(
+          "foo",
+          Intervals.of("2023-01-0" + i + "/2023-01-0" + (i + 1)),
+          "2023-01-0" + i,
+          ImmutableMap.of("path", "a-" + i),
+          ImmutableList.of("dim1"),
+          ImmutableList.of("m1"),
+          new LinearShardSpec(0),
+          9,
+          100
+      );
+      segmentsAppendedWithReplaceLock.add(segment);
+      appendedSegmentToReplaceLockMap.put(segment, replaceLock);
+    }
+
+    segmentSchemaTestUtils.insertUsedSegments(segmentsAppendedWithReplaceLock, 
Collections.emptyMap());
+    derbyConnector.retryWithHandle(
+        handle -> coordinator.insertPendingSegmentsIntoMetastore(
+            handle,
+            ImmutableList.of(pendingSegmentForInterval),
+            "foo",
+            true
+        )
+    );
+    insertIntoUpgradeSegmentsTable(appendedSegmentToReplaceLockMap, 
derbyConnectorRule.metadataTablesConfigSupplier().get());
+
+    final Set<DataSegment> replacingSegments = new HashSet<>();
+    for (int i = 1; i < 9; i++) {
+      final DataSegment segment = new DataSegment(
+          "foo",
+          Intervals.of("2023-01-01/2023-02-01"),
+          "2023-02-01",
+          ImmutableMap.of("path", "b-" + i),
+          ImmutableList.of("dim1"),
+          ImmutableList.of("m1"),
+          new NumberedShardSpec(i, 9),
+          9,
+          100
+      );
+      replacingSegments.add(segment);
+    }
+
+    Assert.assertFalse(
+        coordinator.commitReplaceSegments(replacingSegments, 
ImmutableSet.of(replaceLock), null)
+                   .isSuccess()
+    );
+  }
+
   @Test
   public void testCommitReplaceSegments()
   {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to