This is an automated email from the ASF dual-hosted git repository.

tarmstrong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new 5530b62  IMPALA-10314: Optimize planning time for simple limits
5530b62 is described below

commit 5530b62539e762ddf5825e2b43db2f29d9addae7
Author: Aman Sinha <amsi...@cloudera.com>
AuthorDate: Wed Nov 11 18:38:11 2020 -0800

    IMPALA-10314: Optimize planning time for simple limits
    
    This patch optimizes the planning time for simple limit
    queries by only considering a minimal set of partitions
    whose file descriptors add up to N (the specified limit).
    Each file is conservatively estimated to contain 1 row.
    
    This reduces the number of partitions processed by
    HdfsScanNode.computeScanRangeLocations() which, according
    to query profiling, has been the main contributor to the
    planning time especially for large number of partitions.
    Further, within each partition, we only consider the number
    of non-empty files that brings the total to N.
    
    This is an opt-in optimization. A new planner option
    OPTIMIZE_SIMPLE_LIMIT enables this optimization. Further,
    if there's a WHERE clause, it must have an 'always_true'
    hint in order for the optimization to be considered. For
    example:
      set optimize_simple_limit = true;
      SELECT * FROM T
        WHERE /* +always_true */ <predicate>
      LIMIT 10;
    
    If there are too many empty files in the partitions, it is
    possible that the query may produce fewer rows although
    those are still valid rows.
    
    Testing:
     - Added planner tests for the optimization
     - Ran query_test.py tests by enabling the optimize_simple_limit
     - Added an e2e test. Since result rows are non-deterministic,
       only simple count(*) query on top of subquery with limit
       was added.
    
    Change-Id: I9d6a79263bc092e0f3e9a1d72da5618f3cc35574
    Reviewed-on: http://gerrit.cloudera.org:8080/16723
    Reviewed-by: Impala Public Jenkins <impala-public-jenk...@cloudera.com>
    Tested-by: Impala Public Jenkins <impala-public-jenk...@cloudera.com>
---
 be/src/service/query-options.cc                    |   3 +
 be/src/service/query-options.h                     |   4 +-
 common/thrift/ImpalaInternalService.thrift         |   3 +
 common/thrift/ImpalaService.thrift                 |   8 +
 fe/src/main/cup/sql-parser.cup                     |  12 +-
 .../java/org/apache/impala/analysis/Analyzer.java  |  17 ++
 .../main/java/org/apache/impala/analysis/Expr.java |  40 ++++
 .../org/apache/impala/analysis/PartitionSet.java   |   3 +-
 .../java/org/apache/impala/analysis/Predicate.java |   9 +
 .../org/apache/impala/analysis/SelectStmt.java     |  39 ++++
 .../apache/impala/planner/HdfsPartitionPruner.java |  40 +++-
 .../org/apache/impala/planner/HdfsScanNode.java    |  49 +++-
 .../apache/impala/planner/SingleNodePlanner.java   |  23 +-
 .../org/apache/impala/analysis/ParserTest.java     |   4 +-
 .../org/apache/impala/planner/PlannerTest.java     |  11 +
 .../queries/PlannerTest/optimize-simple-limit.test | 258 +++++++++++++++++++++
 .../QueryTest/range-constant-propagation.test      |  14 ++
 17 files changed, 517 insertions(+), 20 deletions(-)

diff --git a/be/src/service/query-options.cc b/be/src/service/query-options.cc
index 900fe76..f2cd720 100644
--- a/be/src/service/query-options.cc
+++ b/be/src/service/query-options.cc
@@ -329,6 +329,9 @@ Status impala::SetQueryOption(const string& key, const 
string& value,
       case TImpalaQueryOptions::OPTIMIZE_PARTITION_KEY_SCANS:
         query_options->__set_optimize_partition_key_scans(IsTrue(value));
         break;
+      case TImpalaQueryOptions::OPTIMIZE_SIMPLE_LIMIT:
+        query_options->__set_optimize_simple_limit(IsTrue(value));
+        break;
       case TImpalaQueryOptions::REPLICA_PREFERENCE: {
         map<int, const char *> valid_enums_values = {
             {0, "CACHE_LOCAL"},
diff --git a/be/src/service/query-options.h b/be/src/service/query-options.h
index ade7950..d61e47d 100644
--- a/be/src/service/query-options.h
+++ b/be/src/service/query-options.h
@@ -47,7 +47,7 @@ typedef std::unordered_map<string, 
beeswax::TQueryOptionLevel::type>
 // time we add or remove a query option to/from the enum TImpalaQueryOptions.
 #define QUERY_OPTS_TABLE\
   DCHECK_EQ(_TImpalaQueryOptions_VALUES_TO_NAMES.size(),\
-      TImpalaQueryOptions::REPORT_SKEW_LIMIT + 1);\
+      TImpalaQueryOptions::OPTIMIZE_SIMPLE_LIMIT + 1);\
   REMOVED_QUERY_OPT_FN(abort_on_default_limit_exceeded, 
ABORT_ON_DEFAULT_LIMIT_EXCEEDED)\
   QUERY_OPT_FN(abort_on_error, ABORT_ON_ERROR, TQueryOptionLevel::REGULAR)\
   REMOVED_QUERY_OPT_FN(allow_unsupported_formats, ALLOW_UNSUPPORTED_FORMATS)\
@@ -223,6 +223,8 @@ typedef std::unordered_map<string, 
beeswax::TQueryOptionLevel::type>
       TQueryOptionLevel::ADVANCED)\
   QUERY_OPT_FN(report_skew_limit, REPORT_SKEW_LIMIT,\
       TQueryOptionLevel::ADVANCED)\
+  QUERY_OPT_FN(optimize_simple_limit, OPTIMIZE_SIMPLE_LIMIT,\
+      TQueryOptionLevel::REGULAR)\
   ;
 
 /// Enforce practical limits on some query options to avoid undesired query 
state.
diff --git a/common/thrift/ImpalaInternalService.thrift 
b/common/thrift/ImpalaInternalService.thrift
index 51ee39f..70f99dd 100644
--- a/common/thrift/ImpalaInternalService.thrift
+++ b/common/thrift/ImpalaInternalService.thrift
@@ -463,6 +463,9 @@ struct TQueryOptions {
 
   // See comment in ImpalaService.thrift
   116: optional double report_skew_limit = 1.0;
+
+  // See comment in ImpalaService.thrift
+  117: optional bool optimize_simple_limit = false;
 }
 
 // Impala currently has two types of sessions: Beeswax and HiveServer2
diff --git a/common/thrift/ImpalaService.thrift 
b/common/thrift/ImpalaService.thrift
index b71da24..007ae27 100644
--- a/common/thrift/ImpalaService.thrift
+++ b/common/thrift/ImpalaService.thrift
@@ -594,6 +594,14 @@ enum TImpalaQueryOptions {
   // Enable (>=0) or disable(<0) reporting of skews for a query in runtime 
profile.
   // When enabled, used as the CoV threshold value in the skew detection 
formula.
   REPORT_SKEW_LIMIT = 115
+
+  // If true, for simple limit queries (limit with no order-by, group-by, 
aggregates,
+  // joins, analytic functions but does allow where predicates) optimize the 
planning
+  // time by only considering a small number of partitions. The number of 
files within
+  // a partition is used as an approximation to number of rows (1 row per 
file).
+  // This option is opt-in by default as this optimization may in some cases 
produce
+  // fewer (but correct) rows than the limit value in the query.
+  OPTIMIZE_SIMPLE_LIMIT = 116
 }
 
 // The summary of a DML statement.
