Jackie-Jiang commented on code in PR #8584:
URL: https://github.com/apache/pinot/pull/8584#discussion_r856590568


##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java:
##########
@@ -92,12 +91,6 @@ public void completeSegmentOperations(String 
tableNameWithType, SegmentMetadata
           Response.Status.CONFLICT);
     }
 
-    // TODO Allow segment refreshing for realtime tables.
-    if (TableNameBuilder.isRealtimeTableResource(tableNameWithType)) {
-      throw new ControllerApplicationException(LOGGER,
-          "Refresh existing segment " + segmentName + " for realtime table " + 
tableNameWithType
-              + " is not yet supported ", Response.Status.NOT_IMPLEMENTED);
-    }

Review Comment:
   (minor) Remove one extra empty line



##########
pinot-common/src/main/java/org/apache/pinot/common/utils/SegmentUtils.java:
##########
@@ -35,20 +37,23 @@ private SegmentUtils() {
   // Returns the partition id of a realtime segment based segment name and 
segment metadata info retrieved via Helix.
   // Important: The method is costly because it may read data from zookeeper. 
Do not use it in any query execution
   // path.
-  public static int getRealtimeSegmentPartitionId(String segmentName, String 
realtimeTableName,
+  public static Optional<Integer> getRealtimeSegmentPartitionId(String 
segmentName, String realtimeTableName,
       HelixManager helixManager, String partitionColumn) {
     // A fast path if the segmentName is a LLC segment name and we can get the 
partition id from the name directly.
     if (LLCSegmentName.isLowLevelConsumerSegmentName(segmentName)) {
-      return new LLCSegmentName(segmentName).getPartitionGroupId();
+      return Optional.of(new 
LLCSegmentName(segmentName).getPartitionGroupId());
     }
-    // Otherwise, retrieve the partition id from the segment zk metadata. 
Currently only realtime segments from upsert
-    // enabled tables have partition ids in their segment metadata.
+    // Otherwise, retrieve the partition id from the segment zk metadata.
     SegmentZKMetadata segmentZKMetadata =
         
ZKMetadataProvider.getSegmentZKMetadata(helixManager.getHelixPropertyStore(), 
realtimeTableName, segmentName);
     Preconditions
         .checkState(segmentZKMetadata != null, "Failed to find segment ZK 
metadata for segment: %s of table: %s",
             segmentName, realtimeTableName);
     SegmentPartitionMetadata segmentPartitionMetadata = 
segmentZKMetadata.getPartitionMetadata();
+    if (segmentPartitionMetadata == null

Review Comment:
   To be more robust, let's remove the checks and return the partition id when 
it is available, return `null` otherwise. The caller can then decide how to 
proceed
   ```suggestion
       if (segmentPartitionMetadata != null) {
         ColumnPartitionMetadata columnPartitionMetadata =
           
segmentPartitionMetadata.getColumnPartitionMap().get(partitionColumn);
         if (columnPartitionMetadata != null && 
columnPartitionMetadata.getPartitions().size == 1) {
           return columnPartitionMetadata.getPartitions().iterator().next();
         }
       }
       return null;
   ```



##########
pinot-common/src/main/java/org/apache/pinot/common/utils/SegmentUtils.java:
##########
@@ -35,20 +37,23 @@ private SegmentUtils() {
   // Returns the partition id of a realtime segment based segment name and 
segment metadata info retrieved via Helix.
   // Important: The method is costly because it may read data from zookeeper. 
Do not use it in any query execution
   // path.
-  public static int getRealtimeSegmentPartitionId(String segmentName, String 
realtimeTableName,
+  public static Optional<Integer> getRealtimeSegmentPartitionId(String 
segmentName, String realtimeTableName,

Review Comment:
   Suggest returning `Integer` (annotate with `@Nullable`) instead of 
`Optional`. Currently we usually use `null` to represent value not available, 
and several methods for `Optional` is not supported in JDK 8.



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeSegmentAssignment.java:
##########
@@ -260,13 +277,26 @@ public Map<String, Map<String, String>> 
rebalanceTable(Map<String, Map<String, S
           "No COMPLETED instance partitions found, reassigning COMPLETED 
segments the same way as CONSUMING segments "
               + "with CONSUMING instance partitions for table: {}", 
_realtimeTableName);
 
+      Set<String> uploadedSegmentsWithNoPartitionMetadata = new TreeSet<>();
       newAssignment = new TreeMap<>();
       for (String segmentName : completedSegmentAssignment.keySet()) {
+        if (!SegmentUtils
+            .getRealtimeSegmentPartitionId(segmentName, _realtimeTableName, 
_helixManager, _partitionColumn)
+            .isPresent()) {
+          uploadedSegmentsWithNoPartitionMetadata.add(segmentName);
+          continue;
+        }
         List<String> instancesAssigned = assignConsumingSegment(segmentName, 
consumingInstancePartitions);
         Map<String, String> instanceStateMap =
             SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned, 
SegmentStateModel.ONLINE);
         newAssignment.put(segmentName, instanceStateMap);
       }
+      for (String segmentName : uploadedSegmentsWithNoPartitionMetadata) {

Review Comment:
   We should not assign one by one here. We may create a new map 
`unpartitionedCompletedSegmentAssignment` with only the unpartitioned segments, 
then use `reassignSegments()` to get minimum data movement



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeSegmentAssignment.java:
##########
@@ -260,13 +277,26 @@ public Map<String, Map<String, String>> 
rebalanceTable(Map<String, Map<String, S
           "No COMPLETED instance partitions found, reassigning COMPLETED 
segments the same way as CONSUMING segments "
               + "with CONSUMING instance partitions for table: {}", 
_realtimeTableName);
 
+      Set<String> uploadedSegmentsWithNoPartitionMetadata = new TreeSet<>();

Review Comment:
   (minor) Rename to `unpartitionedSegments`



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java:
##########
@@ -2011,7 +2003,8 @@ public void assignTableSegment(String tableNameWithType, 
String segmentName) {
     try {
       TableConfig tableConfig = getTableConfig(tableNameWithType);
       Preconditions.checkState(tableConfig != null, "Failed to find table 
config for table: " + tableNameWithType);
-      SegmentAssignment segmentAssignment = 
SegmentAssignmentFactory.getSegmentAssignment(_helixZkManager, tableConfig);
+      SegmentAssignment segmentAssignment =
+          SegmentAssignmentFactory.getSegmentAssignment(_helixZkManager, 
tableConfig, isUpsertTable(tableNameWithType));

Review Comment:
   Don't pass upsert info to the segment assignment engine, set 
`instancePartitionsType` to `CONSUMING` instead (same as the current code).
   Also, don't call `isUpsertTable()` because it will fetch another table 
config, use `tableConfig.getUpsertMode() != UpsertConfig.Mode.NONE` instead.



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeSegmentAssignment.java:
##########
@@ -365,12 +405,32 @@ private List<String> assignCompletedSegment(String 
segmentName, Map<String, Map<
       // Replica-group based assignment
 
       // Uniformly spray the segment partitions over the instance partitions
-      int segmentPartitionId =
-          SegmentUtils.getRealtimeSegmentPartitionId(segmentName, 
_realtimeTableName, _helixManager, _partitionColumn);
-      int numPartitions = instancePartitions.getNumPartitions();
-      int partitionGroupId = segmentPartitionId % numPartitions;
+      int desiredPartitionId = getDesiredPartitionId(segmentName, 
instancePartitions, currentAssignment);
       return SegmentAssignmentUtils
-          .assignSegmentWithReplicaGroup(currentAssignment, 
instancePartitions, partitionGroupId);
+          .assignSegmentWithReplicaGroup(currentAssignment, 
instancePartitions, desiredPartitionId);
     }
   }
+
+  private int getDesiredPartitionId(String segmentName, InstancePartitions 
instancePartitions,
+      Map<String, Map<String, String>> currentAssignment) {
+    Optional<Integer> segmentPartitionIdOpt =
+        SegmentUtils.getRealtimeSegmentPartitionId(segmentName, 
_realtimeTableName, _helixManager, _partitionColumn);
+    int numPartitions = instancePartitions.getNumPartitions();
+    return segmentPartitionIdOpt.map(segPartitionId -> segPartitionId % 
numPartitions).orElseGet(() -> {
+      // find the partition with the lease number of segments
+      int minPartitionId = 0;
+      int minNumSegmentsAssigned = Integer.MAX_VALUE;
+      for (int partitionId = 0; partitionId < numPartitions; partitionId++) {
+        List<String> instances = instancePartitions.getInstances(partitionId, 
0);
+        int[] numSegmentsAssignedPerInstance =
+            
SegmentAssignmentUtils.getNumSegmentsAssignedPerInstance(currentAssignment, 
instances);

Review Comment:
   This method is quite expensive especially for large table. Calling it for 
each segment * numPartitions can cause performance issue. Instead we can use 
hash-code as suggested above



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeSegmentAssignment.java:
##########
@@ -345,6 +380,11 @@ private Map<String, Map<String, String>> 
reassignSegments(String instancePartiti
 
         newAssignment = SegmentAssignmentUtils
             .rebalanceReplicaGroupBasedTable(currentAssignment, 
instancePartitions, partitionGroupIdToSegmentsMap);
+        for (String segName : uploadedSegmentsWithNoPartitionMetadata) {

Review Comment:
   We should combine the unpartitioned segments into the 
`partitionGroupIdToSegmentsMap` (assign a partition id to them). I'd suggest 
simply using the `Math.abs(segmentName.hashCode() % numPartitions)` as the 
partition group id because it is deterministic. We need to keep it 
deterministic so that during multiple rebalances, it won't switch partition 
which will force it to move around different servers



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeSegmentAssignment.java:
##########
@@ -101,21 +110,26 @@ public void init(HelixManager helixManager, TableConfig 
tableConfig) {
         _replication, _partitionColumn, _realtimeTableName);
   }
 
+  /**
+   * Since the segment can be either a completed or a consuming segment, the 
type of the segment is inferred by the
+   * provided instancePartitionsMap, which has to contain only one 
InstancePartitionsType.
+   */
   @Override
   public List<String> assignSegment(String segmentName, Map<String, 
Map<String, String>> currentAssignment,
       Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap) {
-    InstancePartitions instancePartitions = 
instancePartitionsMap.get(InstancePartitionsType.CONSUMING);
-    Preconditions.checkState(instancePartitions != null, "Failed to find 
CONSUMING instance partitions for table: %s",
-        _realtimeTableName);
+    Preconditions
+        .checkState(instancePartitionsMap.keySet().size() == 1, "One instance 
partition type should be provided");
+    InstancePartitionsType instancePartitionsType = 
instancePartitionsMap.keySet().stream().findFirst().get();
+    InstancePartitions instancePartitions = 
instancePartitionsMap.get(instancePartitionsType);

Review Comment:
   ```suggestion
       Preconditions.checkState(instancePartitionsMap.size() == 1, "One 
instance partition type should be provided");
       Map.Entry<InstancePartitionsType, InstancePartitions> entry = 
instancePartitionsMap.entrySet().iterator().next();
       InstancePartitionsType instancePartitionsType = entry.getKey();
       InstancePartitions instancePartitions = entry.getValue();
   ```



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeSegmentAssignment.java:
##########
@@ -365,12 +405,32 @@ private List<String> assignCompletedSegment(String 
segmentName, Map<String, Map<
       // Replica-group based assignment
 
       // Uniformly spray the segment partitions over the instance partitions
-      int segmentPartitionId =
-          SegmentUtils.getRealtimeSegmentPartitionId(segmentName, 
_realtimeTableName, _helixManager, _partitionColumn);
-      int numPartitions = instancePartitions.getNumPartitions();
-      int partitionGroupId = segmentPartitionId % numPartitions;
+      int desiredPartitionId = getDesiredPartitionId(segmentName, 
instancePartitions, currentAssignment);
       return SegmentAssignmentUtils
-          .assignSegmentWithReplicaGroup(currentAssignment, 
instancePartitions, partitionGroupId);
+          .assignSegmentWithReplicaGroup(currentAssignment, 
instancePartitions, desiredPartitionId);
     }
   }
+
+  private int getDesiredPartitionId(String segmentName, InstancePartitions 
instancePartitions,
+      Map<String, Map<String, String>> currentAssignment) {
+    Optional<Integer> segmentPartitionIdOpt =
+        SegmentUtils.getRealtimeSegmentPartitionId(segmentName, 
_realtimeTableName, _helixManager, _partitionColumn);
+    int numPartitions = instancePartitions.getNumPartitions();
+    return segmentPartitionIdOpt.map(segPartitionId -> segPartitionId % 
numPartitions).orElseGet(() -> {

Review Comment:
   Suggest not using functional apis. Even though this method is not very 
performance critical, it is still called very frequently when rebalancing the 
table



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to