xiangfu0 commented on code in PR #18549:
URL: https://github.com/apache/pinot/pull/18549#discussion_r3281051686
##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java:
##########
@@ -1013,6 +1016,102 @@ static void validateUpsertAndDedupConfig(TableConfig
tableConfig, Schema schema)
validateTTLForDedupConfig(tableConfig, schema);
}
+ private static void validateOfflineUpsertPartitionConfig(TableConfig
tableConfig, Schema schema,
+ Map<String, ColumnPartitionConfig> columnPartitionMap) {
+ Preconditions.checkState(columnPartitionMap.size() == 1,
+ "Offline upsert table must have exactly one partition column in
segmentPartitionConfig");
+ ColumnPartitionConfig columnPartitionConfig =
columnPartitionMap.values().iterator().next();
+ String partitionColumn =
validateOfflineUpsertReplicaGroupPartitionConfig(tableConfig,
columnPartitionConfig);
+ Preconditions.checkState(columnPartitionMap.containsKey(partitionColumn),
+ "Offline upsert table partition column: %s must exist in
segmentPartitionConfig", partitionColumn);
+ validateOfflineUpsertPartitionFunctionConfig(partitionColumn,
columnPartitionMap.get(partitionColumn));
+ if (CollectionUtils.isNotEmpty(schema.getPrimaryKeyColumns())) {
+
Preconditions.checkState(schema.getPrimaryKeyColumns().contains(partitionColumn),
+ "Offline upsert table partition column: %s must be a primary key
column", partitionColumn);
+ }
+ validateOfflineUpsertSegmentAssignmentConfig(tableConfig);
+ }
+
+ private static void validateOfflineUpsertPartitionFunctionConfig(String
partitionColumn,
+ ColumnPartitionConfig columnPartitionConfig) {
+ try {
+ PartitionFunctionFactory.getPartitionFunction(columnPartitionConfig);
+ } catch (Exception e) {
+ throw new IllegalStateException(
+ String.format("Offline upsert table partition column: %s has invalid
segmentPartitionConfig",
+ partitionColumn), e);
+ }
+ }
+
+ private static String
validateOfflineUpsertReplicaGroupPartitionConfig(TableConfig tableConfig,
+ ColumnPartitionConfig columnPartitionConfig) {
+ InstanceAssignmentConfig offlineInstanceAssignmentConfig =
+
MapUtils.emptyIfNull(tableConfig.getInstanceAssignmentConfigMap()).get(InstancePartitionsType.OFFLINE.name());
+ if (offlineInstanceAssignmentConfig != null) {
+ InstanceReplicaGroupPartitionConfig replicaGroupPartitionConfig =
+ offlineInstanceAssignmentConfig.getReplicaGroupPartitionConfig();
+ Preconditions.checkState(replicaGroupPartitionConfig != null,
+ "Offline upsert table must configure replicaGroupPartitionConfig for
OFFLINE instance assignment");
+ String partitionColumn =
replicaGroupPartitionConfig.getPartitionColumn();
+ Preconditions.checkState(StringUtils.isNotEmpty(partitionColumn),
+ "Offline upsert table must configure a partition column in OFFLINE
instanceAssignmentConfigMap");
+
Preconditions.checkState(replicaGroupPartitionConfig.isReplicaGroupBased(),
+ "Offline upsert table must use replica-group based OFFLINE instance
assignment");
+
Preconditions.checkState(replicaGroupPartitionConfig.getNumReplicaGroups() > 0,
+ "Offline upsert table OFFLINE instance assignment must configure
positive numReplicaGroups");
+ Preconditions.checkState(
+ replicaGroupPartitionConfig.getNumPartitions() ==
columnPartitionConfig.getNumPartitions(),
+ "Offline upsert table OFFLINE instance assignment numPartitions: %s
must match segmentPartitionConfig "
+ + "numPartitions: %s",
replicaGroupPartitionConfig.getNumPartitions(),
+ columnPartitionConfig.getNumPartitions());
+
Preconditions.checkState(replicaGroupPartitionConfig.getNumInstancesPerPartition()
== 1,
+ "Offline upsert table OFFLINE instance assignment must configure
numInstancesPerPartition to 1");
+ return partitionColumn;
+ }
+
+ //noinspection deprecation
+ SegmentsValidationAndRetentionConfig validationConfig =
tableConfig.getValidationConfig();
+ ReplicaGroupStrategyConfig replicaGroupStrategyConfig =
+ validationConfig != null ?
validationConfig.getReplicaGroupStrategyConfig() : null;
+ String partitionColumn =
+ replicaGroupStrategyConfig != null ?
replicaGroupStrategyConfig.getPartitionColumn() : null;
+ Preconditions.checkState(StringUtils.isNotEmpty(partitionColumn),
+ "Offline upsert table must configure a partition column in
instanceAssignmentConfigMap or "
+ + "replicaGroupStrategyConfig");
+
Preconditions.checkState(replicaGroupStrategyConfig.getNumInstancesPerPartition()
== 1,
+ "Offline upsert table replicaGroupStrategyConfig must configure
numInstancesPerPartition to 1");
+ return partitionColumn;
+ }
+
+ private static void validateOfflineUpsertSegmentAssignmentConfig(TableConfig
tableConfig) {
+ SegmentAssignmentConfig segmentAssignmentConfig =
+
MapUtils.emptyIfNull(tableConfig.getSegmentAssignmentConfigMap()).get(InstancePartitionsType.OFFLINE.name());
+ String assignmentStrategy =
+ segmentAssignmentConfig != null ?
segmentAssignmentConfig.getAssignmentStrategy() : null;
+ if (assignmentStrategy != null) {
+ Preconditions.checkState(
+
AssignmentStrategy.REPLICA_GROUP_SEGMENT_ASSIGNMENT_STRATEGY.equalsIgnoreCase(assignmentStrategy)
+ ||
AssignmentStrategy.ROUND_ROBIN_REPLICA_GROUP_SEGMENT_ASSIGNMENT_STRATEGY.equalsIgnoreCase(
+ assignmentStrategy),
+ "Offline upsert table OFFLINE segmentAssignmentConfigMap must use a
replica-group segment assignment "
+ + "strategy");
+ }
+
+ if (MapUtils.emptyIfNull(tableConfig.getInstanceAssignmentConfigMap())
+ .containsKey(InstancePartitionsType.OFFLINE.name())) {
+ return;
+ }
+
+ Preconditions.checkState(
+
!MapUtils.emptyIfNull(tableConfig.getInstancePartitionsMap()).containsKey(InstancePartitionsType.OFFLINE),
Review Comment:
This new precondition hard-rejects offline upsert tables that still use
legacy `replicaGroupStrategyConfig` together with a preconfigured
`instancePartitionsMap.OFFLINE`. That layout is still supported by the runtime:
`InstancePartitionsUtils.fetchOrComputeInstancePartitions(...)` continues to
load those preconfigured OFFLINE partitions, so after this lands those clusters
start failing unrelated table-config updates with 400s even though segment
assignment still depends on the existing partitions. This needs a compatibility
path rather than a hard rejection.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]