This is an automated email from the ASF dual-hosted git repository.

xbli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 64b901f426 update upsert TTL watermark in replaceSegment too (#14147)
64b901f426 is described below

commit 64b901f42668017e28d5be2bca43b90b2e747a19
Author: Xiaobing <61892277+klsi...@users.noreply.github.com>
AuthorDate: Wed Oct 2 15:29:37 2024 -0700

    update upsert TTL watermark in replaceSegment too (#14147)
    
    * update upsert TTL watermark in replaceSegment too
---
 .../upsert/BasePartitionUpsertMetadataManager.java | 53 +++++++++++++---------
 ...rrentMapPartitionUpsertMetadataManagerTest.java |  2 +-
 2 files changed, 32 insertions(+), 23 deletions(-)

diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
index 9a868b34c5..0c72f26114 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
@@ -385,9 +385,6 @@ public abstract class BasePartitionUpsertMetadataManager 
implements PartitionUps
     Preconditions.checkArgument(segment instanceof ImmutableSegmentImpl,
         "Got unsupported segment implementation: %s for segment: %s, table: 
%s", segment.getClass(), segmentName,
         _tableNameWithType);
-    if (isTTLEnabled()) {
-      updateWatermark(segment);
-    }
     if (!startOperation()) {
       _logger.info("Skip adding segment: {} because metadata manager is 
already stopped", segment.getSegmentName());
       return;
@@ -413,11 +410,20 @@ public abstract class BasePartitionUpsertMetadataManager 
implements PartitionUps
     return _metadataTTL > 0 || _deletedKeysTTL > 0;
   }
 
+  protected double getMaxComparisonValue(IndexSegment segment) {
+    return ((Number) 
segment.getSegmentMetadata().getColumnMetadataMap().get(_comparisonColumns.get(0))
+        .getMaxValue()).doubleValue();
+  }
+
+  protected boolean isOutOfMetadataTTL(double maxComparisonValue) {
+    return _metadataTTL > 0 && _largestSeenComparisonValue.get() != 
TTL_WATERMARK_NOT_SET
+        && maxComparisonValue < _largestSeenComparisonValue.get() - 
_metadataTTL;
+  }
+
   protected boolean isOutOfMetadataTTL(IndexSegment segment) {
-    if (_metadataTTL > 0 && _largestSeenComparisonValue.get() != 
TTL_WATERMARK_NOT_SET) {
-      Number maxComparisonValue =
-          (Number) 
segment.getSegmentMetadata().getColumnMetadataMap().get(_comparisonColumns.get(0)).getMaxValue();
-      return maxComparisonValue.doubleValue() < 
_largestSeenComparisonValue.get() - _metadataTTL;
+    if (_metadataTTL > 0) {
+      double maxComparisonValue = getMaxComparisonValue(segment);
+      return isOutOfMetadataTTL(maxComparisonValue);
     }
     return false;
   }
@@ -451,8 +457,12 @@ public abstract class BasePartitionUpsertMetadataManager 
implements PartitionUps
   protected void doAddSegment(ImmutableSegmentImpl segment) {
     String segmentName = segment.getSegmentName();
     _logger.info("Adding segment: {}, current primary key count: {}", 
segmentName, getNumPrimaryKeys());
-    if (isOutOfMetadataTTL(segment) && skipAddSegmentOutOfTTL(segment)) {
-      return;
+    if (isTTLEnabled()) {
+      double maxComparisonValue = getMaxComparisonValue(segment);
+      _largestSeenComparisonValue.getAndUpdate(v -> Math.max(v, 
maxComparisonValue));
+      if (isOutOfMetadataTTL(maxComparisonValue) && 
skipAddSegmentOutOfTTL(segment)) {
+        return;
+      }
     }
     long startTimeMs = System.currentTimeMillis();
     if (!_enableSnapshot) {
@@ -495,9 +505,6 @@ public abstract class BasePartitionUpsertMetadataManager 
implements PartitionUps
     Preconditions.checkArgument(segment instanceof ImmutableSegmentImpl,
         "Got unsupported segment implementation: %s for segment: %s, table: 
%s", segment.getClass(), segmentName,
         _tableNameWithType);
-    if (isTTLEnabled()) {
-      updateWatermark(segment);
-    }
     if (!startOperation()) {
       _logger.info("Skip preloading segment: {} because metadata manager is 
already stopped", segmentName);
       return;
@@ -527,8 +534,12 @@ public abstract class BasePartitionUpsertMetadataManager 
implements PartitionUps
       segment.enableUpsert(this, new ThreadSafeMutableRoaringBitmap(), null);
       return;
     }
-    if (isOutOfMetadataTTL(segment) && skipPreloadSegmentOutOfTTL(segment, 
validDocIds)) {
-      return;
+    if (isTTLEnabled()) {
+      double maxComparisonValue = getMaxComparisonValue(segment);
+      _largestSeenComparisonValue.getAndUpdate(v -> Math.max(v, 
maxComparisonValue));
+      if (isOutOfMetadataTTL(maxComparisonValue) && 
skipPreloadSegmentOutOfTTL(segment, validDocIds)) {
+        return;
+      }
     }
     try (UpsertUtils.RecordInfoReader recordInfoReader = new 
UpsertUtils.RecordInfoReader(segment, _primaryKeyColumns,
         _comparisonColumns, _deleteRecordColumn)) {
@@ -701,7 +712,12 @@ public abstract class BasePartitionUpsertMetadataManager 
implements PartitionUps
       replaceSegment(segment, null, null, null, oldSegment);
       return;
     }
-
+    if (isTTLEnabled()) {
+      double maxComparisonValue = getMaxComparisonValue(segment);
+      _largestSeenComparisonValue.getAndUpdate(v -> Math.max(v, 
maxComparisonValue));
+      // Segment might be uploaded directly to the table to replace an old 
segment. So update the TTL watermark but
+      // we can't skip segment even if it's out of TTL as its validDocIds 
bitmap is not updated yet.
+    }
     try (UpsertUtils.RecordInfoReader recordInfoReader = new 
UpsertUtils.RecordInfoReader(segment, _primaryKeyColumns,
         _comparisonColumns, _deleteRecordColumn)) {
       Iterator<RecordInfo> recordInfoIterator =
@@ -1031,13 +1047,6 @@ public abstract class BasePartitionUpsertMetadataManager 
implements PartitionUps
     return new File(_tableIndexDir, V1Constants.TTL_WATERMARK_TABLE_PARTITION 
+ _partitionId);
   }
 
-  protected void updateWatermark(ImmutableSegment segment) {
-    double maxComparisonValue =
-        ((Number) 
segment.getSegmentMetadata().getColumnMetadataMap().get(_comparisonColumns.get(0))
-            .getMaxValue()).doubleValue();
-    _largestSeenComparisonValue.getAndUpdate(v -> Math.max(v, 
maxComparisonValue));
-  }
-
   @VisibleForTesting
   double getWatermark() {
     return _largestSeenComparisonValue.get();
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java
index 90c03bb5de..b29fd68559 100644
--- 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java
@@ -238,7 +238,7 @@ public class 
ConcurrentMapPartitionUpsertMetadataManagerTest {
     ImmutableSegmentImpl segment =
         mockImmutableSegmentWithEndTime(1, new 
ThreadSafeMutableRoaringBitmap(), null, new ArrayList<>(),
             COMPARISON_COLUMNS, new Double(currentTimeMs + 1024), new 
MutableRoaringBitmap());
-    upsertMetadataManager.updateWatermark(segment);
+    upsertMetadataManager.setWatermark(currentTimeMs + 1024);
     assertEquals(upsertMetadataManager.getWatermark(), currentTimeMs + 1024);
 
     // Stop the metadata manager


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to