[ https://issues.apache.org/jira/browse/DRILL-4706?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15638059#comment-15638059 ]
ASF GitHub Bot commented on DRILL-4706: --------------------------------------- Github user sohami commented on a diff in the pull request: https://github.com/apache/drill/pull/639#discussion_r86646115 --- Diff: exec/java-exec/src/test/java/org/apache/drill/exec/planner/fragment/TestLocalAffinityFragmentParallelizer.java --- @@ -0,0 +1,476 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.planner.fragment; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import mockit.Mocked; +import mockit.NonStrictExpectations; +import org.apache.drill.exec.physical.EndpointAffinity; +import org.apache.drill.exec.physical.base.PhysicalOperator; +import org.junit.Test; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Collections; + +import static java.lang.Integer.MAX_VALUE; +import static org.apache.drill.exec.ExecConstants.SLICE_TARGET_DEFAULT; +import static org.apache.drill.exec.planner.fragment.LocalAffinityFragmentParallelizer.INSTANCE; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; + + +public class TestLocalAffinityFragmentParallelizer { + + // Create a set of test endpoints + private static final DrillbitEndpoint DEP1 = newDrillbitEndpoint("node1", 30010); + private static final DrillbitEndpoint DEP2 = newDrillbitEndpoint("node2", 30010); + private static final DrillbitEndpoint DEP3 = newDrillbitEndpoint("node3", 30010); + private static final DrillbitEndpoint DEP4 = newDrillbitEndpoint("node4", 30010); + private static final DrillbitEndpoint DEP5 = newDrillbitEndpoint("node5", 30010); + + @Mocked private Fragment fragment; + @Mocked private PhysicalOperator root; + + private static final DrillbitEndpoint newDrillbitEndpoint(String address, int port) { + return DrillbitEndpoint.newBuilder().setAddress(address).setControlPort(port).build(); + } + + private static final ParallelizationParameters newParameters(final long threshold, final int maxWidthPerNode, + final int maxGlobalWidth) { + return new ParallelizationParameters() { + @Override + public long getSliceTarget() { + return threshold; + } + + @Override + public int getMaxWidthPerNode() { + return maxWidthPerNode; + } + + @Override + public int getMaxGlobalWidth() { + return maxGlobalWidth; + } + + /** + * {@link LocalAffinityFragmentParallelizer} doesn't use affinity factor. + * @return + */ + @Override + public double getAffinityFactor() { + return 0.0f; + } + }; + } + + private final Wrapper newWrapper(double cost, int minWidth, int maxWidth, List<EndpointAffinity> endpointAffinities) { + new NonStrictExpectations() { + { + fragment.getRoot(); result = root; + } + }; + + final Wrapper fragmentWrapper = new Wrapper(fragment, 1); + final Stats stats = fragmentWrapper.getStats(); + stats.setDistributionAffinity(DistributionAffinity.LOCAL); + stats.addCost(cost); + stats.addMinWidth(minWidth); + stats.addMaxWidth(maxWidth); + stats.addEndpointAffinities(endpointAffinities); + return fragmentWrapper; + } + + private void checkEndpointAssignments(List<DrillbitEndpoint> assignedEndpoints, + Map<DrillbitEndpoint, Integer> expectedAssignments) throws Exception { + Map<DrillbitEndpoint, Integer> endpointAssignments = new HashMap<>(); + // Count the number of fragments assigned to each endpoint. + for (DrillbitEndpoint endpoint: assignedEndpoints) { + if (endpointAssignments.containsKey(endpoint)) { + endpointAssignments.put(endpoint, endpointAssignments.get(endpoint) + 1); + } else { + endpointAssignments.put(endpoint, 1); + } + } + + // Verify number of fragments assigned to each endpoint against the expected value. + for (Map.Entry<DrillbitEndpoint, Integer> endpointAssignment : endpointAssignments.entrySet()) { + assertEquals(expectedAssignments.get(endpointAssignment.getKey()).intValue(), + endpointAssignment.getValue().intValue()); + } + } + + @Test + public void testEqualLocalWorkUnitsUnderNodeLimit() throws Exception { + final Wrapper wrapper = newWrapper(200, 1, 80, /* cost, minWidth, maxWidth */ + ImmutableList.of( /* endpointAffinities. */ + /* For local affinity, we only care about numLocalWorkUnits, the last column below */ + /* endpoint, affinity_value, mandatory, maxWidth, numLocalWorkUnits */ + new EndpointAffinity(DEP1, 0.15, false, MAX_VALUE, 16), + new EndpointAffinity(DEP2, 0.15, false, MAX_VALUE, 16), + new EndpointAffinity(DEP3, 0.10, false, MAX_VALUE, 16), + new EndpointAffinity(DEP4, 0.20, false, MAX_VALUE, 16), + new EndpointAffinity(DEP5, 0.20, false, MAX_VALUE, 16) + )); + INSTANCE.parallelizeFragment(wrapper, newParameters(1 /* sliceTarget */, + 23 /* maxWidthPerNode */, + 200 /* globalMaxWidth */), + ImmutableList.of(DEP1, DEP2, DEP3, DEP4, DEP5)); + // Everyone should get assigned 16 because + // The parallelization maxWidth (80) is below the globalMaxWidth(200) and + // localWorkUnits of all nodes is below maxWidthPerNode i.e. 23 + Map<DrillbitEndpoint, Integer> expectedAssignments = ImmutableMap.of(DEP1, 16, + DEP2, 16, + DEP3, 16, + DEP4, 16, + DEP5, 16); + // Expect the fragment parallelization to be 80 (16 * 5) + assertEquals(80, wrapper.getWidth()); + + final List<DrillbitEndpoint> assignedEndpoints = wrapper.getAssignedEndpoints(); + assertEquals(80, assignedEndpoints.size()); + assertTrue(assignedEndpoints.contains(DEP1)); + assertTrue(assignedEndpoints.contains(DEP2)); + assertTrue(assignedEndpoints.contains(DEP3)); + assertTrue(assignedEndpoints.contains(DEP4)); + assertTrue(assignedEndpoints.contains(DEP5)); + + checkEndpointAssignments(assignedEndpoints, expectedAssignments); + } + + @Test + public void testEqualLocalWorkUnitsAboveNodeLimit() throws Exception { + final Wrapper wrapper = newWrapper(200, 1, 80, /* cost, minWidth, maxWidth */ + ImmutableList.of( /* endpointAffinities. */ + /* For local affinity, we only care about numLocalWorkUnits, the last column below */ + /* endpoint, affinity_value, mandatory, maxWidth, numLocalWorkUnits */ + new EndpointAffinity(DEP1, 0.15, false, MAX_VALUE, 16), + new EndpointAffinity(DEP2, 0.15, false, MAX_VALUE, 16), + new EndpointAffinity(DEP3, 0.10, false, MAX_VALUE, 16), + new EndpointAffinity(DEP4, 0.20, false, MAX_VALUE, 16), + new EndpointAffinity(DEP5, 0.20, false, MAX_VALUE, 16) + )); + INSTANCE.parallelizeFragment(wrapper, newParameters(1 /* sliceTarget */, + 8 /* maxWidthPerNode */, + 200 /* globalMaxWidth */), + ImmutableList.of(DEP1, DEP2, DEP3, DEP4, DEP5)); + // Everyone should get assigned 8 because + // maxWidthPerNode is 8 and localWorkUnits of all nodes is above maxWidthPerNode. + // Also, the parallelization maxWidth (80) is below the globalMaxWidth(200) + Map<DrillbitEndpoint, Integer> expectedAssignments = ImmutableMap.of(DEP1, 8, + DEP2, 8, + DEP3, 8, + DEP4, 8, + DEP5, 8); + // Expect the fragment parallelization to be 80 (16 * 5) + assertEquals(40, wrapper.getWidth()); + + final List<DrillbitEndpoint> assignedEndpoints = wrapper.getAssignedEndpoints(); + assertEquals(40, assignedEndpoints.size()); + assertTrue(assignedEndpoints.contains(DEP1)); + assertTrue(assignedEndpoints.contains(DEP2)); + assertTrue(assignedEndpoints.contains(DEP3)); + assertTrue(assignedEndpoints.contains(DEP4)); + assertTrue(assignedEndpoints.contains(DEP5)); + + checkEndpointAssignments(assignedEndpoints, expectedAssignments); + } + + @Test + public void testUnEqualLocalWorkUnitsUnderNodeLimit() throws Exception { + final Wrapper wrapper = newWrapper(200, 1, 80, /* cost, minWidth, maxWidth, endpointAffinities */ + ImmutableList.of( /* endpointAffinities. */ + /* For local affinity, we only care about numLocalWorkUnits, the last column below */ + /* endpoint, affinity_value, mandatory, maxWidth, numLocalWorkUnits */ + new EndpointAffinity(DEP1, 0.15, false, MAX_VALUE, 14), + new EndpointAffinity(DEP2, 0.15, false, MAX_VALUE, 15), + new EndpointAffinity(DEP3, 0.10, false, MAX_VALUE, 16), + new EndpointAffinity(DEP4, 0.20, false, MAX_VALUE, 17), + new EndpointAffinity(DEP5, 0.20, false, MAX_VALUE, 18) + )); + INSTANCE.parallelizeFragment(wrapper, newParameters(1 /*sliceTarget */, + 23 /* maxWidthPerNode */, + 200 /* globalMaxWidth */), + ImmutableList.of(DEP1, DEP2, DEP3, DEP4, DEP5)); + // All DrillbitEndpoints should get fragments same as localWorkUnits they have. + // The parallelization maxWidth (80) is below the globalMaxWidth(200) and + // localWorkUnits of all nodes is below maxWidthPerNode i.e. 23 + Map<DrillbitEndpoint, Integer> expectedAssignments = ImmutableMap.of(DEP1, 14, + DEP2, 15, + DEP3, 16, + DEP4, 17, + DEP5, 18); + // Expect the fragment parallelization to be 80 (14 + 15 + 16 + 17 + 18) + assertEquals(80, wrapper.getWidth()); + + // All Drillbit Endpoints should get fragments assigned. + final List<DrillbitEndpoint> assignedEndpoints = wrapper.getAssignedEndpoints(); + assertEquals(80, assignedEndpoints.size()); + assertTrue(assignedEndpoints.contains(DEP1)); + assertTrue(assignedEndpoints.contains(DEP2)); + assertTrue(assignedEndpoints.contains(DEP3)); + assertTrue(assignedEndpoints.contains(DEP4)); + assertTrue(assignedEndpoints.contains(DEP5)); + + checkEndpointAssignments(assignedEndpoints, expectedAssignments); + } + + @Test + public void testUnequalLocalWorkUnitsAboveNodeLimit() throws Exception { + final Wrapper wrapper = newWrapper(200, 1, 80, /* cost, minWidth, maxWidth, endpointAffinities */ + ImmutableList.of( /* endpointAffinities. */ + /* For local affinity, we only care about numLocalWorkUnits, the last column below */ + /* endpoint, affinity_value, mandatory, maxWidth, numLocalWorkUnits */ + new EndpointAffinity(DEP1, 0.15, false, MAX_VALUE, 14), + new EndpointAffinity(DEP2, 0.15, false, MAX_VALUE, 15), + new EndpointAffinity(DEP3, 0.10, false, MAX_VALUE, 16), + new EndpointAffinity(DEP4, 0.20, false, MAX_VALUE, 17), + new EndpointAffinity(DEP5, 0.20, false, MAX_VALUE, 18) + )); + INSTANCE.parallelizeFragment(wrapper, newParameters(1 /*sliceTarget */, + 16 /* maxWidthPerNode */, + 200 /* globalMaxWidth */), + ImmutableList.of(DEP1, DEP2, DEP3, DEP4, DEP5)); + // All nodes should get 16 or less fragments assigned since maxWidthPerNode is 16. + // Nodes with localWorkUnits less than 16 should get assigned fragments same as localWorkUnits. + // Nodes with localWorkUnits 17 and 18 should get assigned only 16 fragments (they will be capped + // by maxWidthPerNode). + Map<DrillbitEndpoint, Integer> expectedAssignments = ImmutableMap.of(DEP1, 14, + DEP2, 15, + DEP3, 16, + DEP4, 16, + DEP5, 16); + // Expect the fragment parallelization to be 77 (14 + 15 + 16 + 16 + 16) + // maxWidthPerNode is 16. + // The parallelization maxWidth (80) is below the globalMaxWidth(200) + assertEquals(77, wrapper.getWidth()); + + // All nodes should get fragments assigned. + final List<DrillbitEndpoint> assignedEndpoints = wrapper.getAssignedEndpoints(); + assertEquals(77, assignedEndpoints.size()); + assertTrue(assignedEndpoints.contains(DEP1)); + assertTrue(assignedEndpoints.contains(DEP2)); + assertTrue(assignedEndpoints.contains(DEP3)); + assertTrue(assignedEndpoints.contains(DEP4)); + assertTrue(assignedEndpoints.contains(DEP5)); + + checkEndpointAssignments(assignedEndpoints, expectedAssignments); + } + + @Test + public void testTotalWorkUnitsMoreThanGlobalMaxWidth() throws Exception { + final Wrapper wrapper = newWrapper(200, 1, 80, /* cost, minWidth, maxWidth, endpointAffinities */ + ImmutableList.of( /* endpointAffinities. */ + /* For local affinity, we only care about numLocalWorkUnits, the last column below */ + /* endpoint, affinity_value, mandatory, maxWidth, numLocalWorkUnits */ + new EndpointAffinity(DEP1, 0.15, false, MAX_VALUE, 14), + new EndpointAffinity(DEP2, 0.15, false, MAX_VALUE, 15), + new EndpointAffinity(DEP3, 0.10, false, MAX_VALUE, 16), + new EndpointAffinity(DEP4, 0.20, false, MAX_VALUE, 17), + new EndpointAffinity(DEP5, 0.20, false, MAX_VALUE, 18) + )); + INSTANCE.parallelizeFragment(wrapper, newParameters(1 /*sliceTarget */, + 16 /* maxWidthPerNode */, + 40 /* globalMaxWidth */), + ImmutableList.of(DEP1, DEP2, DEP3, DEP4, DEP5)); + // The parallelization maxWidth (80) is more than globalMaxWidth(40). + // Expect the fragment parallelization to be 40 (7 + 8 + 8 + 8 + 9) --- End diff -- It would be great to mention that DEP5 is getting 9 fragment instead of DEP4 since that has more localWorkUnits. We do favor nodes with more localWorkUnit. > Fragment planning causes Drillbits to read remote chunks when local copies > are available > ---------------------------------------------------------------------------------------- > > Key: DRILL-4706 > URL: https://issues.apache.org/jira/browse/DRILL-4706 > Project: Apache Drill > Issue Type: Bug > Components: Query Planning & Optimization > Affects Versions: 1.6.0 > Environment: CentOS, RHEL > Reporter: Kunal Khatua > Assignee: Sorabh Hamirwasia > Labels: performance, planning > > When a table (datasize=70GB) of 160 parquet files (each having a single > rowgroup and fitting within one chunk) is available on a 10-node setup with > replication=3 ; a pure data scan query causes about 2% of the data to be read > remotely. > Even with the creation of metadata cache, the planner is selecting a > sub-optimal plan of executing the SCAN fragments such that some of the data > is served from a remote server. -- This message was sent by Atlassian JIRA (v6.3.4#6332)