This is an automated email from the ASF dual-hosted git repository.

cgivre pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git


The following commit(s) were added to refs/heads/master by this push:
     new 27f3359ddb DRILL-8528: Hbase Limit Push Down (#3000)
27f3359ddb is described below

commit 27f3359ddb764cad7856828c37646ae99be843ed
Author: shfshihuafeng <[email protected]>
AuthorDate: Mon Jul 21 04:48:21 2025 +0800

    DRILL-8528: Hbase Limit Push Down (#3000)
---
 .../drill/exec/store/hbase/HBaseGroupScan.java     | 61 +++++++++++++++++++---
 .../exec/store/hbase/HBasePushFilterIntoScan.java  |  2 +-
 .../drill/exec/store/hbase/HBaseRecordReader.java  |  4 +-
 .../exec/store/hbase/HBaseScanBatchCreator.java    |  2 +-
 .../drill/exec/store/hbase/HBaseStoragePlugin.java |  2 +-
 .../drill/exec/store/hbase/HBaseSubScan.java       | 16 ++++--
 .../drill/hbase/TestHBaseFilterPushDown.java       | 48 +++++++++++++++++
 7 files changed, 121 insertions(+), 14 deletions(-)

diff --git 
a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseGroupScan.java
 
b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseGroupScan.java
index cec24bf0ad..d4867da24c 100644
--- 
a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseGroupScan.java
+++ 
b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseGroupScan.java
@@ -35,6 +35,7 @@ import java.util.Set;
 import java.util.TreeMap;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.drill.common.PlanStringBuilder;
 import org.apache.drill.common.exceptions.DrillRuntimeException;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.expression.SchemaPath;
@@ -99,22 +100,26 @@ public class HBaseGroupScan extends AbstractGroupScan 
implements DrillHBaseConst
 
   private long scanSizeInBytes = 0;
 
+  private final int maxRecords;
+
   @JsonCreator
   public HBaseGroupScan(@JsonProperty("userName") String userName,
                         @JsonProperty("hbaseScanSpec") HBaseScanSpec 
hbaseScanSpec,
                         @JsonProperty("storage") HBaseStoragePluginConfig 
storagePluginConfig,
                         @JsonProperty("columns") List<SchemaPath> columns,
+                        @JsonProperty("maxRecords") int maxRecords,
                         @JacksonInject StoragePluginRegistry pluginRegistry) 
throws IOException, ExecutionSetupException {
-    this (userName, pluginRegistry.resolve(storagePluginConfig, 
HBaseStoragePlugin.class), hbaseScanSpec, columns);
+    this (userName, pluginRegistry.resolve(storagePluginConfig, 
HBaseStoragePlugin.class), hbaseScanSpec, columns, maxRecords);
   }
 
   public HBaseGroupScan(String userName, HBaseStoragePlugin storagePlugin, 
HBaseScanSpec scanSpec,
-      List<SchemaPath> columns) {
+      List<SchemaPath> columns, int maxRecords) {
     super(userName);
     this.storagePlugin = storagePlugin;
     this.storagePluginConfig = storagePlugin.getConfig();
     this.hbaseScanSpec = scanSpec;
     this.columns = columns == null ? ALL_COLUMNS : columns;
+    this.maxRecords = maxRecords;
     init();
   }
 
@@ -134,6 +139,22 @@ public class HBaseGroupScan extends AbstractGroupScan 
implements DrillHBaseConst
     this.filterPushedDown = that.filterPushedDown;
     this.statsCalculator = that.statsCalculator;
     this.scanSizeInBytes = that.scanSizeInBytes;
+    this.maxRecords = that.maxRecords;
+  }
+
+  private HBaseGroupScan(HBaseGroupScan that, int maxRecords) {
+    super(that);
+    this.columns = that.columns == null ? ALL_COLUMNS : that.columns;
+    this.hbaseScanSpec = that.hbaseScanSpec;
+    this.endpointFragmentMapping = that.endpointFragmentMapping;
+    this.regionsToScan = that.regionsToScan;
+    this.storagePlugin = that.storagePlugin;
+    this.storagePluginConfig = that.storagePluginConfig;
+    this.hTableDesc = that.hTableDesc;
+    this.filterPushedDown = that.filterPushedDown;
+    this.statsCalculator = that.statsCalculator;
+    this.scanSizeInBytes = that.scanSizeInBytes;
+    this.maxRecords = maxRecords;
   }
 
   @Override
@@ -329,7 +350,7 @@ public class HBaseGroupScan extends AbstractGroupScan 
implements DrillHBaseConst
     assert minorFragmentId < endpointFragmentMapping.size() : String.format(
         "Mappings length [%d] should be greater than minor fragment id [%d] 
but it isn't.", endpointFragmentMapping.size(),
         minorFragmentId);
-    return new HBaseSubScan(getUserName(), storagePlugin, 
endpointFragmentMapping.get(minorFragmentId), columns);
+    return new HBaseSubScan(getUserName(), storagePlugin, 
endpointFragmentMapping.get(minorFragmentId), columns, maxRecords);
   }
 
   @Override
