This is an automated email from the ASF dual-hosted git repository.
cgivre pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git
The following commit(s) were added to refs/heads/master by this push:
new 27f3359ddb DRILL-8528: Hbase Limit Push Down (#3000)
27f3359ddb is described below
commit 27f3359ddb764cad7856828c37646ae99be843ed
Author: shfshihuafeng <[email protected]>
AuthorDate: Mon Jul 21 04:48:21 2025 +0800
DRILL-8528: Hbase Limit Push Down (#3000)
---
.../drill/exec/store/hbase/HBaseGroupScan.java | 61 +++++++++++++++++++---
.../exec/store/hbase/HBasePushFilterIntoScan.java | 2 +-
.../drill/exec/store/hbase/HBaseRecordReader.java | 4 +-
.../exec/store/hbase/HBaseScanBatchCreator.java | 2 +-
.../drill/exec/store/hbase/HBaseStoragePlugin.java | 2 +-
.../drill/exec/store/hbase/HBaseSubScan.java | 16 ++++--
.../drill/hbase/TestHBaseFilterPushDown.java | 48 +++++++++++++++++
7 files changed, 121 insertions(+), 14 deletions(-)
diff --git
a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseGroupScan.java
b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseGroupScan.java
index cec24bf0ad..d4867da24c 100644
---
a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseGroupScan.java
+++
b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseGroupScan.java
@@ -35,6 +35,7 @@ import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.TimeUnit;
+import org.apache.drill.common.PlanStringBuilder;
import org.apache.drill.common.exceptions.DrillRuntimeException;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.expression.SchemaPath;
@@ -99,22 +100,26 @@ public class HBaseGroupScan extends AbstractGroupScan
implements DrillHBaseConst
private long scanSizeInBytes = 0;
+ private final int maxRecords;
+
@JsonCreator
public HBaseGroupScan(@JsonProperty("userName") String userName,
@JsonProperty("hbaseScanSpec") HBaseScanSpec
hbaseScanSpec,
@JsonProperty("storage") HBaseStoragePluginConfig
storagePluginConfig,
@JsonProperty("columns") List<SchemaPath> columns,
+ @JsonProperty("maxRecords") int maxRecords,
@JacksonInject StoragePluginRegistry pluginRegistry)
throws IOException, ExecutionSetupException {
- this (userName, pluginRegistry.resolve(storagePluginConfig,
HBaseStoragePlugin.class), hbaseScanSpec, columns);
+ this (userName, pluginRegistry.resolve(storagePluginConfig,
HBaseStoragePlugin.class), hbaseScanSpec, columns, maxRecords);
}
public HBaseGroupScan(String userName, HBaseStoragePlugin storagePlugin,
HBaseScanSpec scanSpec,
- List<SchemaPath> columns) {
+ List<SchemaPath> columns, int maxRecords) {
super(userName);
this.storagePlugin = storagePlugin;
this.storagePluginConfig = storagePlugin.getConfig();
this.hbaseScanSpec = scanSpec;
this.columns = columns == null ? ALL_COLUMNS : columns;
+ this.maxRecords = maxRecords;
init();
}
@@ -134,6 +139,22 @@ public class HBaseGroupScan extends AbstractGroupScan
implements DrillHBaseConst
this.filterPushedDown = that.filterPushedDown;
this.statsCalculator = that.statsCalculator;
this.scanSizeInBytes = that.scanSizeInBytes;
+ this.maxRecords = that.maxRecords;
+ }
+
+ private HBaseGroupScan(HBaseGroupScan that, int maxRecords) {
+ super(that);
+ this.columns = that.columns == null ? ALL_COLUMNS : that.columns;
+ this.hbaseScanSpec = that.hbaseScanSpec;
+ this.endpointFragmentMapping = that.endpointFragmentMapping;
+ this.regionsToScan = that.regionsToScan;
+ this.storagePlugin = that.storagePlugin;
+ this.storagePluginConfig = that.storagePluginConfig;
+ this.hTableDesc = that.hTableDesc;
+ this.filterPushedDown = that.filterPushedDown;
+ this.statsCalculator = that.statsCalculator;
+ this.scanSizeInBytes = that.scanSizeInBytes;
+ this.maxRecords = maxRecords;
}
@Override
@@ -329,7 +350,7 @@ public class HBaseGroupScan extends AbstractGroupScan
implements DrillHBaseConst
assert minorFragmentId < endpointFragmentMapping.size() : String.format(
"Mappings length [%d] should be greater than minor fragment id [%d]
but it isn't.", endpointFragmentMapping.size(),
minorFragmentId);
- return new HBaseSubScan(getUserName(), storagePlugin,
endpointFragmentMapping.get(minorFragmentId), columns);
+ return new HBaseSubScan(getUserName(), storagePlugin,
endpointFragmentMapping.get(minorFragmentId), columns, maxRecords);
}
@Override
@@ -380,9 +401,11 @@ public class HBaseGroupScan extends AbstractGroupScan
implements DrillHBaseConst
@Override
public String toString() {
- return "HBaseGroupScan [HBaseScanSpec="
- + hbaseScanSpec + ", columns="
- + columns + "]";
+ return new PlanStringBuilder(this)
+ .field("hbaseScanSpec", hbaseScanSpec)
+ .field("columns", columns)
+ .field("maxRecords", maxRecords)
+ .toString();
}
@JsonProperty("storage")
@@ -396,6 +419,31 @@ public class HBaseGroupScan extends AbstractGroupScan
implements DrillHBaseConst
return columns;
}
+ @JsonProperty("maxRecords")
+ public int getMaxRecords() {
+ return maxRecords;
+ }
+
+ /**
+ * Default is not to support limit pushdown.
+ */
+ @Override
+ public boolean supportsLimitPushdown() {
+ return true;
+ }
+
+ /**
+ * By default, return null to indicate row count based prune is not
supported.
+ * Each group scan subclass should override, if it supports row count based
prune.
+ */
+ @Override
+ public GroupScan applyLimit(int maxRecords) {
+ if (maxRecords == this.maxRecords){
+ return null;
+ }
+ return new HBaseGroupScan(this, maxRecords);
+ }
+
@JsonProperty
public HBaseScanSpec getHBaseScanSpec() {
return hbaseScanSpec;
@@ -423,6 +471,7 @@ public class HBaseGroupScan extends AbstractGroupScan
implements DrillHBaseConst
@VisibleForTesting
public HBaseGroupScan() {
super((String)null);
+ maxRecords = -1;
}
/**
diff --git
a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBasePushFilterIntoScan.java
b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBasePushFilterIntoScan.java
index abe0a45678..835d36a8a1 100644
---
a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBasePushFilterIntoScan.java
+++
b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBasePushFilterIntoScan.java
@@ -122,7 +122,7 @@ public abstract class HBasePushFilterIntoScan extends
StoragePluginOptimizerRule
}
final HBaseGroupScan newGroupsScan = new
HBaseGroupScan(groupScan.getUserName(), groupScan.getStoragePlugin(),
- newScanSpec, groupScan.getColumns());
+ newScanSpec, groupScan.getColumns(), groupScan.getMaxRecords());
newGroupsScan.setFilterPushedDown(true);
final ScanPrel newScanPrel = new ScanPrel(scan.getCluster(),
filter.getTraitSet(), newGroupsScan, scan.getRowType(), scan.getTable());
diff --git
a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java
b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java
index 4421166bf1..4f3195f367 100644
---
a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java
+++
b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java
@@ -84,13 +84,15 @@ public class HBaseRecordReader extends AbstractRecordReader
implements DrillHBas
private final Connection connection;
- public HBaseRecordReader(Connection connection,
HBaseSubScan.HBaseSubScanSpec subScanSpec, List<SchemaPath> projectedColumns) {
+ public HBaseRecordReader(Connection connection,
HBaseSubScan.HBaseSubScanSpec subScanSpec, List<SchemaPath> projectedColumns,
int maxRecords) {
this.connection = connection;
hbaseTableName = TableName.valueOf(
Preconditions.checkNotNull(subScanSpec, "HBase reader needs a sub-scan
spec").getTableName());
hbaseScan = new Scan(subScanSpec.getStartRow(), subScanSpec.getStopRow());
hbaseScanColumnsOnly = new Scan();
+ // Set the limit of rows for this scan. We will terminate the scan if the
number of returned rows reaches this value.
hbaseScan
+ .setLimit(maxRecords)
.setFilter(subScanSpec.getScanFilter())
.setCaching(TARGET_RECORD_COUNT);
diff --git
a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseScanBatchCreator.java
b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseScanBatchCreator.java
index 35657ba9e4..9c875b97d9 100644
---
a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseScanBatchCreator.java
+++
b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseScanBatchCreator.java
@@ -44,7 +44,7 @@ public class HBaseScanBatchCreator implements
BatchCreator<HBaseSubScan> {
if ((columns = subScan.getColumns())==null) {
columns = GroupScan.ALL_COLUMNS;
}
- readers.add(new
HBaseRecordReader(subScan.getStorageEngine().getConnection(), scanSpec,
columns));
+ readers.add(new
HBaseRecordReader(subScan.getStorageEngine().getConnection(), scanSpec,
columns, subScan.getMaxRecords()));
} catch (Exception e1) {
throw new ExecutionSetupException(e1);
}
diff --git
a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseStoragePlugin.java
b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseStoragePlugin.java
index f7c15ebde9..48c4fe4d6d 100644
---
a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseStoragePlugin.java
+++
b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseStoragePlugin.java
@@ -61,7 +61,7 @@ public class HBaseStoragePlugin extends AbstractStoragePlugin
{
@Override
public HBaseGroupScan getPhysicalScan(String userName, JSONOptions
selection) throws IOException {
HBaseScanSpec scanSpec = selection.getListWith(new
TypeReference<HBaseScanSpec>() {});
- return new HBaseGroupScan(userName, this, scanSpec, null);
+ return new HBaseGroupScan(userName, this, scanSpec, null, -1);
}
@Override
diff --git
a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSubScan.java
b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSubScan.java
index 6dd4a7bb93..ab3ba39a8d 100644
---
a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSubScan.java
+++
b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSubScan.java
@@ -50,27 +50,30 @@ public class HBaseSubScan extends AbstractBase implements
SubScan {
private final HBaseStoragePlugin hbaseStoragePlugin;
private final List<HBaseSubScanSpec> regionScanSpecList;
private final List<SchemaPath> columns;
+ private final int maxRecords;
@JsonCreator
public HBaseSubScan(@JacksonInject StoragePluginRegistry registry,
@JsonProperty("userName") String userName,
@JsonProperty("hbaseStoragePluginConfig")
HBaseStoragePluginConfig hbaseStoragePluginConfig,
@JsonProperty("regionScanSpecList")
LinkedList<HBaseSubScanSpec> regionScanSpecList,
- @JsonProperty("columns") List<SchemaPath> columns)
throws ExecutionSetupException {
+ @JsonProperty("columns") List<SchemaPath> columns,
+ @JsonProperty("maxRecords") int maxRecords) throws
ExecutionSetupException {
this(userName,
registry.resolve(hbaseStoragePluginConfig, HBaseStoragePlugin.class),
regionScanSpecList,
- columns);
+ columns, maxRecords);
}
public HBaseSubScan(String userName,
HBaseStoragePlugin hbaseStoragePlugin,
List<HBaseSubScanSpec> regionInfoList,
- List<SchemaPath> columns) {
+ List<SchemaPath> columns, int maxRecords) {
super(userName);
this.hbaseStoragePlugin = hbaseStoragePlugin;
this.regionScanSpecList = regionInfoList;
this.columns = columns;
+ this.maxRecords = maxRecords;
}
@JsonProperty
@@ -98,6 +101,11 @@ public class HBaseSubScan extends AbstractBase implements
SubScan {
return hbaseStoragePlugin;
}
+ @JsonIgnore
+ public int getMaxRecords() {
+ return maxRecords;
+ }
+
@Override
public <T, X, E extends Throwable> T accept(PhysicalVisitor<T, X, E>
physicalVisitor, X value) throws E {
return physicalVisitor.visitSubScan(this, value);
@@ -106,7 +114,7 @@ public class HBaseSubScan extends AbstractBase implements
SubScan {
@Override
public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) {
Preconditions.checkArgument(children.isEmpty());
- return new HBaseSubScan(getUserName(), hbaseStoragePlugin,
regionScanSpecList, columns);
+ return new HBaseSubScan(getUserName(), hbaseStoragePlugin,
regionScanSpecList, columns, maxRecords);
}
@Override
diff --git
a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseFilterPushDown.java
b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseFilterPushDown.java
index 7d69d7b1f7..2e74fabba6 100644
---
a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseFilterPushDown.java
+++
b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseFilterPushDown.java
@@ -44,6 +44,54 @@ public class TestHBaseFilterPushDown extends BaseHBaseTest {
PlanTestBase.testPlanMatchingPatterns(sqlHBase, expectedPlan,
excludedPlan);
}
+ @Test
+ public void testLimitPushDown() throws Exception {
+ final String sql = "SELECT\n"
+ + " *\n"
+ + "FROM\n"
+ + " hbase.`[TABLE_NAME]` tableName\n"
+ + "LIMIT 3\n";
+
+ runHBaseSQLVerifyCount(sql, 3);
+
+ final String[] expectedPlan = {"Limit\\(fetch\\=\\[3\\]\\)",
"maxRecords\\=3"};
+ final String[] excludedPlan ={};
+ final String sqlHBase = canonizeHBaseSQL(sql);
+ PlanTestBase.testPlanMatchingPatterns(sqlHBase, expectedPlan,
excludedPlan);
+ }
+
+ @Test
+ public void testLimitPushDownWithSpecial() throws Exception {
+ final String sql = "SELECT\n"
+ + " *\n"
+ + "FROM\n"
+ + " hbase.`[TABLE_NAME]` tableName\n"
+ + "LIMIT 0\n";
+
+ runHBaseSQLVerifyCount(sql, 0);
+
+ final String[] expectedPlan = {"Limit\\(fetch\\=\\[0\\]\\)",
"Limit\\(offset\\=\\[0\\]\\, fetch\\=\\[0\\]\\)", "maxRecords\\=0"};
+ final String[] excludedPlan ={};
+ final String sqlHBase = canonizeHBaseSQL(sql);
+ PlanTestBase.testPlanMatchingPatterns(sqlHBase, expectedPlan,
excludedPlan);
+ }
+
+ @Test
+ public void testLimitPushDownWithOffset() throws Exception {
+ final String sql = "SELECT\n"
+ + " *\n"
+ + "FROM\n"
+ + " hbase.`[TABLE_NAME]` tableName\n"
+ + "LIMIT 2 offset 2\n";
+
+ runHBaseSQLVerifyCount(sql, 2);
+
+ final String[] expectedPlan = {"Limit\\(offset\\=\\[2\\]\\,
fetch\\=\\[2\\]\\)", "maxRecords\\=4"};
+ final String[] excludedPlan ={};
+ final String sqlHBase = canonizeHBaseSQL(sql);
+ PlanTestBase.testPlanMatchingPatterns(sqlHBase, expectedPlan,
excludedPlan);
+ }
+
@Test
public void testFilterPushDownRowKeyNotEqual() throws Exception {
setColumnWidths(new int[] {8, 38, 38});