[ 
https://issues.apache.org/jira/browse/DRILL-4706?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15638057#comment-15638057
 ] 

ASF GitHub Bot commented on DRILL-4706:
---------------------------------------

Github user sohami commented on a diff in the pull request:

    https://github.com/apache/drill/pull/639#discussion_r86605131
  
    --- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/LocalAffinityFragmentParallelizer.java
 ---
    @@ -0,0 +1,165 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + * <p/>
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * <p/>
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.drill.exec.planner.fragment;
    +
    +import com.google.common.collect.Lists;
    +import com.google.common.collect.Ordering;
    +import org.apache.drill.exec.physical.EndpointAffinity;
    +import org.apache.drill.exec.physical.PhysicalOperatorSetupException;
    +import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
    +
    +import java.util.Map;
    +import java.util.List;
    +import java.util.Collection;
    +import java.util.HashMap;
    +import java.util.Collections;
    +
    +/**
    + * Implementation of {@link FragmentParallelizer} where fragment has zero 
or more endpoints.
    + * This is for Parquet Scan Fragments only. Fragment placement is done 
preferring
    + * data locality.
    + */
    +public class LocalAffinityFragmentParallelizer implements 
FragmentParallelizer {
    +    public static final LocalAffinityFragmentParallelizer INSTANCE = new 
LocalAffinityFragmentParallelizer();
    +
    +    // Sort a list of map entries by values.
    +    Ordering<Map.Entry<DrillbitEndpoint, Integer>> sortByValues = new 
Ordering<Map.Entry<DrillbitEndpoint, Integer>>() {
    +        @Override
    +        public int compare(Map.Entry<DrillbitEndpoint, Integer> left, 
Map.Entry<DrillbitEndpoint, Integer> right) {
    +            return right.getValue().compareTo(left.getValue());
    +        }
    +    };
    +
    +    @Override
    +    public void parallelizeFragment(final Wrapper fragmentWrapper, final 
ParallelizationParameters parameters,
    +                                    final Collection<DrillbitEndpoint> 
activeEndpoints) throws PhysicalOperatorSetupException {
    +        final Stats stats = fragmentWrapper.getStats();
    +        final ParallelizationInfo parallelizationInfo = 
stats.getParallelizationInfo();
    +        final Map<DrillbitEndpoint, EndpointAffinity> endpointAffinityMap =
    +            
fragmentWrapper.getStats().getParallelizationInfo().getEndpointAffinityMap();
    +        int totalWorkUnits = 0;
    +        Map<DrillbitEndpoint, Integer> endpointPool = new HashMap<>();
    +
    +        // Get the total number of work units and list of endPoints to 
schedule fragments on
    +        for (Map.Entry<DrillbitEndpoint, EndpointAffinity> epAff : 
endpointAffinityMap.entrySet()) {
    +            if (epAff.getValue().getNumLocalWorkUnits() > 0) {
    +                totalWorkUnits += epAff.getValue().getNumLocalWorkUnits();
    +                endpointPool.put(epAff.getKey(), 
epAff.getValue().getNumLocalWorkUnits());
    +            }
    +        }
    +
    +        // Find the parallelization width of fragment
    +        // 1. Find the parallelization based on cost. Use max cost of all 
operators in this fragment; this is consistent
    +        //    with the calculation that ExcessiveExchangeRemover uses.
    +        int width = (int) Math.ceil(stats.getMaxCost() / 
parameters.getSliceTarget());
    +
    +        // 2. Cap the parallelization width by fragment level width limit 
and system level per query width limit
    +        width = Math.min(width, 
Math.min(parallelizationInfo.getMaxWidth(), parameters.getMaxGlobalWidth()));
    +
    +        // 3. Cap the parallelization width by system level per node width 
limit
    +        width = Math.min(width, parameters.getMaxWidthPerNode() * 
endpointPool.size());
    +
    +        // 4. Make sure width is at least the min width enforced by 
operators
    +        width = Math.max(parallelizationInfo.getMinWidth(), width);
    +
    +        // 5. Make sure width is at most the max width enforced by 
operators
    +        width = Math.min(parallelizationInfo.getMaxWidth(), width);
    +
    +        // 6: Finally make sure the width is at least one
    +        width = Math.max(1, width);
    +
    +        List<DrillbitEndpoint> assignedEndPoints = Lists.newArrayList();
    +        int totalAssigned = 0;
    +
    +        // Sort the endpointPool based on numLocalWorkUnits. This sorting 
is done because we are doing
    +        // round robin allocation and we stop when we reach the width. We 
want to allocate
    +        // on endpoints which have higher numLocalWorkUnits first.
    +        List<Map.Entry<DrillbitEndpoint, Integer>> sortedEndpointPool = 
Lists.newArrayList(endpointPool.entrySet());
    +        Collections.sort(sortedEndpointPool, sortByValues);
    +
    +        // Keep track of number of fragments allocated to each endpoint.
    +        Map<DrillbitEndpoint, Integer> endpointAssignments = new 
HashMap<>();
    +
    +        // Keep track of how many more to assign to each endpoint.
    +        Map<DrillbitEndpoint, Integer> remainingEndpointAssignments = new 
HashMap<>();
    +
    +        // Calculate the target allocation for each endPoint based on work 
it has to do
    +        // Assign one fragment (minimum) to all the endPoints in the pool.
    +        for (DrillbitEndpoint ep : endpointPool.keySet()) {
    +            final int targetAllocation = (int) 
Math.ceil(endpointAffinityMap.get(ep).getNumLocalWorkUnits() * ((double)width / 
parallelizationInfo.getMaxWidth()));
    +            assignedEndPoints.add(ep);
    +            totalAssigned++;
    +            remainingEndpointAssignments.put(ep, targetAllocation-1);
    +            endpointAssignments.put(ep, 1);
    +        }
    +
    +        // Keep allocating from endpoints in a round robin fashion upto
    +        // max(targetAllocation, maxwidthPerNode) for each endpoint and
    --- End diff --
    
    Wrong comment.. We assign until we reach limit of maxWidthPerNode


> Fragment planning causes Drillbits to read remote chunks when local copies 
> are available
> ----------------------------------------------------------------------------------------
>
>                 Key: DRILL-4706
>                 URL: https://issues.apache.org/jira/browse/DRILL-4706
>             Project: Apache Drill
>          Issue Type: Bug
>          Components: Query Planning & Optimization
>    Affects Versions: 1.6.0
>         Environment: CentOS, RHEL
>            Reporter: Kunal Khatua
>            Assignee: Sorabh Hamirwasia
>              Labels: performance, planning
>
> When a table (datasize=70GB) of 160 parquet files (each having a single 
> rowgroup and fitting within one chunk) is available on a 10-node setup with 
> replication=3 ; a pure data scan query causes about 2% of the data to be read 
> remotely. 
> Even with the creation of metadata cache, the planner is selecting a 
> sub-optimal plan of executing the SCAN fragments such that some of the data 
> is served from a remote server. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to