diff --git a/fe/src/main/cup/sql-parser.cup b/fe/src/main/cup/sql-parser.cup
index 87bc47c..91e24d3 100644
--- a/fe/src/main/cup/sql-parser.cup
+++ b/fe/src/main/cup/sql-parser.cup
@@ -3114,9 +3114,17 @@ expr_list ::=
   :}
   ;
 
+// Currently, we allow predicate hints for the top level WHERE
+// clause. An attempt was made to set this for individual exprs
+// (as part of the 'expr' grammar) but that generated quite a
+// few Shift/Reduce conflicts.
+// TODO: Revisit the possibility of setting the hint per expr
 where_clause ::=
-  KW_WHERE expr:e
-  {: RESULT = e; :}
+  KW_WHERE opt_plan_hints:pred_hints expr:e
+  {:
+    e.setPredicateHints(pred_hints);
+    RESULT = e;
+  :}
   | /* empty */
   {: RESULT = null; :}
   ;
diff --git a/fe/src/main/java/org/apache/impala/analysis/Analyzer.java 
b/fe/src/main/java/org/apache/impala/analysis/Analyzer.java
index 1342dc4..a5b1e36 100644
--- a/fe/src/main/java/org/apache/impala/analysis/Analyzer.java
+++ b/fe/src/main/java/org/apache/impala/analysis/Analyzer.java
@@ -181,6 +181,15 @@ public class Analyzer {
   // Flag indicating if this analyzer instance belongs to a subquery.
   private boolean isSubquery_ = false;
 
+  // Tracks the simple LIMIT status of this query block. First item of the
+  // pair indicates whether a simple limit exists or not, second item is
+  // the actual limit value if it does exist. We use a pair instead of just
+  // a single nullable field because a query block may not have a LIMIT but
+  // if it is an inline view it may be eligible for limit pushdown from an
+  // outer query, so for such case the simpleLimitStatus_ will be non-null
+  // but the pair will be <false, null>.
+  private Pair<Boolean, Long>  simpleLimitStatus_ = null;
+
   // Flag indicating whether this analyzer belongs to a WITH clause view.
   private boolean hasWithClause_ = false;
 
@@ -215,6 +224,14 @@ public class Analyzer {
     globalState_.containsSubquery = true;
   }
 
+  public void setSimpleLimitStatus(Pair<Boolean, Long> simpleLimitStatus) {
+    simpleLimitStatus_ = simpleLimitStatus;
+  }
+
+  public Pair<Boolean, Long> getSimpleLimitStatus() {
+    return simpleLimitStatus_;
+  }
+
   public void setHasTopLevelAcidCollectionTableRef() {
     globalState_.hasTopLevelAcidCollectionTableRef = true;
   }
diff --git a/fe/src/main/java/org/apache/impala/analysis/Expr.java 
b/fe/src/main/java/org/apache/impala/analysis/Expr.java
index fe8474f..ec99a0b 100644
--- a/fe/src/main/java/org/apache/impala/analysis/Expr.java
+++ b/fe/src/main/java/org/apache/impala/analysis/Expr.java
@@ -354,6 +354,15 @@ abstract public class Expr extends TreeNode<Expr> 
implements ParseNode, Cloneabl
         }
       };
 
