This is an automated email from the ASF dual-hosted git repository.

xbli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 8c86ad4652 fix a bug that would take validdocids snapshots redundantly 
(#12246)
8c86ad4652 is described below

commit 8c86ad465232efa4ab1a1eb5a1f19598b2628166
Author: Xiaobing <[email protected]>
AuthorDate: Tue Jan 9 13:41:08 2024 -0800

    fix a bug that would take validdocids snapshots redundantly (#12246)
---
 .../upsert/BasePartitionUpsertMetadataManager.java |   6 +-
 .../BasePartitionUpsertMetadataManagerTest.java    | 174 +++++++++++++++++++++
 2 files changed, 178 insertions(+), 2 deletions(-)

diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
index a9d4474da5..b63f58e013 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
@@ -640,6 +640,7 @@ public abstract class BasePartitionUpsertMetadataManager 
implements PartitionUps
       ImmutableSegmentImpl immutableSegment = (ImmutableSegmentImpl) segment;
       if (!immutableSegment.hasValidDocIdsSnapshotFile()) {
         segmentsWithoutSnapshot.add(immutableSegment);
+        continue;
       }
       immutableSegment.persistValidDocIdsSnapshot();
       numImmutableSegments++;
@@ -655,8 +656,9 @@ public abstract class BasePartitionUpsertMetadataManager 
implements PartitionUps
         ServerGauge.UPSERT_VALID_DOC_ID_SNAPSHOT_COUNT, numImmutableSegments);
     _serverMetrics.setValueOfPartitionGauge(_tableNameWithType, _partitionId,
         ServerGauge.UPSERT_PRIMARY_KEYS_IN_SNAPSHOT_COUNT, 
numPrimaryKeysInSnapshot);
-    _logger.info("Finished taking snapshot for {} immutable segments (out of 
{} total segments) in {}ms",
-        numImmutableSegments, numTrackedSegments, System.currentTimeMillis() - 
startTimeMs);
+    _logger.info(
+        "Finished taking snapshot for {} immutable segments with {} primary 
keys (out of {} total segments) in {}ms",
+        numImmutableSegments, numPrimaryKeysInSnapshot, numTrackedSegments, 
System.currentTimeMillis() - startTimeMs);
   }
 
   /**
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManagerTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManagerTest.java
new file mode 100644
index 0000000000..5fce4cc3b0
--- /dev/null
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManagerTest.java
@@ -0,0 +1,174 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.upsert;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import javax.annotation.Nullable;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.common.metrics.ServerMetrics;
+import 
org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentImpl;
+import org.apache.pinot.segment.local.indexsegment.mutable.MutableSegmentImpl;
+import org.apache.pinot.segment.spi.IndexSegment;
+import org.apache.pinot.segment.spi.MutableSegment;
+import org.apache.pinot.segment.spi.V1Constants;
+import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
+import 
org.apache.pinot.segment.spi.index.mutable.ThreadSafeMutableRoaringBitmap;
+import org.apache.pinot.segment.spi.store.SegmentDirectory;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.roaringbitmap.buffer.MutableRoaringBitmap;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+
+
+public class BasePartitionUpsertMetadataManagerTest {
+  private static final File TEMP_DIR = new File(FileUtils.getTempDirectory(), 
"BasePartitionUpsertMetadataManagerTest");
+
+  @BeforeClass
+  public void setUp()
+      throws IOException {
+    FileUtils.forceMkdir(TEMP_DIR);
+    ServerMetrics.register(mock(ServerMetrics.class));
+  }
+
+  @AfterClass
+  public void tearDown()
+      throws IOException {
+    FileUtils.forceDelete(TEMP_DIR);
+  }
+
+  @Test
+  public void testTakeSnapshotInOrder()
+      throws IOException {
+    DummyPartitionUpsertMetadataManager upsertMetadataManager =
+        new DummyPartitionUpsertMetadataManager("myTable", 0, 
mock(UpsertContext.class));
+
+    List<String> segmentsTakenSnapshot = new ArrayList<>();
+
+    File segDir01 = new File(TEMP_DIR, "seg01");
+    ImmutableSegmentImpl seg01 = createImmutableSegment("seg01", segDir01, 
segmentsTakenSnapshot);
+    seg01.enableUpsert(upsertMetadataManager, createValidDocIds(0, 1, 2, 3), 
null);
+    upsertMetadataManager.trackSegment(seg01);
+    // seg01 has a tmp snapshot file, but no snapshot file
+    FileUtils.touch(new File(segDir01, 
V1Constants.VALID_DOC_IDS_SNAPSHOT_FILE_NAME + "_tmp"));
+
+    File segDir02 = new File(TEMP_DIR, "seg02");
+    ImmutableSegmentImpl seg02 = createImmutableSegment("seg02", segDir02, 
segmentsTakenSnapshot);
+    seg02.enableUpsert(upsertMetadataManager, createValidDocIds(0, 1, 2, 3, 4, 
5), null);
+    upsertMetadataManager.trackSegment(seg02);
+    // seg02 has snapshot file, so its snapshot is taken first.
+    FileUtils.touch(new File(segDir02, 
V1Constants.VALID_DOC_IDS_SNAPSHOT_FILE_NAME));
+
+    File segDir03 = new File(TEMP_DIR, "seg03");
+    ImmutableSegmentImpl seg03 = createImmutableSegment("seg03", segDir03, 
segmentsTakenSnapshot);
+    seg03.enableUpsert(upsertMetadataManager, createValidDocIds(3, 4, 7), 
null);
+    upsertMetadataManager.trackSegment(seg03);
+
+    // The mutable segments will be skipped.
+    upsertMetadataManager.trackSegment(mock(MutableSegmentImpl.class));
+
+    upsertMetadataManager.doTakeSnapshot();
+    assertEquals(segmentsTakenSnapshot.size(), 3);
+    // The snapshot of seg02 was taken firstly, as it's the only segment with 
existing snapshot.
+    assertEquals(segmentsTakenSnapshot.get(0), "seg02");
+    // Set is used to track segments internally, so we can't assert the order 
of the other segments deterministically,
+    // but all 3 segments should have taken their snapshots.
+    assertTrue(segmentsTakenSnapshot.containsAll(Arrays.asList("seg01", 
"seg02", "seg03")));
+
+    assertEquals(TEMP_DIR.list().length, 3);
+    assertTrue(segDir01.exists());
+    assertEquals(seg01.loadValidDocIdsFromSnapshot().getCardinality(), 4);
+    assertTrue(segDir02.exists());
+    assertEquals(seg02.loadValidDocIdsFromSnapshot().getCardinality(), 6);
+    assertTrue(segDir03.exists());
+    assertEquals(seg03.loadValidDocIdsFromSnapshot().getCardinality(), 3);
+  }
+
+  private static ThreadSafeMutableRoaringBitmap createValidDocIds(int... 
docIds) {
+    MutableRoaringBitmap bitmap = new MutableRoaringBitmap();
+    bitmap.add(docIds);
+    return new ThreadSafeMutableRoaringBitmap(bitmap);
+  }
+
+  private static ImmutableSegmentImpl createImmutableSegment(String segName, 
File segDir,
+      List<String> segmentsTakenSnapshot)
+      throws IOException {
+    FileUtils.forceMkdir(segDir);
+    SegmentMetadataImpl meta = mock(SegmentMetadataImpl.class);
+    when(meta.getName()).thenReturn(segName);
+    when(meta.getIndexDir()).thenReturn(segDir);
+    return new ImmutableSegmentImpl(mock(SegmentDirectory.class), meta, new 
HashMap<>(), null) {
+      public void persistValidDocIdsSnapshot() {
+        segmentsTakenSnapshot.add(segName);
+        super.persistValidDocIdsSnapshot();
+      }
+    };
+  }
+
+  private static class DummyPartitionUpsertMetadataManager extends 
BasePartitionUpsertMetadataManager {
+
+    protected DummyPartitionUpsertMetadataManager(String tableNameWithType, 
int partitionId, UpsertContext context) {
+      super(tableNameWithType, partitionId, context);
+    }
+
+    public void trackSegment(IndexSegment seg) {
+      _trackedSegments.add(seg);
+    }
+
+    @Override
+    protected long getNumPrimaryKeys() {
+      return 0;
+    }
+
+    @Override
+    protected void addOrReplaceSegment(ImmutableSegmentImpl segment, 
ThreadSafeMutableRoaringBitmap validDocIds,
+        @Nullable ThreadSafeMutableRoaringBitmap queryableDocIds, 
Iterator<RecordInfo> recordInfoIterator,
+        @Nullable IndexSegment oldSegment, @Nullable MutableRoaringBitmap 
validDocIdsForOldSegment) {
+    }
+
+    @Override
+    protected boolean doAddRecord(MutableSegment segment, RecordInfo 
recordInfo) {
+      return false;
+    }
+
+    @Override
+    protected void removeSegment(IndexSegment segment, MutableRoaringBitmap 
validDocIds) {
+    }
+
+    @Override
+    protected GenericRow doUpdateRecord(GenericRow record, RecordInfo 
recordInfo) {
+      return null;
+    }
+
+    @Override
+    protected void doRemoveExpiredPrimaryKeys() {
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to