@@ -380,9 +401,11 @@ public class HBaseGroupScan extends AbstractGroupScan 
implements DrillHBaseConst
 
   @Override
   public String toString() {
-    return "HBaseGroupScan [HBaseScanSpec="
-        + hbaseScanSpec + ", columns="
-        + columns + "]";
+    return new PlanStringBuilder(this)
+        .field("hbaseScanSpec", hbaseScanSpec)
+        .field("columns", columns)
+        .field("maxRecords", maxRecords)
+        .toString();
   }
 
   @JsonProperty("storage")
@@ -396,6 +419,31 @@ public class HBaseGroupScan extends AbstractGroupScan 
implements DrillHBaseConst
     return columns;
   }
 
+  @JsonProperty("maxRecords")
+  public int getMaxRecords() {
+    return maxRecords;
+  }
+
+  /**
+   * Default is not to support limit pushdown.
+   */
+  @Override
+  public boolean supportsLimitPushdown() {
+    return true;
+  }
+
+  /**
+   * By default, return null to indicate row count based prune is not 
supported.
+   * Each group scan subclass should override, if it supports row count based 
prune.
+   */
+  @Override
+  public GroupScan applyLimit(int maxRecords) {
+    if (maxRecords == this.maxRecords){
+      return null;
+    }
+    return new HBaseGroupScan(this, maxRecords);
+  }
+
   @JsonProperty
   public HBaseScanSpec getHBaseScanSpec() {
     return hbaseScanSpec;
@@ -423,6 +471,7 @@ public class HBaseGroupScan extends AbstractGroupScan 
implements DrillHBaseConst
   @VisibleForTesting
   public HBaseGroupScan() {
     super((String)null);
+    maxRecords = -1;
   }
 
   /**
diff --git 
a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBasePushFilterIntoScan.java
 
b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBasePushFilterIntoScan.java
index abe0a45678..835d36a8a1 100644
--- 
a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBasePushFilterIntoScan.java
+++ 
b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBasePushFilterIntoScan.java
@@ -122,7 +122,7 @@ public abstract class HBasePushFilterIntoScan extends 
StoragePluginOptimizerRule
     }
 
     final HBaseGroupScan newGroupsScan = new 
HBaseGroupScan(groupScan.getUserName(), groupScan.getStoragePlugin(),
-        newScanSpec, groupScan.getColumns());
+        newScanSpec, groupScan.getColumns(), groupScan.getMaxRecords());
     newGroupsScan.setFilterPushedDown(true);
 
     final ScanPrel newScanPrel = new ScanPrel(scan.getCluster(), 
filter.getTraitSet(), newGroupsScan, scan.getRowType(), scan.getTable());
diff --git 
a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java
 
b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java
index 4421166bf1..4f3195f367 100644
--- 
a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java
+++ 
b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java
@@ -84,13 +84,15 @@ public class HBaseRecordReader extends AbstractRecordReader 
implements DrillHBas
 
   private final Connection connection;
 
-  public HBaseRecordReader(Connection connection, 
HBaseSubScan.HBaseSubScanSpec subScanSpec, List<SchemaPath> projectedColumns) {
+  public HBaseRecordReader(Connection connection, 
HBaseSubScan.HBaseSubScanSpec subScanSpec, List<SchemaPath> projectedColumns, 
int maxRecords) {
     this.connection = connection;
     hbaseTableName = TableName.valueOf(
         Preconditions.checkNotNull(subScanSpec, "HBase reader needs a sub-scan 
spec").getTableName());
     hbaseScan = new Scan(subScanSpec.getStartRow(), subScanSpec.getStopRow());
     hbaseScanColumnsOnly = new Scan();
+    // Set the limit of rows for this scan. We will terminate the scan if the 
number of returned rows reaches this value.
     hbaseScan
+        .setLimit(maxRecords)
         .setFilter(subScanSpec.getScanFilter())
         .setCaching(TARGET_RECORD_COUNT);
 
diff --git 
a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseScanBatchCreator.java
 
b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseScanBatchCreator.java
index 35657ba9e4..9c875b97d9 100644
--- 
a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseScanBatchCreator.java
+++ 
b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseScanBatchCreator.java
@@ -44,7 +44,7 @@ public class HBaseScanBatchCreator implements 
BatchCreator<HBaseSubScan> {
         if ((columns = subScan.getColumns())==null) {
           columns = GroupScan.ALL_COLUMNS;
         }
-        readers.add(new 
HBaseRecordReader(subScan.getStorageEngine().getConnection(), scanSpec, 
columns));
+        readers.add(new 
HBaseRecordReader(subScan.getStorageEngine().getConnection(), scanSpec, 
columns, subScan.getMaxRecords()));
       } catch (Exception e1) {
         throw new ExecutionSetupException(e1);
       }
diff --git 
a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseStoragePlugin.java
 
b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseStoragePlugin.java
index f7c15ebde9..48c4fe4d6d 100644
--- 
a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseStoragePlugin.java
+++ 
b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseStoragePlugin.java
@@ -61,7 +61,7 @@ public class HBaseStoragePlugin extends AbstractStoragePlugin 
{
   @Override
   public HBaseGroupScan getPhysicalScan(String userName, JSONOptions 
selection) throws IOException {
     HBaseScanSpec scanSpec = selection.getListWith(new 
TypeReference<HBaseScanSpec>() {});
-    return new HBaseGroupScan(userName, this, scanSpec, null);
+    return new HBaseGroupScan(userName, this, scanSpec, null, -1);
   }
 
   @Override
diff --git 
a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSubScan.java
 
b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSubScan.java
index 6dd4a7bb93..ab3ba39a8d 100644
--- 
a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSubScan.java
+++ 
b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSubScan.java
@@ -50,27 +50,30 @@ public class HBaseSubScan extends AbstractBase implements 
SubScan {
   private final HBaseStoragePlugin hbaseStoragePlugin;
   private final List<HBaseSubScanSpec> regionScanSpecList;
   private final List<SchemaPath> columns;
+  private final int maxRecords;
 
   @JsonCreator
   public HBaseSubScan(@JacksonInject StoragePluginRegistry registry,
                       @JsonProperty("userName") String userName,
                       @JsonProperty("hbaseStoragePluginConfig") 
HBaseStoragePluginConfig hbaseStoragePluginConfig,
                       @JsonProperty("regionScanSpecList") 
LinkedList<HBaseSubScanSpec> regionScanSpecList,
-                      @JsonProperty("columns") List<SchemaPath> columns) 
throws ExecutionSetupException {
+                      @JsonProperty("columns") List<SchemaPath> columns,
+                      @JsonProperty("maxRecords") int maxRecords) throws 
ExecutionSetupException {
     this(userName,
         registry.resolve(hbaseStoragePluginConfig, HBaseStoragePlugin.class),
         regionScanSpecList,
-        columns);
+        columns, maxRecords);
   }
 
   public HBaseSubScan(String userName,
                       HBaseStoragePlugin hbaseStoragePlugin,
                       List<HBaseSubScanSpec> regionInfoList,
-                      List<SchemaPath> columns) {
+                      List<SchemaPath> columns, int maxRecords) {
     super(userName);
     this.hbaseStoragePlugin = hbaseStoragePlugin;
     this.regionScanSpecList = regionInfoList;
     this.columns = columns;
+    this.maxRecords = maxRecords;
   }
 
   @JsonProperty
@@ -98,6 +101,11 @@ public class HBaseSubScan extends AbstractBase implements 
SubScan {
     return hbaseStoragePlugin;
   }
 
+  @JsonIgnore
+  public int getMaxRecords() {
+    return maxRecords;
+  }
+
   @Override
   public <T, X, E extends Throwable> T accept(PhysicalVisitor<T, X, E> 
physicalVisitor, X value) throws E {
     return physicalVisitor.visitSubScan(this, value);
@@ -106,7 +114,7 @@ public class HBaseSubScan extends AbstractBase implements 
SubScan {
   @Override
   public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) {
     Preconditions.checkArgument(children.isEmpty());
-    return new HBaseSubScan(getUserName(), hbaseStoragePlugin, 
regionScanSpecList, columns);
+    return new HBaseSubScan(getUserName(), hbaseStoragePlugin, 
regionScanSpecList, columns, maxRecords);
   }
 
   @Override
diff --git 
a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseFilterPushDown.java
 
b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseFilterPushDown.java
index 7d69d7b1f7..2e74fabba6 100644
--- 
a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseFilterPushDown.java
+++ 
b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseFilterPushDown.java
@@ -44,6 +44,54 @@ public class TestHBaseFilterPushDown extends BaseHBaseTest {
     PlanTestBase.testPlanMatchingPatterns(sqlHBase, expectedPlan, 
excludedPlan);
   }
 
+  @Test
+  public void testLimitPushDown() throws Exception {
+    final String sql = "SELECT\n"
+        + "  *\n"
+        + "FROM\n"
+        + "  hbase.`[TABLE_NAME]` tableName\n"
+        + "LIMIT 3\n";
+
+    runHBaseSQLVerifyCount(sql, 3);
+
+    final String[] expectedPlan = {"Limit\\(fetch\\=\\[3\\]\\)", 
"maxRecords\\=3"};
+    final String[] excludedPlan ={};
+    final String sqlHBase = canonizeHBaseSQL(sql);
+    PlanTestBase.testPlanMatchingPatterns(sqlHBase, expectedPlan, 
excludedPlan);
+  }
+
+  @Test
+  public void testLimitPushDownWithSpecial() throws Exception {
+    final String sql = "SELECT\n"
+        + "  *\n"
+        + "FROM\n"
+        + "  hbase.`[TABLE_NAME]` tableName\n"
+        + "LIMIT 0\n";
+
+    runHBaseSQLVerifyCount(sql, 0);
+
+    final String[] expectedPlan = {"Limit\\(fetch\\=\\[0\\]\\)", 
"Limit\\(offset\\=\\[0\\]\\, fetch\\=\\[0\\]\\)", "maxRecords\\=0"};
+    final String[] excludedPlan ={};
+    final String sqlHBase = canonizeHBaseSQL(sql);
+    PlanTestBase.testPlanMatchingPatterns(sqlHBase, expectedPlan, 
excludedPlan);
+  }
+
+  @Test
+  public void testLimitPushDownWithOffset() throws Exception {
+    final String sql = "SELECT\n"
+        + "  *\n"
+        + "FROM\n"
+        + "  hbase.`[TABLE_NAME]` tableName\n"
+        + "LIMIT 2 offset 2\n";
+
+    runHBaseSQLVerifyCount(sql, 2);
+
+    final String[] expectedPlan = {"Limit\\(offset\\=\\[2\\]\\, 
fetch\\=\\[2\\]\\)", "maxRecords\\=4"};
+    final String[] excludedPlan ={};
+    final String sqlHBase = canonizeHBaseSQL(sql);
+    PlanTestBase.testPlanMatchingPatterns(sqlHBase, expectedPlan, 
excludedPlan);
+  }
+
   @Test
   public void testFilterPushDownRowKeyNotEqual() throws Exception {
     setColumnWidths(new int[] {8, 38, 38});

Reply via email to