tarun11Mavani commented on code in PR #16034:
URL: https://github.com/apache/pinot/pull/16034#discussion_r2136014001
##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java:
##########
@@ -830,12 +834,61 @@ public void replaceConsumingSegment(String segmentName)
throws Exception {
_logger.info("Replacing CONSUMING segment: {} with the one sealed
locally", segmentName);
File indexDir = new File(_indexDir, segmentName);
+
+ // Fix creation time consistency before loading the locally built segment
(for upsert tables)
+ if (isUpsertEnabled()) {
+ fixCreationTimeConsistencyForLocalSegment(segmentName, indexDir);
+ }
+
// Get a new index loading config with latest table config and schema to
load the segment
IndexLoadingConfig indexLoadingConfig = fetchIndexLoadingConfig();
addSegment(ImmutableSegmentLoader.load(indexDir, indexLoadingConfig,
_segmentOperationsThrottler));
_logger.info("Replaced CONSUMING segment: {}", segmentName);
}
+ /**
+ * Fixes creation time consistency for locally built segments in upsert
tables.
+ * This ensures that slower replicas have consistent creation times for
proper upsert comparison.
+ */
+ private void fixCreationTimeConsistencyForLocalSegment(String segmentName,
File indexDir) {
+ try {
+ SegmentZKMetadata zkMetadata = fetchZKMetadata(segmentName);
+ long segmentCreationTimeFromZK = zkMetadata.getCreationTime();
Review Comment:
thanks @Jackie-Jiang . I have updated the PR accordingly.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]