tibrewalpratik17 commented on code in PR #12395: URL: https://github.com/apache/pinot/pull/12395#discussion_r1492024212
########## pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java: ########## @@ -167,6 +172,45 @@ protected void addSegmentWithoutUpsert(ImmutableSegmentImpl segment, ThreadSafeM } } + /** + * When we have to process a new segment, if there are comparison value ties for the same primary-key within the + * segment, then for Partial Upsert tables we need to make sure that the record location map is updated only + * for the latest version of the record. This is specifically a concern for Partial Upsert tables because Realtime + * consumption can potentially end up reading the wrong version of a record, which will lead to permanent + * data-inconsistency. + * + * <p> + * This function returns an iterator that will de-dup + * records + * with the same primary-key. Moreover, for comparison ties, it will only keep the latest record. This iterator can + * then further be used to update the primary-key record location map safely. + * </p> + * + * @param recordInfoIterator iterator over the new segment + * @return iterator that returns de-duplicated records. To resolve ties for comparison column values, we prefer to + * return the latest record. + */ + @VisibleForTesting + protected Iterator<RecordInfo> resolveComparisonTies(Iterator<RecordInfo> recordInfoIterator) { + Map<Object, RecordInfo> deDuplicatedRecordInfo = new HashMap<>(); + while (recordInfoIterator.hasNext()) { + RecordInfo recordInfo = recordInfoIterator.next(); + Comparable newComparisonValue = recordInfo.getComparisonValue(); + deDuplicatedRecordInfo.compute(HashUtils.hashPrimaryKey(recordInfo.getPrimaryKey(), _hashFunction), + (key, existingRecordInfo) -> { Review Comment: can we rename `existingRecordInfo` --> `maxComparisonValueRecordInfo` for more clarity? ########## pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java: ########## @@ -69,6 +71,9 @@ protected void addOrReplaceSegment(ImmutableSegmentImpl segment, ThreadSafeMutab String segmentName = segment.getSegmentName(); segment.enableUpsert(this, validDocIds, queryableDocIds); + if (_partialUpsertHandler != null) { + recordInfoIterator = resolveComparisonTies(recordInfoIterator); + } Review Comment: can we move this to the base class? we would want to do this in all scenarios. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org