rohityadav1993 commented on code in PR #13107:
URL: https://github.com/apache/pinot/pull/13107#discussion_r1625655826


##########
pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java:
##########
@@ -378,6 +379,115 @@ public void testAddReplaceRemoveSegmentWithRecordDelete()
     verifyAddReplaceRemoveSegmentWithRecordDelete(HashFunction.MURMUR3, true);
   }
 
+  @Test
+  public void verifyAddReplaceUploadedSegment()
+      throws IOException {
+    ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
+        new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0,
+            _contextBuilder.setHashFunction(HashFunction.NONE).build());
+    Map<Object, RecordLocation> recordLocationMap = 
upsertMetadataManager._primaryKeyToRecordLocationMap;
+    Set<IndexSegment> trackedSegments = upsertMetadataManager._trackedSegments;
+
+    // Add the first segment
+    int numRecords = 6;
+    int[] primaryKeys = new int[]{0, 1, 2, 0, 1, 0};
+    int[] timestamps = new int[]{100, 100, 100, 80, 120, 100};
+    ThreadSafeMutableRoaringBitmap validDocIds1 = new 
ThreadSafeMutableRoaringBitmap();
+    List<PrimaryKey> primaryKeys1 = getPrimaryKeyList(numRecords, primaryKeys);
+    SegmentMetadataImpl segmentMetadata = mock(SegmentMetadataImpl.class);
+    when(segmentMetadata.getIndexCreationTime()).thenReturn(1000L);
+    ImmutableSegmentImpl segment1 =
+        mockImmutableSegmentWithSegmentMetadata(1, validDocIds1, null, 
primaryKeys1, segmentMetadata, null);
+    List<RecordInfo> recordInfoList1;
+    // get recordInfo by iterating all records.
+    recordInfoList1 = getRecordInfoList(numRecords, primaryKeys, timestamps, 
null);
+    upsertMetadataManager.addSegment(segment1, validDocIds1, null, 
recordInfoList1.iterator());
+    trackedSegments.add(segment1);
+    // segment1: 0 -> {5, 100}, 1 -> {4, 120}, 2 -> {2, 100}
+    assertEquals(recordLocationMap.size(), 3);
+    checkRecordLocation(recordLocationMap, 0, segment1, 5, 100, 
HashFunction.NONE);
+    checkRecordLocation(recordLocationMap, 1, segment1, 4, 120, 
HashFunction.NONE);
+    checkRecordLocation(recordLocationMap, 2, segment1, 2, 100, 
HashFunction.NONE);
+    assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new 
int[]{2, 4, 5});
+
+    // Add the second segment of uploaded name format with higher creation time
+    numRecords = 5;
+    primaryKeys = new int[]{0, 1, 2, 3, 0};
+    timestamps = new int[]{100, 100, 120, 80, 80};
+    ThreadSafeMutableRoaringBitmap validDocIds2 = new 
ThreadSafeMutableRoaringBitmap();
+    ImmutableSegmentImpl uploadedSegment2 =
+        mockUploadedImmutableSegment(2, validDocIds2, null, 
getPrimaryKeyList(numRecords, primaryKeys), 1010L);
+    List<RecordInfo> recordInfoList2;
+    // get recordInfo by iterating all records.
+    recordInfoList2 = getRecordInfoList(numRecords, primaryKeys, timestamps, 
null);
+    upsertMetadataManager.addSegment(uploadedSegment2, validDocIds2, null, 
recordInfoList2.iterator());
+    trackedSegments.add(uploadedSegment2);
+
+    // segment1: 1 -> {4, 120}
+    // uploadedSegment2: 0 -> {0, 100}, 2 -> {2, 120}, 3 -> {3, 80}
+    assertEquals(recordLocationMap.size(), 4);
+    checkRecordLocation(recordLocationMap, 0, uploadedSegment2, 0, 100, 
HashFunction.NONE);
+    checkRecordLocation(recordLocationMap, 1, segment1, 4, 120, 
HashFunction.NONE);
+    checkRecordLocation(recordLocationMap, 2, uploadedSegment2, 2, 120, 
HashFunction.NONE);
+    checkRecordLocation(recordLocationMap, 3, uploadedSegment2, 3, 80, 
HashFunction.NONE);
+    assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new 
int[]{4});
+    assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new 
int[]{0, 2, 3});
+
+    // replace uploadedSegment2 with higher creation time
+    ThreadSafeMutableRoaringBitmap newValidDocIds2 = new 
ThreadSafeMutableRoaringBitmap();
+    ImmutableSegmentImpl newUploadedSegment2 =
+        mockUploadedImmutableSegment(2, newValidDocIds2, null, 
getPrimaryKeyList(numRecords, primaryKeys), 1020L);
+    upsertMetadataManager.replaceSegment(newUploadedSegment2, newValidDocIds2, 
null, recordInfoList2.iterator(),
+        uploadedSegment2);
+    trackedSegments.add(newUploadedSegment2);
+    trackedSegments.remove(uploadedSegment2);
+
+    // segment1: 1 -> {4, 120}
+    // newUploadedSegment2: 0 -> {0, 100}, 2 -> {2, 120}, 3 -> {3, 80}
+    assertEquals(recordLocationMap.size(), 4);
+    checkRecordLocation(recordLocationMap, 0, newUploadedSegment2, 0, 100, 
HashFunction.NONE);
+    checkRecordLocation(recordLocationMap, 1, segment1, 4, 120, 
HashFunction.NONE);
+    checkRecordLocation(recordLocationMap, 2, newUploadedSegment2, 2, 120, 
HashFunction.NONE);
+    checkRecordLocation(recordLocationMap, 3, newUploadedSegment2, 3, 80, 
HashFunction.NONE);
+    assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new 
int[]{4});
+    assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new 
int[]{0, 2, 3});
+
+    // add uploaded segment3 with same creation time as segment1 and higher 
sequence id than newUploadedSegment2
+    numRecords = 2;
+    primaryKeys = new int[]{0, 1};
+    timestamps = new int[]{100, 120};
+    ThreadSafeMutableRoaringBitmap validDocIds3 = new 
ThreadSafeMutableRoaringBitmap();
+    ImmutableSegmentImpl uploadedSegment3 =
+        mockUploadedImmutableSegment(3, validDocIds3, null, 
getPrimaryKeyList(numRecords, primaryKeys), 1000L);
+    List<RecordInfo> recordInfoList3;
+    // get recordInfo by iterating all records.
+    recordInfoList3 = getRecordInfoList(numRecords, primaryKeys, timestamps, 
null);
+    upsertMetadataManager.addSegment(uploadedSegment3, validDocIds3, null, 
recordInfoList3.iterator());
+    trackedSegments.add(uploadedSegment3);
+
+    // newUploadedSegment2: 2 -> {2, 120}, 3 -> {3, 80}
+    // segment3: 0 -> {0, 100}, 1 -> {1, 120}

Review Comment:
   This makes sense: 
https://github.com/apache/pinot/pull/13107#discussion_r1625400329



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to