Jackie-Jiang commented on code in PR #18468:
URL: https://github.com/apache/pinot/pull/18468#discussion_r3221545202
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/MultiTierStrictRealtimeSegmentAssignment.java:
##########
@@ -83,4 +88,48 @@ public Map<String, Map<String, String>>
rebalanceTable(Map<String, Map<String, S
_tableNameWithType,
SegmentAssignmentUtils.getNumSegmentsToMovePerInstance(currentAssignment,
newAssignment));
return newAssignment;
}
+
+ /**
+ * Multi-tier override: pick the latest LLC segment in the partition
(highest sequence number) and skip it if it has
+ * already been moved to a tier, since tiered segments live on a different
instance pool than CONSUMING segments.
+ */
+ @Nullable
+ @Override
+ protected Set<String> getExistingAssignment(int partitionId, Map<String,
Map<String, String>> currentAssignment) {
+ List<String> uploadedSegments = new ArrayList<>();
+ LLCSegmentName latestLLCSegmentName = null;
+ Set<String> latestAssignment = null;
+ for (Map.Entry<String, Map<String, String>> entry :
currentAssignment.entrySet()) {
+ if (isOfflineSegment(entry.getValue())) {
+ continue;
+ }
+ LLCSegmentName llcSegmentName = LLCSegmentName.of(entry.getKey());
+ if (llcSegmentName == null) {
+ uploadedSegments.add(entry.getKey());
Review Comment:
Don't track uploaded segments. We only want to find the latest committed
segment
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/MultiTierStrictRealtimeSegmentAssignment.java:
##########
@@ -83,4 +88,48 @@ public Map<String, Map<String, String>>
rebalanceTable(Map<String, Map<String, S
_tableNameWithType,
SegmentAssignmentUtils.getNumSegmentsToMovePerInstance(currentAssignment,
newAssignment));
return newAssignment;
}
+
+ /**
+ * Multi-tier override: pick the latest LLC segment in the partition
(highest sequence number) and skip it if it has
+ * already been moved to a tier, since tiered segments live on a different
instance pool than CONSUMING segments.
+ */
+ @Nullable
+ @Override
+ protected Set<String> getExistingAssignment(int partitionId, Map<String,
Map<String, String>> currentAssignment) {
+ List<String> uploadedSegments = new ArrayList<>();
+ LLCSegmentName latestLLCSegmentName = null;
+ Set<String> latestAssignment = null;
+ for (Map.Entry<String, Map<String, String>> entry :
currentAssignment.entrySet()) {
+ if (isOfflineSegment(entry.getValue())) {
+ continue;
+ }
+ LLCSegmentName llcSegmentName = LLCSegmentName.of(entry.getKey());
+ if (llcSegmentName == null) {
+ uploadedSegments.add(entry.getKey());
+ continue;
+ }
+ if (llcSegmentName.getPartitionGroupId() == partitionId &&
(latestLLCSegmentName == null
+ || llcSegmentName.getSequenceNumber() >
latestLLCSegmentName.getSequenceNumber())) {
+ latestLLCSegmentName = llcSegmentName;
+ latestAssignment = entry.getValue().keySet();
+ }
+ }
+ if (latestAssignment != null) {
+ SegmentZKMetadata segmentZKMetadata =
+
ZKMetadataProvider.getSegmentZKMetadata(_helixManager.getHelixPropertyStore(),
_tableNameWithType,
+ latestLLCSegmentName.getSegmentName());
+ if (segmentZKMetadata != null && segmentZKMetadata.getTier() != null) {
Review Comment:
We want to return `null` when ZK metadata doesn't exist.
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/MultiTierStrictRealtimeSegmentAssignment.java:
##########
@@ -83,4 +88,48 @@ public Map<String, Map<String, String>>
rebalanceTable(Map<String, Map<String, S
_tableNameWithType,
SegmentAssignmentUtils.getNumSegmentsToMovePerInstance(currentAssignment,
newAssignment));
return newAssignment;
}
+
+ /**
+ * Multi-tier override: pick the latest LLC segment in the partition
(highest sequence number) and skip it if it has
+ * already been moved to a tier, since tiered segments live on a different
instance pool than CONSUMING segments.
+ */
+ @Nullable
+ @Override
+ protected Set<String> getExistingAssignment(int partitionId, Map<String,
Map<String, String>> currentAssignment) {
+ List<String> uploadedSegments = new ArrayList<>();
+ LLCSegmentName latestLLCSegmentName = null;
+ Set<String> latestAssignment = null;
+ for (Map.Entry<String, Map<String, String>> entry :
currentAssignment.entrySet()) {
+ if (isOfflineSegment(entry.getValue())) {
+ continue;
+ }
+ LLCSegmentName llcSegmentName = LLCSegmentName.of(entry.getKey());
+ if (llcSegmentName == null) {
+ uploadedSegments.add(entry.getKey());
+ continue;
+ }
+ if (llcSegmentName.getPartitionGroupId() == partitionId &&
(latestLLCSegmentName == null
+ || llcSegmentName.getSequenceNumber() >
latestLLCSegmentName.getSequenceNumber())) {
+ latestLLCSegmentName = llcSegmentName;
+ latestAssignment = entry.getValue().keySet();
+ }
+ }
+ if (latestAssignment != null) {
+ SegmentZKMetadata segmentZKMetadata =
+
ZKMetadataProvider.getSegmentZKMetadata(_helixManager.getHelixPropertyStore(),
_tableNameWithType,
+ latestLLCSegmentName.getSegmentName());
+ if (segmentZKMetadata != null && segmentZKMetadata.getTier() != null) {
+ _logger.info("Latest segment: {} for partition: {} is on tier: {},
skipping existing assignment",
Review Comment:
Log warning in such case. Same for cases when `segmentZKMetadata` is
unavailable
##########
pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/StrictRealtimeSegmentAssignmentTest.java:
##########
@@ -404,12 +479,20 @@ private void addToAssignment(Map<String, Map<String,
String>> currentAssignment,
}
private static HelixManager createHelixManager() {
+ return createHelixManager(Collections.emptySet());
Review Comment:
(nit)
```suggestion
return createHelixManager(Set.of());
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]