rohityadav1993 commented on code in PR #13107: URL: https://github.com/apache/pinot/pull/13107#discussion_r1625655826
########## pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java: ########## @@ -378,6 +379,115 @@ public void testAddReplaceRemoveSegmentWithRecordDelete() verifyAddReplaceRemoveSegmentWithRecordDelete(HashFunction.MURMUR3, true); } + @Test + public void verifyAddReplaceUploadedSegment() + throws IOException { + ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager = + new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, + _contextBuilder.setHashFunction(HashFunction.NONE).build()); + Map<Object, RecordLocation> recordLocationMap = upsertMetadataManager._primaryKeyToRecordLocationMap; + Set<IndexSegment> trackedSegments = upsertMetadataManager._trackedSegments; + + // Add the first segment + int numRecords = 6; + int[] primaryKeys = new int[]{0, 1, 2, 0, 1, 0}; + int[] timestamps = new int[]{100, 100, 100, 80, 120, 100}; + ThreadSafeMutableRoaringBitmap validDocIds1 = new ThreadSafeMutableRoaringBitmap(); + List<PrimaryKey> primaryKeys1 = getPrimaryKeyList(numRecords, primaryKeys); + SegmentMetadataImpl segmentMetadata = mock(SegmentMetadataImpl.class); + when(segmentMetadata.getIndexCreationTime()).thenReturn(1000L); + ImmutableSegmentImpl segment1 = + mockImmutableSegmentWithSegmentMetadata(1, validDocIds1, null, primaryKeys1, segmentMetadata, null); + List<RecordInfo> recordInfoList1; + // get recordInfo by iterating all records. + recordInfoList1 = getRecordInfoList(numRecords, primaryKeys, timestamps, null); + upsertMetadataManager.addSegment(segment1, validDocIds1, null, recordInfoList1.iterator()); + trackedSegments.add(segment1); + // segment1: 0 -> {5, 100}, 1 -> {4, 120}, 2 -> {2, 100} + assertEquals(recordLocationMap.size(), 3); + checkRecordLocation(recordLocationMap, 0, segment1, 5, 100, HashFunction.NONE); + checkRecordLocation(recordLocationMap, 1, segment1, 4, 120, HashFunction.NONE); + checkRecordLocation(recordLocationMap, 2, segment1, 2, 100, HashFunction.NONE); + assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new int[]{2, 4, 5}); + + // Add the second segment of uploaded name format with higher creation time + numRecords = 5; + primaryKeys = new int[]{0, 1, 2, 3, 0}; + timestamps = new int[]{100, 100, 120, 80, 80}; + ThreadSafeMutableRoaringBitmap validDocIds2 = new ThreadSafeMutableRoaringBitmap(); + ImmutableSegmentImpl uploadedSegment2 = + mockUploadedImmutableSegment(2, validDocIds2, null, getPrimaryKeyList(numRecords, primaryKeys), 1010L); + List<RecordInfo> recordInfoList2; + // get recordInfo by iterating all records. + recordInfoList2 = getRecordInfoList(numRecords, primaryKeys, timestamps, null); + upsertMetadataManager.addSegment(uploadedSegment2, validDocIds2, null, recordInfoList2.iterator()); + trackedSegments.add(uploadedSegment2); + + // segment1: 1 -> {4, 120} + // uploadedSegment2: 0 -> {0, 100}, 2 -> {2, 120}, 3 -> {3, 80} + assertEquals(recordLocationMap.size(), 4); + checkRecordLocation(recordLocationMap, 0, uploadedSegment2, 0, 100, HashFunction.NONE); + checkRecordLocation(recordLocationMap, 1, segment1, 4, 120, HashFunction.NONE); + checkRecordLocation(recordLocationMap, 2, uploadedSegment2, 2, 120, HashFunction.NONE); + checkRecordLocation(recordLocationMap, 3, uploadedSegment2, 3, 80, HashFunction.NONE); + assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new int[]{4}); + assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new int[]{0, 2, 3}); + + // replace uploadedSegment2 with higher creation time + ThreadSafeMutableRoaringBitmap newValidDocIds2 = new ThreadSafeMutableRoaringBitmap(); + ImmutableSegmentImpl newUploadedSegment2 = + mockUploadedImmutableSegment(2, newValidDocIds2, null, getPrimaryKeyList(numRecords, primaryKeys), 1020L); + upsertMetadataManager.replaceSegment(newUploadedSegment2, newValidDocIds2, null, recordInfoList2.iterator(), + uploadedSegment2); + trackedSegments.add(newUploadedSegment2); + trackedSegments.remove(uploadedSegment2); + + // segment1: 1 -> {4, 120} + // newUploadedSegment2: 0 -> {0, 100}, 2 -> {2, 120}, 3 -> {3, 80} + assertEquals(recordLocationMap.size(), 4); + checkRecordLocation(recordLocationMap, 0, newUploadedSegment2, 0, 100, HashFunction.NONE); + checkRecordLocation(recordLocationMap, 1, segment1, 4, 120, HashFunction.NONE); + checkRecordLocation(recordLocationMap, 2, newUploadedSegment2, 2, 120, HashFunction.NONE); + checkRecordLocation(recordLocationMap, 3, newUploadedSegment2, 3, 80, HashFunction.NONE); + assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new int[]{4}); + assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new int[]{0, 2, 3}); + + // add uploaded segment3 with same creation time as segment1 and higher sequence id than newUploadedSegment2 + numRecords = 2; + primaryKeys = new int[]{0, 1}; + timestamps = new int[]{100, 120}; + ThreadSafeMutableRoaringBitmap validDocIds3 = new ThreadSafeMutableRoaringBitmap(); + ImmutableSegmentImpl uploadedSegment3 = + mockUploadedImmutableSegment(3, validDocIds3, null, getPrimaryKeyList(numRecords, primaryKeys), 1000L); + List<RecordInfo> recordInfoList3; + // get recordInfo by iterating all records. + recordInfoList3 = getRecordInfoList(numRecords, primaryKeys, timestamps, null); + upsertMetadataManager.addSegment(uploadedSegment3, validDocIds3, null, recordInfoList3.iterator()); + trackedSegments.add(uploadedSegment3); + + // newUploadedSegment2: 2 -> {2, 120}, 3 -> {3, 80} + // segment3: 0 -> {0, 100}, 1 -> {1, 120} Review Comment: This makes sense: https://github.com/apache/pinot/pull/13107#discussion_r1625400329 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org