This is an automated email from the ASF dual-hosted git repository.

tarmstrong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 425e424b37f081ba1262f4f39298c6c13910aba8
Author: Akos Kovacs <akov...@cloudera.com>
AuthorDate: Tue Dec 8 23:51:09 2020 +0100

    IMPALA-9687 Improve estimates for number of hosts in Kudu plans
    
    In some cases Kudu plans could contain more hosts than the actual number of 
executors.
    This commit fixes it by capping the number of hosts at the number of 
executors,
    and determining which executors have local scan ranges.
    
    Testing:
     - Ran core tests
    
    Updated Kudu planner tests where the memory estimates changed.
    
    Change-Id: I72e341597e980fb6a7e3792905b942ddf5797d03
    Reviewed-on: http://gerrit.cloudera.org:8080/16880
    Reviewed-by: Impala Public Jenkins <impala-public-jenk...@cloudera.com>
    Tested-by: Impala Public Jenkins <impala-public-jenk...@cloudera.com>
---
 .../org/apache/impala/planner/KuduScanNode.java    | 82 ++++++++++++++++++++--
 .../queries/PlannerTest/kudu-selectivity.test      |  2 +-
 .../queries/PlannerTest/tpch-kudu.test             | 20 +++---
 3 files changed, 86 insertions(+), 18 deletions(-)

diff --git a/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java 
b/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java
index 613c40b..e4a2d0c 100644
--- a/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java
@@ -19,9 +19,11 @@ package org.apache.impala.planner;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.ListIterator;
+import java.util.Map;
 import java.util.Set;
 
 import org.apache.impala.analysis.Analyzer;
@@ -54,6 +56,7 @@ import org.apache.impala.thrift.TScanRangeLocation;
 import org.apache.impala.thrift.TScanRangeLocationList;
 import org.apache.impala.thrift.TScanRangeSpec;
 import org.apache.impala.util.KuduUtil;
+import org.apache.impala.util.ExecutorMembershipSnapshot;
 import org.apache.kudu.ColumnSchema;
 import org.apache.kudu.Schema;
 import org.apache.kudu.client.KuduClient;
@@ -277,16 +280,81 @@ public class KuduScanNode extends ScanNode {
     return computeCombinedSelectivity(allConjuncts);
   }
 
