This is an automated email from the ASF dual-hosted git repository. xbli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 64b901f426 update upsert TTL watermark in replaceSegment too (#14147) 64b901f426 is described below commit 64b901f42668017e28d5be2bca43b90b2e747a19 Author: Xiaobing <61892277+klsi...@users.noreply.github.com> AuthorDate: Wed Oct 2 15:29:37 2024 -0700 update upsert TTL watermark in replaceSegment too (#14147) * update upsert TTL watermark in replaceSegment too --- .../upsert/BasePartitionUpsertMetadataManager.java | 53 +++++++++++++--------- ...rrentMapPartitionUpsertMetadataManagerTest.java | 2 +- 2 files changed, 32 insertions(+), 23 deletions(-) diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java index 9a868b34c5..0c72f26114 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java @@ -385,9 +385,6 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps Preconditions.checkArgument(segment instanceof ImmutableSegmentImpl, "Got unsupported segment implementation: %s for segment: %s, table: %s", segment.getClass(), segmentName, _tableNameWithType); - if (isTTLEnabled()) { - updateWatermark(segment); - } if (!startOperation()) { _logger.info("Skip adding segment: {} because metadata manager is already stopped", segment.getSegmentName()); return; @@ -413,11 +410,20 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps return _metadataTTL > 0 || _deletedKeysTTL > 0; } + protected double getMaxComparisonValue(IndexSegment segment) { + return ((Number) segment.getSegmentMetadata().getColumnMetadataMap().get(_comparisonColumns.get(0)) + .getMaxValue()).doubleValue(); + } + + protected boolean isOutOfMetadataTTL(double maxComparisonValue) { + return _metadataTTL > 0 && _largestSeenComparisonValue.get() != TTL_WATERMARK_NOT_SET + && maxComparisonValue < _largestSeenComparisonValue.get() - _metadataTTL; + } + protected boolean isOutOfMetadataTTL(IndexSegment segment) { - if (_metadataTTL > 0 && _largestSeenComparisonValue.get() != TTL_WATERMARK_NOT_SET) { - Number maxComparisonValue = - (Number) segment.getSegmentMetadata().getColumnMetadataMap().get(_comparisonColumns.get(0)).getMaxValue(); - return maxComparisonValue.doubleValue() < _largestSeenComparisonValue.get() - _metadataTTL; + if (_metadataTTL > 0) { + double maxComparisonValue = getMaxComparisonValue(segment); + return isOutOfMetadataTTL(maxComparisonValue); } return false; } @@ -451,8 +457,12 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps protected void doAddSegment(ImmutableSegmentImpl segment) { String segmentName = segment.getSegmentName(); _logger.info("Adding segment: {}, current primary key count: {}", segmentName, getNumPrimaryKeys()); - if (isOutOfMetadataTTL(segment) && skipAddSegmentOutOfTTL(segment)) { - return; + if (isTTLEnabled()) { + double maxComparisonValue = getMaxComparisonValue(segment); + _largestSeenComparisonValue.getAndUpdate(v -> Math.max(v, maxComparisonValue)); + if (isOutOfMetadataTTL(maxComparisonValue) && skipAddSegmentOutOfTTL(segment)) { + return; + } } long startTimeMs = System.currentTimeMillis(); if (!_enableSnapshot) { @@ -495,9 +505,6 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps Preconditions.checkArgument(segment instanceof ImmutableSegmentImpl, "Got unsupported segment implementation: %s for segment: %s, table: %s", segment.getClass(), segmentName, _tableNameWithType); - if (isTTLEnabled()) { - updateWatermark(segment); - } if (!startOperation()) { _logger.info("Skip preloading segment: {} because metadata manager is already stopped", segmentName); return; @@ -527,8 +534,12 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps segment.enableUpsert(this, new ThreadSafeMutableRoaringBitmap(), null); return; } - if (isOutOfMetadataTTL(segment) && skipPreloadSegmentOutOfTTL(segment, validDocIds)) { - return; + if (isTTLEnabled()) { + double maxComparisonValue = getMaxComparisonValue(segment); + _largestSeenComparisonValue.getAndUpdate(v -> Math.max(v, maxComparisonValue)); + if (isOutOfMetadataTTL(maxComparisonValue) && skipPreloadSegmentOutOfTTL(segment, validDocIds)) { + return; + } } try (UpsertUtils.RecordInfoReader recordInfoReader = new UpsertUtils.RecordInfoReader(segment, _primaryKeyColumns, _comparisonColumns, _deleteRecordColumn)) { @@ -701,7 +712,12 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps replaceSegment(segment, null, null, null, oldSegment); return; } - + if (isTTLEnabled()) { + double maxComparisonValue = getMaxComparisonValue(segment); + _largestSeenComparisonValue.getAndUpdate(v -> Math.max(v, maxComparisonValue)); + // Segment might be uploaded directly to the table to replace an old segment. So update the TTL watermark but + // we can't skip segment even if it's out of TTL as its validDocIds bitmap is not updated yet. + } try (UpsertUtils.RecordInfoReader recordInfoReader = new UpsertUtils.RecordInfoReader(segment, _primaryKeyColumns, _comparisonColumns, _deleteRecordColumn)) { Iterator<RecordInfo> recordInfoIterator = @@ -1031,13 +1047,6 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps return new File(_tableIndexDir, V1Constants.TTL_WATERMARK_TABLE_PARTITION + _partitionId); } - protected void updateWatermark(ImmutableSegment segment) { - double maxComparisonValue = - ((Number) segment.getSegmentMetadata().getColumnMetadataMap().get(_comparisonColumns.get(0)) - .getMaxValue()).doubleValue(); - _largestSeenComparisonValue.getAndUpdate(v -> Math.max(v, maxComparisonValue)); - } - @VisibleForTesting double getWatermark() { return _largestSeenComparisonValue.get(); diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java index 90c03bb5de..b29fd68559 100644 --- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java @@ -238,7 +238,7 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest { ImmutableSegmentImpl segment = mockImmutableSegmentWithEndTime(1, new ThreadSafeMutableRoaringBitmap(), null, new ArrayList<>(), COMPARISON_COLUMNS, new Double(currentTimeMs + 1024), new MutableRoaringBitmap()); - upsertMetadataManager.updateWatermark(segment); + upsertMetadataManager.setWatermark(currentTimeMs + 1024); assertEquals(upsertMetadataManager.getWatermark(), currentTimeMs + 1024); // Stop the metadata manager --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org