Jackie-Jiang commented on code in PR #8584:
URL: https://github.com/apache/pinot/pull/8584#discussion_r856590568
##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java:
##########
@@ -92,12 +91,6 @@ public void completeSegmentOperations(String
tableNameWithType, SegmentMetadata
Response.Status.CONFLICT);
}
- // TODO Allow segment refreshing for realtime tables.
- if (TableNameBuilder.isRealtimeTableResource(tableNameWithType)) {
- throw new ControllerApplicationException(LOGGER,
- "Refresh existing segment " + segmentName + " for realtime table " +
tableNameWithType
- + " is not yet supported ", Response.Status.NOT_IMPLEMENTED);
- }
Review Comment:
(minor) Remove one extra empty line
##########
pinot-common/src/main/java/org/apache/pinot/common/utils/SegmentUtils.java:
##########
@@ -35,20 +37,23 @@ private SegmentUtils() {
// Returns the partition id of a realtime segment based segment name and
segment metadata info retrieved via Helix.
// Important: The method is costly because it may read data from zookeeper.
Do not use it in any query execution
// path.
- public static int getRealtimeSegmentPartitionId(String segmentName, String
realtimeTableName,
+ public static Optional<Integer> getRealtimeSegmentPartitionId(String
segmentName, String realtimeTableName,
HelixManager helixManager, String partitionColumn) {
// A fast path if the segmentName is a LLC segment name and we can get the
partition id from the name directly.
if (LLCSegmentName.isLowLevelConsumerSegmentName(segmentName)) {
- return new LLCSegmentName(segmentName).getPartitionGroupId();
+ return Optional.of(new
LLCSegmentName(segmentName).getPartitionGroupId());
}
- // Otherwise, retrieve the partition id from the segment zk metadata.
Currently only realtime segments from upsert
- // enabled tables have partition ids in their segment metadata.
+ // Otherwise, retrieve the partition id from the segment zk metadata.
SegmentZKMetadata segmentZKMetadata =
ZKMetadataProvider.getSegmentZKMetadata(helixManager.getHelixPropertyStore(),
realtimeTableName, segmentName);
Preconditions
.checkState(segmentZKMetadata != null, "Failed to find segment ZK
metadata for segment: %s of table: %s",
segmentName, realtimeTableName);
SegmentPartitionMetadata segmentPartitionMetadata =
segmentZKMetadata.getPartitionMetadata();
+ if (segmentPartitionMetadata == null
Review Comment:
To be more robust, let's remove the checks and return the partition id when
it is available, return `null` otherwise. The caller can then decide how to
proceed
```suggestion
if (segmentPartitionMetadata != null) {
ColumnPartitionMetadata columnPartitionMetadata =
segmentPartitionMetadata.getColumnPartitionMap().get(partitionColumn);
if (columnPartitionMetadata != null &&
columnPartitionMetadata.getPartitions().size == 1) {
return columnPartitionMetadata.getPartitions().iterator().next();
}
}
return null;
```
##########
pinot-common/src/main/java/org/apache/pinot/common/utils/SegmentUtils.java:
##########
@@ -35,20 +37,23 @@ private SegmentUtils() {
// Returns the partition id of a realtime segment based segment name and
segment metadata info retrieved via Helix.
// Important: The method is costly because it may read data from zookeeper.
Do not use it in any query execution
// path.
- public static int getRealtimeSegmentPartitionId(String segmentName, String
realtimeTableName,
+ public static Optional<Integer> getRealtimeSegmentPartitionId(String
segmentName, String realtimeTableName,
Review Comment:
Suggest returning `Integer` (annotate with `@Nullable`) instead of
`Optional`. Currently we usually use `null` to represent value not available,
and several methods for `Optional` is not supported in JDK 8.
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeSegmentAssignment.java:
##########
@@ -260,13 +277,26 @@ public Map<String, Map<String, String>>
rebalanceTable(Map<String, Map<String, S
"No COMPLETED instance partitions found, reassigning COMPLETED
segments the same way as CONSUMING segments "
+ "with CONSUMING instance partitions for table: {}",
_realtimeTableName);
+ Set<String> uploadedSegmentsWithNoPartitionMetadata = new TreeSet<>();
newAssignment = new TreeMap<>();
for (String segmentName : completedSegmentAssignment.keySet()) {
+ if (!SegmentUtils
+ .getRealtimeSegmentPartitionId(segmentName, _realtimeTableName,
_helixManager, _partitionColumn)
+ .isPresent()) {
+ uploadedSegmentsWithNoPartitionMetadata.add(segmentName);
+ continue;
+ }
List<String> instancesAssigned = assignConsumingSegment(segmentName,
consumingInstancePartitions);
Map<String, String> instanceStateMap =
SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned,
SegmentStateModel.ONLINE);
newAssignment.put(segmentName, instanceStateMap);
}
+ for (String segmentName : uploadedSegmentsWithNoPartitionMetadata) {
Review Comment:
We should not assign one by one here. We may create a new map
`unpartitionedCompletedSegmentAssignment` with only the unpartitioned segments,
then use `reassignSegments()` to get minimum data movement
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeSegmentAssignment.java:
##########
@@ -260,13 +277,26 @@ public Map<String, Map<String, String>>
rebalanceTable(Map<String, Map<String, S
"No COMPLETED instance partitions found, reassigning COMPLETED
segments the same way as CONSUMING segments "
+ "with CONSUMING instance partitions for table: {}",
_realtimeTableName);
+ Set<String> uploadedSegmentsWithNoPartitionMetadata = new TreeSet<>();
Review Comment:
(minor) Rename to `unpartitionedSegments`
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java:
##########
@@ -2011,7 +2003,8 @@ public void assignTableSegment(String tableNameWithType,
String segmentName) {
try {
TableConfig tableConfig = getTableConfig(tableNameWithType);
Preconditions.checkState(tableConfig != null, "Failed to find table
config for table: " + tableNameWithType);
- SegmentAssignment segmentAssignment =
SegmentAssignmentFactory.getSegmentAssignment(_helixZkManager, tableConfig);
+ SegmentAssignment segmentAssignment =
+ SegmentAssignmentFactory.getSegmentAssignment(_helixZkManager,
tableConfig, isUpsertTable(tableNameWithType));
Review Comment:
Don't pass upsert info to the segment assignment engine, set
`instancePartitionsType` to `CONSUMING` instead (same as the current code).
Also, don't call `isUpsertTable()` because it will fetch another table
config, use `tableConfig.getUpsertMode() != UpsertConfig.Mode.NONE` instead.
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeSegmentAssignment.java:
##########
@@ -365,12 +405,32 @@ private List<String> assignCompletedSegment(String
segmentName, Map<String, Map<
// Replica-group based assignment
// Uniformly spray the segment partitions over the instance partitions
- int segmentPartitionId =
- SegmentUtils.getRealtimeSegmentPartitionId(segmentName,
_realtimeTableName, _helixManager, _partitionColumn);
- int numPartitions = instancePartitions.getNumPartitions();
- int partitionGroupId = segmentPartitionId % numPartitions;
+ int desiredPartitionId = getDesiredPartitionId(segmentName,
instancePartitions, currentAssignment);
return SegmentAssignmentUtils
- .assignSegmentWithReplicaGroup(currentAssignment,
instancePartitions, partitionGroupId);
+ .assignSegmentWithReplicaGroup(currentAssignment,
instancePartitions, desiredPartitionId);
}
}
+
+ private int getDesiredPartitionId(String segmentName, InstancePartitions
instancePartitions,
+ Map<String, Map<String, String>> currentAssignment) {
+ Optional<Integer> segmentPartitionIdOpt =
+ SegmentUtils.getRealtimeSegmentPartitionId(segmentName,
_realtimeTableName, _helixManager, _partitionColumn);
+ int numPartitions = instancePartitions.getNumPartitions();
+ return segmentPartitionIdOpt.map(segPartitionId -> segPartitionId %
numPartitions).orElseGet(() -> {
+ // find the partition with the lease number of segments
+ int minPartitionId = 0;
+ int minNumSegmentsAssigned = Integer.MAX_VALUE;
+ for (int partitionId = 0; partitionId < numPartitions; partitionId++) {
+ List<String> instances = instancePartitions.getInstances(partitionId,
0);
+ int[] numSegmentsAssignedPerInstance =
+
SegmentAssignmentUtils.getNumSegmentsAssignedPerInstance(currentAssignment,
instances);
Review Comment:
This method is quite expensive especially for large table. Calling it for
each segment * numPartitions can cause performance issue. Instead we can use
hash-code as suggested above
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeSegmentAssignment.java:
##########
@@ -345,6 +380,11 @@ private Map<String, Map<String, String>>
reassignSegments(String instancePartiti
newAssignment = SegmentAssignmentUtils
.rebalanceReplicaGroupBasedTable(currentAssignment,
instancePartitions, partitionGroupIdToSegmentsMap);
+ for (String segName : uploadedSegmentsWithNoPartitionMetadata) {
Review Comment:
We should combine the unpartitioned segments into the
`partitionGroupIdToSegmentsMap` (assign a partition id to them). I'd suggest
simply using the `Math.abs(segmentName.hashCode() % numPartitions)` as the
partition group id because it is deterministic. We need to keep it
deterministic so that during multiple rebalances, it won't switch partition
which will force it to move around different servers
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeSegmentAssignment.java:
##########
@@ -101,21 +110,26 @@ public void init(HelixManager helixManager, TableConfig
tableConfig) {
_replication, _partitionColumn, _realtimeTableName);
}
+ /**
+ * Since the segment can be either a completed or a consuming segment, the
type of the segment is inferred by the
+ * provided instancePartitionsMap, which has to contain only one
InstancePartitionsType.
+ */
@Override
public List<String> assignSegment(String segmentName, Map<String,
Map<String, String>> currentAssignment,
Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap) {
- InstancePartitions instancePartitions =
instancePartitionsMap.get(InstancePartitionsType.CONSUMING);
- Preconditions.checkState(instancePartitions != null, "Failed to find
CONSUMING instance partitions for table: %s",
- _realtimeTableName);
+ Preconditions
+ .checkState(instancePartitionsMap.keySet().size() == 1, "One instance
partition type should be provided");
+ InstancePartitionsType instancePartitionsType =
instancePartitionsMap.keySet().stream().findFirst().get();
+ InstancePartitions instancePartitions =
instancePartitionsMap.get(instancePartitionsType);
Review Comment:
```suggestion
Preconditions.checkState(instancePartitionsMap.size() == 1, "One
instance partition type should be provided");
Map.Entry<InstancePartitionsType, InstancePartitions> entry =
instancePartitionsMap.entrySet().iterator().next();
InstancePartitionsType instancePartitionsType = entry.getKey();
InstancePartitions instancePartitions = entry.getValue();
```
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeSegmentAssignment.java:
##########
@@ -365,12 +405,32 @@ private List<String> assignCompletedSegment(String
segmentName, Map<String, Map<
// Replica-group based assignment
// Uniformly spray the segment partitions over the instance partitions
- int segmentPartitionId =
- SegmentUtils.getRealtimeSegmentPartitionId(segmentName,
_realtimeTableName, _helixManager, _partitionColumn);
- int numPartitions = instancePartitions.getNumPartitions();
- int partitionGroupId = segmentPartitionId % numPartitions;
+ int desiredPartitionId = getDesiredPartitionId(segmentName,
instancePartitions, currentAssignment);
return SegmentAssignmentUtils
- .assignSegmentWithReplicaGroup(currentAssignment,
instancePartitions, partitionGroupId);
+ .assignSegmentWithReplicaGroup(currentAssignment,
instancePartitions, desiredPartitionId);
}
}
+
+ private int getDesiredPartitionId(String segmentName, InstancePartitions
instancePartitions,
+ Map<String, Map<String, String>> currentAssignment) {
+ Optional<Integer> segmentPartitionIdOpt =
+ SegmentUtils.getRealtimeSegmentPartitionId(segmentName,
_realtimeTableName, _helixManager, _partitionColumn);
+ int numPartitions = instancePartitions.getNumPartitions();
+ return segmentPartitionIdOpt.map(segPartitionId -> segPartitionId %
numPartitions).orElseGet(() -> {
Review Comment:
Suggest not using functional apis. Even though this method is not very
performance critical, it is still called very frequently when rebalancing the
table
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]