itschrispeck commented on code in PR #15481: URL: https://github.com/apache/pinot/pull/15481#discussion_r2038047401
########## pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/RelToPRelConverter.java: ########## @@ -0,0 +1,136 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.query.planner.physical.v2; + +import com.google.common.base.Preconditions; +import java.util.ArrayList; +import java.util.List; +import java.util.function.Supplier; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.Filter; +import org.apache.calcite.rel.core.Join; +import org.apache.calcite.rel.core.Minus; +import org.apache.calcite.rel.core.Project; +import org.apache.calcite.rel.core.Sort; +import org.apache.calcite.rel.core.TableScan; +import org.apache.calcite.rel.core.Union; +import org.apache.calcite.rel.core.Values; +import org.apache.calcite.rel.core.Window; +import org.apache.pinot.calcite.rel.logical.PinotLogicalAggregate; +import org.apache.pinot.calcite.rel.traits.TraitAssignment; +import org.apache.pinot.common.config.provider.TableCache; +import org.apache.pinot.query.context.PhysicalPlannerContext; +import org.apache.pinot.query.planner.physical.v2.nodes.PhysicalAggregate; +import org.apache.pinot.query.planner.physical.v2.nodes.PhysicalFilter; +import org.apache.pinot.query.planner.physical.v2.nodes.PhysicalJoin; +import org.apache.pinot.query.planner.physical.v2.nodes.PhysicalProject; +import org.apache.pinot.query.planner.physical.v2.nodes.PhysicalSort; +import org.apache.pinot.query.planner.physical.v2.nodes.PhysicalTableScan; +import org.apache.pinot.query.planner.physical.v2.nodes.PhysicalUnion; +import org.apache.pinot.query.planner.physical.v2.nodes.PhysicalValues; +import org.apache.pinot.query.planner.physical.v2.nodes.PhysicalWindow; +import org.apache.pinot.query.planner.physical.v2.opt.PRelOptRule; +import org.apache.pinot.query.planner.physical.v2.opt.PhysicalOptRuleSet; +import org.apache.pinot.query.planner.physical.v2.opt.RuleExecutor; +import org.apache.pinot.query.planner.physical.v2.opt.RuleExecutors; +import org.apache.pinot.query.planner.plannode.AggregateNode; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Converts a tree of RelNode to a tree of PRelNode, running the configured Physical Optimizers in the process. + */ +public class RelToPRelConverter { + private static final Logger LOGGER = LoggerFactory.getLogger(RelToPRelConverter.class); + + private RelToPRelConverter() { + } + + public static PRelNode toPRelNode(RelNode relNode, PhysicalPlannerContext context, TableCache tableCache) { + // Step-1: Convert each RelNode to a PRelNode + PRelNode rootPRelNode = create(relNode, context.getNodeIdGenerator()); + // Step-2: Assign traits + rootPRelNode = TraitAssignment.assign(rootPRelNode, context); + // Step-3: Run physical optimizer rules. + var ruleAndExecutorList = PhysicalOptRuleSet.create(context, tableCache); + for (var ruleAndExecutor : ruleAndExecutorList) { + PRelOptRule rule = ruleAndExecutor.getLeft(); + RuleExecutor executor = RuleExecutors.create(ruleAndExecutor.getRight(), rule, context); + rootPRelNode = executor.execute(rootPRelNode); + } + return rootPRelNode; + } + + public static PRelNode create(RelNode relNode, Supplier<Integer> nodeIdGenerator) { + List<PRelNode> inputs = new ArrayList<>(); + for (RelNode input : relNode.getInputs()) { + inputs.add(create(input, nodeIdGenerator)); + } + if (relNode instanceof TableScan) { + Preconditions.checkState(inputs.isEmpty(), "Expected no inputs to table scan. Found: %s", inputs); + return new PhysicalTableScan((TableScan) relNode, nodeIdGenerator.get(), null, null); + } else if (relNode instanceof Filter) { + Preconditions.checkState(inputs.size() == 1, "Expected exactly 1 input of filter. Found: %s", inputs); + Filter filter = (Filter) relNode; + return new PhysicalFilter(filter.getCluster(), filter.getTraitSet(), filter.getHints(), filter.getCondition(), + nodeIdGenerator.get(), inputs.get(0), null, false); + } else if (relNode instanceof Project) { + Preconditions.checkState(inputs.size() == 1, "Expected exactly 1 input of project. Found: %s", inputs); + Project project = (Project) relNode; + return new PhysicalProject(project.getCluster(), project.getTraitSet(), project.getHints(), project.getProjects(), + project.getRowType(), project.getVariablesSet(), nodeIdGenerator.get(), inputs.get(0), null, false); + } else if (relNode instanceof PinotLogicalAggregate) { + Preconditions.checkState(inputs.size() == 1, "Expected exactly 1 input of agg. Found: %s", inputs); + PinotLogicalAggregate aggRel = (PinotLogicalAggregate) relNode; + return new PhysicalAggregate(aggRel.getCluster(), aggRel.getTraitSet(), aggRel.getHints(), aggRel.getGroupSet(), + aggRel.getGroupSets(), aggRel.getAggCallList(), nodeIdGenerator.get(), inputs.get(0), null, false, + AggregateNode.AggType.DIRECT, false, List.of(), 0); Review Comment: Trying to understand, why is this always direct? ########## pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/opt/rules/LeafStageWorkerAssignmentRule.java: ########## @@ -0,0 +1,420 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.query.planner.physical.v2.opt.rules; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.stream.Collectors; +import javax.annotation.Nullable; +import org.apache.calcite.plan.RelOptTable; +import org.apache.calcite.rel.RelDistribution; +import org.apache.calcite.rel.core.TableScan; +import org.apache.calcite.rel.hint.RelHint; +import org.apache.commons.collections4.CollectionUtils; +import org.apache.commons.collections4.MapUtils; +import org.apache.pinot.calcite.rel.hint.PinotHintOptions; +import org.apache.pinot.common.config.provider.TableCache; +import org.apache.pinot.common.utils.DatabaseUtils; +import org.apache.pinot.core.routing.RoutingManager; +import org.apache.pinot.core.routing.RoutingTable; +import org.apache.pinot.core.routing.ServerRouteInfo; +import org.apache.pinot.core.routing.TablePartitionInfo; +import org.apache.pinot.core.routing.TimeBoundaryInfo; +import org.apache.pinot.core.transport.ServerInstance; +import org.apache.pinot.query.context.PhysicalPlannerContext; +import org.apache.pinot.query.planner.physical.v2.HashDistributionDesc; +import org.apache.pinot.query.planner.physical.v2.PRelNode; +import org.apache.pinot.query.planner.physical.v2.PinotDataDistribution; +import org.apache.pinot.query.planner.physical.v2.TableScanMetadata; +import org.apache.pinot.query.planner.physical.v2.mapping.DistMappingGenerator; +import org.apache.pinot.query.planner.physical.v2.mapping.PinotDistMapping; +import org.apache.pinot.query.planner.physical.v2.nodes.PhysicalTableScan; +import org.apache.pinot.query.planner.physical.v2.opt.PRelOptRule; +import org.apache.pinot.query.planner.physical.v2.opt.PRelOptRuleCall; +import org.apache.pinot.query.planner.plannode.PlanNode; +import org.apache.pinot.query.routing.QueryServerInstance; +import org.apache.pinot.spi.config.table.TableType; +import org.apache.pinot.spi.utils.builder.TableNameBuilder; +import org.apache.pinot.sql.parsers.CalciteSqlCompiler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * <h1>Overview</h1> + * Assigns workers to all PRelNodes that are part of the leaf stage as determined by {@link PRelNode#isLeafStage()}. + * The workers are mainly determined by the Table Scan, unless filter based server pruning is enabled. + * <h1>Current Features</h1> + * <ul> + * <li> + * Automatically detects partitioning and adds that information to PinotDataDistribution. This will be used + * in subsequent worker assignment steps to simplify Exchange. + * </li> + * </ul> + * <h1>Planned / Upcoming Features</h1> + * <ul> + * <li>Support for look-up join.</li> + * <li>Support for partition parallelism and the colocated join hint. See F2 in #15455.</li> + * <li>Support for Hybrid Tables for automatic partitioning inference.</li> + * <li>Server pruning based on filter predicates.</li> + * </ul> + */ +public class LeafStageWorkerAssignmentRule extends PRelOptRule { + private static final Logger LOGGER = LoggerFactory.getLogger(LeafStageWorkerAssignmentRule.class); + private final TableCache _tableCache; + private final RoutingManager _routingManager; + private final PhysicalPlannerContext _physicalPlannerContext; + + public LeafStageWorkerAssignmentRule(PhysicalPlannerContext physicalPlannerContext, TableCache tableCache) { + _routingManager = physicalPlannerContext.getRoutingManager(); + _physicalPlannerContext = physicalPlannerContext; + _tableCache = tableCache; + } + + @Override + public boolean matches(PRelOptRuleCall call) { + return call._currentNode.isLeafStage(); + } + + @Override + public PRelNode onMatch(PRelOptRuleCall call) { + if (call._currentNode.unwrap() instanceof TableScan) { + return assignTableScan((PhysicalTableScan) call._currentNode, _physicalPlannerContext.getRequestId()); + } + PRelNode currentNode = call._currentNode; + Preconditions.checkState(currentNode.isLeafStage(), "Leaf stage worker assignment called for non-leaf stage node:" + + " %s", currentNode); + PinotDistMapping mapping = DistMappingGenerator.compute(currentNode.getPRelInput(0).unwrap(), + currentNode.unwrap(), null); + PinotDataDistribution derivedDistribution = currentNode.getPRelInput(0).getPinotDataDistributionOrThrow() + .apply(mapping); + return currentNode.with(currentNode.getPRelInputs(), derivedDistribution); + } + + private PhysicalTableScan assignTableScan(PhysicalTableScan tableScan, long requestId) { + // Step-1: Init tableName, table options, routing table and time boundary info. + String tableName = Objects.requireNonNull(getActualTableName(tableScan), "Table not found"); + Map<String, String> tableOptions = getTableOptions(tableScan.getHints()); + Map<String, RoutingTable> routingTableMap = getRoutingTable(tableName, requestId); + Preconditions.checkState(!routingTableMap.isEmpty(), "Unable to find routing entries for table: %s", tableName); + // acquire time boundary info if it is a hybrid table. + TimeBoundaryInfo timeBoundaryInfo = null; + if (routingTableMap.size() > 1) { + timeBoundaryInfo = _routingManager.getTimeBoundaryInfo( + TableNameBuilder.forType(TableType.OFFLINE) + .tableNameWithType(TableNameBuilder.extractRawTableName(tableName))); + if (timeBoundaryInfo == null) { + // remove offline table routing if no time boundary info is acquired. + routingTableMap.remove(TableType.OFFLINE.name()); + } + } + // Step-2: Compute instance to segments map and unavailable segments. + Map<String, Set<String>> segmentUnavailableMap = new HashMap<>(); + InstanceIdToSegments instanceIdToSegments = new InstanceIdToSegments(); + for (Map.Entry<String, RoutingTable> routingEntry : routingTableMap.entrySet()) { + String tableType = routingEntry.getKey(); + RoutingTable routingTable = routingEntry.getValue(); + Map<String, List<String>> currentSegmentsMap = new HashMap<>(); + Map<ServerInstance, ServerRouteInfo> tmp = routingTable.getServerInstanceToSegmentsMap(); + for (Map.Entry<ServerInstance, ServerRouteInfo> serverEntry : tmp.entrySet()) { + String instanceId = serverEntry.getKey().getInstanceId(); + Preconditions.checkState(currentSegmentsMap.put(instanceId, serverEntry.getValue().getSegments()) == null, + "Entry for server %s and table type: %s already exist!", serverEntry.getKey(), tableType); + _physicalPlannerContext.getInstanceIdToQueryServerInstance().computeIfAbsent(instanceId, + (ignore) -> new QueryServerInstance(serverEntry.getKey())); + } + if (tableType.equalsIgnoreCase(TableType.OFFLINE.name())) { + instanceIdToSegments._offlineTableSegmentsMap = currentSegmentsMap; + } else { + instanceIdToSegments._realtimeTableSegmentsMap = currentSegmentsMap; + } + if (!routingTable.getUnavailableSegments().isEmpty()) { + // Set unavailable segments in context, keyed by PRelNode ID. + segmentUnavailableMap.put(TableNameBuilder.forType(TableType.valueOf(tableName)).tableNameWithType(tableName), + new HashSet<>(routingTable.getUnavailableSegments())); + } + } + List<String> fieldNames = tableScan.getRowType().getFieldNames(); + Map<String, TablePartitionInfo> tablePartitionInfoMap = calculateTablePartitionInfo(tableName, + routingTableMap.keySet()); + TableScanWorkerAssignmentResult workerAssignmentResult = assignTableScan(tableName, fieldNames, + instanceIdToSegments, tablePartitionInfoMap); + TableScanMetadata metadata = new TableScanMetadata(Set.of(tableName), workerAssignmentResult._workerIdToSegmentsMap, + tableOptions, segmentUnavailableMap, timeBoundaryInfo); + return tableScan.with(workerAssignmentResult._pinotDataDistribution, metadata); + } + + /** + * Assigns workers for the table-scan node, automatically detecting table partitioning whenever possible. The + * arguments to this method are minimal to facilitate unit-testing. + */ + @VisibleForTesting + static TableScanWorkerAssignmentResult assignTableScan(String tableName, List<String> fieldNames, + InstanceIdToSegments instanceIdToSegments, Map<String, TablePartitionInfo> tpiMap) { + Set<String> tableTypes = instanceIdToSegments.getActiveTableTypes(); + Set<String> partitionedTableTypes = tableTypes.stream().filter(tpiMap::containsKey).collect(Collectors.toSet()); + Preconditions.checkState(!tableTypes.isEmpty(), "No routing entry for offline or realtime type"); + if (tableTypes.equals(partitionedTableTypes)) { + if (partitionedTableTypes.size() == 1) { + // Attempt partitioned distribution + String tableType = partitionedTableTypes.iterator().next(); + String tableNameWithType = TableNameBuilder.forType(TableType.valueOf(tableType)).tableNameWithType(tableName); + TableScanWorkerAssignmentResult assignmentResult = attemptPartitionedDistribution(tableNameWithType, + fieldNames, instanceIdToSegments.getSegmentsMap(TableType.valueOf(tableType)), + tpiMap.get(tableType)); + if (assignmentResult != null) { + return assignmentResult; + } + } else { + // TODO(mse-physical): Support automatic partitioned dist for hybrid tables. + LOGGER.warn("Automatic Partitioned Distribution not supported for Hybrid Tables yet"); + } + } + // For each server, we want to know the segments for each table-type. + Map<String, Map<String, List<String>>> instanceIdToTableTypeToSegmentsMap = new HashMap<>(); + for (String tableType : tableTypes) { + Map<String, List<String>> segmentsMap = instanceIdToSegments.getSegmentsMap(TableType.valueOf(tableType)); + Preconditions.checkNotNull(segmentsMap, "Unexpected null segments map in leaf worker assignment"); + for (Map.Entry<String, List<String>> entry : segmentsMap.entrySet()) { + String instanceId = entry.getKey(); + List<String> segments = entry.getValue(); + instanceIdToTableTypeToSegmentsMap.computeIfAbsent(instanceId, k -> new HashMap<>()) + .put(tableType, segments); + } + } + // For each server, assign one worker each. + Map<Integer, Map<String, List<String>>> workerIdToSegmentsMap = new HashMap<>(); + List<String> workers = new ArrayList<>(); + for (Map.Entry<String, Map<String, List<String>>> entry : instanceIdToTableTypeToSegmentsMap.entrySet()) { + String instanceId = entry.getKey(); + for (var tableTypeAndSegments : entry.getValue().entrySet()) { + String tableType = tableTypeAndSegments.getKey(); + List<String> segments = tableTypeAndSegments.getValue(); + workerIdToSegmentsMap.computeIfAbsent(workers.size(), (x) -> new HashMap<>()).put(tableType, segments); + } + workers.add(String.format("%s@%s", workers.size(), instanceId)); + } + PinotDataDistribution pinotDataDistribution = new PinotDataDistribution(RelDistribution.Type.RANDOM_DISTRIBUTED, + workers, workers.hashCode(), null, null); + return new TableScanWorkerAssignmentResult(pinotDataDistribution, workerIdToSegmentsMap); + } + + @Nullable + @VisibleForTesting + static TableScanWorkerAssignmentResult attemptPartitionedDistribution(String tableNameWithType, Review Comment: Add a comment, returns null if could not generate partitioned distribution.. ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
