[ https://issues.apache.org/jira/browse/DRILL-4446?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15177038#comment-15177038 ]
ASF GitHub Bot commented on DRILL-4446: --------------------------------------- Github user laurentgo commented on a diff in the pull request: https://github.com/apache/drill/pull/403#discussion_r54829311 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/HardAffinityFragmentParallelizer.java --- @@ -0,0 +1,119 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.planner.fragment; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import org.apache.drill.exec.physical.EndpointAffinity; +import org.apache.drill.exec.physical.PhysicalOperatorSetupException; +import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; + +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; + +/** + * Implementation of {@link FragmentParallelizer} where fragment requires running on a given set of endpoints. Width + * per node is depended on the affinity to the endpoint and total width (calculated using costs) + */ +public class HardAffinityFragmentParallelizer implements FragmentParallelizer { + public static final HardAffinityFragmentParallelizer INSTANCE = new HardAffinityFragmentParallelizer(); + + private HardAffinityFragmentParallelizer() { /* singleton */} + + @Override + public void parallelizeFragment(final Wrapper fragmentWrapper, final ParallelizationParameters parameters, + final Collection<DrillbitEndpoint> activeEndpoints) throws PhysicalOperatorSetupException { + + final Stats stats = fragmentWrapper.getStats(); + final ParallelizationInfo pInfo = stats.getParallelizationInfo(); + + // Go through the affinity map and extract the endpoints that have mandatory assignment requirement + final Map<DrillbitEndpoint, EndpointAffinity> endpointPool = Maps.newHashMap(); + for(Entry<DrillbitEndpoint, EndpointAffinity> entry : pInfo.getEndpointAffinityMap().entrySet()) { + if (entry.getValue().isAssignmentRequired()) { + endpointPool.put(entry.getKey(), entry.getValue()); + } + } + + // Step 1: Find the width taking into various parameters + // 1.1. Find the parallelization based on cost. Use max cost of all operators in this fragment; this is consistent + // with the calculation that ExcessiveExchangeRemover uses. + int width = (int) Math.ceil(stats.getMaxCost() / parameters.getParallelizationThreshold()); + + // 1.2. Make sure the width is at least the number of endpoints that require an assignment + width = Math.max(endpointPool.size(), width); + + // 1.3. Cap the parallelization width by fragment level width limit and system level per query width limit + width = Math.max(1, Math.min(width, pInfo.getMaxWidth())); + checkAndThrow(endpointPool.size() <= width, + "Number of mandatory endpoints that require an assignment is more than the allowed fragment max width."); + + width = Math.max(1, Math.min(width, parameters.getMaxGlobalWidth())); + checkAndThrow(endpointPool.size() <= width, + "Number of mandatory endpoints that require an assignment is more than the allowed global query width."); + + // Step 2: Select the endpoints + final Map<DrillbitEndpoint, Integer> endpoints = Maps.newHashMap(); + + // 2.1 First add each endpoint from the pool once so that the mandatory assignment requirement is fulfilled. + int totalAssigned; + for(Entry<DrillbitEndpoint, EndpointAffinity> entry : endpointPool.entrySet()) { + endpoints.put(entry.getKey(), 1); + } + totalAssigned = endpoints.size(); --- End diff -- maybe you can declare totalAssigned here? > Improve current fragment parallelization module > ----------------------------------------------- > > Key: DRILL-4446 > URL: https://issues.apache.org/jira/browse/DRILL-4446 > Project: Apache Drill > Issue Type: New Feature > Affects Versions: 1.5.0 > Reporter: Venki Korukanti > Assignee: Venki Korukanti > Fix For: 1.6.0 > > > Current fragment parallelizer {{SimpleParallelizer.java}} can’t handle > correctly the case where an operator has mandatory scheduling requirement for > a set of DrillbitEndpoints and affinity for each DrillbitEndpoint (i.e how > much portion of the total tasks to be scheduled on each DrillbitEndpoint). It > assumes that scheduling requirements are soft (except one case where Mux and > DeMux case where mandatory parallelization requirement of 1 unit). > An example is: > Cluster has 3 nodes running Drillbits and storage service on each. Data for a > table is only present at storage services in two nodes. So a GroupScan needs > to be scheduled on these two nodes in order to read the data. Storage service > doesn't support (or costly) reading data from remote node. > Inserting the mandatory scheduling requirements within existing > SimpleParallelizer is not sufficient as you may end up with a plan that has a > fragment with two GroupScans each having its own hard parallelization > requirements. > Proposal is: > Add a property to each operator which tells what parallelization > implementation to use. Most operators don't have any particular strategy > (such as Project or Filter), they depend on incoming operator. Current > existing operators which have requirements (all existing GroupScans) default > to current parallelizer {{SimpleParallelizer}}. {{Screen}} defaults to new > mandatory assignment parallelizer. It is possible that PhysicalPlan generated > can have a fragment with operators having different parallelization > strategies. In that case an exchange is inserted in between operators where a > change in parallelization strategy is required. > Will send a detailed design doc. -- This message was sent by Atlassian JIRA (v6.3.4#6332)