+  public static final com.google.common.base.Predicate<Expr> 
IS_ALWAYS_TRUE_PREDICATE =
+      new com.google.common.base.Predicate<Expr>() {
+        @Override
+        public boolean apply(Expr arg) {
+          return arg instanceof Predicate
+              && ((Predicate) arg).hasAlwaysTrueHint();
+        }
+      };
+
   // id that's unique across the entire query statement and is assigned by
   // Analyzer.registerConjuncts(); only assigned for the top-level terms of a
   // conjunction, and therefore null for most Exprs
@@ -401,6 +410,9 @@ abstract public class Expr extends TreeNode<Expr> 
implements ParseNode, Cloneabl
   // True if this has already been counted towards the number of statement 
expressions
   private boolean isCountedForNumStmtExprs_ = false;
 
+  // For exprs of type Predicate, this keeps track of predicate hints
+  protected List<PlanHint> predicateHints_;
+
   protected Expr() {
     type_ = Type.INVALID;
     selectivity_ = -1.0;
@@ -425,6 +437,10 @@ abstract public class Expr extends TreeNode<Expr> 
implements ParseNode, Cloneabl
     fn_ = other.fn_;
     isCountedForNumStmtExprs_ = other.isCountedForNumStmtExprs_;
     children_ = Expr.cloneList(other.children_);
+    if (other.predicateHints_ != null) {
+      predicateHints_ = new ArrayList<>();
+      predicateHints_.addAll(other.predicateHints_);
+    }
   }
 
   public boolean isAnalyzed() { return isAnalyzed_; }
@@ -489,6 +505,22 @@ abstract public class Expr extends TreeNode<Expr> 
implements ParseNode, Cloneabl
     analysisDone();
   }
 
+  protected void analyzeHints(Analyzer analyzer) throws AnalysisException {
+    if (predicateHints_ != null && !predicateHints_.isEmpty()) {
+      if (!(this instanceof Predicate)) {
+        throw new AnalysisException("Expr hints are only supported for 
predicates");
+      }
+      for (PlanHint hint : predicateHints_) {
+        if (hint.is("ALWAYS_TRUE")) {
+          ((Predicate) this).setHasAlwaysTrueHint(true);
+          analyzer.setHasPlanHints();
+        } else {
+          analyzer.addWarning("Predicate hint not recognized: " + hint);
+        }
+      }
+    }
+  }
+
   /**
    * Does subclass-specific analysis. Subclasses should override analyzeImpl().
    */
@@ -1765,4 +1797,12 @@ abstract public class Expr extends TreeNode<Expr> 
implements ParseNode, Cloneabl
     }
     return value;
   }
+
+  public void setPredicateHints(List<PlanHint> hints) {
+    Preconditions.checkNotNull(hints);
+    predicateHints_ = hints;
+  }
+
+  public List<PlanHint> getPredicateHints() { return predicateHints_; }
+
 }
