tibrewalpratik17 commented on code in PR #12395:
URL: https://github.com/apache/pinot/pull/12395#discussion_r1492024212


##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java:
##########
@@ -167,6 +172,45 @@ protected void 
addSegmentWithoutUpsert(ImmutableSegmentImpl segment, ThreadSafeM
     }
   }
 
+  /**
+   * When we have to process a new segment, if there are comparison value ties 
for the same primary-key within the
+   * segment, then for Partial Upsert tables we need to make sure that the 
record location map is updated only
+   * for the latest version of the record. This is specifically a concern for 
Partial Upsert tables because Realtime
+   * consumption can potentially end up reading the wrong version of a record, 
which will lead to permanent
+   * data-inconsistency.
+   *
+   * <p>
+   *  This function returns an iterator that will de-dup
+   *  records
+   *  with the same primary-key. Moreover, for comparison ties, it will only 
keep the latest record. This iterator can
+   *  then further be used to update the primary-key record location map 
safely.
+   * </p>
+   *
+   * @param recordInfoIterator iterator over the new segment
+   * @return iterator that returns de-duplicated records. To resolve ties for 
comparison column values, we prefer to
+   *         return the latest record.
+   */
+  @VisibleForTesting
+  protected Iterator<RecordInfo> resolveComparisonTies(Iterator<RecordInfo> 
recordInfoIterator) {
+    Map<Object, RecordInfo> deDuplicatedRecordInfo = new HashMap<>();
+    while (recordInfoIterator.hasNext()) {
+      RecordInfo recordInfo = recordInfoIterator.next();
+      Comparable newComparisonValue = recordInfo.getComparisonValue();
+      
deDuplicatedRecordInfo.compute(HashUtils.hashPrimaryKey(recordInfo.getPrimaryKey(),
 _hashFunction),
+          (key, existingRecordInfo) -> {

Review Comment:
   can we rename `existingRecordInfo` --> `maxComparisonValueRecordInfo` for 
more clarity?



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java:
##########
@@ -69,6 +71,9 @@ protected void addOrReplaceSegment(ImmutableSegmentImpl 
segment, ThreadSafeMutab
     String segmentName = segment.getSegmentName();
     segment.enableUpsert(this, validDocIds, queryableDocIds);
 
+    if (_partialUpsertHandler != null) {
+      recordInfoIterator = resolveComparisonTies(recordInfoIterator);
+    }

Review Comment:
   can we move this to the base class? we would want to do this in all 
scenarios. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to