[ 
https://issues.apache.org/jira/browse/DRILL-4446?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15231263#comment-15231263
 ] 

ASF GitHub Bot commented on DRILL-4446:
---------------------------------------

Github user sudheeshkatkam commented on a diff in the pull request:

    https://github.com/apache/drill/pull/403#discussion_r58956969
  
    --- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/HardAffinityFragmentParallelizer.java
 ---
    @@ -0,0 +1,134 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + * <p/>
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * <p/>
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.drill.exec.planner.fragment;
    +
    +import com.google.common.collect.Lists;
    +import com.google.common.collect.Maps;
    +import org.apache.drill.exec.physical.EndpointAffinity;
    +import org.apache.drill.exec.physical.PhysicalOperatorSetupException;
    +import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
    +import org.slf4j.Logger;
    +
    +import java.util.Collection;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Map.Entry;
    +
    +/**
    + * Implementation of {@link FragmentParallelizer} where fragment requires 
running on a given set of endpoints. Width
    + * per node is depended on the affinity to the endpoint and total width 
(calculated using costs)
    + */
    +public class HardAffinityFragmentParallelizer implements 
FragmentParallelizer {
    +  private static final Logger logger = 
org.slf4j.LoggerFactory.getLogger(HardAffinityFragmentParallelizer.class);
    +
    +  public static final HardAffinityFragmentParallelizer INSTANCE = new 
HardAffinityFragmentParallelizer();
    +
    +  private static String EOL = System.getProperty("line.separator");
    +
    +  private HardAffinityFragmentParallelizer() { /* singleton */}
    +
    +  @Override
    +  public void parallelizeFragment(final Wrapper fragmentWrapper, final 
ParallelizationParameters parameters,
    +      final Collection<DrillbitEndpoint> activeEndpoints) throws 
PhysicalOperatorSetupException {
    +
    +    final Stats stats = fragmentWrapper.getStats();
    +    final ParallelizationInfo pInfo = stats.getParallelizationInfo();
    +
    +    // Go through the affinity map and extract the endpoints that have 
mandatory assignment requirement
    +    final Map<DrillbitEndpoint, EndpointAffinity> endpointPool = 
Maps.newHashMap();
    +    for(Entry<DrillbitEndpoint, EndpointAffinity> entry : 
pInfo.getEndpointAffinityMap().entrySet()) {
    +      if (entry.getValue().isAssignmentRequired()) {
    +        endpointPool.put(entry.getKey(), entry.getValue());
    +      }
    +    }
    +
    +    // Step 1: Find the width taking into various parameters
    +    // 1.1. Find the parallelization based on cost. Use max cost of all 
operators in this fragment; this is consistent
    +    //      with the calculation that ExcessiveExchangeRemover uses.
    +    int width = (int) Math.ceil(stats.getMaxCost() / 
parameters.getSliceTarget());
    +
    +    // 1.2. Make sure the width is at least the number of endpoints that 
require an assignment
    +    width = Math.max(endpointPool.size(), width);
    +
    +    // 1.3. Cap the parallelization width by fragment level width limit 
and system level per query width limit
    +    width = Math.max(1, Math.min(width, pInfo.getMaxWidth()));
    +    checkAndThrow(endpointPool.size() <= width, logger,
    +        "Number of mandatory endpoints ({}) that require an assignment is 
more than the allowed fragment max " +
    +            "width ({}).", endpointPool.size(), pInfo.getMaxWidth());
    +
    +    // 1.4 Cap the parallelization width by global max query width
    +    width = Math.max(1, Math.min(width, parameters.getMaxGlobalWidth()));
    +    checkAndThrow(endpointPool.size() <= width, logger,
    +        "Number of mandatory endpoints ({}) that require an assignment is 
more than the allowed global query " +
    +            "width ({}).", endpointPool.size(), 
parameters.getMaxGlobalWidth());
    +
    +    // 1.5 Cap the parallelization width by max allowed parallelization 
per node
    +    width = Math.max(1, Math.min(width, 
endpointPool.size()*parameters.getMaxWidthPerNode()));
    +
    +    // Step 2: Select the endpoints
    +    final Map<DrillbitEndpoint, Integer> endpoints = Maps.newHashMap();
    +
    +    // 2.1 First add each endpoint from the pool once so that the 
mandatory assignment requirement is fulfilled.
    +    for(Entry<DrillbitEndpoint, EndpointAffinity> entry : 
endpointPool.entrySet()) {
    +      endpoints.put(entry.getKey(), 1);
    +    }
    +    int totalAssigned = endpoints.size();
    +
    +    // 2.2 Assign the remaining slots to endpoints proportional to the 
affinity of each endpoint
    +    int remainingSlots = width - endpoints.size();
    +    while (remainingSlots > 0) {
    +      for(EndpointAffinity epAf : endpointPool.values()) {
    +        final int moreAllocation = (int) Math.ceil(epAf.getAffinity() * 
remainingSlots);
    +        int currentAssignments = endpoints.get(epAf.getEndpoint());
    +        for(int i=0;
    +            i < moreAllocation &&  totalAssigned < width && 
currentAssignments < parameters.getMaxWidthPerNode();
    +            i++) {
    +          totalAssigned++;
    +          currentAssignments++;
    +        }
    +        endpoints.put(epAf.getEndpoint(), currentAssignments);
    +      }
    +      final int previousRemainingSlots = remainingSlots;
    +      remainingSlots = width - totalAssigned;
    +      if (previousRemainingSlots == remainingSlots) {
    +        logger.error("Can't parallelize fragment: " +
    +            "Every mandatory node has exhausted the maximum width per node 
limit." + EOL +
    +            "Endpoint pool: {}" + EOL + "Assignment so far: {}" + EOL + 
"Width: {}", endpointPool, endpoints, width);
    +        throw new PhysicalOperatorSetupException("Can not parallelize 
fragment.");
    +      }
    +    }
    +
    +    final List<DrillbitEndpoint> assignedEndpoints = Lists.newArrayList();
    +    for(Entry<DrillbitEndpoint, Integer> entry : endpoints.entrySet()) {
    +      for(int i=0; i < entry.getValue(); i++) {
    +        assignedEndpoints.add(entry.getKey());
    +      }
    +    }
    +
    +    fragmentWrapper.setWidth(width);
    +    fragmentWrapper.assignEndpoints(assignedEndpoints);
    +  }
    +
    +  private static void checkAndThrow(final boolean expr, final Logger 
logger, final String errMsg, Object... args)
    --- End diff --
    
    checkOrThrow?


> Improve current fragment parallelization module
> -----------------------------------------------
>
>                 Key: DRILL-4446
>                 URL: https://issues.apache.org/jira/browse/DRILL-4446
>             Project: Apache Drill
>          Issue Type: New Feature
>    Affects Versions: 1.5.0
>            Reporter: Venki Korukanti
>            Assignee: Venki Korukanti
>             Fix For: 1.7.0
>
>
> Current fragment parallelizer {{SimpleParallelizer.java}} can’t handle 
> correctly the case where an operator has mandatory scheduling requirement for 
> a set of DrillbitEndpoints and affinity for each DrillbitEndpoint (i.e how 
> much portion of the total tasks to be scheduled on each DrillbitEndpoint). It 
> assumes that scheduling requirements are soft (except one case where Mux and 
> DeMux case where mandatory parallelization requirement of 1 unit). 
> An example is:
> Cluster has 3 nodes running Drillbits and storage service on each. Data for a 
> table is only present at storage services in two nodes. So a GroupScan needs 
> to be scheduled on these two nodes in order to read the data. Storage service 
> doesn't support (or costly) reading data from remote node.
> Inserting the mandatory scheduling requirements within existing 
> SimpleParallelizer is not sufficient as you may end up with a plan that has a 
> fragment with two GroupScans each having its own hard parallelization 
> requirements.
> Proposal is:
> Add a property to each operator which tells what parallelization 
> implementation to use. Most operators don't have any particular strategy 
> (such as Project or Filter), they depend on incoming operator. Current 
> existing operators which have requirements (all existing GroupScans) default 
> to current parallelizer {{SimpleParallelizer}}. {{Screen}} defaults to new 
> mandatory assignment parallelizer. It is possible that PhysicalPlan generated 
> can have a fragment with operators having different parallelization 
> strategies. In that case an exchange is inserted in between operators where a 
> change in parallelization strategy is required.
> Will send a detailed design doc.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to