+  /**
+   * Estimate the number of impalad nodes that this scan node will execute on 
(which is
+   * ultimately determined by the scheduling done by the backend's Scheduler).
+   * Assume that scan ranges that can be scheduled locally will be, and that 
scan
+   * ranges that cannot will be round-robined across the cluster.
+   */
+  protected void computeNumNodes(Analyzer analyzer) {
+    ExecutorMembershipSnapshot cluster = 
ExecutorMembershipSnapshot.getCluster();
+    final int maxInstancesPerNode = getMaxInstancesPerNode(analyzer);
+    final int maxPossibleInstances = cluster.numExecutors() * 
maxInstancesPerNode;
+    int totalNodes = 0;
+    int totalInstances = 0;
+    int numLocalRanges = 0;
+    int numRemoteRanges = 0;
+    // Counts the number of local ranges, capped at maxInstancesPerNode.
+    Map<TNetworkAddress, Integer> localRangeCounts = new HashMap<>();
+    // Sum of the counter values in localRangeCounts.
+    int totalLocalParallelism = 0;
+    if (scanRangeSpecs_.isSetConcrete_ranges()) {
+      for (TScanRangeLocationList range : scanRangeSpecs_.concrete_ranges) {
+        boolean anyLocal = false;
+        if (range.isSetLocations()) {
+          for (TScanRangeLocation loc : range.locations) {
+            TNetworkAddress address =
+                analyzer.getHostIndex().getEntry(loc.getHost_idx());
+            if (cluster.contains(address)) {
+              anyLocal = true;
+              // Use the full tserver address (including port) to account for 
the test
+              // minicluster where there are multiple tservers and impalads on 
a single
+              // host.  This assumes that when an impalad is colocated with a 
tserver,
+              // there are the same number of impalads as tservers on this 
host in this
+              // cluster.
+              int count = localRangeCounts.getOrDefault(address, 0);
+              if (count < maxInstancesPerNode) {
+                ++totalLocalParallelism;
+                localRangeCounts.put(address, count + 1);
+              }
+            }
+          }
+        }
+        // This range has at least one replica with a colocated impalad, so 
assume it
+        // will be scheduled on one of those nodes.
+        if (anyLocal) {
+          ++numLocalRanges;
+        } else {
+          ++numRemoteRanges;
+        }
+        // Approximate the number of nodes that will execute locally assigned 
ranges to
+        // be the smaller of the number of locally assigned ranges and the 
number of
+        // hosts that hold replica for those ranges.
+        int numLocalNodes = Math.min(numLocalRanges, localRangeCounts.size());
+        // The remote ranges are round-robined across all the impalads.
+        int numRemoteNodes = Math.min(numRemoteRanges, cluster.numExecutors());
+        // The local and remote assignments may overlap, but we don't know by 
how much
+        // so conservatively assume no overlap.
+        totalNodes = Math.min(numLocalNodes + numRemoteNodes, 
cluster.numExecutors());
+
+        int numLocalInstances = Math.min(numLocalRanges, 
totalLocalParallelism);
+        totalInstances = Math.min(numLocalInstances + numRemoteRanges,
+            totalNodes * maxInstancesPerNode);
+
+        // Exit early if we have maxed out our estimate of hosts/instances, to 
avoid
+        // extraneous work in case the number of scan ranges dominates the 
number of
+        // nodes.
+        if (totalInstances == maxPossibleInstances) break;
+      }
+    }
+    numNodes_ = Math.max(totalNodes, 1);
+    numInstances_ = Math.max(totalInstances, 1);
+  }
+
   @Override
   protected void computeStats(Analyzer analyzer) {
     super.computeStats(analyzer);
-    // Update the number of nodes to reflect the hosts that have relevant data.
-    numNodes_ = Math.max(1, hostIndexSet_.size());
-    // Estimate the total number of instances, based on two upper bounds:
-    // * The number of scan ranges to process.
-    // * The maximum parallelism allowed across all the hosts.
-    numInstances_ = Math.min(scanRangeSpecs_.getConcrete_rangesSize(),
-        numNodes_ * getMaxInstancesPerNode(analyzer));
+    computeNumNodes(analyzer);
 
     // Update the cardinality
     inputCardinality_ = cardinality_ = kuduTable_.getNumRows();
diff --git 
a/testdata/workloads/functional-planner/queries/PlannerTest/kudu-selectivity.test
 
b/testdata/workloads/functional-planner/queries/PlannerTest/kudu-selectivity.test
index 87356bc..fde05a0 100644
--- 
a/testdata/workloads/functional-planner/queries/PlannerTest/kudu-selectivity.test
+++ 
b/testdata/workloads/functional-planner/queries/PlannerTest/kudu-selectivity.test
@@ -23,7 +23,7 @@ Per-Host Resources: mem-estimate=16.00KB mem-reservation=0B 
thread-reservation=1
      tuple-ids=0 row-size=124B cardinality=1
      in pipelines: 00(GETNEXT)
 
-F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=1
+F00:PLAN FRAGMENT [RANDOM] hosts=1 instances=1
 Per-Host Resources: mem-estimate=1.88MB mem-reservation=0B thread-reservation=2
   DATASTREAM SINK [FRAGMENT=F01, EXCHANGE=01, UNPARTITIONED]
   |  mem-estimate=0B mem-reservation=0B thread-reservation=0
diff --git 
a/testdata/workloads/functional-planner/queries/PlannerTest/tpch-kudu.test 
b/testdata/workloads/functional-planner/queries/PlannerTest/tpch-kudu.test
index dc2dad1..6f6aa30 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/tpch-kudu.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/tpch-kudu.test
@@ -87,9 +87,9 @@ limit 100
 EXPLAIN_LEVEL=2
 ---- PLAN
 Max Per-Host Resource Reservation: Memory=26.31MB Threads=10
-Per-Host Resource Estimates: Memory=57MB
+Per-Host Resource Estimates: Memory=59MB
 F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
-|  Per-Host Resources: mem-estimate=57.00MB mem-reservation=26.31MB 
thread-reservation=10 runtime-filters-memory=8.00MB
+|  Per-Host Resources: mem-estimate=58.88MB mem-reservation=26.31MB 
thread-reservation=10 runtime-filters-memory=8.00MB
 PLAN-ROOT SINK
 |  output exprs: round(s_acctbal, CAST(2 AS TINYINT)), s_name, n_name, 
p_partkey, p_mfgr, s_address, s_phone, s_comment
 |  mem-estimate=0B mem-reservation=0B thread-reservation=0
@@ -117,7 +117,7 @@ PLAN-ROOT SINK
 |  |
 |  |--04:SCAN KUDU [tpch_kudu.region]
 |  |     kudu predicates: r_name = 'EUROPE'
-|  |     mem-estimate=768.00KB mem-reservation=0B thread-reservation=1
+|  |     mem-estimate=1.50MB mem-reservation=0B thread-reservation=1
 |  |     tuple-ids=4 row-size=2B cardinality=1
 |  |     in pipelines: 04(GETNEXT)
 |  |
@@ -131,7 +131,7 @@ PLAN-ROOT SINK
 |  |
 |  |--03:SCAN KUDU [tpch_kudu.nation]
 |  |     runtime filters: RF010[bloom] -> n_regionkey, RF011[min_max] -> 
n_regionkey
-|  |     mem-estimate=1.12MB mem-reservation=0B thread-reservation=1
+|  |     mem-estimate=2.25MB mem-reservation=0B thread-reservation=1
 |  |     tuple-ids=3 row-size=27B cardinality=25
 |  |     in pipelines: 03(GETNEXT)
 |  |
@@ -187,7 +187,7 @@ PLAN-ROOT SINK
 |
 |--08:SCAN KUDU [tpch_kudu.region]
 |     kudu predicates: r_name = 'EUROPE'
-|     mem-estimate=768.00KB mem-reservation=0B thread-reservation=1
+|     mem-estimate=1.50MB mem-reservation=0B thread-reservation=1
 |     tuple-ids=8 row-size=2B cardinality=1
 |     in pipelines: 08(GETNEXT)
 |
@@ -201,7 +201,7 @@ PLAN-ROOT SINK
 |
 |--07:SCAN KUDU [tpch_kudu.nation]
 |     runtime filters: RF004[bloom] -> n_regionkey, RF005[min_max] -> 
n_regionkey
-|     mem-estimate=768.00KB mem-reservation=0B thread-reservation=1
+|     mem-estimate=1.50MB mem-reservation=0B thread-reservation=1
 |     tuple-ids=7 row-size=4B cardinality=25
 |     in pipelines: 07(GETNEXT)
 |
@@ -364,7 +364,7 @@ order by
   revenue desc
 ---- PLAN
 Max Per-Host Resource Reservation: Memory=21.38MB Threads=7
-Per-Host Resource Estimates: Memory=38MB
+Per-Host Resource Estimates: Memory=40MB
 PLAN-ROOT SINK
 |
 12:SORT
@@ -490,7 +490,7 @@ order by
   l_year
 ---- PLAN
 Max Per-Host Resource Reservation: Memory=49.62MB Threads=7
-Per-Host Resource Estimates: Memory=66MB
+Per-Host Resource Estimates: Memory=67MB
 PLAN-ROOT SINK
 |
 12:SORT
@@ -593,7 +593,7 @@ order by
   o_year
 ---- PLAN
 Max Per-Host Resource Reservation: Memory=19.56MB Threads=9
-Per-Host Resource Estimates: Memory=40MB
+Per-Host Resource Estimates: Memory=42MB
 PLAN-ROOT SINK
 |
 16:SORT
@@ -1577,7 +1577,7 @@ order by
 limit 100
 ---- PLAN
 Max Per-Host Resource Reservation: Memory=14.56MB Threads=7
-Per-Host Resource Estimates: Memory=75MB
+Per-Host Resource Estimates: Memory=76MB
 PLAN-ROOT SINK
 |
 12:TOP-N [LIMIT=100]

Reply via email to