This is an automated email from the ASF dual-hosted git repository.
cgivre pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git
The following commit(s) were added to refs/heads/master by this push:
new 78a52087bb DRILL-8527: Hive Limit Push Down (#2997)
78a52087bb is described below
commit 78a52087bb85bcc7034157544193155b87503111
Author: shfshihuafeng <[email protected]>
AuthorDate: Thu Jul 10 22:28:49 2025 +0800
DRILL-8527: Hive Limit Push Down (#2997)
---
.../org/apache/drill/exec/store/hive/HiveScan.java | 41 +++++++++++++++++---
.../drill/exec/store/hive/HiveStoragePlugin.java | 2 +-
.../apache/drill/exec/store/hive/HiveSubScan.java | 12 +++++-
.../hive/readers/HiveDefaultRecordReader.java | 8 ++--
.../store/hive/readers/HiveTextRecordReader.java | 4 +-
.../store/hive/readers/ReadersInitializer.java | 9 ++---
.../apache/drill/exec/hive/TestHivePushDown.java | 45 ++++++++++++++++++++++
7 files changed, 103 insertions(+), 18 deletions(-)
diff --git
a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java
b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java
index 3fd1258d02..f26f68eb25 100644
---
a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java
+++
b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java
@@ -65,7 +65,7 @@ public class HiveScan extends AbstractGroupScan {
private final HiveReadEntry hiveReadEntry;
private final HiveMetadataProvider metadataProvider;
private final Map<String, String> confProperties;
-
+ private final int maxRecords;
private List<List<LogicalInputSplit>> mappings;
private List<LogicalInputSplit> inputSplits;
@@ -77,21 +77,23 @@ public class HiveScan extends AbstractGroupScan {
@JsonProperty("hiveStoragePluginConfig") final
HiveStoragePluginConfig hiveStoragePluginConfig,
@JsonProperty("columns") final List<SchemaPath> columns,
@JsonProperty("confProperties") final Map<String, String>
confProperties,
+ @JsonProperty("maxRecords") final int maxRecords,
@JacksonInject final StoragePluginRegistry pluginRegistry)
throws ExecutionSetupException {
this(userName,
hiveReadEntry,
pluginRegistry.resolve(hiveStoragePluginConfig,
HiveStoragePlugin.class),
columns,
- null, confProperties);
+ null, confProperties, maxRecords);
}
public HiveScan(final String userName, final HiveReadEntry hiveReadEntry,
final HiveStoragePlugin hiveStoragePlugin,
- final List<SchemaPath> columns, final HiveMetadataProvider
metadataProvider, final Map<String, String> confProperties) throws
ExecutionSetupException {
+ final List<SchemaPath> columns, final HiveMetadataProvider
metadataProvider, final Map<String, String> confProperties, int maxRecords)
throws ExecutionSetupException {
super(userName);
this.hiveReadEntry = hiveReadEntry;
this.columns = columns;
this.hiveStoragePlugin = hiveStoragePlugin;
this.confProperties = confProperties;
+ this.maxRecords = maxRecords;
if (metadataProvider == null) {
this.metadataProvider = new HiveMetadataProvider(userName,
hiveReadEntry, getHiveConf());
} else {
@@ -106,10 +108,20 @@ public class HiveScan extends AbstractGroupScan {
this.hiveStoragePlugin = that.hiveStoragePlugin;
this.metadataProvider = that.metadataProvider;
this.confProperties = that.confProperties;
+ this.maxRecords = that.maxRecords;
+ }
+ public HiveScan(final HiveScan that, int maxRecords) {
+ super(that);
+ this.columns = that.columns;
+ this.hiveReadEntry = that.hiveReadEntry;
+ this.hiveStoragePlugin = that.hiveStoragePlugin;
+ this.metadataProvider = that.metadataProvider;
+ this.confProperties = that.confProperties;
+ this.maxRecords = maxRecords;
}
public HiveScan clone(final HiveReadEntry hiveReadEntry) throws
ExecutionSetupException {
- return new HiveScan(getUserName(), hiveReadEntry, hiveStoragePlugin,
columns, metadataProvider, confProperties);
+ return new HiveScan(getUserName(), hiveReadEntry, hiveStoragePlugin,
columns, metadataProvider, confProperties, maxRecords);
}
@JsonProperty
@@ -133,6 +145,11 @@ public class HiveScan extends AbstractGroupScan {
return confProperties;
}
+ @JsonProperty
+ public int getMaxRecords() {
+ return maxRecords;
+ }
+
@JsonIgnore
public HiveStoragePlugin getStoragePlugin() {
return hiveStoragePlugin;
@@ -167,6 +184,19 @@ public class HiveScan extends AbstractGroupScan {
}
}
+ @Override
+ public GroupScan applyLimit(int maxRecords) {
+ if (maxRecords == this.maxRecords){
+ return null;
+ }
+ return new HiveScan(this, maxRecords);
+ }
+
+ @Override
+ public boolean supportsLimitPushdown() {
+ return true;
+ }
+
@Override
public SubScan getSpecificScan(final int minorFragmentId) throws
ExecutionSetupException {
try {
@@ -189,7 +219,7 @@ public class HiveScan extends AbstractGroupScan {
}
final HiveReadEntry subEntry = new
HiveReadEntry(hiveReadEntry.getTableWrapper(), parts);
- return new HiveSubScan(getUserName(), encodedInputSplits, subEntry,
splitTypes, columns, hiveStoragePlugin, confProperties);
+ return new HiveSubScan(getUserName(), encodedInputSplits, subEntry,
splitTypes, columns, hiveStoragePlugin, maxRecords, confProperties);
} catch (IOException | ReflectiveOperationException e) {
throw new ExecutionSetupException(e);
}
@@ -271,6 +301,7 @@ public class HiveScan extends AbstractGroupScan {
+ ", partitions= " + partitions
+ ", inputDirectories=" +
metadataProvider.getInputDirectories(hiveReadEntry)
+ ", confProperties=" + confProperties
+ + ", maxRecords=" + maxRecords
+ "]";
}
diff --git
a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePlugin.java
b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePlugin.java
index 997d0e39cb..a85159fe1e 100644
---
a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePlugin.java
+++
b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePlugin.java
@@ -113,7 +113,7 @@ public class HiveStoragePlugin extends
AbstractStoragePlugin {
}
}
- return new HiveScan(userName, hiveReadEntry, this, columns, null,
confProperties);
+ return new HiveScan(userName, hiveReadEntry, this, columns, null,
confProperties, -1);
} catch (ExecutionSetupException e) {
throw new IOException(e);
}
diff --git
a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveSubScan.java
b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveSubScan.java
index b3ec4de509..f49fcd0dd0 100644
---
a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveSubScan.java
+++
b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveSubScan.java
@@ -58,7 +58,7 @@ public class HiveSubScan extends AbstractBase implements
SubScan {
private final List<HivePartition> partitions;
private final List<SchemaPath> columns;
private final Map<String, String> confProperties;
-
+ private final int maxRecords;
@JsonCreator
public HiveSubScan(@JacksonInject StoragePluginRegistry registry,
@JsonProperty("userName") String userName,
@@ -67,6 +67,7 @@ public class HiveSubScan extends AbstractBase implements
SubScan {
@JsonProperty("splitClasses") List<String> splitClasses,
@JsonProperty("columns") List<SchemaPath> columns,
@JsonProperty("hiveStoragePluginConfig")
HiveStoragePluginConfig hiveStoragePluginConfig,
+ @JsonProperty("maxRecords") int maxRecords,
@JsonProperty("confProperties") Map<String, String>
confProperties)
throws IOException, ExecutionSetupException,
ReflectiveOperationException {
this(userName,
@@ -75,6 +76,7 @@ public class HiveSubScan extends AbstractBase implements
SubScan {
splitClasses,
columns,
registry.resolve(hiveStoragePluginConfig, HiveStoragePlugin.class),
+ maxRecords,
confProperties);
}
@@ -84,6 +86,7 @@ public class HiveSubScan extends AbstractBase implements
SubScan {
final List<String> splitClasses,
final List<SchemaPath> columns,
final HiveStoragePlugin hiveStoragePlugin,
+ final Integer maxRecords,
final Map<String, String> confProperties)
throws IOException, ReflectiveOperationException {
super(userName);
@@ -94,6 +97,7 @@ public class HiveSubScan extends AbstractBase implements
SubScan {
this.splitClasses = splitClasses;
this.columns = columns;
this.hiveStoragePlugin = hiveStoragePlugin;
+ this.maxRecords = maxRecords;
this.confProperties = confProperties;
for (int i = 0; i < splits.size(); i++) {
@@ -121,6 +125,10 @@ public class HiveSubScan extends AbstractBase implements
SubScan {
return columns;
}
+ public int getMaxRecords() {
+ return maxRecords;
+ }
+
@JsonProperty
public HiveStoragePluginConfig getHiveStoragePluginConfig() {
return hiveStoragePlugin.getConfig();
@@ -164,7 +172,7 @@ public class HiveSubScan extends AbstractBase implements
SubScan {
@Override
public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children)
throws ExecutionSetupException {
try {
- return new HiveSubScan(getUserName(), splits, hiveReadEntry,
splitClasses, columns, hiveStoragePlugin, confProperties);
+ return new HiveSubScan(getUserName(), splits, hiveReadEntry,
splitClasses, columns, hiveStoragePlugin, maxRecords, confProperties);
} catch (IOException | ReflectiveOperationException e) {
throw new ExecutionSetupException(e);
}
diff --git
a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/HiveDefaultRecordReader.java
b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/HiveDefaultRecordReader.java
index 0e5d54ef13..0605b42b92 100644
---
a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/HiveDefaultRecordReader.java
+++
b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/HiveDefaultRecordReader.java
@@ -217,7 +217,7 @@ public class HiveDefaultRecordReader extends
AbstractRecordReader {
*/
private StructField[] selectedStructFieldRefs;
-
+ private final int maxRecords;
/**
* Readers constructor called by initializer.
*
@@ -231,7 +231,7 @@ public class HiveDefaultRecordReader extends
AbstractRecordReader {
*/
public HiveDefaultRecordReader(HiveTableWithColumnCache table, HivePartition
partition,
Collection<InputSplit> inputSplits,
List<SchemaPath> projectedColumns,
- FragmentContext context, HiveConf hiveConf,
UserGroupInformation proxyUgi) {
+ FragmentContext context, HiveConf hiveConf,
UserGroupInformation proxyUgi, int maxRecords) {
this.hiveTable = table;
this.partition = partition;
this.hiveConf = hiveConf;
@@ -243,6 +243,7 @@ public class HiveDefaultRecordReader extends
AbstractRecordReader {
this.partitionValues = new Object[0];
setColumns(projectedColumns);
this.fragmentContext = context;
+ this.maxRecords = maxRecords;
}
@Override
@@ -396,7 +397,8 @@ public class HiveDefaultRecordReader extends
AbstractRecordReader {
try {
int recordCount;
- for (recordCount = 0; (recordCount < TARGET_RECORD_COUNT &&
hasNextValue(valueHolder)); recordCount++) {
+ int record = maxRecords > 0 ? maxRecords : TARGET_RECORD_COUNT;
+ for (recordCount = 0; (recordCount < record &&
hasNextValue(valueHolder)); recordCount++) {
Object deserializedHiveRecord =
partitionToTableSchemaConverter.convert(partitionDeserializer.deserialize((Writable)
valueHolder));
outputWriter.setPosition(recordCount);
readHiveRecordAndInsertIntoRecordBatch(deserializedHiveRecord);
diff --git
a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/HiveTextRecordReader.java
b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/HiveTextRecordReader.java
index dadd3bd8a5..cb873c78b0 100644
---
a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/HiveTextRecordReader.java
+++
b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/HiveTextRecordReader.java
@@ -58,8 +58,8 @@ public class HiveTextRecordReader extends
HiveDefaultRecordReader {
*/
public HiveTextRecordReader(HiveTableWithColumnCache table, HivePartition
partition,
Collection<InputSplit> inputSplits,
List<SchemaPath> projectedColumns,
- FragmentContext context, HiveConf hiveConf,
UserGroupInformation proxyUgi) {
- super(table, partition, inputSplits, projectedColumns, context, hiveConf,
proxyUgi);
+ FragmentContext context, HiveConf hiveConf,
UserGroupInformation proxyUgi, int maxRecords) {
+ super(table, partition, inputSplits, projectedColumns, context, hiveConf,
proxyUgi, maxRecords);
}
@Override
diff --git
a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/ReadersInitializer.java
b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/ReadersInitializer.java
index fc3d548086..15faccfecc 100644
---
a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/ReadersInitializer.java
+++
b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/ReadersInitializer.java
@@ -56,10 +56,10 @@ public class ReadersInitializer {
final UserGroupInformation proxyUgi =
ImpersonationUtil.createProxyUgi(config.getUserName(), ctx.getQueryUserName());
final List<List<InputSplit>> inputSplits = config.getInputSplits();
final HiveConf hiveConf = config.getHiveConf();
-
+ final int maxRecords = config.getMaxRecords();
if (inputSplits.isEmpty()) {
return Collections.singletonList(
- readerFactory.createReader(config.getTable(), null /*partition*/,
null /*split*/, config.getColumns(), ctx, hiveConf, proxyUgi)
+ readerFactory.createReader(config.getTable(), null /*partition*/,
null /*split*/, config.getColumns(), ctx, hiveConf, proxyUgi, maxRecords)
);
} else {
IndexedPartitions partitions = getPartitions(config);
@@ -70,7 +70,7 @@ public class ReadersInitializer {
partitions.get(idx),
inputSplits.get(idx),
config.getColumns(),
- ctx, hiveConf, proxyUgi))
+ ctx, hiveConf, proxyUgi, maxRecords))
.collect(Collectors.toList());
}
}
@@ -109,8 +109,7 @@ public class ReadersInitializer {
RecordReader createReader(HiveTableWithColumnCache table, HivePartition
partition,
Collection<InputSplit> inputSplits,
List<SchemaPath> projectedColumns,
- FragmentContext context, HiveConf hiveConf,
UserGroupInformation proxyUgi);
-
+ FragmentContext context, HiveConf hiveConf,
UserGroupInformation proxyUgi, int maxRecords);
}
/**
diff --git
a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/TestHivePushDown.java
b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/TestHivePushDown.java
new file mode 100644
index 0000000000..097bc8d863
--- /dev/null
+++
b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/TestHivePushDown.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.hive;
+
+import org.apache.drill.categories.HiveStorageTest;
+import org.apache.drill.categories.SlowTest;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.ExpectedException;
+
+import static org.junit.Assert.assertEquals;
+
+
+@Category({SlowTest.class, HiveStorageTest.class})
+public class TestHivePushDown extends HiveTestBase {
+
+ @Rule
+ public ExpectedException thrown = ExpectedException.none();
+
+ @Test
+ public void testLimitPushDown() throws Exception {
+ String query = "SELECT * FROM hive.`default`.kv LIMIT 1";
+
+ int actualRowCount = testSql(query);
+ assertEquals("Expected and actual row count should match", 1,
actualRowCount);
+
+ testPlanMatchingPatterns(query, new String[]{"LIMIT"}, new
String[]{"maxRecords=1"});
+ }
+}