This is an automated email from the ASF dual-hosted git repository.

abhishek pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new feeb4f0fb03 Allocate pending segments at latest committed version 
(#15459)
feeb4f0fb03 is described below

commit feeb4f0fb03fce90e523c7e1c10e71a19478400c
Author: Kashif Faraz <[email protected]>
AuthorDate: Thu Dec 14 16:18:39 2023 +0530

    Allocate pending segments at latest committed version (#15459)
    
    The segment allocation algorithm reuses an already allocated pending 
segment if the new allocation request is made for the same parameters:
    
    datasource
    sequence name
    same interval
    same value of skipSegmentLineageCheck (false for batch append, true for 
streaming append)
    same previous segment id (used only when skipSegmentLineageCheck = false)
    The above parameters can thus uniquely identify a pending segment (enforced 
by the UNIQUE constraint on the sequence_name_prev_id_sha1 column in 
druid_pendingSegments metadata table).
    
    This reuse is done in order to
    
    allow replica tasks (in case of streaming ingestion) to use the same set of 
segment IDs.
    allow re-run of a failed batch task to use the same segment ID and prevent 
unnecessary allocations
---
 .../common/actions/SegmentAllocateActionTest.java  |  71 ++-
 .../IndexerSQLMetadataStorageCoordinator.java      | 522 ++++++++++++---------
 .../IndexerSQLMetadataStorageCoordinatorTest.java  |   2 -
 3 files changed, 365 insertions(+), 230 deletions(-)

diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentAllocateActionTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentAllocateActionTest.java
index 13c499e47e2..4ccb8707750 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentAllocateActionTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentAllocateActionTest.java
@@ -55,11 +55,11 @@ import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
-import org.junit.rules.ExpectedException;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
 import java.io.IOException;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -70,9 +70,6 @@ import java.util.stream.Collectors;
 @RunWith(Parameterized.class)
 public class SegmentAllocateActionTest
 {
-  @Rule
-  public ExpectedException thrown = ExpectedException.none();
-
   @Rule
   public TaskActionTestKit taskActionTestKit = new TaskActionTestKit();
 
@@ -403,6 +400,72 @@ public class SegmentAllocateActionTest
     assertSameIdentifier(id2, id7);
   }
 
+  @Test
+  public void testSegmentIsAllocatedForLatestUsedSegmentVersion() throws 
IOException
+  {
+    final Task task = NoopTask.create();
+    taskActionTestKit.getTaskLockbox().add(task);
+
+    final String sequenceName = "sequence_1";
+
+    // Allocate segments when there are no committed segments
+    final SegmentIdWithShardSpec pendingSegmentV01 =
+        allocate(task, PARTY_TIME, Granularities.NONE, Granularities.HOUR, 
sequenceName, null);
+    final SegmentIdWithShardSpec pendingSegmentV02 =
+        allocate(task, PARTY_TIME, Granularities.NONE, Granularities.HOUR, 
sequenceName, null);
+
+    assertSameIdentifier(pendingSegmentV01, pendingSegmentV02);
+
+    // Commit a segment for version V1
+    final DataSegment segmentV1
+        = DataSegment.builder()
+                     .dataSource(DATA_SOURCE)
+                     .interval(Granularities.HOUR.bucket(PARTY_TIME))
+                     .version(PARTY_TIME.plusDays(1).toString())
+                     .shardSpec(new LinearShardSpec(0))
+                     .size(100)
+                     .build();
+    taskActionTestKit.getMetadataStorageCoordinator().commitSegments(
+        Collections.singleton(segmentV1)
+    );
+
+    // Verify that new allocations use version V1
+    final SegmentIdWithShardSpec pendingSegmentV11 =
+        allocate(task, PARTY_TIME, Granularities.NONE, Granularities.HOUR, 
sequenceName, null);
+    final SegmentIdWithShardSpec pendingSegmentV12 =
+        allocate(task, PARTY_TIME, Granularities.NONE, Granularities.HOUR, 
sequenceName, null);
+
+    assertSameIdentifier(pendingSegmentV11, pendingSegmentV12);
+    Assert.assertEquals(segmentV1.getVersion(), 
pendingSegmentV11.getVersion());
+
+    Assert.assertNotEquals(pendingSegmentV01, pendingSegmentV11);
+
+    // Commit a segment for version V2 to overshadow V1
+    final DataSegment segmentV2
+        = DataSegment.builder()
+                     .dataSource(DATA_SOURCE)
+                     .interval(Granularities.HOUR.bucket(PARTY_TIME))
+                     .version(PARTY_TIME.plusDays(2).toString())
+                     .shardSpec(new LinearShardSpec(0))
+                     .size(100)
+                     .build();
+    taskActionTestKit.getMetadataStorageCoordinator().commitSegments(
+        Collections.singleton(segmentV2)
+    );
+    Assert.assertTrue(segmentV2.getVersion().compareTo(segmentV1.getVersion()) 
> 0);
+
+    // Verify that new segment allocations use version V2
+    final SegmentIdWithShardSpec pendingSegmentV21 =
+        allocate(task, PARTY_TIME, Granularities.NONE, Granularities.HOUR, 
sequenceName, null);
+    final SegmentIdWithShardSpec pendingSegmentV22 =
+        allocate(task, PARTY_TIME, Granularities.NONE, Granularities.HOUR, 
sequenceName, null);
+    assertSameIdentifier(pendingSegmentV21, pendingSegmentV22);
+    Assert.assertEquals(segmentV2.getVersion(), 
pendingSegmentV21.getVersion());
+
+    Assert.assertNotEquals(pendingSegmentV21, pendingSegmentV01);
+    Assert.assertNotEquals(pendingSegmentV21, pendingSegmentV11);
+  }
+
   @Test
   public void testMultipleSequences()
   {
diff --git 
a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
 
b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
index c62e59c0b25..9e4fc578eda 100644
--- 
a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
+++ 
b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
@@ -645,10 +645,23 @@ public class IndexerSQLMetadataStorageCoordinator 
implements IndexerMetadataStor
     Preconditions.checkNotNull(sequenceName, "sequenceName");
     Preconditions.checkNotNull(interval, "interval");
     Preconditions.checkNotNull(maxVersion, "version");
-    Interval allocateInterval = 
interval.withChronology(ISOChronology.getInstanceUTC());
+    final Interval allocateInterval = 
interval.withChronology(ISOChronology.getInstanceUTC());
 
     return connector.retryWithHandle(
         handle -> {
+          // Get the time chunk and associated data segments for the given 
interval, if any
+          final List<TimelineObjectHolder<String, DataSegment>> existingChunks 
=
+              getTimelineForIntervalsWithHandle(handle, dataSource, 
ImmutableList.of(interval))
+                  .lookup(interval);
+          if (existingChunks.size() > 1) {
+            // Not possible to expand more than one chunk with a single 
segment.
+            log.warn(
+                "Cannot allocate new segment for dataSource[%s], interval[%s] 
as it already has [%,d] versions.",
+                dataSource, interval, existingChunks.size()
+            );
+            return null;
+          }
+
           if (skipSegmentLineageCheck) {
             return allocatePendingSegment(
                 handle,
@@ -656,7 +669,8 @@ public class IndexerSQLMetadataStorageCoordinator 
implements IndexerMetadataStor
                 sequenceName,
                 allocateInterval,
                 partialShardSpec,
-                maxVersion
+                maxVersion,
+                existingChunks
             );
           } else {
             return allocatePendingSegmentWithSegmentLineageCheck(
@@ -666,7 +680,8 @@ public class IndexerSQLMetadataStorageCoordinator 
implements IndexerMetadataStor
                 previousSegmentId,
                 allocateInterval,
                 partialShardSpec,
-                maxVersion
+                maxVersion,
+                existingChunks
             );
           }
         }
@@ -803,26 +818,32 @@ public class IndexerSQLMetadataStorageCoordinator 
implements IndexerMetadataStor
       @Nullable final String previousSegmentId,
       final Interval interval,
       final PartialShardSpec partialShardSpec,
-      final String maxVersion
+      final String maxVersion,
+      final List<TimelineObjectHolder<String, DataSegment>> existingChunks
   ) throws IOException
   {
     final String previousSegmentIdNotNull = previousSegmentId == null ? "" : 
previousSegmentId;
-    final CheckExistingSegmentIdResult result = checkAndGetExistingSegmentId(
-        handle.createQuery(
-            StringUtils.format(
-                "SELECT payload FROM %s WHERE "
-                + "dataSource = :dataSource AND "
-                + "sequence_name = :sequence_name AND "
-                + "sequence_prev_id = :sequence_prev_id",
-                dbTables.getPendingSegmentsTable()
-            )
-        ),
+
+    final String sql = StringUtils.format(
+        "SELECT payload FROM %s WHERE "
+        + "dataSource = :dataSource AND "
+        + "sequence_name = :sequence_name AND "
+        + "sequence_prev_id = :sequence_prev_id",
+        dbTables.getPendingSegmentsTable()
+    );
+    final Query<Map<String, Object>> query
+        = handle.createQuery(sql)
+                .bind("dataSource", dataSource)
+                .bind("sequence_name", sequenceName)
+                .bind("sequence_prev_id", previousSegmentIdNotNull);
+
+    final String usedSegmentVersion = existingChunks.isEmpty() ? null : 
existingChunks.get(0).getVersion();
+    final CheckExistingSegmentIdResult result = findExistingPendingSegment(
+        query,
         interval,
         sequenceName,
         previousSegmentIdNotNull,
-        Pair.of("dataSource", dataSource),
-        Pair.of("sequence_name", sequenceName),
-        Pair.of("sequence_prev_id", previousSegmentIdNotNull)
+        usedSegmentVersion
     );
 
     if (result.found) {
@@ -835,7 +856,8 @@ public class IndexerSQLMetadataStorageCoordinator 
implements IndexerMetadataStor
         dataSource,
         interval,
         partialShardSpec,
-        maxVersion
+        maxVersion,
+        existingChunks
     );
     if (newIdentifier == null) {
       return null;
@@ -854,6 +876,8 @@ public class IndexerSQLMetadataStorageCoordinator 
implements IndexerMetadataStor
                .putBytes(StringUtils.toUtf8(sequenceName))
                .putByte((byte) 0xff)
                .putBytes(StringUtils.toUtf8(previousSegmentIdNotNull))
+               .putByte((byte) 0xff)
+               .putBytes(StringUtils.toUtf8(newIdentifier.getVersion()))
                .hash()
                .asBytes()
     );
@@ -878,11 +902,26 @@ public class IndexerSQLMetadataStorageCoordinator 
implements IndexerMetadataStor
       final List<SegmentCreateRequest> requests
   ) throws IOException
   {
+    // Get the time chunk and associated data segments for the given interval, 
if any
+    final List<TimelineObjectHolder<String, DataSegment>> existingChunks =
+        getTimelineForIntervalsWithHandle(handle, dataSource, 
Collections.singletonList(interval))
+            .lookup(interval);
+    if (existingChunks.size() > 1) {
+      log.warn(
+          "Cannot allocate new segments for dataSource[%s], interval[%s] as 
interval already has [%,d] chunks.",
+          dataSource, interval, existingChunks.size()
+      );
+      return Collections.emptyMap();
+    }
+
+    final String existingVersion = existingChunks.isEmpty() ? null : 
existingChunks.get(0).getVersion();
     final Map<SegmentCreateRequest, CheckExistingSegmentIdResult> 
existingSegmentIds;
     if (skipSegmentLineageCheck) {
-      existingSegmentIds = getExistingSegmentIdsSkipLineageCheck(handle, 
dataSource, interval, requests);
+      existingSegmentIds =
+          getExistingSegmentIdsSkipLineageCheck(handle, dataSource, interval, 
existingVersion, requests);
     } else {
-      existingSegmentIds = getExistingSegmentIdsWithLineageCheck(handle, 
dataSource, interval, requests);
+      existingSegmentIds =
+          getExistingSegmentIdsWithLineageCheck(handle, dataSource, interval, 
existingVersion, requests);
     }
 
     // For every request see if a segment id already exists
@@ -901,8 +940,14 @@ public class IndexerSQLMetadataStorageCoordinator 
implements IndexerMetadataStor
     }
 
     // For each of the remaining requests, create a new segment
-    final Map<SegmentCreateRequest, SegmentIdWithShardSpec> createdSegments =
-        createNewSegments(handle, dataSource, interval, 
skipSegmentLineageCheck, requestsForNewSegments);
+    final Map<SegmentCreateRequest, SegmentIdWithShardSpec> createdSegments = 
createNewSegments(
+        handle,
+        dataSource,
+        interval,
+        skipSegmentLineageCheck,
+        existingChunks,
+        requestsForNewSegments
+    );
 
     // SELECT -> INSERT can fail due to races; callers must be prepared to 
retry.
     // Avoiding ON DUPLICATE KEY since it's not portable.
@@ -925,14 +970,16 @@ public class IndexerSQLMetadataStorageCoordinator 
implements IndexerMetadataStor
   @SuppressWarnings("UnstableApiUsage")
   private String getSequenceNameAndPrevIdSha(
       SegmentCreateRequest request,
-      Interval interval,
+      SegmentIdWithShardSpec pendingSegmentId,
       boolean skipSegmentLineageCheck
   )
   {
     final Hasher hasher = Hashing.sha1().newHasher()
                                  
.putBytes(StringUtils.toUtf8(request.getSequenceName()))
                                  .putByte((byte) 0xff);
+
     if (skipSegmentLineageCheck) {
+      final Interval interval = pendingSegmentId.getInterval();
       hasher
           .putLong(interval.getStartMillis())
           .putLong(interval.getEndMillis());
@@ -941,6 +988,9 @@ public class IndexerSQLMetadataStorageCoordinator 
implements IndexerMetadataStor
           .putBytes(StringUtils.toUtf8(request.getPreviousSegmentId()));
     }
 
+    hasher.putByte((byte) 0xff);
+    hasher.putBytes(StringUtils.toUtf8(pendingSegmentId.getVersion()));
+
     return BaseEncoding.base16().encode(hasher.hash().asBytes());
   }
 
@@ -951,28 +1001,32 @@ public class IndexerSQLMetadataStorageCoordinator 
implements IndexerMetadataStor
       final String sequenceName,
       final Interval interval,
       final PartialShardSpec partialShardSpec,
-      final String maxVersion
+      final String maxVersion,
+      final List<TimelineObjectHolder<String, DataSegment>> existingChunks
   ) throws IOException
   {
-    final CheckExistingSegmentIdResult result = checkAndGetExistingSegmentId(
-        handle.createQuery(
-            StringUtils.format(
-                "SELECT payload FROM %s WHERE "
-                + "dataSource = :dataSource AND "
-                + "sequence_name = :sequence_name AND "
-                + "start = :start AND "
-                + "%2$send%2$s = :end",
-                dbTables.getPendingSegmentsTable(),
-                connector.getQuoteString()
-            )
-        ),
+    final String sql = StringUtils.format(
+        "SELECT payload FROM %s WHERE "
+        + "dataSource = :dataSource AND "
+        + "sequence_name = :sequence_name AND "
+        + "start = :start AND "
+        + "%2$send%2$s = :end",
+        dbTables.getPendingSegmentsTable(),
+        connector.getQuoteString()
+    );
+    final Query<Map<String, Object>> query
+        = handle.createQuery(sql)
+                .bind("dataSource", dataSource)
+                .bind("sequence_name", sequenceName)
+                .bind("start", interval.getStart().toString())
+                .bind("end", interval.getEnd().toString());
+
+    final CheckExistingSegmentIdResult result = findExistingPendingSegment(
+        query,
         interval,
         sequenceName,
         null,
-        Pair.of("dataSource", dataSource),
-        Pair.of("sequence_name", sequenceName),
-        Pair.of("start", interval.getStart().toString()),
-        Pair.of("end", interval.getEnd().toString())
+        existingChunks.isEmpty() ? null : existingChunks.get(0).getVersion()
     );
 
     if (result.found) {
@@ -984,7 +1038,8 @@ public class IndexerSQLMetadataStorageCoordinator 
implements IndexerMetadataStor
         dataSource,
         interval,
         partialShardSpec,
-        maxVersion
+        maxVersion,
+        existingChunks
     );
     if (newIdentifier == null) {
       return null;
@@ -1004,6 +1059,8 @@ public class IndexerSQLMetadataStorageCoordinator 
implements IndexerMetadataStor
                .putByte((byte) 0xff)
                .putLong(interval.getStartMillis())
                .putLong(interval.getEndMillis())
+               .putByte((byte) 0xff)
+               .putBytes(StringUtils.toUtf8(newIdentifier.getVersion()))
                .hash()
                .asBytes()
     );
@@ -1011,7 +1068,10 @@ public class IndexerSQLMetadataStorageCoordinator 
implements IndexerMetadataStor
     // always insert empty previous sequence id
     insertPendingSegmentIntoMetastore(handle, newIdentifier, dataSource, 
interval, "", sequenceName, sequenceNamePrevIdSha1);
 
-    log.info("Allocated pending segment [%s] for sequence[%s] in DB", 
newIdentifier, sequenceName);
+    log.info(
+        "Created new pending segment[%s] for datasource[%s], sequence[%s], 
interval[%s].",
+        newIdentifier, dataSource, sequenceName, interval
+    );
 
     return newIdentifier;
   }
@@ -1023,6 +1083,7 @@ public class IndexerSQLMetadataStorageCoordinator 
implements IndexerMetadataStor
       Handle handle,
       String dataSource,
       Interval interval,
+      String usedSegmentVersion,
       List<SegmentCreateRequest> requests
   ) throws IOException
   {
@@ -1052,7 +1113,11 @@ public class IndexerSQLMetadataStorageCoordinator 
implements IndexerMetadataStor
       final PendingSegmentsRecord record = dbSegments.next();
       final SegmentIdWithShardSpec segmentId =
           jsonMapper.readValue(record.getPayload(), 
SegmentIdWithShardSpec.class);
-      sequenceToSegmentId.put(record.getSequenceName(), segmentId);
+
+      // Consider only the pending segments allocated for the latest used 
segment version
+      if (usedSegmentVersion == null || 
segmentId.getVersion().equals(usedSegmentVersion)) {
+        sequenceToSegmentId.put(record.getSequenceName(), segmentId);
+      }
     }
 
     final Map<SegmentCreateRequest, CheckExistingSegmentIdResult> 
requestToResult = new HashMap<>();
@@ -1071,6 +1136,7 @@ public class IndexerSQLMetadataStorageCoordinator 
implements IndexerMetadataStor
       Handle handle,
       String dataSource,
       Interval interval,
+      String usedSegmentVersion,
       List<SegmentCreateRequest> requests
   ) throws IOException
   {
@@ -1090,14 +1156,15 @@ public class IndexerSQLMetadataStorageCoordinator 
implements IndexerMetadataStor
 
     final Map<SegmentCreateRequest, CheckExistingSegmentIdResult> 
requestToResult = new HashMap<>();
     for (SegmentCreateRequest request : requests) {
-      CheckExistingSegmentIdResult result = checkAndGetExistingSegmentId(
+      CheckExistingSegmentIdResult result = findExistingPendingSegment(
           handle.createQuery(sql)
                 .bind("dataSource", dataSource)
                 .bind("sequence_name", request.getSequenceName())
                 .bind("sequence_prev_id", request.getPreviousSegmentId()),
           interval,
           request.getSequenceName(),
-          request.getPreviousSegmentId()
+          request.getPreviousSegmentId(),
+          usedSegmentVersion
       );
       requestToResult.put(request, result);
     }
@@ -1105,50 +1172,43 @@ public class IndexerSQLMetadataStorageCoordinator 
implements IndexerMetadataStor
     return requestToResult;
   }
 
-  private CheckExistingSegmentIdResult checkAndGetExistingSegmentId(
+  private CheckExistingSegmentIdResult findExistingPendingSegment(
       final Query<Map<String, Object>> query,
       final Interval interval,
       final String sequenceName,
       final @Nullable String previousSegmentId,
-      final Pair<String, String>... queryVars
+      final @Nullable String usedSegmentVersion
   ) throws IOException
   {
-    Query<Map<String, Object>> boundQuery = query;
-    for (Pair<String, String> var : queryVars) {
-      boundQuery = boundQuery.bind(var.lhs, var.rhs);
-    }
-    final List<byte[]> existingBytes = 
boundQuery.map(ByteArrayMapper.FIRST).list();
-
-    if (existingBytes.isEmpty()) {
+    final List<byte[]> records = query.map(ByteArrayMapper.FIRST).list();
+    if (records.isEmpty()) {
       return new CheckExistingSegmentIdResult(false, null);
-    } else {
-      final SegmentIdWithShardSpec existingIdentifier = jsonMapper.readValue(
-          Iterables.getOnlyElement(existingBytes),
-          SegmentIdWithShardSpec.class
-      );
-
-      if (existingIdentifier.getInterval().isEqual(interval)) {
-        log.info(
-            "Found existing pending segment [%s] for sequence[%s] (previous = 
[%s]) in DB",
-            existingIdentifier,
-            sequenceName,
-            previousSegmentId
-        );
+    }
 
-        return new CheckExistingSegmentIdResult(true, existingIdentifier);
-      } else {
-        log.warn(
-            "Cannot use existing pending segment [%s] for sequence[%s] 
(previous = [%s]) in DB, "
-            + "does not match requested interval[%s]",
-            existingIdentifier,
-            sequenceName,
-            previousSegmentId,
-            interval
-        );
+    for (byte[] record : records) {
+      final SegmentIdWithShardSpec pendingSegment
+          = jsonMapper.readValue(record, SegmentIdWithShardSpec.class);
 
-        return new CheckExistingSegmentIdResult(true, null);
+      // Consider only pending segments matching the expected version
+      if (usedSegmentVersion == null || 
pendingSegment.getVersion().equals(usedSegmentVersion)) {
+        if (pendingSegment.getInterval().isEqual(interval)) {
+          log.info(
+              "Found existing pending segment[%s] for sequence[%s], previous 
segment[%s], version[%s] in DB",
+              pendingSegment, sequenceName, previousSegmentId, 
usedSegmentVersion
+          );
+          return new CheckExistingSegmentIdResult(true, pendingSegment);
+        } else {
+          log.warn(
+              "Cannot use existing pending segment [%s] for sequence[%s], 
previous segment[%s] in DB"
+              + " as it does not match requested interval[%s], version[%s].",
+              pendingSegment, sequenceName, previousSegmentId, interval, 
usedSegmentVersion
+          );
+          return new CheckExistingSegmentIdResult(true, null);
+        }
       }
     }
+
+    return new CheckExistingSegmentIdResult(false, null);
   }
 
   private static class CheckExistingSegmentIdResult
@@ -1164,6 +1224,52 @@ public class IndexerSQLMetadataStorageCoordinator 
implements IndexerMetadataStor
     }
   }
 
+  private static class UniqueAllocateRequest
+  {
+    private final Interval interval;
+    private final String previousSegmentId;
+    private final String sequenceName;
+    private final boolean skipSegmentLineageCheck;
+
+    private final int hashCode;
+
+    public UniqueAllocateRequest(
+        Interval interval,
+        SegmentCreateRequest request,
+        boolean skipSegmentLineageCheck
+    )
+    {
+      this.interval = interval;
+      this.sequenceName = request.getSequenceName();
+      this.previousSegmentId = request.getPreviousSegmentId();
+      this.skipSegmentLineageCheck = skipSegmentLineageCheck;
+
+      this.hashCode = Objects.hash(interval, sequenceName, previousSegmentId, 
skipSegmentLineageCheck);
+    }
+
+    @Override
+    public boolean equals(Object o)
+    {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+      UniqueAllocateRequest that = (UniqueAllocateRequest) o;
+      return skipSegmentLineageCheck == that.skipSegmentLineageCheck
+             && Objects.equals(interval, that.interval)
+             && Objects.equals(sequenceName, that.sequenceName)
+             && Objects.equals(previousSegmentId, that.previousSegmentId);
+    }
+
+    @Override
+    public int hashCode()
+    {
+      return hashCode;
+    }
+  }
+
   private SegmentPublishResult commitAppendSegmentsAndMetadataInTransaction(
       Set<DataSegment> appendSegments,
       Map<DataSegment, ReplaceTaskLock> appendSegmentToReplaceLock,
@@ -1264,7 +1370,7 @@ public class IndexerSQLMetadataStorageCoordinator 
implements IndexerMetadataStor
                  .bind("sequence_prev_id", request.getPreviousSegmentId())
                  .bind(
                      "sequence_name_prev_id_sha1",
-                     getSequenceNameAndPrevIdSha(request, interval, 
skipSegmentLineageCheck)
+                     getSequenceNameAndPrevIdSha(request, segmentId, 
skipSegmentLineageCheck)
                  )
                  .bind("payload", jsonMapper.writeValueAsBytes(segmentId));
     }
@@ -1480,6 +1586,7 @@ public class IndexerSQLMetadataStorageCoordinator 
implements IndexerMetadataStor
       String dataSource,
       Interval interval,
       boolean skipSegmentLineageCheck,
+      List<TimelineObjectHolder<String, DataSegment>> existingChunks,
       List<SegmentCreateRequest> requests
   ) throws IOException
   {
@@ -1487,22 +1594,6 @@ public class IndexerSQLMetadataStorageCoordinator 
implements IndexerMetadataStor
       return Collections.emptyMap();
     }
 
-    // Get the time chunk and associated data segments for the given interval, 
if any
-    final List<TimelineObjectHolder<String, DataSegment>> existingChunks =
-        getTimelineForIntervalsWithHandle(handle, dataSource, 
Collections.singletonList(interval))
-            .lookup(interval);
-
-    if (existingChunks.size() > 1) {
-      // Not possible to expand more than one chunk with a single segment.
-      log.warn(
-          "Cannot allocate new segments for dataSource[%s], interval[%s]: 
already have [%,d] chunks.",
-          dataSource,
-          interval,
-          existingChunks.size()
-      );
-      return Collections.emptyMap();
-    }
-
     // Shard spec of any of the requests (as they are all compatible) can be 
used to
     // identify existing shard specs that share partition space with the 
requested ones.
     final PartialShardSpec partialShardSpec = 
requests.get(0).getPartialShardSpec();
@@ -1542,15 +1633,16 @@ public class IndexerSQLMetadataStorageCoordinator 
implements IndexerMetadataStor
         new HashSet<>(getPendingSegmentsForIntervalWithHandle(handle, 
dataSource, interval).keySet());
 
     final Map<SegmentCreateRequest, SegmentIdWithShardSpec> createdSegments = 
new HashMap<>();
-    final Map<String, SegmentIdWithShardSpec> sequenceHashToSegment = new 
HashMap<>();
+    final Map<UniqueAllocateRequest, SegmentIdWithShardSpec> 
uniqueRequestToSegment = new HashMap<>();
 
     for (SegmentCreateRequest request : requests) {
       // Check if the required segment has already been created in this batch
-      final String sequenceHash = getSequenceNameAndPrevIdSha(request, 
interval, skipSegmentLineageCheck);
+      final UniqueAllocateRequest uniqueRequest =
+          new UniqueAllocateRequest(interval, request, 
skipSegmentLineageCheck);
 
       final SegmentIdWithShardSpec createdSegment;
-      if (sequenceHashToSegment.containsKey(sequenceHash)) {
-        createdSegment = sequenceHashToSegment.get(sequenceHash);
+      if (uniqueRequestToSegment.containsKey(uniqueRequest)) {
+        createdSegment = uniqueRequestToSegment.get(uniqueRequest);
       } else {
         createdSegment = createNewSegment(
             request,
@@ -1564,8 +1656,8 @@ public class IndexerSQLMetadataStorageCoordinator 
implements IndexerMetadataStor
         // Add to pendingSegments to consider for partitionId
         if (createdSegment != null) {
           pendingSegments.add(createdSegment);
-          sequenceHashToSegment.put(sequenceHash, createdSegment);
-          log.info("Created new segment [%s]", createdSegment);
+          uniqueRequestToSegment.put(uniqueRequest, createdSegment);
+          log.info("Created new segment[%s]", createdSegment);
         }
       }
 
@@ -1574,7 +1666,7 @@ public class IndexerSQLMetadataStorageCoordinator 
implements IndexerMetadataStor
       }
     }
 
-    log.info("Created [%d] new segments for [%d] allocate requests.", 
sequenceHashToSegment.size(), requests.size());
+    log.info("Created [%d] new segments for [%d] allocate requests.", 
uniqueRequestToSegment.size(), requests.size());
     return createdSegments;
   }
 
@@ -1694,140 +1786,122 @@ public class IndexerSQLMetadataStorageCoordinator 
implements IndexerMetadataStor
       final String dataSource,
       final Interval interval,
       final PartialShardSpec partialShardSpec,
-      final String existingVersion
+      final String existingVersion,
+      final List<TimelineObjectHolder<String, DataSegment>> existingChunks
   ) throws IOException
   {
-    // Get the time chunk and associated data segments for the given interval, 
if any
-    final List<TimelineObjectHolder<String, DataSegment>> existingChunks = 
getTimelineForIntervalsWithHandle(
-        handle,
-        dataSource,
-        ImmutableList.of(interval)
-    ).lookup(interval);
-
-    if (existingChunks.size() > 1) {
-      // Not possible to expand more than one chunk with a single segment.
-      log.warn(
-          "Cannot allocate new segment for dataSource[%s], interval[%s]: 
already have [%,d] chunks.",
-          dataSource,
-          interval,
-          existingChunks.size()
-      );
-      return null;
+    // max partitionId of published data segments which share the same 
partition space.
+    SegmentIdWithShardSpec committedMaxId = null;
 
+    @Nullable
+    final String versionOfExistingChunk;
+    if (existingChunks.isEmpty()) {
+      versionOfExistingChunk = null;
     } else {
-      // max partitionId of published data segments which share the same 
partition space.
-      SegmentIdWithShardSpec committedMaxId = null;
+      TimelineObjectHolder<String, DataSegment> existingHolder = 
Iterables.getOnlyElement(existingChunks);
+      versionOfExistingChunk = existingHolder.getVersion();
 
-      @Nullable
-      final String versionOfExistingChunk;
-      if (existingChunks.isEmpty()) {
-        versionOfExistingChunk = null;
-      } else {
-        TimelineObjectHolder<String, DataSegment> existingHolder = 
Iterables.getOnlyElement(existingChunks);
-        versionOfExistingChunk = existingHolder.getVersion();
-
-        // Don't use the stream API for performance.
-        for (DataSegment segment : FluentIterable
-            .from(existingHolder.getObject())
-            .transform(PartitionChunk::getObject)
-            // Here we check only the segments of the shardSpec which shares 
the same partition space with the given
-            // partialShardSpec. Note that OverwriteShardSpec doesn't share 
the partition space with others.
-            // See PartitionIds.
-            .filter(segment -> 
segment.getShardSpec().sharePartitionSpace(partialShardSpec))) {
-          if (committedMaxId == null
-              || committedMaxId.getShardSpec().getPartitionNum() < 
segment.getShardSpec().getPartitionNum()) {
-            committedMaxId = SegmentIdWithShardSpec.fromDataSegment(segment);
-          }
+      // Don't use the stream API for performance.
+      for (DataSegment segment : FluentIterable
+          .from(existingHolder.getObject())
+          .transform(PartitionChunk::getObject)
+          // Here we check only the segments of the shardSpec which shares the 
same partition space with the given
+          // partialShardSpec. Note that OverwriteShardSpec doesn't share the 
partition space with others.
+          // See PartitionIds.
+          .filter(segment -> 
segment.getShardSpec().sharePartitionSpace(partialShardSpec))) {
+        if (committedMaxId == null
+            || committedMaxId.getShardSpec().getPartitionNum() < 
segment.getShardSpec().getPartitionNum()) {
+          committedMaxId = SegmentIdWithShardSpec.fromDataSegment(segment);
         }
       }
+    }
 
 
-      // Fetch the pending segments for this interval to determine max 
partitionId
-      // across all shard specs (published + pending).
-      // A pending segment having a higher partitionId must also be considered
-      // to avoid clashes when inserting the pending segment created here.
-      final Set<SegmentIdWithShardSpec> pendings = new HashSet<>(
-          getPendingSegmentsForIntervalWithHandle(handle, dataSource, 
interval).keySet()
-      );
-      if (committedMaxId != null) {
-        pendings.add(committedMaxId);
-      }
+    // Fetch the pending segments for this interval to determine max 
partitionId
+    // across all shard specs (published + pending).
+    // A pending segment having a higher partitionId must also be considered
+    // to avoid clashes when inserting the pending segment created here.
+    final Set<SegmentIdWithShardSpec> pendings = new HashSet<>(
+        getPendingSegmentsForIntervalWithHandle(handle, dataSource, 
interval).keySet()
+    );
+    if (committedMaxId != null) {
+      pendings.add(committedMaxId);
+    }
 
-      // If there is an existing chunk, find the max id with the same version 
as the existing chunk.
-      // There may still be a pending segment with a higher version (but no 
corresponding used segments)
-      // which may generate a clash with an existing segment once the new id 
is generated
-      final SegmentIdWithShardSpec overallMaxId;
-      overallMaxId = pendings.stream()
-                             .filter(id -> 
id.getShardSpec().sharePartitionSpace(partialShardSpec))
-                             .filter(id -> versionOfExistingChunk == null
-                                           || 
id.getVersion().equals(versionOfExistingChunk))
-                             
.max(Comparator.comparing(SegmentIdWithShardSpec::getVersion)
-                                            .thenComparing(id -> 
id.getShardSpec().getPartitionNum()))
-                             .orElse(null);
-
-
-      // Determine the version of the new segment
-      final String newSegmentVersion;
-      if (versionOfExistingChunk != null) {
-        newSegmentVersion = versionOfExistingChunk;
-      } else if (overallMaxId != null) {
-        newSegmentVersion = overallMaxId.getVersion();
-      } else {
-        // this is the first segment for this interval
-        newSegmentVersion = null;
-      }
+    // If there is an existing chunk, find the max id with the same version as 
the existing chunk.
+    // There may still be a pending segment with a higher version (but no 
corresponding used segments)
+    // which may generate a clash with an existing segment once the new id is 
generated
+    final SegmentIdWithShardSpec overallMaxId;
+    overallMaxId = pendings.stream()
+                           .filter(id -> 
id.getShardSpec().sharePartitionSpace(partialShardSpec))
+                           .filter(id -> versionOfExistingChunk == null
+                                         || 
id.getVersion().equals(versionOfExistingChunk))
+                           
.max(Comparator.comparing(SegmentIdWithShardSpec::getVersion)
+                                          .thenComparing(id -> 
id.getShardSpec().getPartitionNum()))
+                           .orElse(null);
 
-      if (overallMaxId == null) {
-        // When appending segments, null overallMaxId means that we are 
allocating the very initial
-        // segment for this time chunk.
-        // This code is executed when the Overlord coordinates segment 
allocation, which is either you append segments
-        // or you use segment lock. Since the core partitions set is not 
determined for appended segments, we set
-        // it 0. When you use segment lock, the core partitions set doesn't 
work with it. We simply set it 0 so that the
-        // OvershadowableManager handles the atomic segment update.
-        final int newPartitionId = 
partialShardSpec.useNonRootGenerationPartitionSpace()
-                                   ? 
PartitionIds.NON_ROOT_GEN_START_PARTITION_ID
-                                   : PartitionIds.ROOT_GEN_START_PARTITION_ID;
-        String version = newSegmentVersion == null ? existingVersion : 
newSegmentVersion;
-        return new SegmentIdWithShardSpec(
-            dataSource,
-            interval,
-            version,
-            partialShardSpec.complete(jsonMapper, newPartitionId, 0)
-        );
-      } else if (!overallMaxId.getInterval().equals(interval)) {
-        log.warn(
-            "Cannot allocate new segment for dataSource[%s], interval[%s], 
existingVersion[%s]: conflicting segment[%s].",
-            dataSource,
-            interval,
-            existingVersion,
-            overallMaxId
-        );
-        return null;
-      } else if (committedMaxId != null
-                 && committedMaxId.getShardSpec().getNumCorePartitions()
-                    == SingleDimensionShardSpec.UNKNOWN_NUM_CORE_PARTITIONS) {
-        log.warn(
-            "Cannot allocate new segment because of unknown core partition 
size of segment[%s], shardSpec[%s]",
-            committedMaxId,
-            committedMaxId.getShardSpec()
-        );
-        return null;
-      } else {
-        // The number of core partitions must always be chosen from the set of 
used segments in the SegmentTimeline.
-        // When the core partitions have been dropped, using pending segments 
may lead to an incorrect state
-        // where the chunk is believed to have core partitions and queries 
results are incorrect.
 
-        return new SegmentIdWithShardSpec(
-            dataSource,
-            interval,
-            Preconditions.checkNotNull(newSegmentVersion, "newSegmentVersion"),
-            partialShardSpec.complete(
-                jsonMapper,
-                overallMaxId.getShardSpec().getPartitionNum() + 1,
-                committedMaxId == null ? 0 : 
committedMaxId.getShardSpec().getNumCorePartitions()
-            )
-        );
-      }
+    // Determine the version of the new segment
+    final String newSegmentVersion;
+    if (versionOfExistingChunk != null) {
+      newSegmentVersion = versionOfExistingChunk;
+    } else if (overallMaxId != null) {
+      newSegmentVersion = overallMaxId.getVersion();
+    } else {
+      // this is the first segment for this interval
+      newSegmentVersion = null;
+    }
+
+    if (overallMaxId == null) {
+      // When appending segments, null overallMaxId means that we are 
allocating the very initial
+      // segment for this time chunk.
+      // This code is executed when the Overlord coordinates segment 
allocation, which is either you append segments
+      // or you use segment lock. Since the core partitions set is not 
determined for appended segments, we set
+      // it 0. When you use segment lock, the core partitions set doesn't work 
with it. We simply set it 0 so that the
+      // OvershadowableManager handles the atomic segment update.
+      final int newPartitionId = 
partialShardSpec.useNonRootGenerationPartitionSpace()
+                                 ? PartitionIds.NON_ROOT_GEN_START_PARTITION_ID
+                                 : PartitionIds.ROOT_GEN_START_PARTITION_ID;
+      String version = newSegmentVersion == null ? existingVersion : 
newSegmentVersion;
+      return new SegmentIdWithShardSpec(
+          dataSource,
+          interval,
+          version,
+          partialShardSpec.complete(jsonMapper, newPartitionId, 0)
+      );
+    } else if (!overallMaxId.getInterval().equals(interval)) {
+      log.warn(
+          "Cannot allocate new segment for dataSource[%s], interval[%s], 
existingVersion[%s]: conflicting segment[%s].",
+          dataSource,
+          interval,
+          existingVersion,
+          overallMaxId
+      );
+      return null;
+    } else if (committedMaxId != null
+               && committedMaxId.getShardSpec().getNumCorePartitions()
+                  == SingleDimensionShardSpec.UNKNOWN_NUM_CORE_PARTITIONS) {
+      log.warn(
+          "Cannot allocate new segment because of unknown core partition size 
of segment[%s], shardSpec[%s]",
+          committedMaxId,
+          committedMaxId.getShardSpec()
+      );
+      return null;
+    } else {
+      // The number of core partitions must always be chosen from the set of 
used segments in the SegmentTimeline.
+      // When the core partitions have been dropped, using pending segments 
may lead to an incorrect state
+      // where the chunk is believed to have core partitions and queries 
results are incorrect.
+
+      return new SegmentIdWithShardSpec(
+          dataSource,
+          interval,
+          Preconditions.checkNotNull(newSegmentVersion, "newSegmentVersion"),
+          partialShardSpec.complete(
+              jsonMapper,
+              overallMaxId.getShardSpec().getPartitionNum() + 1,
+              committedMaxId == null ? 0 : 
committedMaxId.getShardSpec().getNumCorePartitions()
+          )
+      );
     }
   }
 
diff --git 
a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
 
b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
index 9e977dec3e8..4ee72e74f92 100644
--- 
a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
+++ 
b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
@@ -2078,7 +2078,6 @@ public class IndexerSQLMetadataStorageCoordinatorTest
    * - verify that the id for segment5 is correct
    * - Later, after the above was dropped, another segment on same interval 
was created by the stream but this
    * time there was an integrity violation in the pending segments table 
because the
-   * {@link IndexerSQLMetadataStorageCoordinator#createNewSegment(Handle, 
String, Interval, PartialShardSpec, String)}
    * method returned a segment id that already existed in the pending segments 
table
    */
   @Test
@@ -2178,7 +2177,6 @@ public class IndexerSQLMetadataStorageCoordinatorTest
     
Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_version_new_2",
 identifier4.toString());
     // Since all core partitions have been dropped
     Assert.assertEquals(0, identifier4.getShardSpec().getNumCorePartitions());
-
   }
 
   /**


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to