imply-cheddar commented on code in PR #18230:
URL: https://github.com/apache/druid/pull/18230#discussion_r2198886888
##########
indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndAppendTest.java:
##########
@@ -1141,8 +1141,10 @@ public void
test_concurrentAppend_toIntervalWithUnusedSegments()
// Allocate and commit another APPEND segment
final SegmentIdWithShardSpec pendingSegment2
= appendTask.allocateSegmentForTimestamp(FIRST_OF_JAN_23.getStart(),
Granularities.DAY);
- Assert.assertEquals(SEGMENT_V0, pendingSegment2.getVersion());
- Assert.assertEquals(1, pendingSegment2.getShardSpec().getPartitionNum());
+
+ // Verify that the new segment gets a different version
+ Assert.assertEquals(SEGMENT_V0 + "S", pendingSegment2.getVersion());
+ Assert.assertEquals(0, pendingSegment2.getShardSpec().getPartitionNum());
Review Comment:
It would probably also be good to validate the "keeps appending" behavior in
the tests as well (i.e. if one is created, marked unused, a new one created,
marked unused, created again, marked unused, etc. it should just build mroe and
more 'S's)
##########
server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java:
##########
@@ -1590,18 +1591,86 @@ private SegmentIdWithShardSpec createNewPendingSegment(
committedMaxId == null ? 0 :
committedMaxId.getShardSpec().getNumCorePartitions()
)
);
- return getTrueAllocatedId(transaction, allocatedId);
+ return getUniqueIdForSecondaryAllocation(transaction, allocatedId);
}
}
/**
- * Verifies that the allocated id doesn't already exist in the
druid_segments table.
- * If yes, try to get the max unallocated id considering the unused segments
for the datasource, version and interval
- * Otherwise, use the same id.
- * @param allocatedId The segment allcoted on the basis of used and pending
segments
- * @return a segment id that isn't already used by other unused segments
+ * Returns a unique {@link SegmentIdWithShardSpec} which does not clash with
+ * any existing unused segment. If an unused segment already exists that
matches
+ * the interval and version of the given {@code allocatedId}, a fresh version
+ * is created by suffixing one or more {@link
PendingSegmentRecord#CONCURRENT_APPEND_VERSION_SUFFIX}.
+ * Such a conflict can happen only if all the segments in this interval
created
+ * by a prior APPEND task were marked as unused.
+ * <p>
+ * This method should be called only when allocating the first segment in an
interval.
+ */
+ private SegmentIdWithShardSpec getUniqueIdForPrimaryAllocation(
+ SegmentMetadataTransaction transaction,
+ SegmentIdWithShardSpec allocatedId
+ )
+ {
+ // Get all the unused segment versions for this datasource and interval
+ final Set<String> unusedSegmentVersions =
transaction.noCacheSql().retrieveUnusedSegmentVersionsWithInterval(
+ allocatedId.getDataSource(),
+ allocatedId.getInterval()
+ );
+
+ final String allocatedVersion = allocatedId.getVersion();
+ if (!unusedSegmentVersions.contains(allocatedVersion)) {
+ // Nothing to do, this version is new
+ return allocatedId;
+ } else if
(!PendingSegmentRecord.DEFAULT_VERSION_FOR_CONCURRENT_APPEND.equals(allocatedVersion))
{
+ // Version clash should never happen for non-APPEND locks
+ throw DruidException.defensive(
+ "Cannot allocate segment[%s] as there are already some unused
segments"
+ + " for version[%s] in this interval.",
+ allocatedId, allocatedVersion
+ );
+ }
+
+ // Iterate until a new non-clashing version is found
+ boolean foundFreshVersion = false;
+ StringBuilder candidateVersion = new StringBuilder(
+ allocatedId.getVersion() +
PendingSegmentRecord.CONCURRENT_APPEND_VERSION_SUFFIX
+ );
+ for (int i = 0; i < 10; ++i) {
+ if (unusedSegmentVersions.contains(candidateVersion.toString())) {
+
candidateVersion.append(PendingSegmentRecord.CONCURRENT_APPEND_VERSION_SUFFIX);
+ } else {
+ foundFreshVersion = true;
+ break;
+ }
+ }
+
+ if (foundFreshVersion) {
+ return new SegmentIdWithShardSpec(
Review Comment:
This is probably such a seldom-done thing that an INFO log indicating that a
new version was found and what it is shouldn't flood the logs, but also should
provide decent context on what's going on in cases where this ends up getting
called/run unexpectedly.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]