klsince commented on code in PR #16034:
URL: https://github.com/apache/pinot/pull/16034#discussion_r2138368288
##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java:
##########
@@ -1146,4 +1148,29 @@ public Set<String> getNewlyAddedSegments() {
}
return Collections.emptySet();
}
+
+ /**
+ * Returns the ZooKeeper creation time for upsert consistency.
+ * This refers to the time set by the controller when creating new consuming
+ * segment.
Review Comment:
nit: format? looks like `segment` can be folded up.
##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java:
##########
@@ -730,10 +732,13 @@ private void handleDedup(ImmutableSegmentImpl
immutableSegment) {
}
}
- private void handleUpsert(ImmutableSegment immutableSegment) {
+ private void handleUpsert(ImmutableSegment immutableSegment, @Nullable
SegmentZKMetadata zkMetadata) {
String segmentName = immutableSegment.getSegmentName();
_logger.info("Adding immutable segment: {} with upsert enabled",
segmentName);
+ // Set the ZK creation time for upsert consistency across replicas
Review Comment:
nit: make the comment a bit more specific: "Set the ZK creation time so that
same creation time can be used to break the comparison ties across replicas, to
ensure data consistency of replicas"
##########
pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/metadata/SegmentMetadataImpl.java:
##########
@@ -380,6 +381,23 @@ public long getIndexCreationTime() {
return _creationTime;
}
+ /**
+ * Returns the ZooKeeper creation time for upsert consistency.
Review Comment:
think we can set `_zkCreationTime = _creationTime` in the constructor for RT
segment
```
/**
* For REALTIME consuming segments.
*/
public SegmentMetadataImpl(String rawTableName, String segmentName, Schema
schema, long creationTime) {
...
```
##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java:
##########
@@ -826,13 +847,28 @@ public void
downloadAndReplaceConsumingSegment(SegmentZKMetadata zkMetadata)
/**
* Replaces the CONSUMING segment with the one sealed locally.
*/
+ @Deprecated
public void replaceConsumingSegment(String segmentName)
throws Exception {
+ // Fetch ZK metadata to pass to addSegment so that ZK creation time is set
automatically
+ SegmentZKMetadata zkMetadata = fetchZKMetadataNullable(segmentName);
+ replaceConsumingSegment(segmentName, zkMetadata);
Review Comment:
nit: pass null zkMetadata to keep the old behavior?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]