klsince commented on code in PR #10255:
URL: https://github.com/apache/pinot/pull/10255#discussion_r1119314780
##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotInstanceAssignmentRestletResource.java:
##########
@@ -80,39 +83,68 @@ public class PinotInstanceAssignmentRestletResource {
@Produces(MediaType.APPLICATION_JSON)
@Path("/tables/{tableName}/instancePartitions")
@ApiOperation(value = "Get the instance partitions")
- public Map<InstancePartitionsType, InstancePartitions> getInstancePartitions(
+ public Map<String, InstancePartitions> getInstancePartitions(
@ApiParam(value = "Name of the table") @PathParam("tableName") String
tableName,
- @ApiParam(value = "OFFLINE|CONSUMING|COMPLETED") @QueryParam("type")
@Nullable
- InstancePartitionsType instancePartitionsType) {
- Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap =
new TreeMap<>();
+ @ApiParam(value = "OFFLINE|CONSUMING|COMPLETED|Name of the tier")
@QueryParam("type") @Nullable String type) {
+ Map<String, InstancePartitions> instancePartitionsMap = new TreeMap<>();
String rawTableName = TableNameBuilder.extractRawTableName(tableName);
TableType tableType =
TableNameBuilder.getTableTypeFromTableName(tableName);
if (tableType != TableType.REALTIME) {
- if (instancePartitionsType == InstancePartitionsType.OFFLINE ||
instancePartitionsType == null) {
- InstancePartitions offlineInstancePartitions = InstancePartitionsUtils
- .fetchInstancePartitions(_resourceManager.getPropertyStore(),
+ if (InstancePartitionsType.OFFLINE.toString().equals(type) || type ==
null) {
+ InstancePartitions offlineInstancePartitions =
+
InstancePartitionsUtils.fetchInstancePartitions(_resourceManager.getPropertyStore(),
InstancePartitionsType.OFFLINE.getInstancePartitionsName(rawTableName));
if (offlineInstancePartitions != null) {
- instancePartitionsMap.put(InstancePartitionsType.OFFLINE,
offlineInstancePartitions);
+ instancePartitionsMap.put(InstancePartitionsType.OFFLINE.toString(),
offlineInstancePartitions);
}
}
}
if (tableType != TableType.OFFLINE) {
- if (instancePartitionsType == InstancePartitionsType.CONSUMING ||
instancePartitionsType == null) {
- InstancePartitions consumingInstancePartitions =
InstancePartitionsUtils
- .fetchInstancePartitions(_resourceManager.getPropertyStore(),
+ if (InstancePartitionsType.CONSUMING.toString().equals(type) || type ==
null) {
+ InstancePartitions consumingInstancePartitions =
+
InstancePartitionsUtils.fetchInstancePartitions(_resourceManager.getPropertyStore(),
InstancePartitionsType.CONSUMING.getInstancePartitionsName(rawTableName));
if (consumingInstancePartitions != null) {
- instancePartitionsMap.put(InstancePartitionsType.CONSUMING,
consumingInstancePartitions);
+
instancePartitionsMap.put(InstancePartitionsType.CONSUMING.toString(),
consumingInstancePartitions);
}
}
- if (instancePartitionsType == InstancePartitionsType.COMPLETED ||
instancePartitionsType == null) {
- InstancePartitions completedInstancePartitions =
InstancePartitionsUtils
- .fetchInstancePartitions(_resourceManager.getPropertyStore(),
+ if (InstancePartitionsType.COMPLETED.toString().equals(type) || type ==
null) {
+ InstancePartitions completedInstancePartitions =
+
InstancePartitionsUtils.fetchInstancePartitions(_resourceManager.getPropertyStore(),
InstancePartitionsType.COMPLETED.getInstancePartitionsName(rawTableName));
if (completedInstancePartitions != null) {
- instancePartitionsMap.put(InstancePartitionsType.COMPLETED,
completedInstancePartitions);
+
instancePartitionsMap.put(InstancePartitionsType.COMPLETED.toString(),
completedInstancePartitions);
+ }
+ }
+ }
+
+ TableConfig realtimeTableConfig =
_resourceManager.getRealtimeTableConfig(tableName);
+ if (realtimeTableConfig != null &&
CollectionUtils.isNotEmpty(realtimeTableConfig.getTierConfigsList())) {
+ for (TierConfig tierConfig : realtimeTableConfig.getTierConfigsList()) {
+ if (type == null || type.equals(tierConfig.getName())) {
+ InstancePartitions instancePartitions =
+
InstancePartitionsUtils.fetchInstancePartitions(_resourceManager.getPropertyStore(),
+
InstancePartitionsUtils.getInstancePartitonNameForTier(realtimeTableConfig.getTableName(),
+ tierConfig.getName()));
+ if (instancePartitions != null) {
+ instancePartitionsMap.put(tierConfig.getName(),
instancePartitions);
+ }
+ }
+ }
+ }
+
+ TableConfig offlineTableConfig =
_resourceManager.getOfflineTableConfig(tableName);
+ if (offlineTableConfig != null &&
CollectionUtils.isNotEmpty(offlineTableConfig.getTierConfigsList())) {
+ for (TierConfig tierConfig : offlineTableConfig.getTierConfigsList()) {
+ if (type == null || type.equals(tierConfig.getName())) {
+ InstancePartitions instancePartitions =
+
InstancePartitionsUtils.fetchInstancePartitions(_resourceManager.getPropertyStore(),
+
InstancePartitionsUtils.getInstancePartitonNameForTier(realtimeTableConfig.getTableName(),
Review Comment:
offlineTableConfig.getTableName()?
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java:
##########
@@ -1757,6 +1758,23 @@ private void assignInstances(TableConfig tableConfig,
boolean override) {
}
}
}
+
+ // Process and persist tier config instancePartitions
+ if (CollectionUtils.isNotEmpty(tableConfig.getTierConfigsList())
+ && tableConfig.getInstanceAssignmentConfigMap() != null) {
+ for (TierConfig tierConfig : tableConfig.getTierConfigsList()) {
+ if
(tableConfig.getInstanceAssignmentConfigMap().containsKey(tierConfig.getName()))
{
+ if (override ||
InstancePartitionsUtils.fetchInstancePartitions(_propertyStore,
+
InstancePartitionsUtils.getInstancePartitonNameForTier(tableNameWithType,
tierConfig.getName()))
+ == null) {
+ InstancePartitions instancePartitions =
+ instanceAssignmentDriver.assignInstances(tierConfig.getName(),
instanceConfigs, null,
+
tableConfig.getInstanceAssignmentConfigMap().get(tierConfig.getName()));
+ InstancePartitionsUtils.persistInstancePartitions(_propertyStore,
instancePartitions);
Review Comment:
leave some INFO logs as done in the for-loop above.
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##########
@@ -545,10 +547,54 @@ private Map<String, InstancePartitions>
getTierToInstancePartitionsMap(String ta
* InstancePartitions to zk.
* Then we'll be able to support replica group assignment while creating
InstancePartitions for tiers
*/
- private InstancePartitions getInstancePartitionsForTier(Tier tier, String
tableNameWithType) {
Review Comment:
update the method comment a bit?
also I feel it probably better to keep this method functional, w/o
persisting states into ZK as a side effect, which may be put in separate util
methods and get called by TableRebalancer based on dryRun/bootstrap flag.
##########
pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java:
##########
@@ -261,6 +263,20 @@ public static UserConfig
getUserConfig(ZkHelixPropertyStore<ZNRecord> propertySt
}
}
+ @Nullable
+ public static List<InstancePartitions>
getAllInstancePartitions(HelixPropertyStore<ZNRecord> propertyStore) {
+ List<ZNRecord> znRecordss =
+ propertyStore.getChildren(PROPERTYSTORE_INSTANCE_PARTITIONS_PREFIX,
null, AccessOption.PERSISTENT);
+
+ try {
+ return
Optional.ofNullable(znRecordss).orElseGet(ArrayList::new).stream().map(InstancePartitions::fromZNRecord)
+ .collect(Collectors.toList());
+ } catch (Exception e) {
+ LOGGER.error("Caught exception while getting user list configuration",
e);
Review Comment:
looks like need to adjust the error msg for this method
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentDriver.java:
##########
@@ -82,4 +83,34 @@ public InstancePartitions
assignInstances(InstancePartitionsType instancePartiti
instancePartitionSelector.selectInstances(poolToInstanceConfigsMap,
instancePartitions);
return instancePartitions;
}
+
+ // TODO (saurabh) : Move commons
Review Comment:
curious is there some complexity to extract the common methods in this PR? I
might have missed it. maybe call it out in the TODO comment.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]