This is an automated email from the ASF dual-hosted git repository.
cgivre pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git
The following commit(s) were added to refs/heads/master by this push:
new 6cea55defa DRILL-8526: Hive Predicate Push Down for ORC and Parquet
(#2995)
6cea55defa is described below
commit 6cea55defad03d158e1f958abc65fdbe9d970550
Author: shfshihuafeng <[email protected]>
AuthorDate: Thu Aug 7 23:09:40 2025 +0800
DRILL-8526: Hive Predicate Push Down for ORC and Parquet (#2995)
---
.../org/apache/drill/exec/store/hive/HiveScan.java | 35 ++-
.../drill/exec/store/hive/HiveStoragePlugin.java | 3 +
.../filter/HiveCompareFunctionsProcessor.java | 266 +++++++++++++++++++++
.../exec/store/hive/readers/filter/HiveFilter.java | 71 ++++++
.../hive/readers/filter/HiveFilterBuilder.java | 212 ++++++++++++++++
.../readers/filter/HivePushFilterIntoScan.java | 171 +++++++++++++
.../apache/drill/exec/TestHiveFilterPushDown.java | 124 ++++++++++
.../exec/store/hive/HiveTestDataGenerator.java | 13 +
8 files changed, 886 insertions(+), 9 deletions(-)
diff --git
a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java
b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java
index f26f68eb25..9ec05d9136 100644
---
a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java
+++
b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java
@@ -23,6 +23,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import org.apache.drill.common.PlanStringBuilder;
import org.apache.drill.common.exceptions.DrillRuntimeException;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.expression.SchemaPath;
@@ -39,11 +40,13 @@ import org.apache.drill.exec.store.StoragePluginRegistry;
import org.apache.drill.exec.store.hive.HiveMetadataProvider.HiveStats;
import org.apache.drill.exec.store.hive.HiveMetadataProvider.LogicalInputSplit;
import org.apache.drill.exec.store.hive.HiveTableWrapper.HivePartitionWrapper;
+import org.apache.drill.exec.store.hive.readers.filter.HiveFilter;
import org.apache.drill.exec.util.Utilities;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -54,6 +57,7 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import static
org.apache.drill.exec.store.hive.HiveUtilities.createPartitionWithSpecColumns;
+import static
org.apache.hadoop.hive.ql.io.sarg.ConvertAstToSearchArg.SARG_PUSHDOWN;
@JsonTypeName("hive-scan")
public class HiveScan extends AbstractGroupScan {
@@ -70,7 +74,7 @@ public class HiveScan extends AbstractGroupScan {
private List<LogicalInputSplit> inputSplits;
protected List<SchemaPath> columns;
-
+ private boolean filterPushedDown = false;
@JsonCreator
public HiveScan(@JsonProperty("userName") final String userName,
@JsonProperty("hiveReadEntry") final HiveReadEntry
hiveReadEntry,
@@ -171,6 +175,16 @@ public class HiveScan extends AbstractGroupScan {
return !(partitionKeys == null || partitionKeys.size() == 0);
}
+ @JsonIgnore
+ public void setFilterPushedDown(boolean isPushedDown) {
+ this.filterPushedDown = isPushedDown;
+ }
+
+ @JsonIgnore
+ public boolean isFilterPushedDown() {
+ return filterPushedDown;
+ }
+
@Override
public void applyAssignments(final List<CoordinationProtos.DrillbitEndpoint>
endpoints) {
mappings = new ArrayList<>();
@@ -295,14 +309,17 @@ public class HiveScan extends AbstractGroupScan {
public String toString() {
List<HivePartitionWrapper> partitions =
hiveReadEntry.getHivePartitionWrappers();
int numPartitions = partitions == null ? 0 : partitions.size();
- return "HiveScan [table=" + hiveReadEntry.getHiveTableWrapper()
- + ", columns=" + columns
- + ", numPartitions=" + numPartitions
- + ", partitions= " + partitions
- + ", inputDirectories=" +
metadataProvider.getInputDirectories(hiveReadEntry)
- + ", confProperties=" + confProperties
- + ", maxRecords=" + maxRecords
- + "]";
+ String SearchArgumentString = confProperties.get(SARG_PUSHDOWN);
+ SearchArgument searchArgument = SearchArgumentString == null ? null :
HiveFilter.create(SearchArgumentString);
+
+ return new PlanStringBuilder(this)
+ .field("table", hiveReadEntry.getHiveTableWrapper())
+ .field("columns", columns)
+ .field("numPartitions", numPartitions)
+ .field("inputDirectories",
metadataProvider.getInputDirectories(hiveReadEntry))
+ .field("confProperties", confProperties)
+ .field("SearchArgument", searchArgument)
+ .toString();
}
@Override
diff --git
a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePlugin.java
b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePlugin.java
index a85159fe1e..f18d005e7f 100644
---
a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePlugin.java
+++
b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePlugin.java
@@ -48,6 +48,7 @@ import
org.apache.drill.exec.server.options.SessionOptionManager;
import org.apache.drill.exec.store.AbstractStoragePlugin;
import org.apache.drill.exec.store.SchemaConfig;
import org.apache.drill.exec.store.StoragePluginOptimizerRule;
+import org.apache.drill.exec.store.hive.readers.filter.HivePushFilterIntoScan;
import org.apache.drill.exec.store.hive.schema.HiveSchemaFactory;
import com.google.common.collect.ImmutableSet;
@@ -200,6 +201,8 @@ public class HiveStoragePlugin extends
AbstractStoragePlugin {
options.getBoolean(ExecConstants.HIVE_OPTIMIZE_PARQUET_SCAN_WITH_NATIVE_READER))
{
ruleBuilder.add(ConvertHiveParquetScanToDrillParquetScan.INSTANCE);
}
+ ruleBuilder.add(HivePushFilterIntoScan.FILTER_ON_PROJECT);
+ ruleBuilder.add(HivePushFilterIntoScan.FILTER_ON_SCAN);
return ruleBuilder.build();
}
default:
diff --git
a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/filter/HiveCompareFunctionsProcessor.java
b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/filter/HiveCompareFunctionsProcessor.java
new file mode 100644
index 0000000000..6a76e7aca4
--- /dev/null
+++
b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/filter/HiveCompareFunctionsProcessor.java
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.hive.readers.filter;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import org.apache.drill.common.FunctionNames;
+import org.apache.drill.common.expression.CastExpression;
+import org.apache.drill.common.expression.FunctionCall;
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.expression.ValueExpressions;
+import org.apache.drill.common.expression.ValueExpressions.BooleanExpression;
+import org.apache.drill.common.expression.ValueExpressions.DateExpression;
+import org.apache.drill.common.expression.ValueExpressions.DoubleExpression;
+import org.apache.drill.common.expression.ValueExpressions.FloatExpression;
+import org.apache.drill.common.expression.ValueExpressions.IntExpression;
+import org.apache.drill.common.expression.ValueExpressions.LongExpression;
+import org.apache.drill.common.expression.ValueExpressions.QuotedString;
+import org.apache.drill.common.expression.ValueExpressions.TimeExpression;
+import org.apache.drill.common.expression.ValueExpressions.TimeStampExpression;
+import org.apache.drill.common.expression.visitors.AbstractExprVisitor;
+import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
+import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
+
+import java.sql.Timestamp;
+
+public class HiveCompareFunctionsProcessor extends
AbstractExprVisitor<Boolean, LogicalExpression, RuntimeException> {
+
+ private static final ImmutableSet<String> IS_FUNCTIONS_SET;
+
+ private Object value;
+ private PredicateLeaf.Type valueType;
+ private boolean success;
+ private SchemaPath path;
+ private String functionName;
+
+ public HiveCompareFunctionsProcessor(String functionName) {
+ this.success = false;
+ this.functionName = functionName;
+ }
+
+ static {
+ ImmutableSet.Builder<String> builder = ImmutableSet.builder();
+ IS_FUNCTIONS_SET = builder
+ .add(FunctionNames.IS_NOT_NULL)
+ .add("isNotNull")
+ .add("is not null")
+ .add(FunctionNames.IS_NULL)
+ .add("isNull")
+ .add("is null")
+ .add(FunctionNames.IS_TRUE)
+ .add(FunctionNames.IS_NOT_TRUE)
+ .add(FunctionNames.IS_FALSE)
+ .add(FunctionNames.IS_NOT_FALSE)
+ .build();
+
+ }
+
+ private static final ImmutableMap<String, String>
COMPARE_FUNCTIONS_TRANSPOSE_MAP;
+ static {
+ ImmutableMap.Builder<String, String> builder = ImmutableMap.builder();
+ COMPARE_FUNCTIONS_TRANSPOSE_MAP = builder
+ // binary functions
+ .put(FunctionNames.LIKE, FunctionNames.LIKE)
+ .put(FunctionNames.EQ, FunctionNames.EQ)
+ .put(FunctionNames.NE, FunctionNames.NE)
+ .put(FunctionNames.GE, FunctionNames.LE)
+ .put(FunctionNames.GT, FunctionNames.LT)
+ .put(FunctionNames.LE, FunctionNames.GE)
+ .put(FunctionNames.LT, FunctionNames.GT)
+ .build();
+ }
+
+ private static final ImmutableSet<Class<? extends LogicalExpression>>
VALUE_EXPRESSION_CLASSES;
+ static {
+ ImmutableSet.Builder<Class<? extends LogicalExpression>> builder =
ImmutableSet.builder();
+ VALUE_EXPRESSION_CLASSES = builder
+ .add(BooleanExpression.class)
+ .add(DateExpression.class)
+ .add(DoubleExpression.class)
+ .add(FloatExpression.class)
+ .add(IntExpression.class)
+ .add(LongExpression.class)
+ .add(QuotedString.class)
+ .add(TimeExpression.class)
+ .build();
+ }
+
+ public static boolean isCompareFunction(final String functionName) {
+ return COMPARE_FUNCTIONS_TRANSPOSE_MAP.keySet().contains(functionName);
+ }
+
+ public static boolean isIsFunction(final String funcName) {
+ return IS_FUNCTIONS_SET.contains(funcName);
+ }
+
+ // shows whether function is simplified IS FALSE
+ public static boolean isNot(final FunctionCall call, final String funcName) {
+ return !call.args().isEmpty()
+ && FunctionNames.NOT.equals(funcName);
+ }
+
+ public static HiveCompareFunctionsProcessor
createFunctionsProcessorInstance(final FunctionCall call) {
+ String functionName = call.getName();
+ HiveCompareFunctionsProcessor evaluator = new
HiveCompareFunctionsProcessor(functionName);
+
+ return createFunctionsProcessorInstanceInternal(call, evaluator);
+ }
+
+ protected static <T extends HiveCompareFunctionsProcessor> T
createFunctionsProcessorInstanceInternal(FunctionCall call, T evaluator) {
+ LogicalExpression nameArg = call.arg(0);
+ LogicalExpression valueArg = call.argCount() >= 2 ? call.arg(1) : null;
+ if (valueArg != null) { // binary function
+ if (VALUE_EXPRESSION_CLASSES.contains(nameArg.getClass())) {
+ LogicalExpression swapArg = valueArg;
+ valueArg = nameArg;
+ nameArg = swapArg;
+
evaluator.setFunctionName(COMPARE_FUNCTIONS_TRANSPOSE_MAP.get(evaluator.getFunctionName()));
+ }
+ evaluator.setSuccess(nameArg.accept(evaluator, valueArg));
+ } else if (call.arg(0) instanceof SchemaPath) {
+ evaluator.setPath((SchemaPath) nameArg);
+ }
+ evaluator.setSuccess(true);
+ return evaluator;
+ }
+
+ public boolean isSuccess() {
+ return success;
+ }
+
+ protected void setSuccess(boolean success) {
+ this.success = success;
+ }
+
+ public SchemaPath getPath() {
+ return path;
+ }
+
+ protected void setPath(SchemaPath path) {
+ this.path = path;
+ }
+
+ public String getFunctionName() {
+ return functionName;
+ }
+
+ protected void setFunctionName(String functionName) {
+ this.functionName = functionName;
+ }
+
+ public Object getValue() {
+ return value;
+ }
+
+ public void setValue(Object value) {
+ this.value = value;
+ }
+
+ public PredicateLeaf.Type getValueType() {
+ return valueType;
+ }
+
+ public void setValueType(PredicateLeaf.Type valueType) {
+ this.valueType = valueType;
+ }
+
+ @Override
+ public Boolean visitCastExpression(CastExpression e, LogicalExpression
valueArg) throws RuntimeException {
+ if (e.getInput() instanceof CastExpression || e.getInput() instanceof
SchemaPath) {
+ return e.getInput().accept(this, valueArg);
+ }
+ return false;
+ }
+
+ @Override
+ public Boolean visitUnknown(LogicalExpression e, LogicalExpression valueArg)
throws RuntimeException {
+ return false;
+ }
+
+ @Override
+ public Boolean visitSchemaPath(SchemaPath path, LogicalExpression valueArg)
throws RuntimeException {
+ if (valueArg instanceof QuotedString) {
+ this.value = ((QuotedString) valueArg).getString();
+ this.path = path;
+ this.valueType = PredicateLeaf.Type.STRING;
+ return true;
+ }
+
+ if (valueArg instanceof IntExpression) {
+ int expValue = ((IntExpression) valueArg).getInt();
+ this.value = ((Integer) expValue).longValue();
+ this.path = path;
+ this.valueType = PredicateLeaf.Type.LONG;
+ return true;
+ }
+
+ if (valueArg instanceof LongExpression) {
+ this.value = ((LongExpression) valueArg).getLong();
+ this.path = path;
+ this.valueType = PredicateLeaf.Type.LONG;
+ return true;
+ }
+
+ if (valueArg instanceof FloatExpression) {
+ this.value = ((FloatExpression) valueArg).getFloat();
+ this.path = path;
+ this.valueType = PredicateLeaf.Type.FLOAT;
+ return true;
+ }
+
+ if (valueArg instanceof DoubleExpression) {
+ this.value = ((DoubleExpression) valueArg).getDouble();
+ this.path = path;
+ this.valueType = PredicateLeaf.Type.FLOAT;
+ return true;
+ }
+
+ if (valueArg instanceof BooleanExpression) {
+ this.value = ((BooleanExpression) valueArg).getBoolean();
+ this.path = path;
+ this.valueType = PredicateLeaf.Type.BOOLEAN;
+ return true;
+ }
+
+ if (valueArg instanceof DateExpression) {
+ this.value = ((DateExpression) valueArg).getDate();
+ this.path = path;
+ this.valueType = PredicateLeaf.Type.LONG;
+ return true;
+ }
+
+ if (valueArg instanceof TimeStampExpression) {
+ long timeStamp = ((TimeStampExpression) valueArg).getTimeStamp();
+ this.value = new Timestamp(timeStamp);
+ this.path = path;
+ this.valueType = PredicateLeaf.Type.TIMESTAMP;
+ return true;
+ }
+
+ if (valueArg instanceof ValueExpressions.VarDecimalExpression) {
+ double v = ((ValueExpressions.VarDecimalExpression)
valueArg).getBigDecimal().doubleValue();
+ this.value = new HiveDecimalWritable(String.valueOf(v));
+ this.path = path;
+ this.valueType = PredicateLeaf.Type.DECIMAL;
+ return true;
+ }
+ return false;
+ }
+}
diff --git
a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/filter/HiveFilter.java
b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/filter/HiveFilter.java
new file mode 100644
index 0000000000..46d973696b
--- /dev/null
+++
b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/filter/HiveFilter.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.hive.readers.filter;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.commons.codec.binary.Base64;
+import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
+import org.apache.hadoop.hive.ql.io.sarg.SearchArgumentImpl;
+import org.apache.hive.com.esotericsoftware.kryo.Kryo;
+import org.apache.hive.com.esotericsoftware.kryo.io.Input;
+import org.apache.hive.com.esotericsoftware.kryo.io.Output;
+/**
+ * Primary interface for <a href="http://en.wikipedia.org/wiki/Sargable">
+ * SearchArgument</a>, which are the subset of predicates
+ * that can be pushed down to the RecordReader. Each SearchArgument consists
+ * of a series of SearchClauses that must each be true for the row to be
+ * accepted by the filter.
+ *
+ * This requires that the filter be normalized into conjunctive normal form
+ * (<a href="http://en.wikipedia.org/wiki/Conjunctive_normal_form">CNF</a>).
+ */
+public class HiveFilter {
+
+ private final SearchArgument searchArgument;
+ private final static ThreadLocal<Kryo> kryo = new ThreadLocal<Kryo>() {
+ protected Kryo initialValue() { return new Kryo(); }
+ };
+
+ public HiveFilter(SearchArgument searchArgument) {
+ this.searchArgument = searchArgument;
+ }
+
+ private static String toKryo(SearchArgument sarg) {
+ Output out = new Output(4 * 1024, 10 * 1024 * 1024);
+ new Kryo().writeObject(out, sarg);
+ out.close();
+ return Base64.encodeBase64String(out.toBytes());
+ }
+
+ @VisibleForTesting
+ public static SearchArgument create(String kryo) {
+ return create(Base64.decodeBase64(kryo));
+ }
+
+ private static SearchArgument create(byte[] kryoBytes) {
+ return kryo.get().readObject(new Input(kryoBytes),
SearchArgumentImpl.class);
+ }
+
+ public SearchArgument getSearchArgument() {
+ return searchArgument;
+ }
+
+ public String getSearchArgumentString() {
+ return toKryo(searchArgument);
+ }
+}
diff --git
a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/filter/HiveFilterBuilder.java
b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/filter/HiveFilterBuilder.java
new file mode 100644
index 0000000000..92c23b67cc
--- /dev/null
+++
b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/filter/HiveFilterBuilder.java
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.hive.readers.filter;
+
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.drill.common.FunctionNames;
+import org.apache.drill.common.expression.BooleanOperator;
+import org.apache.drill.common.expression.FieldReference;
+import org.apache.drill.common.expression.FunctionCall;
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.expression.visitors.AbstractExprVisitor;
+import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
+import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
+import org.apache.hadoop.hive.ql.io.sarg.SearchArgumentFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.List;
+
+public class HiveFilterBuilder extends
AbstractExprVisitor<SearchArgument.Builder, Void, RuntimeException> {
+
+ private static final Logger logger =
LoggerFactory.getLogger(HiveFilterBuilder.class);
+
+ private final LogicalExpression le;
+ private final SearchArgument.Builder builder;
+ private final HashMap<String, SqlTypeName> dataTypeMap;
+
+ private boolean allExpressionsConverted = false;
+
+ HiveFilterBuilder(final LogicalExpression le, final HashMap<String,
SqlTypeName> dataTypeMap) {
+ this.le = le;
+ this.dataTypeMap = dataTypeMap;
+ this.builder = SearchArgumentFactory.newBuilder();
+ }
+
+ public HiveFilter parseTree() {
+ SearchArgument.Builder accept = le.accept(this, null);
+ SearchArgument searchArgument = builder.build();
+ if (accept != null) {
+ searchArgument = accept.build();
+ }
+ return new HiveFilter(searchArgument);
+ }
+
+ public boolean isAllExpressionsConverted() {
+ return allExpressionsConverted;
+ }
+
+ @Override
+ public SearchArgument.Builder visitUnknown(LogicalExpression e, Void value)
throws RuntimeException {
+ allExpressionsConverted = false;
+ return null;
+ }
+
+ @Override
+ public SearchArgument.Builder visitSchemaPath(SchemaPath path, Void value)
throws RuntimeException {
+ if (path instanceof FieldReference) {
+ String fieldName = path.getAsNamePart().getName();
+ SqlTypeName sqlTypeName = dataTypeMap.get(fieldName);
+ switch (sqlTypeName) {
+ case BOOLEAN:
+ PredicateLeaf.Type valueType = convertLeafType(sqlTypeName);
+ builder.startNot().equals(fieldName, valueType, false).end();
+ break;
+ default:
+ // otherwise, we don't know what to do so make it a maybe or do
nothing
+ builder.literal(SearchArgument.TruthValue.YES_NO_NULL);
+ }
+ }
+ return builder;
+ }
+
+ @Override
+ public SearchArgument.Builder visitBooleanOperator(BooleanOperator op, Void
value) throws RuntimeException {
+ return visitFunctionCall(op, value);
+ }
+
+ @Override
+ public SearchArgument.Builder visitFunctionCall(FunctionCall call, Void
value) throws RuntimeException {
+ String functionName = call.getName();
+ List<LogicalExpression> args = call.args();
+ if (HiveCompareFunctionsProcessor.isCompareFunction(functionName) ||
HiveCompareFunctionsProcessor.isIsFunction(functionName) ||
HiveCompareFunctionsProcessor.isNot(call, functionName)) {
+ HiveCompareFunctionsProcessor processor =
HiveCompareFunctionsProcessor.createFunctionsProcessorInstance(call);
+ if (processor.isSuccess()) {
+ return buildSearchArgument(processor);
+ }
+ } else {
+ switch (functionName) {
+ case FunctionNames.AND:
+ builder.startAnd();
+ break;
+ case FunctionNames.OR:
+ builder.startOr();
+ break;
+ default:
+ logger.warn("Unsupported logical operator:{} for push down",
functionName);
+ return builder;
+ }
+ for (LogicalExpression arg : args) {
+ arg.accept(this, null);
+ }
+ builder.end();
+ }
+ return builder;
+ }
+
+ private SearchArgument.Builder buildSearchArgument(final
HiveCompareFunctionsProcessor processor) {
+ String functionName = processor.getFunctionName();
+ SchemaPath field = processor.getPath();
+ Object fieldValue = processor.getValue();
+ PredicateLeaf.Type valueType = processor.getValueType();
+ SqlTypeName sqlTypeName = dataTypeMap.get(field.getAsNamePart().getName());
+ if (fieldValue == null) {
+ valueType = convertLeafType(sqlTypeName);
+ }
+ if (valueType == null) {
+ return builder;
+ }
+ switch (functionName) {
+ case FunctionNames.EQ:
+ builder.startAnd().equals(field.getAsNamePart().getName(), valueType,
fieldValue).end();
+ break;
+ case FunctionNames.NE:
+ builder.startNot().equals(field.getAsNamePart().getName(), valueType,
fieldValue).end();
+ break;
+ case FunctionNames.GE:
+ builder.startNot().lessThan(field.getAsNamePart().getName(),
valueType, fieldValue).end();
+ break;
+ case FunctionNames.GT:
+ builder.startNot().lessThanEquals(field.getAsNamePart().getName(),
valueType, fieldValue).end();
+ break;
+ case FunctionNames.LE:
+ builder.startAnd().lessThanEquals(field.getAsNamePart().getName(),
valueType, fieldValue).end();
+ break;
+ case FunctionNames.LT:
+ builder.startAnd().lessThan(field.getAsNamePart().getName(),
valueType, fieldValue).end();
+ break;
+ case FunctionNames.IS_NULL:
+ case "isNull":
+ case "is null":
+ builder.startAnd().isNull(field.getAsNamePart().getName(),
valueType).end();
+ break;
+ case FunctionNames.IS_NOT_NULL:
+ case "isNotNull":
+ case "is not null":
+ builder.startNot().isNull(field.getAsNamePart().getName(),
valueType).end();
+ break;
+ case FunctionNames.IS_NOT_TRUE:
+ case FunctionNames.IS_FALSE:
+ case FunctionNames.NOT:
+ builder.startNot().equals(field.getAsNamePart().getName(), valueType,
true).end();
+ break;
+ case FunctionNames.IS_TRUE:
+ case FunctionNames.IS_NOT_FALSE:
+ builder.startNot().equals(field.getAsNamePart().getName(), valueType,
false).end();
+ break;
+ }
+ return builder;
+ }
+
+ private PredicateLeaf.Type convertLeafType(SqlTypeName sqlTypeName) {
+ PredicateLeaf.Type type = null;
+ switch (sqlTypeName) {
+ case BOOLEAN:
+ type = PredicateLeaf.Type.BOOLEAN;
+ break;
+ case TINYINT:
+ case SMALLINT:
+ case INTEGER:
+ case BIGINT:
+ type = PredicateLeaf.Type.LONG;
+ break;
+ case DOUBLE:
+ case FLOAT:
+ type = PredicateLeaf.Type.FLOAT;
+ break;
+ case DECIMAL:
+ type = PredicateLeaf.Type.DECIMAL;
+ break;
+ case DATE:
+ type = PredicateLeaf.Type.DATE;
+ break;
+ case TIMESTAMP:
+ type = PredicateLeaf.Type.TIMESTAMP;
+ break;
+ case CHAR:
+ case VARCHAR:
+ type = PredicateLeaf.Type.STRING;
+ break;
+ default:
+ logger.warn("Not support push down type:" + sqlTypeName.getName());
+ }
+ return type;
+ }
+}
diff --git
a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/filter/HivePushFilterIntoScan.java
b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/filter/HivePushFilterIntoScan.java
new file mode 100644
index 0000000000..f372521ec7
--- /dev/null
+++
b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/readers/filter/HivePushFilterIntoScan.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.hive.readers.filter;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelOptRuleOperand;
+import org.apache.calcite.plan.RelOptUtil;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.exec.planner.logical.DrillOptiq;
+import org.apache.drill.exec.planner.logical.DrillParseContext;
+import org.apache.drill.exec.planner.logical.RelOptHelper;
+import org.apache.drill.exec.planner.physical.FilterPrel;
+import org.apache.drill.exec.planner.physical.PrelUtil;
+import org.apache.drill.exec.planner.physical.ProjectPrel;
+import org.apache.drill.exec.planner.physical.ScanPrel;
+import org.apache.drill.exec.store.StoragePluginOptimizerRule;
+import org.apache.drill.exec.store.hive.HiveScan;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static
org.apache.hadoop.hive.ql.io.sarg.ConvertAstToSearchArg.SARG_PUSHDOWN;
+
+public abstract class HivePushFilterIntoScan extends
StoragePluginOptimizerRule {
+
+ private HivePushFilterIntoScan(RelOptRuleOperand operand, String
description) {
+ super(operand, description);
+ }
+
+ public static final StoragePluginOptimizerRule FILTER_ON_SCAN =
+ new HivePushFilterIntoScan(RelOptHelper.some(FilterPrel.class,
+ RelOptHelper.any(ScanPrel.class)),
"HivePushFilterIntoScan:Filter_On_Scan") {
+
+ @Override
+ public void onMatch(RelOptRuleCall call) {
+ final ScanPrel scan = call.rel(1);
+ final FilterPrel filter = call.rel(0);
+ final RexNode condition = filter.getCondition();
+
+ HiveScan groupScan = (HiveScan) scan.getGroupScan();
+ if (groupScan.isFilterPushedDown()) {
+ /*
+ * The rule can get triggered again due to the transformed "scan =>
filter" sequence
+ * created by the earlier execution of this rule when we could not do
a complete
+ * conversion of Optiq Filter's condition to Hive Filter. In such
cases, we rely upon
+ * this flag to not do a re-processing of the rule on the already
transformed call.
+ */
+ return;
+ }
+
+ doPushFilterToScan(call, filter, null, scan, groupScan, condition);
+ }
+
+ @Override
+ public boolean matches(RelOptRuleCall call) {
+ final ScanPrel scan = call.rel(1);
+ if (scan.getGroupScan() instanceof HiveScan) {
+ return super.matches(call);
+ }
+ return false;
+ }
+ };
+
+ public static final StoragePluginOptimizerRule FILTER_ON_PROJECT =
+ new HivePushFilterIntoScan(RelOptHelper.some(FilterPrel.class,
+ RelOptHelper.some(ProjectPrel.class,
RelOptHelper.any(ScanPrel.class))),
+ "HivePushFilterIntoScan:Filter_On_Project") {
+
+ @Override
+ public void onMatch(RelOptRuleCall call) {
+ final ScanPrel scan = call.rel(2);
+ final ProjectPrel project = call.rel(1);
+ final FilterPrel filter = call.rel(0);
+
+ HiveScan groupScan = (HiveScan) scan.getGroupScan();
+ if (groupScan.isFilterPushedDown()) {
+ /*
+ * The rule can get triggered again due to the transformed "scan =>
filter" sequence
+ * created by the earlier execution of this rule when we could not do
a complete
+ * conversion of Optiq Filter's condition to Hive Filter. In such
cases, we rely upon
+ * this flag to not do a re-processing of the rule on the already
transformed call.
+ */
+ return;
+ }
+
+ // convert the filter to one that references the child of the project
+ final RexNode condition =
RelOptUtil.pushPastProject(filter.getCondition(), project);
+
+ doPushFilterToScan(call, filter, project, scan, groupScan, condition);
+ }
+
+ @Override
+ public boolean matches(RelOptRuleCall call) {
+ final ScanPrel scan = call.rel(2);
+ if (scan.getGroupScan() instanceof HiveScan) {
+ return super.matches(call);
+ }
+ return false;
+ }
+ };
+
+
+ protected void doPushFilterToScan(final RelOptRuleCall call, final
FilterPrel filter,
+ final ProjectPrel project, final ScanPrel scan, final HiveScan groupScan,
+ final RexNode condition) {
+
+ HashMap<String, SqlTypeName> dataTypeMap = new HashMap<>();
+ List<RelDataTypeField> fieldList = scan.getRowType().getFieldList();
+ for (RelDataTypeField relDataTypeField : fieldList) {
+ String name = relDataTypeField.getName();
+ SqlTypeName sqlTypeName = relDataTypeField.getValue().getSqlTypeName();
+ dataTypeMap.put(name, sqlTypeName);
+ }
+
+ final LogicalExpression conditionExp = DrillOptiq.toDrill(new
DrillParseContext(PrelUtil.getPlannerSettings(call.getPlanner())),
+ scan, condition);
+ final HiveFilterBuilder orcFilterBuilder = new
HiveFilterBuilder(conditionExp, dataTypeMap);
+ final HiveFilter newScanSpec = orcFilterBuilder.parseTree();
+
+ if (newScanSpec == null) {
+ return; //no filter pushdown ==> No transformation.
+ }
+
+ String searchArgument = newScanSpec.getSearchArgumentString();
+ Map<String, String> confProperties = groupScan.getConfProperties();
+ confProperties.put(SARG_PUSHDOWN, searchArgument);
+ HiveScan newGroupsScan = null;
+ try {
+ newGroupsScan = new HiveScan(groupScan.getUserName(),
groupScan.getHiveReadEntry(),
+ groupScan.getStoragePlugin(), groupScan.getColumns(), null,
confProperties, groupScan.getMaxRecords());
+ } catch (ExecutionSetupException e) {
+ //Why does constructor method HiveScan throw exception?
+ e.printStackTrace();
+ throw new RuntimeException(e);
+ }
+ newGroupsScan.setFilterPushedDown(true);
+
+ final ScanPrel newScanPrel = new ScanPrel(scan.getCluster(),
filter.getTraitSet(),
+ newGroupsScan, scan.getRowType(), scan.getTable());
+ // Depending on whether is a project in the middle, assign either scan or
copy of project to
+ // childRel.
+ final RelNode childRel = project == null ? newScanPrel :
project.copy(project.getTraitSet(),
+ ImmutableList.of(newScanPrel));
+ /*
+ * we could not convert the entire filter condition expression into an
Hive orc filter.
+ */
+ call.transformTo(filter.copy(filter.getTraitSet(),
ImmutableList.of(childRel)));
+ }
+}
diff --git
a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/TestHiveFilterPushDown.java
b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/TestHiveFilterPushDown.java
new file mode 100644
index 0000000000..b821ae5a3b
--- /dev/null
+++
b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/TestHiveFilterPushDown.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec;
+
+import org.apache.drill.categories.HiveStorageTest;
+import org.apache.drill.categories.SlowTest;
+import org.apache.drill.exec.hive.HiveTestBase;
+import org.apache.drill.exec.planner.physical.PlannerSettings;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import static org.junit.Assert.assertEquals;
+
+@Category({SlowTest.class, HiveStorageTest.class})
+public class TestHiveFilterPushDown extends HiveTestBase {
+
+ @BeforeClass
+ public static void init() {
+ //set false for test parquet push down
+
setSessionOption(ExecConstants.HIVE_OPTIMIZE_PARQUET_SCAN_WITH_NATIVE_READER,
false);
+ setSessionOption(PlannerSettings.ENABLE_DECIMAL_DATA_TYPE_KEY, true);
+ }
+
+ @AfterClass
+ public static void cleanup() {
+
resetSessionOption(ExecConstants.HIVE_OPTIMIZE_PARQUET_SCAN_WITH_NATIVE_READER);
+ resetSessionOption(PlannerSettings.ENABLE_DECIMAL_DATA_TYPE_KEY);
+ }
+
+ @Test
+ public void testPushDownWithEqualForOrc() throws Exception {
+ String query = "select * from hive.orc_push_down where key = 1";
+
+ int actualRowCount = testSql(query);
+ assertEquals("Expected and actual row count should match", 4,
actualRowCount);
+
+ testPlanMatchingPatterns(query,
+ new String[]{"Filter\\(condition=\\[=\\($0, 1\\)\\]\\)",
+ "SearchArgument=leaf-0 = \\(EQUALS key 1\\), expr =
leaf-0"},
+ new String[]{});
+ }
+
+ @Test
+ public void testPushDownWithNotEqualForOrc() throws Exception {
+ String query = "select * from hive.orc_push_down where key <> 1";
+
+ int actualRowCount = testSql(query);
+ assertEquals("Expected and actual row count should match", 2,
actualRowCount);
+
+ testPlanMatchingPatterns(query,
+ new String[]{"Filter\\(condition=\\[=\\<\\>\\($0, 1\\)\\]\\)",
+ "SearchArgument=leaf-0 = \\(EQUALS key 1\\), expr =
\\(not leaf-0\\)"},
+ new String[]{});
+ }
+
+ @Test
+ public void testPushDownWithGreaterThanForOrc() throws Exception {
+ String query = "select * from hive.orc_push_down where key > 1";
+
+ int actualRowCount = testSql(query);
+ assertEquals("Expected and actual row count should match", 2,
actualRowCount);
+
+ testPlanMatchingPatterns(query,
+ new String[]{"Filter\\(condition=\\[=\\>\\($0, 1\\)\\]\\)",
+ "SearchArgument=leaf-0 = \\(LESS_THAN_EQUALS key 1\\),
expr = \\(not leaf-0\\)"},
+ new String[]{});
+ }
+
+ @Test
+ public void testPushDownWithLessThanForOrc() throws Exception {
+ String query = "select * from hive.orc_push_down where key < 2";
+
+ int actualRowCount = testSql(query);
+ assertEquals("Expected and actual row count should match", 4,
actualRowCount);
+
+ testPlanMatchingPatterns(query,
+ new String[]{"Filter\\(condition=\\[=\\<\\($0, 2\\)\\]\\)",
+ "SearchArgument=leaf-0 = \\(LESS_THAN key 2\\), expr =
leaf-0"},
+ new String[]{});
+ }
+
+ @Test
+ public void testPushDownWithAndForOrc() throws Exception {
+ String query = "select * from hive.orc_push_down where key = 2 and
var_key = 'var_6'";
+
+ int actualRowCount = testSql(query);
+ assertEquals("Expected and actual row count should match", 1,
actualRowCount);
+
+ testPlanMatchingPatterns(query,
+ new String[]{"Filter\\(condition=\\[AND\\(=\\($0, 2\\),
=\\($2, 'var_6'\\)\\)\\]\\)",
+ "SearchArgument=leaf-0 = \\(EQUALS key 2\\), leaf-1 =
\\(EQUALS var_key var_6\\), expr = \\(and leaf-0 leaf-1\\)"},
+ new String[]{});
+ }
+
+ @Test
+ public void testPushDownWithOrForOrc() throws Exception {
+ String query = "select * from hive.orc_push_down where key = 2 and
var_key = 'var_1'";
+
+ int actualRowCount = testSql(query);
+ assertEquals("Expected and actual row count should match", 3,
actualRowCount);
+
+ testPlanMatchingPatterns(query,
+ new String[]{"Filter\\(condition=\\[OR\\(=\\($0, 2\\), =\\($2,
'var_1'\\)\\)\\]\\)",
+ "SearchArgument=leaf-0 = \\(EQUALS key 2\\), leaf-1 =
\\(EQUALS var_key var_1\\), expr = \\(or leaf-0 leaf-1\\)"},
+ new String[]{});
+ }
+}
diff --git
a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/store/hive/HiveTestDataGenerator.java
b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/store/hive/HiveTestDataGenerator.java
index 106fb22963..0a8b7bc3d9 100644
---
a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/store/hive/HiveTestDataGenerator.java
+++
b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/store/hive/HiveTestDataGenerator.java
@@ -454,8 +454,21 @@ public class HiveTestDataGenerator {
"INNER JOIN kv ON kv.key = dk.key");
createTableWithEmptyParquet(hiveDriver);
+ createTestDataForDrillORCPushDownReaderTests(hiveDriver);
}
+ private void createTestDataForDrillORCPushDownReaderTests(Driver hiveDriver)
{
+ // Hive managed table that has data qualified for Drill orc filter push
down
+ executeQuery(hiveDriver, "create table orc_push_down(key int, int_key int,
var_key varchar(10), dec_key decimal(5, 2), boolean_key boolean) stored as
orc");
+ // each insert is created in separate file
+ executeQuery(hiveDriver, "insert into table orc_push_down values (1, 1,
'var_1', 1.11,true), (1, 2, 'var_2', 2.22,false)");
+ executeQuery(hiveDriver, "insert into table orc_push_down values (1, 3,
'var_3', 3.33,true), (1, 4, 'var_4', 4.44,true)");
+ executeQuery(hiveDriver, "insert into table orc_push_down values (2, 5,
'var_5', 5.55,false), (2, 6, 'var_6', 6.66,true)");
+ executeQuery(hiveDriver, "insert into table orc_push_down values (null, 7,
'var_7', 7.77,false), (null, 8, 'var_8', 8.88,false)");
+ }
+
+
+
private void createTestDataForDrillNativeParquetReaderTests(Driver
hiveDriver) {
// Hive managed table that has data qualified for Drill native filter push
down
executeQuery(hiveDriver, "create table kv_native(key int, int_key int,
var_key varchar(10), dec_key decimal(5, 2)) stored as parquet");