[ 
https://issues.apache.org/jira/browse/DRILL-4446?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15177038#comment-15177038
 ] 

ASF GitHub Bot commented on DRILL-4446:
---------------------------------------

Github user laurentgo commented on a diff in the pull request:

    https://github.com/apache/drill/pull/403#discussion_r54829311
  
    --- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/HardAffinityFragmentParallelizer.java
 ---
    @@ -0,0 +1,119 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + * <p/>
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * <p/>
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.drill.exec.planner.fragment;
    +
    +import com.google.common.collect.Lists;
    +import com.google.common.collect.Maps;
    +import org.apache.drill.exec.physical.EndpointAffinity;
    +import org.apache.drill.exec.physical.PhysicalOperatorSetupException;
    +import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
    +
    +import java.util.Collection;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Map.Entry;
    +
    +/**
    + * Implementation of {@link FragmentParallelizer} where fragment requires 
running on a given set of endpoints. Width
    + * per node is depended on the affinity to the endpoint and total width 
(calculated using costs)
    + */
    +public class HardAffinityFragmentParallelizer implements 
FragmentParallelizer {
    +  public static final HardAffinityFragmentParallelizer INSTANCE = new 
HardAffinityFragmentParallelizer();
    +
    +  private HardAffinityFragmentParallelizer() { /* singleton */}
    +
    +  @Override
    +  public void parallelizeFragment(final Wrapper fragmentWrapper, final 
ParallelizationParameters parameters,
    +      final Collection<DrillbitEndpoint> activeEndpoints) throws 
PhysicalOperatorSetupException {
    +
    +    final Stats stats = fragmentWrapper.getStats();
    +    final ParallelizationInfo pInfo = stats.getParallelizationInfo();
    +
    +    // Go through the affinity map and extract the endpoints that have 
mandatory assignment requirement
    +    final Map<DrillbitEndpoint, EndpointAffinity> endpointPool = 
Maps.newHashMap();
    +    for(Entry<DrillbitEndpoint, EndpointAffinity> entry : 
pInfo.getEndpointAffinityMap().entrySet()) {
    +      if (entry.getValue().isAssignmentRequired()) {
    +        endpointPool.put(entry.getKey(), entry.getValue());
    +      }
    +    }
    +
    +    // Step 1: Find the width taking into various parameters
    +    // 1.1. Find the parallelization based on cost. Use max cost of all 
operators in this fragment; this is consistent
    +    //      with the calculation that ExcessiveExchangeRemover uses.
    +    int width = (int) Math.ceil(stats.getMaxCost() / 
parameters.getParallelizationThreshold());
    +
    +    // 1.2. Make sure the width is at least the number of endpoints that 
require an assignment
    +    width = Math.max(endpointPool.size(), width);
    +
    +    // 1.3. Cap the parallelization width by fragment level width limit 
and system level per query width limit
    +    width = Math.max(1, Math.min(width, pInfo.getMaxWidth()));
    +    checkAndThrow(endpointPool.size() <= width,
    +        "Number of mandatory endpoints that require an assignment is more 
than the allowed fragment max width.");
    +
    +    width = Math.max(1, Math.min(width, parameters.getMaxGlobalWidth()));
    +    checkAndThrow(endpointPool.size() <= width,
    +        "Number of mandatory endpoints that require an assignment is more 
than the allowed global query width.");
    +
    +    // Step 2: Select the endpoints
    +    final Map<DrillbitEndpoint, Integer> endpoints = Maps.newHashMap();
    +
    +    // 2.1 First add each endpoint from the pool once so that the 
mandatory assignment requirement is fulfilled.
    +    int totalAssigned;
    +    for(Entry<DrillbitEndpoint, EndpointAffinity> entry : 
endpointPool.entrySet()) {
    +      endpoints.put(entry.getKey(), 1);
    +    }
    +    totalAssigned = endpoints.size();
    --- End diff --
    
    maybe you can declare totalAssigned here?


> Improve current fragment parallelization module
> -----------------------------------------------
>
>                 Key: DRILL-4446
>                 URL: https://issues.apache.org/jira/browse/DRILL-4446
>             Project: Apache Drill
>          Issue Type: New Feature
>    Affects Versions: 1.5.0
>            Reporter: Venki Korukanti
>            Assignee: Venki Korukanti
>             Fix For: 1.6.0
>
>
> Current fragment parallelizer {{SimpleParallelizer.java}} can’t handle 
> correctly the case where an operator has mandatory scheduling requirement for 
> a set of DrillbitEndpoints and affinity for each DrillbitEndpoint (i.e how 
> much portion of the total tasks to be scheduled on each DrillbitEndpoint). It 
> assumes that scheduling requirements are soft (except one case where Mux and 
> DeMux case where mandatory parallelization requirement of 1 unit). 
> An example is:
> Cluster has 3 nodes running Drillbits and storage service on each. Data for a 
> table is only present at storage services in two nodes. So a GroupScan needs 
> to be scheduled on these two nodes in order to read the data. Storage service 
> doesn't support (or costly) reading data from remote node.
> Inserting the mandatory scheduling requirements within existing 
> SimpleParallelizer is not sufficient as you may end up with a plan that has a 
> fragment with two GroupScans each having its own hard parallelization 
> requirements.
> Proposal is:
> Add a property to each operator which tells what parallelization 
> implementation to use. Most operators don't have any particular strategy 
> (such as Project or Filter), they depend on incoming operator. Current 
> existing operators which have requirements (all existing GroupScans) default 
> to current parallelizer {{SimpleParallelizer}}. {{Screen}} defaults to new 
> mandatory assignment parallelizer. It is possible that PhysicalPlan generated 
> can have a fragment with operators having different parallelization 
> strategies. In that case an exchange is inserted in between operators where a 
> change in parallelization strategy is required.
> Will send a detailed design doc.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to