diff --git a/fe/src/main/java/org/apache/impala/analysis/PartitionSet.java 
b/fe/src/main/java/org/apache/impala/analysis/PartitionSet.java
index c14eced..375b899 100644
--- a/fe/src/main/java/org/apache/impala/analysis/PartitionSet.java
+++ b/fe/src/main/java/org/apache/impala/analysis/PartitionSet.java
@@ -86,7 +86,8 @@ public class PartitionSet extends PartitionSpecBase {
 
     try {
       HdfsPartitionPruner pruner = new HdfsPartitionPruner(desc);
-      partitions_ = pruner.prunePartitions(analyzer, transformedConjuncts, 
true).first;
+      partitions_ = pruner.prunePartitions(analyzer, transformedConjuncts, 
true,
+          null).first;
     } catch (ImpalaException e) {
       if (e instanceof AnalysisException) throw (AnalysisException) e;
       throw new AnalysisException("Partition expr evaluation failed in the 
backend.", e);
diff --git a/fe/src/main/java/org/apache/impala/analysis/Predicate.java 
b/fe/src/main/java/org/apache/impala/analysis/Predicate.java
index 34e6722..c704e30 100644
--- a/fe/src/main/java/org/apache/impala/analysis/Predicate.java
+++ b/fe/src/main/java/org/apache/impala/analysis/Predicate.java
@@ -24,10 +24,13 @@ import org.apache.impala.common.Reference;
 
 public abstract class Predicate extends Expr {
   protected boolean isEqJoinConjunct_;
+  // true if this predicate has an always_true hint
+  protected boolean hasAlwaysTrueHint_;
 
   public Predicate() {
     super();
     isEqJoinConjunct_ = false;
+    hasAlwaysTrueHint_ = false;
   }
 
   /**
@@ -36,15 +39,18 @@ public abstract class Predicate extends Expr {
   protected Predicate(Predicate other) {
     super(other);
     isEqJoinConjunct_ = other.isEqJoinConjunct_;
+    hasAlwaysTrueHint_ = other.hasAlwaysTrueHint_;
   }
 
   public void setIsEqJoinConjunct(boolean v) { isEqJoinConjunct_ = v; }
+  public void setHasAlwaysTrueHint(boolean v) { hasAlwaysTrueHint_ = v; }
 
   @Override
   protected void analyzeImpl(Analyzer analyzer) throws AnalysisException {
     type_ = Type.BOOLEAN;
     // values: true/false/null
     numDistinctValues_ = 3;
+    analyzeHints(analyzer);
   }
 
   /**
@@ -91,4 +97,7 @@ public abstract class Predicate extends Expr {
    * Returns the SlotRef bound by this Predicate.
    */
   public SlotRef getBoundSlot() { return null; }
+
+  public boolean hasAlwaysTrueHint() { return hasAlwaysTrueHint_; }
+
 }
diff --git a/fe/src/main/java/org/apache/impala/analysis/SelectStmt.java 
b/fe/src/main/java/org/apache/impala/analysis/SelectStmt.java
index 4ad4652..28f28aa 100644
--- a/fe/src/main/java/org/apache/impala/analysis/SelectStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/SelectStmt.java
@@ -32,6 +32,7 @@ import org.apache.impala.catalog.StructType;
 import org.apache.impala.catalog.TableLoadingException;
 import org.apache.impala.common.AnalysisException;
 import org.apache.impala.common.ColumnAliasGenerator;
+import org.apache.impala.common.Pair;
 import org.apache.impala.common.RuntimeEnv;
 import org.apache.impala.common.TableAliasGenerator;
 import org.apache.impala.common.TreeNode;
@@ -176,6 +177,38 @@ public class SelectStmt extends QueryStmt {
   public ExprSubstitutionMap getBaseTblSmap() { return baseTblSmap_; }
 
   /**
+   * A simple limit statement has a limit but no order-by,
+   * group-by, aggregates, joins or analytic functions. It can
+   * have a WHERE clause only if it is tagged as always true.
+   * This method does not explicitly check for subqueries. If the
+   * subquery occurs in the WHERE, the initial check for simple
+   * limit may succeed. Subsequently, in the planning process
+   * the statement rewriter may transform it to a join or an
+   * equivalent plan and analyze will be called again and this time
+   * it may or may not satisfy the simple limit criteria.
+   * Note that FROM clause subquery is generally ok since it is
+   * a table ref.
+   */
+  public Pair<Boolean, Long> checkSimpleLimitStmt() {
+    if (getTableRefs().size() == 1
+        && !hasGroupByClause() && !hasOrderByClause()
+        && !hasMultiAggInfo() && !hasAnalyticInfo()) {
+      if (hasWhereClause() && 
!Expr.IS_ALWAYS_TRUE_PREDICATE.apply(getWhereClause())) {
+        return null;
+      }
+      if (hasLimit()) {
+        return new Pair<>(new Boolean(true), getLimit());
+      } else {
+        // even if this SELECT statement does not have a LIMIT, it is a
+        // simple select which may be an inline view and eligible for a
+        // limit pushdown from an outer block, so we return a non-null value
+        return new Pair<>(new Boolean(false), null);
+      }
+    }
+    return null;
+  }
+
+  /**
    * Append additional grouping expressions to the select list. Used by 
StmtRewriter.
    */
   protected void addGroupingExprs(List<Expr> addtlGroupingExprs) {
@@ -286,6 +319,7 @@ public class SelectStmt extends QueryStmt {
       }
 
       buildColumnLineageGraph();
+      analyzer_.setSimpleLimitStatus(checkSimpleLimitStmt());
     }
 
     private void analyzeSelectClause() throws AnalysisException {
@@ -1173,6 +1207,11 @@ public class SelectStmt extends QueryStmt {
     // Where clause
     if (whereClause_ != null) {
       strBuilder.append(" WHERE ");
+      List<PlanHint> predHints = whereClause_.getPredicateHints();
+      if (predHints != null && predHints.size() > 0) {
+        strBuilder.append(ToSqlUtils.getPlanHintsSql(options,
+            predHints)).append(" ");
+      }
       strBuilder.append(whereClause_.toSql(options));
     }
     // Group By clause
diff --git 
a/fe/src/main/java/org/apache/impala/planner/HdfsPartitionPruner.java 
b/fe/src/main/java/org/apache/impala/planner/HdfsPartitionPruner.java
index 98ab761..03d8e5e 100644
--- a/fe/src/main/java/org/apache/impala/planner/HdfsPartitionPruner.java
+++ b/fe/src/main/java/org/apache/impala/planner/HdfsPartitionPruner.java
@@ -39,6 +39,7 @@ import org.apache.impala.analysis.LiteralExpr;
 import org.apache.impala.analysis.NullLiteral;
 import org.apache.impala.analysis.SlotId;
 import org.apache.impala.analysis.SlotRef;
+import org.apache.impala.analysis.TableRef;
 import org.apache.impala.analysis.TupleDescriptor;
 import org.apache.impala.catalog.FeFsPartition;
 import org.apache.impala.catalog.FeFsTable;
@@ -110,7 +111,8 @@ public class HdfsPartitionPruner {
    * If 'allowEmpty' is False, empty partitions are not returned.
    */
   public Pair<List<? extends FeFsPartition>, List<Expr>> prunePartitions(
-      Analyzer analyzer, List<Expr> conjuncts, boolean allowEmpty)
+      Analyzer analyzer, List<Expr> conjuncts, boolean allowEmpty,
+      TableRef hdfsTblRef)
       throws ImpalaException {
     // Start with creating a collection of partition filters for the 
applicable conjuncts.
     List<HdfsPartitionFilter> partitionFilters = new ArrayList<>();
@@ -179,10 +181,46 @@ public class HdfsPartitionPruner {
             }
           }));
     }
+    if (hdfsTblRef != null) {
+      // check and apply pruning for simple limit
+      results = pruneForSimpleLimit(hdfsTblRef, analyzer, results);
+    }
     return new Pair<>(results, partitionConjuncts);
   }
 
   /**
+   * Prune partitions based on eligibility of simple limit optimization:
+   *  - table ref should not already have a TABLESAMPLE clause
+   *  - OPTIMIZE_SIMPLE_LIMIT is enabled and the query block satisfies
+   *    simple limit criteria
+   */
+  private List<? extends FeFsPartition> pruneForSimpleLimit(TableRef tblRef,
+    Analyzer analyzer, List<? extends FeFsPartition> partitions) {
+    if (tblRef.getSampleParams() == null
+        && analyzer.getQueryCtx().client_request.getQuery_options()
+        .isOptimize_simple_limit()
+        && analyzer.getSimpleLimitStatus() != null
+        && analyzer.getSimpleLimitStatus().first) {
+      List<FeFsPartition> prunedPartitions = new ArrayList<>();
+      long numRows = 0;
+      // Instead of using the partitions num rows statistic which may be stale,
+      // we use a conservative estimate of number of files within a partition 
and
+      // 1 row per file
+      for (FeFsPartition p : partitions) {
+        numRows += p.getNumFileDescriptors();
+        prunedPartitions.add(p);
+        if (numRows >= analyzer.getSimpleLimitStatus().second) {
+          // here we only limit the partitions; later in HdfsScanNode we will
+          // limit the file descriptors within a partition
+          break;
+        }
+      }
+      return prunedPartitions;
+    }
+    return partitions;
+  }
+
+  /**
    * Recursive function that checks if a given partition expr can be evaluated
    * directly from the partition key values. If 'expr' contains any constant 
expressions,
    * they are evaluated in the BE and are replaced by their corresponding 
results, as
diff --git a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java 
b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
index d2daae2..c0c6312 100644
--- a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
@@ -842,12 +842,43 @@ public class HdfsScanNode extends ScanNode {
     largestScanRangeBytes_ = 0;
     maxScanRangeNumRows_ = -1;
     fileFormats_ = new HashSet<>();
+    long simpleLimitNumRows = 0; // only used for the simple limit case
+    boolean isSimpleLimit =
+        (analyzer.getQueryCtx().client_request.getQuery_options()
+            .isOptimize_simple_limit()
+        && analyzer.getSimpleLimitStatus() != null
+        && analyzer.getSimpleLimitStatus().first);
     for (FeFsPartition partition: partitions_) {
+      // Missing disk id accounting is only done for file systems that support 
the notion
+      // of disk/storage ids.
+      FileSystem partitionFs;
+      try {
+        partitionFs = partition.getLocationPath().getFileSystem(CONF);
+      } catch (IOException e) {
+        throw new ImpalaRuntimeException("Error determining partition fs 
type", e);
+      }
+      boolean fsHasBlocks = FileSystemUtil.supportsStorageIds(partitionFs);
       List<FileDescriptor> fileDescs;
       if (this instanceof IcebergScanNode) {
         fileDescs = ((IcebergScanNode) 
this).getFileDescriptorByIcebergPredicates();
       } else {
-        fileDescs = partition.getFileDescriptors();
+        if (isSimpleLimit) {
+          fileDescs = new ArrayList<>();
+          for (FileDescriptor fd : partition.getFileDescriptors()) {
+            // skip empty files
+            if ((fsHasBlocks && fd.getNumFileBlocks() == 0)
+                || (!fsHasBlocks && fd.getFileLength() <= 0)) {
+              continue;
+            }
+            simpleLimitNumRows++;  // conservatively estimate 1 row per file
+            fileDescs.add(fd);
+            if (simpleLimitNumRows == analyzer.getSimpleLimitStatus().second) {
+              break;
+            }
+          }
+        } else {
+          fileDescs = partition.getFileDescriptors();
+        }
       }
 
       if (sampledFiles != null) {
@@ -861,15 +892,7 @@ public class HdfsScanNode extends ScanNode {
       analyzer.getDescTbl().addReferencedPartition(tbl_, partition.getId());
       fileFormats_.add(partition.getFileFormat());
       Preconditions.checkState(partition.getId() >= 0);
-      // Missing disk id accounting is only done for file systems that support 
the notion
-      // of disk/storage ids.
-      FileSystem partitionFs;
-      try {
-        partitionFs = partition.getLocationPath().getFileSystem(CONF);
-      } catch (IOException e) {
-        throw new ImpalaRuntimeException("Error determining partition fs 
type", e);
-      }
-      boolean fsHasBlocks = FileSystemUtil.supportsStorageIds(partitionFs);
+
       if (!fsHasBlocks) {
         // Limit the scan range length if generating scan ranges (and we're not
         // short-circuiting the scan for a partition key scan).
@@ -923,6 +946,12 @@ public class HdfsScanNode extends ScanNode {
         updateMaxScanRangeNumRows(
             partitionNumRows, partitionBytes, partitionMaxScanRangeBytes);
       }
+      if (isSimpleLimit && simpleLimitNumRows ==
+          analyzer.getSimpleLimitStatus().second) {
+        // for the simple limit case if the estimated rows has already reached 
the limit
+        // there's no need to process more partitions
+        break;
+      }
     }
     if (totalFilesPerFs_.isEmpty() || sumValues(totalFilesPerFs_) == 0) {
       maxScanRangeNumRows_ = 0;
diff --git a/fe/src/main/java/org/apache/impala/planner/SingleNodePlanner.java 
b/fe/src/main/java/org/apache/impala/planner/SingleNodePlanner.java
index 8ce0b40..bb00470 100644
--- a/fe/src/main/java/org/apache/impala/planner/SingleNodePlanner.java
+++ b/fe/src/main/java/org/apache/impala/planner/SingleNodePlanner.java
@@ -68,8 +68,6 @@ import org.apache.impala.catalog.FeKuduTable;
 import org.apache.impala.catalog.FeIcebergTable;
 import org.apache.impala.catalog.FeTable;
 import org.apache.impala.catalog.HdfsFileFormat;
-import org.apache.impala.catalog.HdfsPartition;
-import org.apache.impala.catalog.HdfsPartition.FileDescriptor;
 import org.apache.impala.catalog.ScalarType;
 import org.apache.impala.catalog.TableLoadingException;
 import org.apache.impala.common.AnalysisException;
@@ -1182,6 +1180,8 @@ public class SingleNodePlanner {
     // On-clause of an outer join may be pushed into the inline view as well.
     migrateConjunctsToInlineView(analyzer, inlineViewRef);
 
+    migrateSimpleLimitToInlineView(analyzer, inlineViewRef);
+
     // Turn a constant select into a UnionNode that materializes the exprs.
     // TODO: unify this with createConstantSelectPlan(), this is basically the
     // same thing
@@ -1311,6 +1311,23 @@ public class SingleNodePlanner {
     }
   }
 
+  public void migrateSimpleLimitToInlineView(final Analyzer analyzer,
+    final InlineViewRef inlineViewRef) {
+    Pair<Boolean, Long> outerStatus = analyzer.getSimpleLimitStatus();
+    if (outerStatus == null || !outerStatus.first
+        || inlineViewRef.isTableMaskingView()) {
+      return;
+    }
+    Pair<Boolean, Long> viewStatus =
+        inlineViewRef.getAnalyzer().getSimpleLimitStatus();
+    // if the view already has a limit, we leave it as-is otherwise we
+    // apply the outer limit
+    if (viewStatus != null && !viewStatus.first) {
+      inlineViewRef.getAnalyzer().setSimpleLimitStatus(new Pair<>(new 
Boolean(true),
+          outerStatus.second));
+    }
+  }
+
   /**
    * Migrates unassigned conjuncts into an inline view. Conjuncts are not
    * migrated into the inline view if the view has a LIMIT/OFFSET clause or if 
the
@@ -1470,7 +1487,7 @@ public class SingleNodePlanner {
     // end up removing some predicates.
     HdfsPartitionPruner pruner = new HdfsPartitionPruner(tupleDesc);
     Pair<List<? extends FeFsPartition>, List<Expr>> pair =
-        pruner.prunePartitions(analyzer, conjuncts, false);
+        pruner.prunePartitions(analyzer, conjuncts, false, hdfsTblRef);
     List<? extends FeFsPartition> partitions = pair.first;
 
     // Mark all slots referenced by the remaining conjuncts as materialized.
diff --git a/fe/src/test/java/org/apache/impala/analysis/ParserTest.java 
b/fe/src/test/java/org/apache/impala/analysis/ParserTest.java
index 339cb22..b244337 100644
--- a/fe/src/test/java/org/apache/impala/analysis/ParserTest.java
+++ b/fe/src/test/java/org/apache/impala/analysis/ParserTest.java
@@ -3491,7 +3491,7 @@ public class ParserTest extends FrontendTestBase {
         "                           ^\n" +
         "Encountered: EOF\n" +
         "Expected: CASE, CAST, DATE, DEFAULT, EXISTS, FALSE, GROUPING, IF, 
INTERVAL, " +
-        "LEFT, NOT, NULL, REPLACE, RIGHT, TRUNCATE, TRUE, IDENTIFIER");
+        "LEFT, NOT, NULL, REPLACE, RIGHT, STRAIGHT_JOIN, TRUNCATE, TRUE, 
IDENTIFIER");
 
     // missing predicate in where clause (group by)
     ParserError("select c, b, c from t where group by a, b",
@@ -3500,7 +3500,7 @@ public class ParserTest extends FrontendTestBase {
         "                            ^\n" +
         "Encountered: GROUP\n" +
         "Expected: CASE, CAST, DATE, DEFAULT, EXISTS, FALSE, GROUPING, IF, 
INTERVAL, " +
-        "LEFT, NOT, NULL, REPLACE, RIGHT, TRUNCATE, TRUE, IDENTIFIER");
+        "LEFT, NOT, NULL, REPLACE, RIGHT, STRAIGHT_JOIN, TRUNCATE, TRUE, 
IDENTIFIER");
 
     // unmatched string literal starting with "
     ParserError("select c, \"b, c from t",
diff --git a/fe/src/test/java/org/apache/impala/planner/PlannerTest.java 
b/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
index 96d3b19..c812430 100644
--- a/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
+++ b/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
@@ -1100,4 +1100,15 @@ public class PlannerTest extends PlannerTestBase {
     runPlannerTestFile("outer-to-inner-joins", options,
         ImmutableSet.of(PlannerTestOption.VALIDATE_CARDINALITY));
   }
+
+  /**
+   * Test simple limit optimization
+   */
+  @Test
+  public void testSimpleLimitOptimization() {
+    TQueryOptions options = new TQueryOptions();
+    options.setOptimize_simple_limit(true);
+    runPlannerTestFile("optimize-simple-limit", options);
+  }
+
 }
diff --git 
a/testdata/workloads/functional-planner/queries/PlannerTest/optimize-simple-limit.test
 
b/testdata/workloads/functional-planner/queries/PlannerTest/optimize-simple-limit.test
new file mode 100644
index 0000000..b46089b
--- /dev/null
+++ 
b/testdata/workloads/functional-planner/queries/PlannerTest/optimize-simple-limit.test
@@ -0,0 +1,258 @@
+# IMPALA-10314
+# Basic simple limit optimization
+select * from functional.alltypes_date_partition limit 10;
+---- PLAN
+PLAN-ROOT SINK
+|
+00:SCAN HDFS [functional.alltypes_date_partition]
+   HDFS partitions=10/55 files=10 size=3.97KB
+   limit: 10
+   row-size=65B cardinality=10
+====
+# Sanity check with simple limit optimization disabled
+select * from functional.alltypes_date_partition limit 10;
+---- QUERYOPTIONS
+OPTIMIZE_SIMPLE_LIMIT=false
+---- PLAN
+PLAN-ROOT SINK
+|
+00:SCAN HDFS [functional.alltypes_date_partition]
+   HDFS partitions=55/55 files=55 size=27.27KB
+   limit: 10
+   row-size=65B cardinality=10
+====
+# limit on both sides of a union all
+select * from functional.alltypes_date_partition limit 10
+union all
+select * from functional.alltypes_date_partition limit 5
+---- PLAN
+PLAN-ROOT SINK
+|
+00:UNION
+|  pass-through-operands: all
+|  row-size=65B cardinality=15
+|
+|--02:SCAN HDFS [functional.alltypes_date_partition]
+|     HDFS partitions=5/55 files=5 size=1.29KB
+|     limit: 5
+|     row-size=65B cardinality=5
+|
+01:SCAN HDFS [functional.alltypes_date_partition]
+   HDFS partitions=10/55 files=10 size=3.97KB
+   limit: 10
+   row-size=65B cardinality=10
+====
+# limit in outer block, WHERE clause in inline view
+# along with an explicit hint
+with dp_view as
+(select * from functional.alltypes_date_partition
+where /* +always_true */ date_col = cast(timestamp_col as date))
+select * from dp_view limit 10;
+---- PLAN
+PLAN-ROOT SINK
+|
+00:SCAN HDFS [functional.alltypes_date_partition]
+   HDFS partitions=10/55 files=10 size=3.97KB
+   predicates: date_col = CAST(timestamp_col AS DATE)
+   limit: 10
+   row-size=65B cardinality=8
+====
+# limit and WHERE with a wrongly specified hint (missing
+# underscore)
+select * from functional.alltypes_date_partition
+where /* +alwaystrue */ date_col = cast(timestamp_col as date)
+limit 10;
+---- PLAN
+PLAN-ROOT SINK
+|
+00:SCAN HDFS [functional.alltypes_date_partition]
+   HDFS partitions=55/55 files=55 size=27.27KB
+   predicates: date_col = CAST(timestamp_col AS DATE)
+   limit: 10
+   row-size=65B cardinality=10
+====
+# WHERE with hint in alternative format '[]' and uppercase
+select * from functional.alltypes_date_partition
+where [ALWAYS_TRUE] date_col = cast(timestamp_col as date)
+limit 10;
+---- PLAN
+PLAN-ROOT SINK
+|
+00:SCAN HDFS [functional.alltypes_date_partition]
+   HDFS partitions=10/55 files=10 size=3.97KB
+   predicates: date_col = CAST(timestamp_col AS DATE)
+   limit: 10
+   row-size=65B cardinality=8
+====
+# WHERE with more conjuncts
+select * from functional.alltypes_date_partition
+where [ALWAYS_TRUE] date_col = cast(timestamp_col as date)
+and int_col >= 0
+limit 10;
+---- PLAN
+PLAN-ROOT SINK
+|
+00:SCAN HDFS [functional.alltypes_date_partition]
+   HDFS partitions=10/55 files=10 size=3.97KB
+   predicates: int_col >= 0, date_col = CAST(timestamp_col AS DATE)
+   limit: 10
+   row-size=65B cardinality=8
+====
+# WHERE with subquery; should not apply optimization
+# since the subquery is converted to a join
+select * from functional.alltypes_date_partition
+where [ALWAYS_TRUE] int_col in (select int_col from functional.alltypes)
+limit 10;
+---- PLAN
+PLAN-ROOT SINK
+|
+03:HASH JOIN [LEFT SEMI JOIN]
+|  hash predicates: int_col = functional.alltypes.int_col
+|  runtime filters: RF000 <- functional.alltypes.int_col
+|  limit: 10
+|  row-size=65B cardinality=10
+|
+|--02:AGGREGATE [FINALIZE]
+|  |  group by: functional.alltypes.int_col
+|  |  row-size=4B cardinality=10
+|  |
+|  01:SCAN HDFS [functional.alltypes]
+|     HDFS partitions=24/24 files=24 size=478.45KB
+|     row-size=4B cardinality=7.30K
+|
+00:SCAN HDFS [functional.alltypes_date_partition]
+   HDFS partitions=55/55 files=55 size=27.27KB
+   runtime filters: RF000 -> int_col
+   row-size=65B cardinality=500
+====
+# limit in outer block, WHERE clause in inline view
+# but without the hint: should not apply optimization
+with dp_view as
+(select * from functional.alltypes_date_partition
+where date_col = cast(timestamp_col as date))
+select * from dp_view limit 10;
+---- PLAN
+PLAN-ROOT SINK
+|
+00:SCAN HDFS [functional.alltypes_date_partition]
+   HDFS partitions=55/55 files=55 size=27.27KB
+   predicates: date_col = CAST(timestamp_col AS DATE)
+   limit: 10
+   row-size=65B cardinality=10
+====
+# limit and partition pruning via range constant propagation:
+# should not apply optimization since outer query predicate
+# is not always true
+with dp_view as
+(select * from functional.alltypes_date_partition
+where /* +always_true */ date_col = cast(timestamp_col as date))
+select * from dp_view
+where timestamp_col between '2009-01-01' and '2009-02-01'
+limit 10;
+---- PLAN
+PLAN-ROOT SINK
+|
+00:SCAN HDFS [functional.alltypes_date_partition]
+   partition predicates: date_col >= DATE '2009-01-01' AND date_col <= DATE 
'2009-02-01'
+   HDFS partitions=32/55 files=32 size=15.99KB
+   predicates: functional.alltypes_date_partition.timestamp_col <= TIMESTAMP 
'2009-02-01 00:00:00', functional.alltypes_date_partition.timestamp_col >= 
TIMESTAMP '2009-01-01 00:00:00', date_col = CAST(timestamp_col AS DATE)
+   limit: 10
+   row-size=65B cardinality=10
+====
+# limit in outer block, WHERE and LIMIT clause in
+# inline view
+with dp_view as
+(select * from functional.alltypes_date_partition
+where /* +always_true */ date_col = cast(timestamp_col as date) limit 20)
+select * from dp_view limit 10;
+---- PLAN
+PLAN-ROOT SINK
+|
+00:SCAN HDFS [functional.alltypes_date_partition]
+   HDFS partitions=20/55 files=20 size=9.42KB
+   predicates: date_col = CAST(timestamp_col AS DATE)
+   limit: 10
+   row-size=65B cardinality=10
+====
+# Join with limit: optimization should not be applied
+select * from functional.alltypes_date_partition t1
+inner join functional.alltypes t2
+on t1.bigint_col = t2.bigint_col
+limit 10;
+---- PLAN
+PLAN-ROOT SINK
+|
+02:HASH JOIN [INNER JOIN]
+|  hash predicates: t2.bigint_col = t1.bigint_col
+|  runtime filters: RF000 <- t1.bigint_col
+|  limit: 10
+|  row-size=154B cardinality=10
+|
+|--00:SCAN HDFS [functional.alltypes_date_partition t1]
+|     HDFS partitions=55/55 files=55 size=27.27KB
+|     row-size=65B cardinality=500
+|
+01:SCAN HDFS [functional.alltypes t2]
+   HDFS partitions=24/24 files=24 size=478.45KB
+   runtime filters: RF000 -> t2.bigint_col
+   row-size=89B cardinality=7.30K
+====
+# Order-by with limit: optimization should not be applied
+select * from functional.alltypes_date_partition
+order by bigint_col limit 10;
+---- PLAN
+PLAN-ROOT SINK
+|
+01:TOP-N [LIMIT=10]
+|  order by: bigint_col ASC
+|  row-size=65B cardinality=10
+|
+00:SCAN HDFS [functional.alltypes_date_partition]
+   HDFS partitions=55/55 files=55 size=27.27KB
+   row-size=65B cardinality=500
+====
+# Order-by with limit in outer query, limit also present in
+# from clause subquery: optimization should be applied
+select * from (select * from functional.alltypes_date_partition limit 10) t
+order by int_col limit 10;
+---- PLAN
+PLAN-ROOT SINK
+|
+01:TOP-N [LIMIT=10]
+|  order by: int_col ASC
+|  row-size=65B cardinality=10
+|
+00:SCAN HDFS [functional.alltypes_date_partition]
+   HDFS partitions=10/55 files=10 size=3.97KB
+   limit: 10
+   row-size=65B cardinality=10
+====
+# limit query on table with multiple files per partition
+select * from functional.alltypesaggmultifiles limit 10;
+---- PLAN
+PLAN-ROOT SINK
+|
+00:SCAN HDFS [functional.alltypesaggmultifiles]
+   HDFS partitions=3/11 files=10 size=183.75KB
+   limit: 10
+   row-size=84B cardinality=10
+====
+# Special case of EXISTS subquery that gets transformed to have a
+# limit 1 and then simple limit optimization should be applied
+select * from functional.alltypestiny
+where exists (select int_col from functional.alltypes_date_partition)
+---- PLAN
+PLAN-ROOT SINK
+|
+02:NESTED LOOP JOIN [LEFT SEMI JOIN]
+|  row-size=89B cardinality=8
+|
+|--01:SCAN HDFS [functional.alltypes_date_partition]
+|     HDFS partitions=1/55 files=1 size=257B
+|     limit: 1
+|     row-size=0B cardinality=1
+|
+00:SCAN HDFS [functional.alltypestiny]
+   HDFS partitions=4/4 files=4 size=460B
+   row-size=89B cardinality=8
+====
diff --git 
a/testdata/workloads/functional-query/queries/QueryTest/range-constant-propagation.test
 
b/testdata/workloads/functional-query/queries/QueryTest/range-constant-propagation.test
index 4db6e2b..7f59d57 100644
--- 
a/testdata/workloads/functional-query/queries/QueryTest/range-constant-propagation.test
+++ 
b/testdata/workloads/functional-query/queries/QueryTest/range-constant-propagation.test
@@ -23,3 +23,17 @@ and timestamp_col <= '2009-02-01';
 ---- TYPES
 BIGINT, BIGINT
 ====
+---- QUERY
+# IMPALA-10314
+# Simple limit in outer query referencing a with clause subquery.
+# WHERE clause has an always_true hint.
+set optimize_simple_limit=true;
+with dp_view as
+(select * from alltypes_date_partition
+where /* +always_true */ date_col = cast(timestamp_col as date))
+select count(*) from (select * from dp_view limit 10) t;
+---- RESULTS
+10
+---- TYPES
+BIGINT
+====

Reply via email to