imply-cheddar commented on code in PR #18230:
URL: https://github.com/apache/druid/pull/18230#discussion_r2198886888


##########
indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndAppendTest.java:
##########
@@ -1141,8 +1141,10 @@ public void 
test_concurrentAppend_toIntervalWithUnusedSegments()
     // Allocate and commit another APPEND segment
     final SegmentIdWithShardSpec pendingSegment2
         = appendTask.allocateSegmentForTimestamp(FIRST_OF_JAN_23.getStart(), 
Granularities.DAY);
-    Assert.assertEquals(SEGMENT_V0, pendingSegment2.getVersion());
-    Assert.assertEquals(1, pendingSegment2.getShardSpec().getPartitionNum());
+
+    // Verify that the new segment gets a different version
+    Assert.assertEquals(SEGMENT_V0 + "S", pendingSegment2.getVersion());
+    Assert.assertEquals(0, pendingSegment2.getShardSpec().getPartitionNum());

Review Comment:
   It would probably also be good to validate the "keeps appending" behavior in 
the tests as well (i.e. if one is created, marked unused, a new one created, 
marked unused, created again, marked unused, etc. it should just build mroe and 
more 'S's)



##########
server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java:
##########
@@ -1590,18 +1591,86 @@ private SegmentIdWithShardSpec createNewPendingSegment(
               committedMaxId == null ? 0 : 
committedMaxId.getShardSpec().getNumCorePartitions()
           )
       );
-      return getTrueAllocatedId(transaction, allocatedId);
+      return getUniqueIdForSecondaryAllocation(transaction, allocatedId);
     }
   }
 
   /**
-   * Verifies that the allocated id doesn't already exist in the 
druid_segments table.
-   * If yes, try to get the max unallocated id considering the unused segments 
for the datasource, version and interval
-   * Otherwise, use the same id.
-   * @param allocatedId The segment allcoted on the basis of used and pending 
segments
-   * @return a segment id that isn't already used by other unused segments
+   * Returns a unique {@link SegmentIdWithShardSpec} which does not clash with
+   * any existing unused segment. If an unused segment already exists that 
matches
+   * the interval and version of the given {@code allocatedId}, a fresh version
+   * is created by suffixing one or more {@link 
PendingSegmentRecord#CONCURRENT_APPEND_VERSION_SUFFIX}.
+   * Such a conflict can happen only if all the segments in this interval 
created
+   * by a prior APPEND task were marked as unused.
+   * <p>
+   * This method should be called only when allocating the first segment in an 
interval.
+   */
+  private SegmentIdWithShardSpec getUniqueIdForPrimaryAllocation(
+      SegmentMetadataTransaction transaction,
+      SegmentIdWithShardSpec allocatedId
+  )
+  {
+    // Get all the unused segment versions for this datasource and interval
+    final Set<String> unusedSegmentVersions = 
transaction.noCacheSql().retrieveUnusedSegmentVersionsWithInterval(
+        allocatedId.getDataSource(),
+        allocatedId.getInterval()
+    );
+
+    final String allocatedVersion = allocatedId.getVersion();
+    if (!unusedSegmentVersions.contains(allocatedVersion)) {
+      // Nothing to do, this version is new
+      return allocatedId;
+    } else if 
(!PendingSegmentRecord.DEFAULT_VERSION_FOR_CONCURRENT_APPEND.equals(allocatedVersion))
 {
+      // Version clash should never happen for non-APPEND locks
+      throw DruidException.defensive(
+          "Cannot allocate segment[%s] as there are already some unused 
segments"
+          + " for version[%s] in this interval.",
+          allocatedId, allocatedVersion
+      );
+    }
+
+    // Iterate until a new non-clashing version is found
+    boolean foundFreshVersion = false;
+    StringBuilder candidateVersion = new StringBuilder(
+        allocatedId.getVersion() + 
PendingSegmentRecord.CONCURRENT_APPEND_VERSION_SUFFIX
+    );
+    for (int i = 0; i < 10; ++i) {
+      if (unusedSegmentVersions.contains(candidateVersion.toString())) {
+        
candidateVersion.append(PendingSegmentRecord.CONCURRENT_APPEND_VERSION_SUFFIX);
+      } else {
+        foundFreshVersion = true;
+        break;
+      }
+    }
+
+    if (foundFreshVersion) {
+      return new SegmentIdWithShardSpec(

Review Comment:
   This is probably such a seldom-done thing that an INFO log indicating that a 
new version was found and what it is shouldn't flood the logs, but also should 
provide decent context on what's going on in cases where this ends up getting 
called/run unexpectedly.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to