This is an automated email from the ASF dual-hosted git repository. codope pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push: new 306f2bb03e6 [HUDI-6077] Add more partition push down filters (#8452) 306f2bb03e6 is described below commit 306f2bb03e6a42e2bca5f4e0db0379a87ad58a66 Author: Rex(Hui) An <bonean...@gmail.com> AuthorDate: Mon Jul 24 18:43:10 2023 +0800 [HUDI-6077] Add more partition push down filters (#8452) --- .../org/apache/spark/sql/hudi/SparkAdapter.scala | 7 +- .../functional/TestHoodieBackedMetadata.java | 2 +- .../io/storage/row/TestHoodieRowCreateHandle.java | 11 +- .../org/apache/hudi/BaseHoodieTableFileIndex.java | 22 ++ .../java/org/apache/hudi/expression/ArrayData.java | 21 +- .../apache/hudi/expression/BinaryExpression.java | 37 +- .../org/apache/hudi/expression/BindVisitor.java | 187 ++++++++++ .../org/apache/hudi/expression/BoundReference.java | 32 +- .../org/apache/hudi/expression/Comparators.java | 52 +++ .../org/apache/hudi}/expression/Expression.java | 32 +- .../apache/hudi}/expression/ExpressionVisitor.java | 16 +- .../apache/hudi}/expression/LeafExpression.java | 11 +- .../java/org/apache/hudi/expression/Literal.java | 121 ++++++ .../org/apache/hudi/expression/NameReference.java | 20 +- .../apache/hudi/expression/PartialBindVisitor.java | 153 ++++++++ .../java/org/apache/hudi/expression/Predicate.java | 29 +- .../org/apache/hudi/expression/Predicates.java | 413 +++++++++++++++++++++ .../org/apache/hudi/expression/StructLike.java | 13 +- .../java/org/apache/hudi/internal/schema/Type.java | 80 +++- .../org/apache/hudi/internal/schema/Types.java | 37 +- .../hudi/internal/schema/utils/Conversions.java | 49 +++ .../hudi/metadata/AbstractHoodieTableMetadata.java | 96 +++++ .../apache/hudi/metadata/BaseTableMetadata.java | 24 +- .../metadata/FileSystemBackedTableMetadata.java | 111 +++++- .../hudi/metadata/HoodieBackedTableMetadata.java | 23 ++ .../apache/hudi/metadata/HoodieTableMetadata.java | 10 + .../hudi/expression/TestPartialBindVisitor.java | 83 +++++ .../schema/utils/TestAvroSchemaEvolutionUtils.java | 2 +- .../TestFileSystemBackedTableMetadata.java | 12 +- .../scala/org/apache/hudi/SparkFilterHelper.scala | 122 ++++++ .../apache/hudi/SparkHoodieTableFileIndex.scala | 92 +++-- .../org/apache/hudi/TestHoodieFileIndex.scala | 72 ++++ .../org/apache/hudi/TestSparkFilterHelper.scala | 185 +++++++++ .../sql/hudi/TestHoodieInternalRowUtils.scala | 2 +- .../sql/hudi/TestLazyPartitionPathFetching.scala | 30 ++ .../TestPartitionPushDownWhenListingPaths.scala | 109 ++++++ .../apache/spark/sql/adapter/Spark2Adapter.scala | 15 +- .../spark/sql/adapter/BaseSpark3Adapter.scala | 7 +- .../apache/hudi/hive/util/FilterGenVisitor.java | 84 +++-- .../hudi/hive/util/PartitionFilterGenerator.java | 64 ++-- .../hudi/utilities/HoodieDataTableValidator.java | 2 +- .../apache/hudi/utilities/HoodieRepairTool.java | 2 +- 42 files changed, 2260 insertions(+), 232 deletions(-) diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala index cc72cf23a6b..782a49ac189 100644 --- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala @@ -36,7 +36,7 @@ import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier} import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat import org.apache.spark.sql.parser.HoodieExtendedParserInterface -import org.apache.spark.sql.sources.BaseRelation +import org.apache.spark.sql.sources.{BaseRelation, Filter} import org.apache.spark.sql.types.{DataType, Metadata, StructType} import org.apache.spark.storage.StorageLevel @@ -203,4 +203,9 @@ trait SparkAdapter extends Serializable { * Converts instance of [[StorageLevel]] to a corresponding string */ def convertStorageLevelToString(level: StorageLevel): String + + /** + * Tries to translate a Catalyst Expression into data source Filter + */ + def translateFilter(predicate: Expression, supportNestedPredicatePushdown: Boolean = false): Option[Filter] } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java index 56fabb362b6..5288947bb19 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java @@ -3204,7 +3204,7 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase { HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc); // Partitions should match - FileSystemBackedTableMetadata fsBackedTableMetadata = new FileSystemBackedTableMetadata(engineContext, + FileSystemBackedTableMetadata fsBackedTableMetadata = new FileSystemBackedTableMetadata(engineContext, metaClient.getTableConfig(), new SerializableConfiguration(hadoopConf), config.getBasePath(), config.shouldAssumeDatePartitioning()); List<String> fsPartitions = fsBackedTableMetadata.getAllPartitionPaths(); List<String> metadataPartitions = tableMetadata.getAllPartitionPaths(); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/storage/row/TestHoodieRowCreateHandle.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/storage/row/TestHoodieRowCreateHandle.java index 6dc90af78ae..42e84ebe7fd 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/storage/row/TestHoodieRowCreateHandle.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/storage/row/TestHoodieRowCreateHandle.java @@ -25,7 +25,6 @@ import org.apache.hudi.common.model.HoodieWriteStat; import org.apache.hudi.common.testutils.HoodieTestDataGenerator; import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.config.HoodieWriteConfig; -import org.apache.hudi.exception.HoodieInsertException; import org.apache.hudi.exception.TableNotFoundException; import org.apache.hudi.table.HoodieSparkTable; import org.apache.hudi.table.HoodieTable; @@ -190,16 +189,8 @@ public class TestHoodieRowCreateHandle extends HoodieClientTestHarness { HoodieTable table = HoodieSparkTable.create(cfg, context, metaClient); new HoodieRowCreateHandle(table, cfg, " def", UUID.randomUUID().toString(), "001", RANDOM.nextInt(100000), RANDOM.nextLong(), RANDOM.nextLong(), SparkDatasetTestUtils.STRUCT_TYPE); fail("Should have thrown exception"); - } catch (HoodieInsertException ioe) { - // expected without metadata table - if (enableMetadataTable) { - fail("Should have thrown TableNotFoundException"); - } } catch (TableNotFoundException e) { - // expected with metadata table - if (!enableMetadataTable) { - fail("Should have thrown HoodieInsertException"); - } + // expected throw failure } } diff --git a/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java b/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java index c5adafa38e2..3a24ef4dd2f 100644 --- a/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java +++ b/hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java @@ -36,7 +36,9 @@ import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.expression.Expression; import org.apache.hudi.hadoop.CachingPath; +import org.apache.hudi.internal.schema.Types; import org.apache.hudi.metadata.HoodieTableMetadata; import org.apache.hudi.metadata.HoodieTableMetadataUtil; @@ -270,6 +272,26 @@ public abstract class BaseHoodieTableFileIndex implements AutoCloseable { )); } + protected List<PartitionPath> listPartitionPaths(List<String> relativePartitionPaths, + Types.RecordType partitionFields, + Expression partitionColumnPredicates) { + List<String> matchedPartitionPaths; + try { + matchedPartitionPaths = tableMetadata.getPartitionPathWithPathPrefixUsingFilterExpression(relativePartitionPaths, + partitionFields, partitionColumnPredicates); + } catch (IOException e) { + throw new HoodieIOException("Error fetching partition paths", e); + } + + // Convert partition's path into partition descriptor + return matchedPartitionPaths.stream() + .map(partitionPath -> { + Object[] partitionColumnValues = parsePartitionColumnValues(partitionColumns, partitionPath); + return new PartitionPath(partitionPath, partitionColumnValues); + }) + .collect(Collectors.toList()); + } + protected List<PartitionPath> listPartitionPaths(List<String> relativePartitionPaths) { List<String> matchedPartitionPaths; try { diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/expression/AttributeReferenceExpression.java b/hudi-common/src/main/java/org/apache/hudi/expression/ArrayData.java similarity index 69% copy from hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/expression/AttributeReferenceExpression.java copy to hudi-common/src/main/java/org/apache/hudi/expression/ArrayData.java index 3be19330a19..3f7830e996c 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/expression/AttributeReferenceExpression.java +++ b/hudi-common/src/main/java/org/apache/hudi/expression/ArrayData.java @@ -16,22 +16,25 @@ * limitations under the License. */ -package org.apache.hudi.hive.expression; +package org.apache.hudi.expression; -public class AttributeReferenceExpression extends LeafExpression { +import java.util.List; - private final String name; +public class ArrayData implements StructLike { - public AttributeReferenceExpression(String name) { - this.name = name; + private final List<Object> data; + + public ArrayData(List<Object> data) { + this.data = data; } - public String getName() { - return name; + @Override + public int numFields() { + return data.size(); } @Override - public <T> T accept(ExpressionVisitor<T> exprVisitor) { - return exprVisitor.visitAttribute(this); + public <T> T get(int pos, Class<T> classTag) { + return classTag.cast(data.get(pos)); } } diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/expression/BinaryOperator.java b/hudi-common/src/main/java/org/apache/hudi/expression/BinaryExpression.java similarity index 56% rename from hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/expression/BinaryOperator.java rename to hudi-common/src/main/java/org/apache/hudi/expression/BinaryExpression.java index d2bc83e9452..a0afdc18928 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/expression/BinaryOperator.java +++ b/hudi-common/src/main/java/org/apache/hudi/expression/BinaryExpression.java @@ -16,46 +16,26 @@ * limitations under the License. */ -package org.apache.hudi.hive.expression; +package org.apache.hudi.expression; import java.util.Arrays; +import java.util.List; /** * The expression that accept two child expressions. */ -public class BinaryOperator extends Expression { +public abstract class BinaryExpression implements Expression { private final Operator operator; private final Expression left; private final Expression right; - private BinaryOperator(Expression left, Operator operator, Expression right) { - super(Arrays.asList(left, right)); + BinaryExpression(Expression left, Operator operator, Expression right) { this.left = left; this.operator = operator; this.right = right; } - public static BinaryOperator and(Expression left, Expression right) { - return new BinaryOperator(left, Operator.AND, right); - } - - public static BinaryOperator or(Expression left, Expression right) { - return new BinaryOperator(left, Operator.OR, right); - } - - public static BinaryOperator eq(Expression left, Expression right) { - return new BinaryOperator(left, Operator.EQ, right); - } - - public static BinaryOperator gteq(Expression left, Expression right) { - return new BinaryOperator(left, Operator.GT_EQ, right); - } - - public static BinaryOperator lteq(Expression left, Expression right) { - return new BinaryOperator(left, Operator.LT_EQ, right); - } - public Operator getOperator() { return operator; } @@ -69,7 +49,12 @@ public class BinaryOperator extends Expression { } @Override - public <T> T accept(ExpressionVisitor<T> exprVisitor) { - return exprVisitor.visitBinaryOperator(this); + public List<Expression> getChildren() { + return Arrays.asList(left, right); + } + + @Override + public String toString() { + return left.toString() + " " + operator.symbol + " " + right.toString(); } } diff --git a/hudi-common/src/main/java/org/apache/hudi/expression/BindVisitor.java b/hudi-common/src/main/java/org/apache/hudi/expression/BindVisitor.java new file mode 100644 index 00000000000..2b7e589af21 --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/expression/BindVisitor.java @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.expression; + +import org.apache.hudi.internal.schema.Types; + +import java.util.List; +import java.util.stream.Collectors; + +public class BindVisitor implements ExpressionVisitor<Expression> { + + protected final Types.RecordType recordType; + protected final boolean caseSensitive; + + public BindVisitor(Types.RecordType recordType, boolean caseSensitive) { + this.recordType = recordType; + this.caseSensitive = caseSensitive; + } + + @Override + public Expression alwaysTrue() { + return Predicates.TrueExpression.get(); + } + + @Override + public Expression alwaysFalse() { + return Predicates.FalseExpression.get(); + } + + @Override + public Expression visitAnd(Predicates.And and) { + if (and.getLeft() instanceof Predicates.FalseExpression + || and.getRight() instanceof Predicates.FalseExpression) { + return alwaysFalse(); + } + + Expression left = and.getLeft().accept(this); + Expression right = and.getRight().accept(this); + if (left instanceof Predicates.FalseExpression + || right instanceof Predicates.FalseExpression) { + return alwaysFalse(); + } + + if (left instanceof Predicates.TrueExpression + && right instanceof Predicates.TrueExpression) { + return alwaysTrue(); + } + + if (left instanceof Predicates.TrueExpression) { + return right; + } + + if (right instanceof Predicates.TrueExpression) { + return left; + } + + return Predicates.and(left, right); + } + + @Override + public Expression visitOr(Predicates.Or or) { + if (or.getLeft() instanceof Predicates.TrueExpression + || or.getRight() instanceof Predicates.TrueExpression) { + return alwaysTrue(); + } + + Expression left = or.getLeft().accept(this); + Expression right = or.getRight().accept(this); + if (left instanceof Predicates.TrueExpression + || right instanceof Predicates.TrueExpression) { + return alwaysTrue(); + } + + if (left instanceof Predicates.FalseExpression + && right instanceof Predicates.FalseExpression) { + return alwaysFalse(); + } + + if (left instanceof Predicates.FalseExpression) { + return right; + } + + if (right instanceof Predicates.FalseExpression) { + return left; + } + + return Predicates.or(left, right); + } + + @Override + public Expression visitLiteral(Literal literal) { + return literal; + } + + @Override + public Expression visitNameReference(NameReference attribute) { + Types.Field field = caseSensitive + ? recordType.fieldByName(attribute.getName()) + : recordType.fieldByNameCaseInsensitive(attribute.getName()); + + if (field == null) { + throw new IllegalArgumentException("The attribute " + attribute + + " cannot be bound from schema " + recordType); + } + + return new BoundReference(field.fieldId(), field.type()); + } + + @Override + public Expression visitBoundReference(BoundReference boundReference) { + return boundReference; + } + + @Override + public Expression visitPredicate(Predicate predicate) { + if (predicate instanceof Predicates.Not) { + Expression expr = ((Predicates.Not) predicate).child.accept(this); + if (expr instanceof Predicates.TrueExpression) { + return alwaysFalse(); + } + if (expr instanceof Predicates.FalseExpression) { + return alwaysTrue(); + } + + return Predicates.not(expr); + } + + if (predicate instanceof Predicates.BinaryComparison) { + Predicates.BinaryComparison binaryExp = (Predicates.BinaryComparison) predicate; + Expression left = binaryExp.getLeft().accept(this); + Expression right = binaryExp.getRight().accept(this); + return new Predicates.BinaryComparison(left, binaryExp.getOperator(), right); + } + + if (predicate instanceof Predicates.In) { + Predicates.In in = ((Predicates.In) predicate); + Expression valueExpression = in.value.accept(this); + List<Expression> validValues = in.validValues.stream() + .map(validValue -> validValue.accept(this)) + .collect(Collectors.toList()); + + return Predicates.in(valueExpression, validValues); + } + + if (predicate instanceof Predicates.IsNull) { + Predicates.IsNull isNull = (Predicates.IsNull) predicate; + return Predicates.isNull(isNull.child.accept(this)); + } + + if (predicate instanceof Predicates.IsNotNull) { + Predicates.IsNotNull isNotNull = (Predicates.IsNotNull) predicate; + return Predicates.isNotNull(isNotNull.child.accept(this)); + } + + if (predicate instanceof Predicates.StringStartsWith) { + Predicates.StringStartsWith contains = (Predicates.StringStartsWith) predicate; + Expression left = contains.getLeft().accept(this); + Expression right = contains.getRight().accept(this); + return Predicates.startsWith(left, right); + } + + if (predicate instanceof Predicates.StringContains) { + Predicates.StringContains contains = (Predicates.StringContains) predicate; + Expression left = contains.getLeft().accept(this); + Expression right = contains.getRight().accept(this); + return Predicates.contains(left, right); + } + + throw new IllegalArgumentException("The expression " + this + "cannot be visited as predicate"); + } +} diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/expression/Literal.java b/hudi-common/src/main/java/org/apache/hudi/expression/BoundReference.java similarity index 60% copy from hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/expression/Literal.java copy to hudi-common/src/main/java/org/apache/hudi/expression/BoundReference.java index f84756a6a85..f3a7e0fac51 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/expression/Literal.java +++ b/hudi-common/src/main/java/org/apache/hudi/expression/BoundReference.java @@ -16,28 +16,38 @@ * limitations under the License. */ -package org.apache.hudi.hive.expression; +package org.apache.hudi.expression; -public class Literal extends LeafExpression { +import org.apache.hudi.internal.schema.Type; - private final String value; - private final String type; +public class BoundReference extends LeafExpression { - public Literal(String value, String type) { - this.value = value; + private final int ordinal; + private final Type type; + + public BoundReference(int ordinal, Type type) { + this.ordinal = ordinal; this.type = type; } - public String getValue() { - return value; + @Override + public Type getDataType() { + return type; } - public String getType() { - return type; + @Override + public Object eval(StructLike data) { + return data.get(ordinal, this.type.typeId().getClassTag()); } @Override public <T> T accept(ExpressionVisitor<T> exprVisitor) { - return exprVisitor.visitLiteral(this); + return exprVisitor.visitBoundReference(this); + } + + @Override + public String toString() { + return "boundReference[ordinal: " + ordinal + ", type: " + + type.typeId().getName() + "]"; } } diff --git a/hudi-common/src/main/java/org/apache/hudi/expression/Comparators.java b/hudi-common/src/main/java/org/apache/hudi/expression/Comparators.java new file mode 100644 index 00000000000..14e208197f1 --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/expression/Comparators.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.expression; + +import org.apache.hudi.common.util.Option; +import org.apache.hudi.internal.schema.Type; +import org.apache.hudi.internal.schema.Types; + +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.Map; + +public class Comparators { + + private static final Map<Type, Comparator<?>> COMPARATORS = Collections.unmodifiableMap( + new HashMap<Type, Comparator<?>>() { + { + put(Types.BooleanType.get(), Comparator.naturalOrder()); + put(Types.IntType.get(), Comparator.naturalOrder()); + put(Types.LongType.get(), Comparator.naturalOrder()); + put(Types.FloatType.get(), Comparator.naturalOrder()); + put(Types.DoubleType.get(), Comparator.naturalOrder()); + put(Types.DateType.get(), Comparator.naturalOrder()); + put(Types.TimeType.get(), Comparator.naturalOrder()); + put(Types.TimestampType.get(), Comparator.naturalOrder()); + put(Types.StringType.get(), Comparator.naturalOrder()); + put(Types.UUIDType.get(), Comparator.naturalOrder()); + } + }); + + public static <T> Comparator<T> forType(Type.PrimitiveType type) { + return (Comparator<T>) Option.ofNullable(COMPARATORS.get(type)) + .orElseThrow(() -> new UnsupportedOperationException("The desired type " + type + " doesn't support comparator yet")); + } +} diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/expression/Expression.java b/hudi-common/src/main/java/org/apache/hudi/expression/Expression.java similarity index 64% rename from hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/expression/Expression.java rename to hudi-common/src/main/java/org/apache/hudi/expression/Expression.java index 8a64237ae1b..5052a35b852 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/expression/Expression.java +++ b/hudi-common/src/main/java/org/apache/hudi/expression/Expression.java @@ -16,20 +16,31 @@ * limitations under the License. */ -package org.apache.hudi.hive.expression; +package org.apache.hudi.expression; +import org.apache.hudi.internal.schema.Type; + +import java.io.Serializable; import java.util.List; -public abstract class Expression { +public interface Expression extends Serializable { - public enum Operator { + enum Operator { + TRUE("TRUE", "TRUE"), + FALSE("FALSE", "FALSE"), AND("AND", "&&"), OR("OR", "||"), GT(">", ">"), LT("<", "<"), EQ("=", "="), GT_EQ(">=", ">="), - LT_EQ("<=", "<="); + LT_EQ("<=", "<="), + STARTS_WITH(null, null), + CONTAINS(null, null), + IS_NULL(null, null), + IS_NOT_NULL(null, null), + IN("IN", "IN"), + NOT("NOT", "NOT"); public final String sqlOperator; public final String symbol; @@ -40,14 +51,19 @@ public abstract class Expression { } } - private final List<Expression> children; + List<Expression> getChildren(); + + Type getDataType(); - public Expression(List<Expression> children) { - this.children = children; + default Object eval(StructLike data) { + throw new UnsupportedOperationException("Cannot evaluate expression " + this); } /** * Traverses the expression with the provided {@link ExpressionVisitor} */ - public abstract <T> T accept(ExpressionVisitor<T> exprVisitor); + <T> T accept(ExpressionVisitor<T> exprVisitor); + + @Override + String toString(); } diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/expression/ExpressionVisitor.java b/hudi-common/src/main/java/org/apache/hudi/expression/ExpressionVisitor.java similarity index 76% rename from hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/expression/ExpressionVisitor.java rename to hudi-common/src/main/java/org/apache/hudi/expression/ExpressionVisitor.java index 9f6ea7b1597..10a0362c786 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/expression/ExpressionVisitor.java +++ b/hudi-common/src/main/java/org/apache/hudi/expression/ExpressionVisitor.java @@ -16,16 +16,26 @@ * limitations under the License. */ -package org.apache.hudi.hive.expression; +package org.apache.hudi.expression; /** * Visitor used to travers the expression. */ public interface ExpressionVisitor<T> { - T visitBinaryOperator(BinaryOperator binaryOperator); + T alwaysTrue(); + + T alwaysFalse(); T visitLiteral(Literal literal); - T visitAttribute(AttributeReferenceExpression attribute); + T visitNameReference(NameReference attribute); + + T visitBoundReference(BoundReference boundReference); + + T visitAnd(Predicates.And and); + + T visitOr(Predicates.Or or); + + T visitPredicate(Predicate predicate); } diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/expression/LeafExpression.java b/hudi-common/src/main/java/org/apache/hudi/expression/LeafExpression.java similarity index 81% copy from hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/expression/LeafExpression.java copy to hudi-common/src/main/java/org/apache/hudi/expression/LeafExpression.java index 318a79116ca..c85f2095a4f 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/expression/LeafExpression.java +++ b/hudi-common/src/main/java/org/apache/hudi/expression/LeafExpression.java @@ -16,14 +16,17 @@ * limitations under the License. */ -package org.apache.hudi.hive.expression; +package org.apache.hudi.expression; + +import java.util.List; /** * Expression that without any child expressions. */ -public abstract class LeafExpression extends Expression { +public abstract class LeafExpression implements Expression { - public LeafExpression() { - super(null); + @Override + public List<Expression> getChildren() { + return null; } } diff --git a/hudi-common/src/main/java/org/apache/hudi/expression/Literal.java b/hudi-common/src/main/java/org/apache/hudi/expression/Literal.java new file mode 100644 index 00000000000..01fbdb1a1c8 --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/expression/Literal.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.expression; + +import org.apache.hudi.internal.schema.Type; +import org.apache.hudi.internal.schema.Types; + +import javax.xml.bind.DatatypeConverter; +import java.math.BigDecimal; +import java.nio.ByteBuffer; +import java.util.UUID; + +public class Literal<T> extends LeafExpression { + + public static <V> Literal from(V value) { + if (value instanceof Integer || value instanceof Short) { + return new Literal<>(value, Types.IntType.get()); + } + + if (value instanceof Byte) { + return new Literal<>(((Byte)value).intValue(), Types.IntType.get()); + } + + if (value instanceof Long) { + return new Literal<>(value, Types.LongType.get()); + } + + if (value instanceof Boolean) { + return new Literal<>(value, Types.BooleanType.get()); + } + + if (value instanceof Double) { + return new Literal<>(value, Types.DoubleType.get()); + } + + if (value instanceof Float) { + return new Literal<>(value, Types.FloatType.get()); + } + + if (value instanceof BigDecimal) { + BigDecimal decimal = (BigDecimal) value; + return new Literal<>(value, Types.DecimalType.get(decimal.precision(), decimal.scale())); + } + + if (value instanceof CharSequence) { + return new Literal<>(value, Types.StringType.get()); + } + + if (value instanceof byte[]) { + byte[] bytes = (byte[]) value; + return new Literal<>(ByteBuffer.wrap(bytes), Types.FixedType.getFixed(bytes.length)); + } + + if (value instanceof ByteBuffer) { + return new Literal<>(value, Types.BinaryType.get()); + } + + if (value instanceof UUID) { + return new Literal<>(value, Types.UUIDType.get()); + } + + throw new IllegalArgumentException("Cannot convert value from class " + + value.getClass().getName() + " to Literal"); + } + + private final T value; + private final Type type; + + public Literal(T value, Type type) { + this.value = value; + this.type = type; + } + + public T getValue() { + return value; + } + + @Override + public Type getDataType() { + return type; + } + + @Override + public Object eval(StructLike data) { + return value; + } + + @Override + public <T> T accept(ExpressionVisitor<T> exprVisitor) { + return exprVisitor.visitLiteral(this); + } + + @Override + public String toString() { + if (value == null) { + return "null"; + } + + if (value instanceof ByteBuffer) { + return DatatypeConverter.printHexBinary(((ByteBuffer)value).array()); + } + + return value.toString(); + } +} diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/expression/AttributeReferenceExpression.java b/hudi-common/src/main/java/org/apache/hudi/expression/NameReference.java similarity index 69% rename from hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/expression/AttributeReferenceExpression.java rename to hudi-common/src/main/java/org/apache/hudi/expression/NameReference.java index 3be19330a19..7d41a61791f 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/expression/AttributeReferenceExpression.java +++ b/hudi-common/src/main/java/org/apache/hudi/expression/NameReference.java @@ -16,13 +16,15 @@ * limitations under the License. */ -package org.apache.hudi.hive.expression; +package org.apache.hudi.expression; -public class AttributeReferenceExpression extends LeafExpression { +import org.apache.hudi.internal.schema.Type; + +public class NameReference extends LeafExpression { private final String name; - public AttributeReferenceExpression(String name) { + public NameReference(String name) { this.name = name; } @@ -30,8 +32,18 @@ public class AttributeReferenceExpression extends LeafExpression { return name; } + @Override + public Type getDataType() { + throw new UnsupportedOperationException("NameReference is not bound yet"); + } + @Override public <T> T accept(ExpressionVisitor<T> exprVisitor) { - return exprVisitor.visitAttribute(this); + return exprVisitor.visitNameReference(this); + } + + @Override + public String toString() { + return "NameReference(name=" + name + ")"; } } diff --git a/hudi-common/src/main/java/org/apache/hudi/expression/PartialBindVisitor.java b/hudi-common/src/main/java/org/apache/hudi/expression/PartialBindVisitor.java new file mode 100644 index 00000000000..cece36291df --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/expression/PartialBindVisitor.java @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.expression; + +import org.apache.hudi.common.util.Option; +import org.apache.hudi.internal.schema.Types; + +import java.util.List; +import java.util.Objects; +import java.util.stream.Collectors; + +/** + * Will try to bind all references, and convert unresolved references to AlwaysTrue. + * + * e.g. `year=2023 AND day=12`, if year and day both are provided to `recordType`, + * then the expression won't change, if day is not provided, the expression will be + * transformed to `year=2023 AND True`, which will be optimized to `year=2023`. + */ +public class PartialBindVisitor extends BindVisitor { + + public PartialBindVisitor(Types.RecordType recordType, boolean caseSensitive) { + super(recordType, caseSensitive); + } + + /** + * If the attribute cannot find from the schema, directly return null, visitPredicate + * will handle it. + */ + @Override + public Expression visitNameReference(NameReference attribute) { + Types.Field field = caseSensitive + ? recordType.fieldByName(attribute.getName()) + : recordType.fieldByNameCaseInsensitive(attribute.getName()); + + if (field == null) { + return null; + } + + return new BoundReference(field.fieldId(), field.type()); + } + + /** + * If an expression is null after accept method, which means it cannot be bounded from + * schema, we'll directly return {@link Predicates.TrueExpression}. + */ + @Override + public Expression visitPredicate(Predicate predicate) { + + if (predicate instanceof Predicates.BinaryComparison) { + Predicates.BinaryComparison binaryExp = (Predicates.BinaryComparison) predicate; + Expression left = binaryExp.getLeft().accept(this); + if (left == null) { + return alwaysTrue(); + } else { + Expression right = binaryExp.getRight().accept(this); + if (right == null) { + return alwaysTrue(); + } + + return new Predicates.BinaryComparison(left, binaryExp.getOperator(), right); + } + } + + if (predicate instanceof Predicates.Not) { + Expression expr = ((Predicates.Not) predicate).child.accept(this); + if (expr instanceof Predicates.TrueExpression) { + return alwaysFalse(); + } + if (expr instanceof Predicates.FalseExpression) { + return alwaysTrue(); + } + + return Predicates.not(expr); + } + + if (predicate instanceof Predicates.In) { + Predicates.In in = ((Predicates.In) predicate); + Expression valueExpression = in.value.accept(this); + if (valueExpression == null) { + return alwaysTrue(); + } + List<Expression> validValues = in.validValues.stream() + .map(validValue -> validValue.accept(this)) + .collect(Collectors.toList()); + if (validValues.stream().anyMatch(Objects::isNull)) { + return alwaysTrue(); + } + return Predicates.in(valueExpression, validValues); + } + + if (predicate instanceof Predicates.IsNull) { + Predicates.IsNull isNull = (Predicates.IsNull) predicate; + return Option.ofNullable(isNull.child.accept(this)) + .map(expr -> (Expression)Predicates.isNull(expr)) + .orElse(alwaysTrue()); + } + + if (predicate instanceof Predicates.IsNotNull) { + Predicates.IsNotNull isNotNull = (Predicates.IsNotNull) predicate; + return Option.ofNullable(isNotNull.child.accept(this)) + .map(expr -> (Expression)Predicates.isNotNull(expr)) + .orElse(alwaysTrue()); + } + + if (predicate instanceof Predicates.StringStartsWith) { + Predicates.StringStartsWith startsWith = (Predicates.StringStartsWith) predicate; + Expression left = startsWith.getLeft().accept(this); + if (left == null) { + return alwaysTrue(); + } else { + Expression right = startsWith.getRight().accept(this); + if (right == null) { + return alwaysTrue(); + } + + return Predicates.startsWith(left, right); + } + } + + if (predicate instanceof Predicates.StringContains) { + Predicates.StringContains contains = (Predicates.StringContains) predicate; + Expression left = contains.getLeft().accept(this); + if (left == null) { + return alwaysTrue(); + } else { + Expression right = contains.getRight().accept(this); + if (right == null) { + return alwaysTrue(); + } + + return Predicates.contains(left, right); + } + } + + throw new IllegalArgumentException("The expression " + predicate + " cannot be visited as predicate"); + } +} diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/expression/Literal.java b/hudi-common/src/main/java/org/apache/hudi/expression/Predicate.java similarity index 65% rename from hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/expression/Literal.java rename to hudi-common/src/main/java/org/apache/hudi/expression/Predicate.java index f84756a6a85..220267870dc 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/expression/Literal.java +++ b/hudi-common/src/main/java/org/apache/hudi/expression/Predicate.java @@ -16,28 +16,25 @@ * limitations under the License. */ -package org.apache.hudi.hive.expression; +package org.apache.hudi.expression; -public class Literal extends LeafExpression { +import org.apache.hudi.internal.schema.Type; +import org.apache.hudi.internal.schema.Types; - private final String value; - private final String type; - - public Literal(String value, String type) { - this.value = value; - this.type = type; - } +/** + * An expression that returns a Boolean value. + */ +public interface Predicate extends Expression { - public String getValue() { - return value; + @Override + default Type getDataType() { + return Types.BooleanType.get(); } - public String getType() { - return type; - } + Operator getOperator(); @Override - public <T> T accept(ExpressionVisitor<T> exprVisitor) { - return exprVisitor.visitLiteral(this); + default <T> T accept(ExpressionVisitor<T> exprVisitor) { + return exprVisitor.visitPredicate(this); } } diff --git a/hudi-common/src/main/java/org/apache/hudi/expression/Predicates.java b/hudi-common/src/main/java/org/apache/hudi/expression/Predicates.java new file mode 100644 index 00000000000..11c4f39507f --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/expression/Predicates.java @@ -0,0 +1,413 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.expression; + +import org.apache.hudi.internal.schema.Type; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; + +public class Predicates { + + public static TrueExpression alwaysTrue() { + return TrueExpression.get(); + } + + public static FalseExpression alwaysFalse() { + return FalseExpression.get(); + } + + public static And and(Expression left, Expression right) { + return new And(left, right); + } + + public static Or or(Expression left, Expression right) { + return new Or(left, right); + } + + public static BinaryComparison gt(Expression left, Expression right) { + return new BinaryComparison(left, Expression.Operator.GT, right); + } + + public static BinaryComparison lt(Expression left, Expression right) { + return new BinaryComparison(left, Expression.Operator.LT, right); + } + + public static BinaryComparison eq(Expression left, Expression right) { + return new BinaryComparison(left, Expression.Operator.EQ, right); + } + + public static BinaryComparison gteq(Expression left, Expression right) { + return new BinaryComparison(left, Expression.Operator.GT_EQ, right); + } + + public static BinaryComparison lteq(Expression left, Expression right) { + return new BinaryComparison(left, Expression.Operator.LT_EQ, right); + } + + public static StringStartsWith startsWith(Expression left, Expression right) { + return new StringStartsWith(left, right); + } + + public static StringContains contains(Expression left, Expression right) { + return new StringContains(left, right); + } + + public static In in(Expression left, List<Expression> validExpressions) { + return new In(left, validExpressions); + } + + public static IsNull isNull(Expression child) { + return new IsNull(child); + } + + public static IsNotNull isNotNull(Expression child) { + return new IsNotNull(child); + } + + public static Not not(Expression expr) { + return new Not(expr); + } + + public static class TrueExpression extends LeafExpression implements Predicate { + + private static final TrueExpression INSTANCE = new TrueExpression(); + + public static TrueExpression get() { + return INSTANCE; + } + + @Override + public Boolean eval(StructLike data) { + return true; + } + + @Override + public Operator getOperator() { + return Operator.TRUE; + } + + @Override + public <T> T accept(ExpressionVisitor<T> exprVisitor) { + return exprVisitor.alwaysTrue(); + } + + @Override + public String toString() { + return "TRUE"; + } + } + + public static class FalseExpression extends LeafExpression implements Predicate { + + private static final FalseExpression INSTANCE = new FalseExpression(); + + public static FalseExpression get() { + return INSTANCE; + } + + @Override + public Boolean eval(StructLike data) { + return false; + } + + @Override + public Operator getOperator() { + return Operator.FALSE; + } + + @Override + public <T> T accept(ExpressionVisitor<T> exprVisitor) { + return exprVisitor.alwaysFalse(); + } + + @Override + public String toString() { + return "FALSE"; + } + } + + public static class And extends BinaryExpression implements Predicate { + + public And(Expression left, Expression right) { + super(left, Operator.AND, right); + } + + @Override + public Boolean eval(StructLike data) { + if (getLeft() instanceof FalseExpression || getRight() instanceof FalseExpression) { + return false; + } + Object left = getLeft().eval(data); + if (left != null && !(Boolean) left) { + return false; + } else { + Object right = getRight().eval(data); + if (right != null && !(Boolean) right) { + return false; + } else { + if (left != null && right != null) { + return true; + } else { + return false; + } + } + } + } + + @Override + public <T> T accept(ExpressionVisitor<T> exprVisitor) { + return exprVisitor.visitAnd(this); + } + + @Override + public String toString() { + return "(" + getLeft() + " " + getOperator().symbol + " " + getRight() + ")"; + } + } + + public static class Or extends BinaryExpression implements Predicate { + + public Or(Expression left, Expression right) { + super(left, Operator.OR, right); + } + + @Override + public Boolean eval(StructLike data) { + if (getLeft() instanceof TrueExpression || getRight() instanceof TrueExpression) { + return true; + } + + Object left = getLeft().eval(data); + + if (left == null) { + return false; + } + + if ((Boolean) left) { + return true; + } else { + Object right = getRight().eval(data); + return right != null && (Boolean) right; + } + } + + @Override + public <T> T accept(ExpressionVisitor<T> exprVisitor) { + return exprVisitor.visitOr(this); + } + + @Override + public String toString() { + return "(" + getLeft() + " " + getOperator().symbol + " " + getRight() + ")"; + } + } + + public static class StringStartsWith extends BinaryExpression implements Predicate { + + StringStartsWith(Expression left, Expression right) { + super(left, Operator.STARTS_WITH, right); + } + + @Override + public String toString() { + return getLeft().toString() + ".startWith(" + getRight().toString() + ")"; + } + + @Override + public Object eval(StructLike data) { + return getLeft().eval(data).toString().startsWith(getRight().eval(data).toString()); + } + } + + public static class StringContains extends BinaryExpression implements Predicate { + + StringContains(Expression left, Expression right) { + super(left, Operator.CONTAINS, right); + } + + @Override + public String toString() { + return getLeft().toString() + ".contains(" + getRight().toString() + ")"; + } + + @Override + public Object eval(StructLike data) { + return getLeft().eval(data).toString().contains(getRight().eval(data).toString()); + } + } + + public static class In implements Predicate { + + protected final Expression value; + protected final List<Expression> validValues; + + public In(Expression value, List<Expression> validValues) { + this.value = value; + this.validValues = validValues; + } + + @Override + public List<Expression> getChildren() { + ArrayList<Expression> children = new ArrayList<>(validValues.size() + 1); + children.add(value); + children.addAll(validValues); + return children; + } + + @Override + public Boolean eval(StructLike data) { + Set<Object> values = validValues.stream() + .map(validValue -> validValue.eval(data)) + .collect(Collectors.toSet()); + return values.contains(value.eval(data)); + } + + @Override + public Operator getOperator() { + return Operator.IN; + } + + @Override + public String toString() { + return value.toString() + " " + getOperator().symbol + " " + + validValues.stream().map(Expression::toString).collect(Collectors.joining(",", "(", ")")); + } + } + + public static class IsNull implements Predicate { + + protected final Expression child; + + public IsNull(Expression child) { + this.child = child; + } + + @Override + public List<Expression> getChildren() { + return Collections.singletonList(child); + } + + @Override + public Boolean eval(StructLike data) { + return child.eval(data) == null; + } + + @Override + public Operator getOperator() { + return Operator.IS_NULL; + } + + @Override + public String toString() { + return child.toString() + " IS NULL"; + } + } + + public static class IsNotNull implements Predicate { + + protected final Expression child; + + public IsNotNull(Expression child) { + this.child = child; + } + + @Override + public List<Expression> getChildren() { + return Collections.singletonList(child); + } + + @Override + public Boolean eval(StructLike data) { + return child.eval(data) != null; + } + + @Override + public Operator getOperator() { + return Operator.IS_NOT_NULL; + } + + @Override + public String toString() { + return child.toString() + " IS NOT NULL"; + } + } + + public static class Not implements Predicate { + + Expression child; + + public Not(Expression child) { + this.child = child; + } + + @Override + public List<Expression> getChildren() { + return Collections.singletonList(child); + } + + @Override + public Boolean eval(StructLike data) { + return ! (Boolean) child.eval(data); + } + + @Override + public Operator getOperator() { + return Operator.NOT; + } + + @Override + public String toString() { + return "NOT " + child; + } + } + + public static class BinaryComparison extends BinaryExpression implements Predicate { + + public BinaryComparison(Expression left, Operator operator, Expression right) { + super(left, operator, right); + } + + @Override + public Boolean eval(StructLike data) { + if (getLeft().getDataType().isNestedType()) { + throw new IllegalArgumentException("The nested type doesn't support binary comparison"); + } + Comparator<Object> comparator = Comparators.forType((Type.PrimitiveType) getLeft().getDataType()); + switch (getOperator()) { + case EQ: + return comparator.compare(getLeft().eval(data), getRight().eval(data)) == 0; + case GT: + return comparator.compare(getLeft().eval(data), getRight().eval(data)) > 0; + case GT_EQ: + return comparator.compare(getLeft().eval(data), getRight().eval(data)) >= 0; + case LT: + return comparator.compare(getLeft().eval(data), getRight().eval(data)) < 0; + case LT_EQ: + return comparator.compare(getLeft().eval(data), getRight().eval(data)) <= 0; + default: + throw new IllegalArgumentException("The operation " + getOperator() + " doesn't support binary comparison"); + } + } + } +} diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/expression/LeafExpression.java b/hudi-common/src/main/java/org/apache/hudi/expression/StructLike.java similarity index 79% rename from hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/expression/LeafExpression.java rename to hudi-common/src/main/java/org/apache/hudi/expression/StructLike.java index 318a79116ca..d6346b8c657 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/expression/LeafExpression.java +++ b/hudi-common/src/main/java/org/apache/hudi/expression/StructLike.java @@ -16,14 +16,11 @@ * limitations under the License. */ -package org.apache.hudi.hive.expression; +package org.apache.hudi.expression; -/** - * Expression that without any child expressions. - */ -public abstract class LeafExpression extends Expression { +public interface StructLike { + + int numFields(); - public LeafExpression() { - super(null); - } + <T> T get(int pos, Class<T> classTag); } diff --git a/hudi-common/src/main/java/org/apache/hudi/internal/schema/Type.java b/hudi-common/src/main/java/org/apache/hudi/internal/schema/Type.java index d51ab37719a..bc8b89004d6 100644 --- a/hudi-common/src/main/java/org/apache/hudi/internal/schema/Type.java +++ b/hudi-common/src/main/java/org/apache/hudi/internal/schema/Type.java @@ -18,10 +18,22 @@ package org.apache.hudi.internal.schema; +import org.apache.hudi.common.util.PartitionPathEncodeUtils; + import java.io.Serializable; +import java.math.BigDecimal; +import java.nio.ByteBuffer; +import java.time.Instant; +import java.time.LocalDate; +import java.time.OffsetDateTime; +import java.time.ZoneOffset; +import java.time.format.DateTimeFormatter; +import java.time.temporal.ChronoUnit; import java.util.List; import java.util.Locale; +import java.util.Map; import java.util.Objects; +import java.util.UUID; /** * The type of a schema, reference avro schema. @@ -29,21 +41,45 @@ import java.util.Objects; * to do add support for localTime if avro version is updated */ public interface Type extends Serializable { + + OffsetDateTime EPOCH = Instant.ofEpochSecond(0).atOffset(ZoneOffset.UTC); + LocalDate EPOCH_DAY = EPOCH.toLocalDate(); + /** * Enums for type names. */ enum TypeID { - RECORD, ARRAY, MAP, FIXED, STRING, BINARY, - INT, LONG, FLOAT, DOUBLE, DATE, BOOLEAN, TIME, TIMESTAMP, DECIMAL, UUID; - private String name; - - TypeID() { + RECORD(Types.RecordType.class), + ARRAY(List.class), + MAP(Map.class), + FIXED(ByteBuffer.class), + STRING(String.class), + BINARY(ByteBuffer.class), + INT(Integer.class), + LONG(Long.class), + FLOAT(Float.class), + DOUBLE(Double.class), + DATE(Integer.class), + BOOLEAN(Boolean.class), + TIME(Long.class), + TIMESTAMP(Long.class), + DECIMAL(BigDecimal.class), + UUID(UUID.class); + private final String name; + private final Class<?> classTag; + + TypeID(Class<?> classTag) { this.name = this.name().toLowerCase(Locale.ROOT); + this.classTag = classTag; } public String getName() { return name; } + + public Class<?> getClassTag() { + return classTag; + } } static TypeID fromValue(String value) { @@ -54,6 +90,40 @@ public interface Type extends Serializable { } } + static Object fromPartitionString(String partitionValue, Type type) { + if (partitionValue == null + || PartitionPathEncodeUtils.DEFAULT_PARTITION_PATH.equals(partitionValue) + || PartitionPathEncodeUtils.DEPRECATED_DEFAULT_PARTITION_PATH.equals(partitionValue)) { + return null; + } + + switch (type.typeId()) { + case INT: + return Integer.parseInt(partitionValue); + case LONG: + return Long.parseLong(partitionValue); + case BOOLEAN: + return Boolean.parseBoolean(partitionValue); + case FLOAT: + return Float.parseFloat(partitionValue); + case DECIMAL: + return new BigDecimal(partitionValue); + case DOUBLE: + return Double.parseDouble(partitionValue); + case UUID: + return UUID.fromString(partitionValue); + case DATE: + // TODO Support different date format + return Math.toIntExact(ChronoUnit.DAYS.between( + EPOCH_DAY, LocalDate.parse(partitionValue, DateTimeFormatter.ISO_LOCAL_DATE))); + case STRING: + return partitionValue; + default: + throw new UnsupportedOperationException("Cast value " + partitionValue + + " to type " + type + " is not supported yet"); + } + } + TypeID typeId(); default boolean isNestedType() { diff --git a/hudi-common/src/main/java/org/apache/hudi/internal/schema/Types.java b/hudi-common/src/main/java/org/apache/hudi/internal/schema/Types.java index 3d2774bc19b..ed03a7349cb 100644 --- a/hudi-common/src/main/java/org/apache/hudi/internal/schema/Types.java +++ b/hudi-common/src/main/java/org/apache/hudi/internal/schema/Types.java @@ -23,7 +23,6 @@ import org.apache.hudi.internal.schema.Type.PrimitiveType; import java.io.Serializable; import java.util.Arrays; -import java.util.HashMap; import java.util.List; import java.util.Locale; import java.util.Map; @@ -511,6 +510,7 @@ public class Types { private final Field[] fields; private transient Map<String, Field> nameToFields = null; + private transient Map<String, Field> lowercaseNameToFields = null; private transient Map<Integer, Field> idToFields = null; private RecordType(List<Field> fields, String name) { @@ -523,30 +523,43 @@ public class Types { return Arrays.asList(fields); } - public Field field(String name) { + /** + * Case-sensitive get field by name + */ + public Field fieldByName(String name) { if (nameToFields == null) { - nameToFields = new HashMap<>(); - for (Field field : fields) { - nameToFields.put(field.name().toLowerCase(Locale.ROOT), field); - } + nameToFields = Arrays.stream(fields) + .collect(Collectors.toMap( + Field::name, + field -> field)); + } + return nameToFields.get(name); + } + + public Field fieldByNameCaseInsensitive(String name) { + if (lowercaseNameToFields == null) { + lowercaseNameToFields = Arrays.stream(fields) + .collect(Collectors.toMap( + field -> field.name.toLowerCase(Locale.ROOT), + field -> field)); } - return nameToFields.get(name.toLowerCase(Locale.ROOT)); + return lowercaseNameToFields.get(name.toLowerCase(Locale.ROOT)); } @Override public Field field(int id) { if (idToFields == null) { - idToFields = new HashMap<>(); - for (Field field : fields) { - idToFields.put(field.fieldId(), field); - } + idToFields = Arrays.stream(fields) + .collect(Collectors.toMap( + Field::fieldId, + field -> field)); } return idToFields.get(id); } @Override public Type fieldType(String name) { - Field field = field(name); + Field field = fieldByNameCaseInsensitive(name); if (field != null) { return field.type(); } diff --git a/hudi-common/src/main/java/org/apache/hudi/internal/schema/utils/Conversions.java b/hudi-common/src/main/java/org/apache/hudi/internal/schema/utils/Conversions.java new file mode 100644 index 00000000000..67adff88ac7 --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/internal/schema/utils/Conversions.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.internal.schema.utils; + +import org.apache.hudi.internal.schema.Type; +import org.apache.hudi.internal.schema.Types; + +import java.util.Arrays; +import java.util.HashSet; + +public class Conversions { + + private static final HashSet<Type.TypeID> SUPPORTED_PARTITION_TYPES = new HashSet<>( + Arrays.asList(Type.TypeID.INT, + Type.TypeID.LONG, + Type.TypeID.BOOLEAN, + Type.TypeID.FLOAT, + Type.TypeID.DECIMAL, + Type.TypeID.DOUBLE, + Type.TypeID.UUID, + Type.TypeID.DATE, + Type.TypeID.STRING)); + + public static boolean isPartitionSchemaSupportedConversion(Types.RecordType schema) { + for (Types.Field field: schema.fields()) { + if (!SUPPORTED_PARTITION_TYPES.contains(field.type().typeId())) { + return false; + } + } + + return true; + } +} diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/AbstractHoodieTableMetadata.java b/hudi-common/src/main/java/org/apache/hudi/metadata/AbstractHoodieTableMetadata.java new file mode 100644 index 00000000000..f62786e9517 --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/AbstractHoodieTableMetadata.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.metadata; + +import org.apache.hadoop.fs.Path; +import org.apache.hudi.common.config.SerializableConfiguration; +import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.common.util.PartitionPathEncodeUtils; +import org.apache.hudi.common.util.StringUtils; +import org.apache.hudi.expression.ArrayData; +import org.apache.hudi.hadoop.CachingPath; +import org.apache.hudi.hadoop.SerializablePath; +import org.apache.hudi.internal.schema.Type; +import org.apache.hudi.internal.schema.Types; + +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +public abstract class AbstractHoodieTableMetadata implements HoodieTableMetadata { + + protected transient HoodieEngineContext engineContext; + + protected final SerializableConfiguration hadoopConf; + protected final SerializablePath dataBasePath; + + // TODO get this from HoodieConfig + protected final boolean caseSensitive = false; + + public AbstractHoodieTableMetadata(HoodieEngineContext engineContext, SerializableConfiguration conf, String dataBasePath) { + this.engineContext = engineContext; + this.hadoopConf = conf; + this.dataBasePath = new SerializablePath(new CachingPath(dataBasePath)); + } + + protected static int getPathPartitionLevel(Types.RecordType partitionFields, String path) { + if (StringUtils.isNullOrEmpty(path) || partitionFields == null) { + return 0; + } + + int level = 1; + for (int i = 1; i < path.length() - 1; i++) { + if (path.charAt(i) == Path.SEPARATOR_CHAR) { + level++; + } + } + if (path.startsWith(Path.SEPARATOR)) { + level--; + } + if (path.endsWith(Path.SEPARATOR)) { + level--; + } + return level; + } + + protected static ArrayData extractPartitionValues(Types.RecordType partitionFields, + String relativePartitionPath, + boolean urlEncodePartitioningEnabled) { + if (partitionFields.fields().size() == 1) { + // SinglePartPartitionValue, which might contain slashes. + String partitionValue = relativePartitionPath.split("=")[1]; + return new ArrayData(Collections.singletonList(Type.fromPartitionString( + urlEncodePartitioningEnabled ? PartitionPathEncodeUtils.unescapePathName(partitionValue) : partitionValue, + partitionFields.field(0).type()))); + } + + List<Object> partitionValues; + String[] partitionFragments = relativePartitionPath.split("/"); + partitionValues = IntStream.range(0, partitionFragments.length) + .mapToObj(idx -> { + String partitionValue = partitionFragments[idx].split("=")[1]; + return Type.fromPartitionString( + urlEncodePartitioningEnabled ? PartitionPathEncodeUtils.unescapePathName(partitionValue) : partitionValue, + partitionFields.field(idx).type()); + }).collect(Collectors.toList()); + + return new ArrayData(partitionValues); + } +} diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java b/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java index 076eb4bf1dc..7e1acf3a87c 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java @@ -29,8 +29,8 @@ import org.apache.hudi.common.engine.HoodieLocalEngineContext; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.metrics.Registry; import org.apache.hudi.common.model.HoodieRecord; -import org.apache.hudi.common.model.HoodieRecordGlobalLocation; import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.model.HoodieRecordGlobalLocation; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.util.HoodieTimer; import org.apache.hudi.common.util.Option; @@ -41,8 +41,6 @@ import org.apache.hudi.common.util.hash.FileIndexID; import org.apache.hudi.common.util.hash.PartitionIndexID; import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.exception.HoodieMetadataException; -import org.apache.hudi.hadoop.CachingPath; -import org.apache.hudi.hadoop.SerializablePath; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; @@ -68,25 +66,33 @@ import java.util.stream.Collectors; /** * Abstract class for implementing common table metadata operations. */ -public abstract class BaseTableMetadata implements HoodieTableMetadata { +public abstract class BaseTableMetadata extends AbstractHoodieTableMetadata { private static final Logger LOG = LoggerFactory.getLogger(BaseTableMetadata.class); - protected transient HoodieEngineContext engineContext; - protected final SerializablePath dataBasePath; + protected static final long MAX_MEMORY_SIZE_IN_BYTES = 1024 * 1024 * 1024; + // NOTE: Buffer-size is deliberately set pretty low, since MT internally is relying + // on HFile (serving as persisted binary key-value mapping) to do caching + protected static final int BUFFER_SIZE = 10 * 1024; // 10Kb + protected final HoodieTableMetaClient dataMetaClient; protected final Option<HoodieMetadataMetrics> metrics; protected final HoodieMetadataConfig metadataConfig; protected boolean isMetadataTableInitialized; + protected final boolean hiveStylePartitioningEnabled; + protected final boolean urlEncodePartitioningEnabled; protected BaseTableMetadata(HoodieEngineContext engineContext, HoodieMetadataConfig metadataConfig, String dataBasePath) { - this.engineContext = engineContext; - this.dataBasePath = new SerializablePath(new CachingPath(dataBasePath)); + super(engineContext, engineContext.getHadoopConf(), dataBasePath); + this.dataMetaClient = HoodieTableMetaClient.builder() - .setConf(engineContext.getHadoopConf().get()) + .setConf(hadoopConf.get()) .setBasePath(dataBasePath) .build(); + + this.hiveStylePartitioningEnabled = Boolean.parseBoolean(dataMetaClient.getTableConfig().getHiveStylePartitioningEnable()); + this.urlEncodePartitioningEnabled = Boolean.parseBoolean(dataMetaClient.getTableConfig().getUrlEncodePartitioning()); this.metadataConfig = metadataConfig; this.isMetadataTableInitialized = dataMetaClient.getTableConfig().isMetadataTableAvailable(); diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java b/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java index 9ce83a7d953..10ed196714d 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/FileSystemBackedTableMetadata.java @@ -26,6 +26,7 @@ import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodiePartitionMetadata; import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.common.model.HoodieRecordGlobalLocation; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.util.Option; @@ -33,6 +34,12 @@ import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.exception.HoodieMetadataException; +import org.apache.hudi.exception.TableNotFoundException; +import org.apache.hudi.expression.BindVisitor; +import org.apache.hudi.expression.Expression; +import org.apache.hudi.expression.PartialBindVisitor; +import org.apache.hudi.expression.Predicates; +import org.apache.hudi.internal.schema.Types; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -52,20 +59,36 @@ import java.util.stream.Stream; /** * Implementation of {@link HoodieTableMetadata} based file-system-backed table metadata. */ -public class FileSystemBackedTableMetadata implements HoodieTableMetadata { +public class FileSystemBackedTableMetadata extends AbstractHoodieTableMetadata { private static final int DEFAULT_LISTING_PARALLELISM = 1500; - private final transient HoodieEngineContext engineContext; - private final SerializableConfiguration hadoopConf; - private final String datasetBasePath; private final boolean assumeDatePartitioning; - public FileSystemBackedTableMetadata(HoodieEngineContext engineContext, SerializableConfiguration conf, String datasetBasePath, + private final boolean hiveStylePartitioningEnabled; + private final boolean urlEncodePartitioningEnabled; + + public FileSystemBackedTableMetadata(HoodieEngineContext engineContext, HoodieTableConfig tableConfig, + SerializableConfiguration conf, String datasetBasePath, boolean assumeDatePartitioning) { - this.engineContext = engineContext; - this.hadoopConf = conf; - this.datasetBasePath = datasetBasePath; + super(engineContext, conf, datasetBasePath); + + this.hiveStylePartitioningEnabled = Boolean.parseBoolean(tableConfig.getHiveStylePartitioningEnable()); + this.urlEncodePartitioningEnabled = Boolean.parseBoolean(tableConfig.getUrlEncodePartitioning()); + this.assumeDatePartitioning = assumeDatePartitioning; + } + + public FileSystemBackedTableMetadata(HoodieEngineContext engineContext, + SerializableConfiguration conf, String datasetBasePath, + boolean assumeDatePartitioning) { + super(engineContext, conf, datasetBasePath); + + FileSystem fs = FSUtils.getFs(dataBasePath.get(), conf.get()); + Path metaPath = new Path(dataBasePath.get(), HoodieTableMetaClient.METAFOLDER_NAME); + TableNotFoundException.checkTableValidity(fs, this.dataBasePath.get(), metaPath); + HoodieTableConfig tableConfig = new HoodieTableConfig(fs, metaPath.toString(), null, null); + this.hiveStylePartitioningEnabled = Boolean.parseBoolean(tableConfig.getHiveStylePartitioningEnable()); + this.urlEncodePartitioningEnabled = Boolean.parseBoolean(tableConfig.getUrlEncodePartitioning()); this.assumeDatePartitioning = assumeDatePartitioning; } @@ -77,15 +100,29 @@ public class FileSystemBackedTableMetadata implements HoodieTableMetadata { @Override public List<String> getAllPartitionPaths() throws IOException { - Path basePath = new Path(datasetBasePath); + Path basePath = dataBasePath.get(); if (assumeDatePartitioning) { FileSystem fs = basePath.getFileSystem(hadoopConf.get()); - return FSUtils.getAllPartitionFoldersThreeLevelsDown(fs, datasetBasePath); + return FSUtils.getAllPartitionFoldersThreeLevelsDown(fs, dataBasePath.toString()); } return getPartitionPathWithPathPrefixes(Collections.singletonList("")); } + @Override + public List<String> getPartitionPathWithPathPrefixUsingFilterExpression(List<String> relativePathPrefixes, + Types.RecordType partitionFields, + Expression expression) throws IOException { + return relativePathPrefixes.stream().flatMap(relativePathPrefix -> { + try { + return getPartitionPathWithPathPrefixUsingFilterExpression(relativePathPrefix, + partitionFields, expression).stream(); + } catch (IOException e) { + throw new HoodieIOException("Error fetching partition paths with relative path: " + relativePathPrefix, e); + } + }).collect(Collectors.toList()); + } + @Override public List<String> getPartitionPathWithPathPrefixes(List<String> relativePathPrefixes) { return relativePathPrefixes.stream().flatMap(relativePathPrefix -> { @@ -98,11 +135,35 @@ public class FileSystemBackedTableMetadata implements HoodieTableMetadata { } private List<String> getPartitionPathWithPathPrefix(String relativePathPrefix) throws IOException { + return getPartitionPathWithPathPrefixUsingFilterExpression(relativePathPrefix, null, null); + } + + private List<String> getPartitionPathWithPathPrefixUsingFilterExpression(String relativePathPrefix, + Types.RecordType partitionFields, + Expression pushedExpr) throws IOException { List<Path> pathsToList = new CopyOnWriteArrayList<>(); pathsToList.add(StringUtils.isNullOrEmpty(relativePathPrefix) - ? new Path(datasetBasePath) : new Path(datasetBasePath, relativePathPrefix)); + ? dataBasePath.get() : new Path(dataBasePath.get(), relativePathPrefix)); List<String> partitionPaths = new CopyOnWriteArrayList<>(); + int currentPartitionLevel = -1; + boolean needPushDownExpressions; + Expression fullBoundExpr; + // Not like `HoodieBackedTableMetadata`, since we don't know the exact partition levels here, + // given it's possible that partition values contains `/`, which could affect + // the result to get right `partitionValue` when listing paths, here we have + // to make it more strict that `urlEncodePartitioningEnabled` must be enabled. + // TODO better enable urlEncodePartitioningEnabled if hiveStylePartitioningEnabled is enabled? + if (hiveStylePartitioningEnabled && urlEncodePartitioningEnabled + && pushedExpr != null && partitionFields != null) { + currentPartitionLevel = getPathPartitionLevel(partitionFields, relativePathPrefix); + needPushDownExpressions = true; + fullBoundExpr = pushedExpr.accept(new BindVisitor(partitionFields, caseSensitive)); + } else { + fullBoundExpr = Predicates.alwaysTrue(); + needPushDownExpressions = false; + } + while (!pathsToList.isEmpty()) { // TODO: Get the parallelism from HoodieWriteConfig int listingParallelism = Math.min(DEFAULT_LISTING_PARALLELISM, pathsToList.size()); @@ -116,7 +177,7 @@ public class FileSystemBackedTableMetadata implements HoodieTableMetadata { List<Pair<Option<String>, Option<Path>>> result = engineContext.flatMap(pathsToList, path -> { FileSystem fileSystem = path.getFileSystem(hadoopConf.get()); if (HoodiePartitionMetadata.hasPartitionMetadata(fileSystem, path)) { - return Stream.of(Pair.of(Option.of(FSUtils.getRelativePartitionPath(new Path(datasetBasePath), path)), Option.empty())); + return Stream.of(Pair.of(Option.of(FSUtils.getRelativePartitionPath(dataBasePath.get(), path)), Option.empty())); } return Arrays.stream(fileSystem.listStatus(path, p -> { try { @@ -129,10 +190,34 @@ public class FileSystemBackedTableMetadata implements HoodieTableMetadata { }, listingParallelism); pathsToList.clear(); - partitionPaths.addAll(result.stream().filter(entry -> entry.getKey().isPresent()).map(entry -> entry.getKey().get()) + partitionPaths.addAll(result.stream().filter(entry -> entry.getKey().isPresent()) + .map(entry -> entry.getKey().get()) + .filter(relativePartitionPath -> fullBoundExpr instanceof Predicates.TrueExpression + || (Boolean) fullBoundExpr.eval( + extractPartitionValues(partitionFields, relativePartitionPath, urlEncodePartitioningEnabled))) .collect(Collectors.toList())); + Expression partialBoundExpr; + // If partitionPaths is nonEmpty, we're already at the last path level, and all paths + // are filtered already. + if (needPushDownExpressions && partitionPaths.isEmpty()) { + // Here we assume the path level matches the number of partition columns, so we'll rebuild + // new schema based on current path level. + // e.g. partition columns are <region, date, hh>, if we're listing the second level, then + // currentSchema would be <region, date> + // `PartialBindVisitor` will bind reference if it can be found from `currentSchema`, otherwise + // will change the expression to `alwaysTrue`. Can see `PartialBindVisitor` for details. + Types.RecordType currentSchema = Types.RecordType.get(partitionFields.fields().subList(0, ++currentPartitionLevel)); + PartialBindVisitor partialBindVisitor = new PartialBindVisitor(currentSchema, caseSensitive); + partialBoundExpr = pushedExpr.accept(partialBindVisitor); + } else { + partialBoundExpr = Predicates.alwaysTrue(); + } + pathsToList.addAll(result.stream().filter(entry -> entry.getValue().isPresent()).map(entry -> entry.getValue().get()) + .filter(path -> partialBoundExpr instanceof Predicates.TrueExpression + || (Boolean) partialBoundExpr.eval( + extractPartitionValues(partitionFields, FSUtils.getRelativePartitionPath(dataBasePath.get(), path), urlEncodePartitioningEnabled))) .collect(Collectors.toList())); } return partitionPaths; diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java index b0e9bcaaab2..9a27d2cfbca 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java @@ -46,6 +46,9 @@ import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.exception.TableNotFoundException; +import org.apache.hudi.expression.BindVisitor; +import org.apache.hudi.expression.Expression; +import org.apache.hudi.internal.schema.Types; import org.apache.hudi.io.storage.HoodieFileReaderFactory; import org.apache.hudi.io.storage.HoodieSeekingFileReader; import org.apache.hudi.util.Transient; @@ -143,6 +146,26 @@ public class HoodieBackedTableMetadata extends BaseTableMetadata { return Option.ofNullable(recordsByKeys.get(key)); } + @Override + public List<String> getPartitionPathWithPathPrefixUsingFilterExpression(List<String> relativePathPrefixes, + Types.RecordType partitionFields, + Expression expression) throws IOException { + Expression boundedExpr = expression.accept(new BindVisitor(partitionFields, caseSensitive)); + List<String> selectedPartitionPaths = getPartitionPathWithPathPrefixes(relativePathPrefixes); + + // Can only prune partitions if the number of partition levels matches partition fields + // Here we'll check the first selected partition to see whether the numbers match. + if (hiveStylePartitioningEnabled + && getPathPartitionLevel(partitionFields, selectedPartitionPaths.get(0)) == partitionFields.fields().size()) { + return selectedPartitionPaths.stream() + .filter(p -> + (boolean) boundedExpr.eval(extractPartitionValues(partitionFields, p, urlEncodePartitioningEnabled))) + .collect(Collectors.toList()); + } + + return selectedPartitionPaths; + } + @Override public List<String> getPartitionPathWithPathPrefixes(List<String> relativePathPrefixes) throws IOException { // TODO: consider skipping this method for non-partitioned table and simplify the checks diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadata.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadata.java index ac17eed001b..0ba197a5c68 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadata.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadata.java @@ -33,6 +33,8 @@ import org.apache.hudi.exception.HoodieMetadataException; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; +import org.apache.hudi.expression.Expression; +import org.apache.hudi.internal.schema.Types; import java.io.IOException; import java.io.Serializable; @@ -146,6 +148,14 @@ public interface HoodieTableMetadata extends Serializable, AutoCloseable { */ FileStatus[] getAllFilesInPartition(Path partitionPath) throws IOException; + /** + * Retrieve the paths of partitions under the provided sub-directories, + * and try to filter these partitions using the provided {@link Expression}. + */ + List<String> getPartitionPathWithPathPrefixUsingFilterExpression(List<String> relativePathPrefixes, + Types.RecordType partitionFields, + Expression expression) throws IOException; + /** * Fetches all partition paths that are the sub-directories of the list of provided (relative) paths. * <p> diff --git a/hudi-common/src/test/java/org/apache/hudi/expression/TestPartialBindVisitor.java b/hudi-common/src/test/java/org/apache/hudi/expression/TestPartialBindVisitor.java new file mode 100644 index 00000000000..c7e75711823 --- /dev/null +++ b/hudi-common/src/test/java/org/apache/hudi/expression/TestPartialBindVisitor.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.expression; + +import org.apache.hudi.internal.schema.Types; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.Arrays; + +public class TestPartialBindVisitor { + + private static Types.RecordType schema; + @BeforeAll + public static void init() { + ArrayList<Types.Field> fields = new ArrayList<>(5); + fields.add(Types.Field.get(0, true, "a", Types.StringType.get())); + fields.add(Types.Field.get(1, true, "b", Types.DateType.get())); + fields.add(Types.Field.get(2, true, "c", Types.IntType.get())); + fields.add(Types.Field.get(3, true, "d", Types.LongType.get())); + fields.add(Types.Field.get(4, false, "f", Types.BooleanType.get())); + schema = Types.RecordType.get(fields, "schema"); + } + + @Test + public void testPartialBindIfAllExisting() { + PartialBindVisitor partialBindVisitor = new PartialBindVisitor(schema, false); + + Predicates.BinaryComparison eq = Predicates.eq(new NameReference("a"), + Literal.from("Jane")); + Predicates.BinaryComparison gt = Predicates.gt(new NameReference("c"), + Literal.from(10)); + Predicates.In in = Predicates.in(new NameReference("d"), + Arrays.asList(Literal.from(10L), Literal.from(13L))); + + Predicates.And expr = Predicates.and(eq, Predicates.or(gt, in)); + Expression binded = expr.accept(partialBindVisitor); + + Assertions.assertTrue((Boolean) binded.eval(new ArrayData(Arrays.asList("Jane", "2023-04-02", 15, 5L, false)))); + Assertions.assertTrue((Boolean) binded.eval(new ArrayData(Arrays.asList("Jane", "2023-04-02", 5, 10L, false)))); + Assertions.assertFalse((Boolean) binded.eval(new ArrayData(Arrays.asList("Lone", "2023-04-02", 15, 5L, false)))); + Assertions.assertFalse((Boolean) binded.eval(new ArrayData(Arrays.asList("Lone", "2023-04-02", 10, 5L, false)))); + } + + @Test + public void testPartialBindIfFieldMissing() { + PartialBindVisitor partialBindVisitor = new PartialBindVisitor(schema, false); + + Predicates.BinaryComparison eq = Predicates.eq(new NameReference("a"), + Literal.from("Jane")); + Predicates.BinaryComparison lt = Predicates.lt(new NameReference("m"), + Literal.from(10)); + Predicates.BinaryComparison gteq = Predicates.gteq(new NameReference("d"), + Literal.from(10L)); + + Predicates.And expr = Predicates.and(eq, Predicates.or(lt, gteq)); + // Since Attribute m does not exist in the schema, so the OR expression is always true, + // the expression is optimized to only consider the EQ expression + Expression binded = expr.accept(partialBindVisitor); + + Assertions.assertTrue((Boolean) binded.eval(new ArrayData(Arrays.asList("Jane", "2023-04-02", 15, 5L, false)))); + Assertions.assertFalse((Boolean) binded.eval(new ArrayData(Arrays.asList("Lone", "2023-04-02", 15, 5L, false)))); + Assertions.assertFalse((Boolean) binded.eval(new ArrayData(Arrays.asList("Lone", "2023-04-02", 10, 5L, false)))); + } +} diff --git a/hudi-common/src/test/java/org/apache/hudi/internal/schema/utils/TestAvroSchemaEvolutionUtils.java b/hudi-common/src/test/java/org/apache/hudi/internal/schema/utils/TestAvroSchemaEvolutionUtils.java index 338e892f48c..6c5fcb7049c 100644 --- a/hudi-common/src/test/java/org/apache/hudi/internal/schema/utils/TestAvroSchemaEvolutionUtils.java +++ b/hudi-common/src/test/java/org/apache/hudi/internal/schema/utils/TestAvroSchemaEvolutionUtils.java @@ -331,7 +331,7 @@ public class TestAvroSchemaEvolutionUtils { avroRecord.put("preferences", preferencesRecord); // fill mapType Map<String, GenericData.Record> locations = new HashMap<>(); - Schema mapSchema = AvroInternalSchemaConverter.convert(((Types.MapType)record.field("locations").type()).valueType(), "test1.locations"); + Schema mapSchema = AvroInternalSchemaConverter.convert(((Types.MapType)record.fieldByNameCaseInsensitive("locations").type()).valueType(), "test1.locations"); GenericData.Record locationsValue = new GenericData.Record(mapSchema); locationsValue.put("lat", 1.2f); locationsValue.put("long", 1.4f); diff --git a/hudi-common/src/test/java/org/apache/hudi/metadata/TestFileSystemBackedTableMetadata.java b/hudi-common/src/test/java/org/apache/hudi/metadata/TestFileSystemBackedTableMetadata.java index db7608cea96..799ff7e7d23 100644 --- a/hudi-common/src/test/java/org/apache/hudi/metadata/TestFileSystemBackedTableMetadata.java +++ b/hudi-common/src/test/java/org/apache/hudi/metadata/TestFileSystemBackedTableMetadata.java @@ -71,7 +71,7 @@ public class TestFileSystemBackedTableMetadata extends HoodieCommonTestHarness { hoodieTestTable.addCommit("100").withBaseFilesInPartition(DEFAULT_PARTITION, IntStream.range(0, 10).toArray()); HoodieLocalEngineContext localEngineContext = new HoodieLocalEngineContext(metaClient.getHadoopConf()); FileSystemBackedTableMetadata fileSystemBackedTableMetadata = - new FileSystemBackedTableMetadata(localEngineContext, new SerializableConfiguration(metaClient.getHadoopConf()), basePath, false); + new FileSystemBackedTableMetadata(localEngineContext, metaClient.getTableConfig(), new SerializableConfiguration(metaClient.getHadoopConf()), basePath, false); Assertions.assertEquals(0, fileSystemBackedTableMetadata.getAllPartitionPaths().size()); Assertions.assertEquals(10, fileSystemBackedTableMetadata.getAllFilesInPartition(new Path(basePath)).length); Assertions.assertEquals(10, fileSystemBackedTableMetadata.getAllFilesInPartitions( @@ -96,7 +96,7 @@ public class TestFileSystemBackedTableMetadata extends HoodieCommonTestHarness { }); HoodieLocalEngineContext localEngineContext = new HoodieLocalEngineContext(metaClient.getHadoopConf()); FileSystemBackedTableMetadata fileSystemBackedTableMetadata = - new FileSystemBackedTableMetadata(localEngineContext, new SerializableConfiguration(metaClient.getHadoopConf()), basePath, true); + new FileSystemBackedTableMetadata(localEngineContext, metaClient.getTableConfig(), new SerializableConfiguration(metaClient.getHadoopConf()), basePath, true); Assertions.assertEquals(3, fileSystemBackedTableMetadata.getAllPartitionPaths().size()); Assertions.assertEquals(10, fileSystemBackedTableMetadata.getAllFilesInPartition(new Path(basePath + "/" + DATE_PARTITIONS.get(0))).length); @@ -127,7 +127,7 @@ public class TestFileSystemBackedTableMetadata extends HoodieCommonTestHarness { }); HoodieLocalEngineContext localEngineContext = new HoodieLocalEngineContext(metaClient.getHadoopConf()); FileSystemBackedTableMetadata fileSystemBackedTableMetadata = - new FileSystemBackedTableMetadata(localEngineContext, new SerializableConfiguration(metaClient.getHadoopConf()), basePath, false); + new FileSystemBackedTableMetadata(localEngineContext, metaClient.getTableConfig(), new SerializableConfiguration(metaClient.getHadoopConf()), basePath, false); Assertions.assertEquals(3, fileSystemBackedTableMetadata.getAllPartitionPaths().size()); List<String> fullPartitionPaths = DATE_PARTITIONS.stream().map(p -> basePath + "/" + p).collect(Collectors.toList()); @@ -152,7 +152,7 @@ public class TestFileSystemBackedTableMetadata extends HoodieCommonTestHarness { }); HoodieLocalEngineContext localEngineContext = new HoodieLocalEngineContext(metaClient.getHadoopConf()); FileSystemBackedTableMetadata fileSystemBackedTableMetadata = - new FileSystemBackedTableMetadata(localEngineContext, new SerializableConfiguration(metaClient.getHadoopConf()), basePath, false); + new FileSystemBackedTableMetadata(localEngineContext, metaClient.getTableConfig(), new SerializableConfiguration(metaClient.getHadoopConf()), basePath, false); Assertions.assertEquals(3, fileSystemBackedTableMetadata.getAllPartitionPaths().size()); Assertions.assertEquals(10, fileSystemBackedTableMetadata.getAllFilesInPartition(new Path(basePath + "/" + ONE_LEVEL_PARTITIONS.get(0))).length); @@ -178,7 +178,7 @@ public class TestFileSystemBackedTableMetadata extends HoodieCommonTestHarness { }); HoodieLocalEngineContext localEngineContext = new HoodieLocalEngineContext(metaClient.getHadoopConf()); FileSystemBackedTableMetadata fileSystemBackedTableMetadata = - new FileSystemBackedTableMetadata(localEngineContext, new SerializableConfiguration(metaClient.getHadoopConf()), basePath, false); + new FileSystemBackedTableMetadata(localEngineContext, metaClient.getTableConfig(), new SerializableConfiguration(metaClient.getHadoopConf()), basePath, false); Assertions.assertEquals(3, fileSystemBackedTableMetadata.getAllPartitionPaths().size()); Assertions.assertEquals(10, fileSystemBackedTableMetadata.getAllFilesInPartition(new Path(basePath + "/" + MULTI_LEVEL_PARTITIONS.get(0))).length); @@ -203,7 +203,7 @@ public class TestFileSystemBackedTableMetadata extends HoodieCommonTestHarness { }); HoodieLocalEngineContext localEngineContext = new HoodieLocalEngineContext(metaClient.getHadoopConf()); FileSystemBackedTableMetadata fileSystemBackedTableMetadata = - new FileSystemBackedTableMetadata(localEngineContext, new SerializableConfiguration(metaClient.getHadoopConf()), basePath, false); + new FileSystemBackedTableMetadata(localEngineContext, metaClient.getTableConfig(), new SerializableConfiguration(metaClient.getHadoopConf()), basePath, false); Assertions.assertEquals(3, fileSystemBackedTableMetadata.getAllPartitionPaths().size()); Assertions.assertEquals(0, fileSystemBackedTableMetadata.getAllFilesInPartition(new Path(basePath + "/" + MULTI_LEVEL_PARTITIONS.get(0))).length); diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkFilterHelper.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkFilterHelper.scala new file mode 100644 index 00000000000..5a9bc29089e --- /dev/null +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkFilterHelper.scala @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi + +import org.apache.hudi.expression.{Predicates, Expression, Literal, NameReference} +import org.apache.hudi.internal.schema.{Type, Types} +import org.apache.spark.sql.catalyst.util.DateTimeUtils +import org.apache.spark.sql.sources._ +import org.apache.spark.sql.types._ + +import java.sql.{Date, Timestamp} +import java.time.{Instant, LocalDate} +import scala.jdk.CollectionConverters.seqAsJavaListConverter + +object SparkFilterHelper { + + def convertFilters(filters: Seq[Filter]): Expression = { + filters.flatMap(convertFilter) + .reduceLeftOption(Predicates.and) + .getOrElse(Predicates.alwaysTrue()) + } + + def convertFilter(filter: Filter): Option[Expression] = filter match { + case EqualTo(attribute, value) => + Some(Predicates.eq(new NameReference(attribute), toLiteral(value))) + case EqualNullSafe(attribute, value) => + Some(Predicates.eq(new NameReference(attribute), toLiteral(value))) + case LessThan(attribute, value) => + Some(Predicates.lt(new NameReference(attribute), toLiteral(value))) + case LessThanOrEqual(attribute, value) => + Some(Predicates.lteq(new NameReference(attribute), toLiteral(value))) + case GreaterThan(attribute, value) => + Some(Predicates.gt(new NameReference(attribute), toLiteral(value))) + case GreaterThanOrEqual(attribute, value) => + Some(Predicates.gteq(new NameReference(attribute), toLiteral(value))) + case In(attribute, values) => + Some(Predicates.in(new NameReference(attribute), values.map(toLiteral(_).asInstanceOf[Expression]).toList.asJava)) + case And(left, right) => + for { + convertedLeft <- convertFilter(left) + convertedRight <- convertFilter(right) + } yield Predicates.and(convertedLeft, convertedRight) + case Or(left, right) => + for { + convertedLeft <- convertFilter(left) + convertedRight <- convertFilter(right) + } yield Predicates.or(convertedLeft, convertedRight) + case StringStartsWith(attribute, value) => + Some(Predicates.startsWith(new NameReference(attribute), toLiteral(value))) + case StringContains(attribute, value) => + Some(Predicates.contains(new NameReference(attribute), toLiteral(value))) + case Not(child) => + convertFilter(child).map(Predicates.not) + case IsNull(attribute) => + Some(Predicates.isNull(new NameReference(attribute))) + case IsNotNull(attribute) => + Some(Predicates.isNotNull(new NameReference(attribute))) + case _ => + None + } + + def toLiteral(value: Any): Literal[_] = { + value match { + case timestamp : Timestamp => + new Literal(DateTimeUtils.fromJavaTimestamp(timestamp), Types.TimestampType.get()) + case date: Date => + new Literal(DateTimeUtils.fromJavaDate(date), Types.DateType.get()) + case instant: Instant => + new Literal(DateTimeUtils.instantToMicros(instant), Types.TimestampType.get()) + case localDate: LocalDate => + new Literal(Math.toIntExact(localDate.toEpochDay), Types.TimestampType.get()) + case _ => + Literal.from(value) + } + } + + def convertDataType(sparkType: DataType): Type = sparkType match { + case StructType(fields) => + val convertedFields = fields.zipWithIndex.map { + case (field, idx) => + Types.Field.get(idx, field.nullable, field.name, convertDataType(field.dataType), field.getComment().orNull) + }.toList.asJava + Types.RecordType.get(convertedFields) + case BooleanType => + Types.BooleanType.get() + case IntegerType | ShortType | ByteType => + Types.IntType.get() + case LongType => + Types.LongType.get() + case FloatType => + Types.FloatType.get() + case DoubleType => + Types.DoubleType.get() + case StringType | CharType(_) | VarcharType(_) => + Types.StringType.get() + case DateType => + Types.DateType.get() + case TimestampType => + Types.TimestampType.get() + case _type: DecimalType => + Types.DecimalType.get(_type.precision, _type.scale) + case _ => + throw new UnsupportedOperationException(s"Cannot convert spark type $sparkType to the relate HUDI type") + } +} diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala index c76af7b39ce..35ef3e9f066 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala @@ -31,6 +31,8 @@ import org.apache.hudi.common.util.ValidationUtils.checkState import org.apache.hudi.config.HoodieBootstrapConfig.DATA_QUERIES_ONLY import org.apache.hudi.hadoop.CachingPath import org.apache.hudi.hadoop.CachingPath.createRelativePathUnsafe +import org.apache.hudi.internal.schema.Types.RecordType +import org.apache.hudi.internal.schema.utils.Conversions import org.apache.hudi.keygen.{StringPartitionPathFormatter, TimestampBasedAvroKeyGenerator, TimestampBasedKeyGenerator} import org.apache.hudi.util.JFunction import org.apache.spark.api.java.JavaSparkContext @@ -44,9 +46,11 @@ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String +import java.util.Collections import javax.annotation.concurrent.NotThreadSafe import scala.collection.JavaConverters._ import scala.language.implicitConversions +import scala.util.{Failure, Success, Try} /** * Implementation of the [[BaseHoodieTableFileIndex]] for Spark @@ -225,7 +229,7 @@ class SparkHoodieTableFileIndex(spark: SparkSession, logInfo("Partition path prefix analysis is disabled; falling back to fetching all partitions") getAllQueryPartitionPaths.asScala } else { - tryListByPartitionPathPrefix(partitionColumnNames, partitionPruningPredicates) + tryPushDownPartitionPredicates(partitionColumnNames, partitionPruningPredicates) } // NOTE: In some cases, like for ex, when non-encoded slash '/' is used w/in the partition column's value, @@ -260,7 +264,7 @@ class SparkHoodieTableFileIndex(spark: SparkSession, // NOTE: Here we try to to achieve efficiency in avoiding necessity to recursively list deep folder structures of // partitioned tables w/ multiple partition columns, by carefully analyzing provided partition predicates: // - // In cases when partition-predicates have + // 1. Firstly, when partition-predicates have // - The form of equality predicates w/ static literals (for ex, like `date = '2022-01-01'`) // - Fully specified proper prefix of the partition schema (ie fully binding first N columns // of the partition schema adhering to hereby described rules) @@ -274,11 +278,12 @@ class SparkHoodieTableFileIndex(spark: SparkSession, // // country_code: string (for ex, 'us') // date: string (for ex, '2022-01-01') + // hour: string (for ex, '08') // // Table's folder structure: // us/ - // |- 2022-01-01/ - // |- 2022-01-02/ + // |- 2022-01-01/06 + // |- 2022-01-02/07 // ... // // In case we have incoming query specifies following predicates: @@ -286,7 +291,15 @@ class SparkHoodieTableFileIndex(spark: SparkSession, // `... WHERE country_code = 'us' AND date = '2022-01-01'` // // We can deduce full partition-path w/o doing a single listing: `us/2022-01-01` - private def tryListByPartitionPathPrefix(partitionColumnNames: Seq[String], partitionColumnPredicates: Seq[Expression]) = { + // + // 2. Try to push down all partition predicates when listing the sub-folder. + // In case we have incoming query specifies following predicates: + // + // `... WHERE country_code = 'us' AND date = '2022-01-01' and hour = '06'` + // + // We can deduce full partition-path w/o doing a single listing: `us/2022-01-01`, and then push down + // these filters when listing `us/2022-01-01` to get the directory 'us/2022-01-01/06' + private def tryPushDownPartitionPredicates(partitionColumnNames: Seq[String], partitionColumnPredicates: Seq[Expression]): Seq[PartitionPath] = { // Static partition-path prefix is defined as a prefix of the full partition-path where only // first N partition columns (in-order) have proper (static) values bound in equality predicates, // allowing in turn to build such prefix to be used in subsequent filtering @@ -301,24 +314,59 @@ class SparkHoodieTableFileIndex(spark: SparkSession, .map(colName => (colName, (staticPartitionColumnValuesMap(colName)._1, staticPartitionColumnValuesMap(colName)._2.get))) } - if (staticPartitionColumnNameValuePairs.isEmpty) { - logDebug("Unable to compose relative partition path prefix from the predicates; falling back to fetching all partitions") - getAllQueryPartitionPaths.asScala - } else { - // Based on the static partition-column name-value pairs, we'll try to compose static partition-path - // prefix to try to reduce the scope of the required file-listing - val relativePartitionPathPrefix = composeRelativePartitionPath(staticPartitionColumnNameValuePairs) - - if (!metaClient.getFs.exists(new Path(getBasePath, relativePartitionPathPrefix))) { - Seq() - } else if (staticPartitionColumnNameValuePairs.length == partitionColumnNames.length) { - // In case composed partition path is complete, we can return it directly avoiding extra listing operation - Seq(new PartitionPath(relativePartitionPathPrefix, staticPartitionColumnNameValuePairs.map(_._2._2.asInstanceOf[AnyRef]).toArray)) - } else { - // Otherwise, compile extracted partition values (from query predicates) into a sub-path which is a prefix - // of the complete partition path, do listing for this prefix-path only - listPartitionPaths(Seq(relativePartitionPathPrefix).toList.asJava).asScala + val hiveStylePartitioning = metaClient.getTableConfig.getHiveStylePartitioningEnable.toBoolean + val urlEncodePartitioning = metaClient.getTableConfig.getUrlEncodePartitioning.toBoolean + + val partitionTypesOption =if (hiveStylePartitioning && urlEncodePartitioning) { + Try { + SparkFilterHelper.convertDataType(partitionSchema).asInstanceOf[RecordType] + } match { + case Success(partitionRecordType) + if partitionRecordType.fields().size() == _partitionSchemaFromProperties.size + && Conversions.isPartitionSchemaSupportedConversion(partitionRecordType) => + Some(partitionRecordType) + case _ => + None } + } else { + // Avoid convert partition schemas if hivestylePartitioning & urlEncodePartitioning is not enabled. + None + } + + (staticPartitionColumnNameValuePairs.isEmpty, partitionTypesOption) match { + case (true, Some(partitionTypes)) => + // Push down partition filters without pathPrefix + val convertedFilters = SparkFilterHelper.convertFilters( + partitionColumnPredicates.flatMap { + expr => sparkAdapter.translateFilter(expr) + }) + listPartitionPaths(Collections.singletonList(""), partitionTypes, convertedFilters).asScala + case (true, None) => + logDebug("Unable to compose relative partition path prefix from the predicates; falling back to fetching all partitions") + getAllQueryPartitionPaths.asScala + case (false, _) => + // Based on the static partition-column name-value pairs, we'll try to compose static partition-path + // prefix to try to reduce the scope of the required file-listing + val relativePartitionPathPrefix = composeRelativePartitionPath(staticPartitionColumnNameValuePairs) + + if (!metaClient.getFs.exists(new Path(getBasePath, relativePartitionPathPrefix))) { + Seq() + } else if (staticPartitionColumnNameValuePairs.length == partitionColumnNames.length) { + // In case composed partition path is complete, we can return it directly avoiding extra listing operation + Seq(new PartitionPath(relativePartitionPathPrefix, staticPartitionColumnNameValuePairs.map(_._2._2.asInstanceOf[AnyRef]).toArray)) + } else { + partitionTypesOption.map { partitionTypes => + // Try to composite path prefix and filters to gain better performance + val convertedFilters = SparkFilterHelper.convertFilters( + partitionColumnPredicates.flatMap { + expr => sparkAdapter.translateFilter(expr) + }) + listPartitionPaths(Seq(relativePartitionPathPrefix).toList.asJava, partitionTypes, convertedFilters).asScala + }.getOrElse { + log.warn("Met incompatible issue when converting to hudi data type, rollback to list by prefix directly") + listPartitionPaths(Seq(relativePartitionPathPrefix).toList.asJava).asScala + } + } } } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieFileIndex.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieFileIndex.scala index f8f082489ce..ba5c2edb2d1 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieFileIndex.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieFileIndex.scala @@ -383,6 +383,66 @@ class TestHoodieFileIndex extends HoodieSparkClientTestBase with ScalaAssertionS } } + /** + * This test mainly ensures all non-partition-prefix filter can be pushed successfully + */ + @ParameterizedTest + @CsvSource(value = Array("true, false", "false, false", "true, true", "false, true")) + def testPartitionPruneWithMultiplePartitionColumnsWithComplexExpression(useMetadataTable: Boolean, + complexExpressionPushDown: Boolean): Unit = { + val _spark = spark + import _spark.implicits._ + + val partitionNames = Seq("prefix", "dt", "hh", "country") + val writerOpts: Map[String, String] = commonOpts ++ Map( + DataSourceWriteOptions.OPERATION.key -> DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL, + RECORDKEY_FIELD.key -> "id", + PRECOMBINE_FIELD.key -> "version", + PARTITIONPATH_FIELD.key -> partitionNames.mkString(","), + HoodieMetadataConfig.ENABLE.key -> useMetadataTable.toString + ) + + val readerOpts: Map[String, String] = queryOpts ++ Map( + HoodieMetadataConfig.ENABLE.key -> useMetadataTable.toString, + DataSourceReadOptions.FILE_INDEX_LISTING_MODE_OVERRIDE.key -> "lazy", + DataSourceReadOptions.FILE_INDEX_LISTING_PARTITION_PATH_PREFIX_ANALYSIS_ENABLED.key -> "true" + ) + + // Add a prefix "default" to ensure `PushDownByPartitionPrefix` not work + val inputDF1 = (for (i <- 0 until 10) yield (i, s"a$i", 10 + i, 10000, + "default", s"2021-03-0${i % 2 + 1}", i % 6 + 1, if (i % 2 == 0) "CN" else "SG")) + .toDF("id", "name", "price", "version", "prefix", "dt", "hh", "country") + + inputDF1.write.format("hudi") + .options(writerOpts) + .option(DataSourceWriteOptions.URL_ENCODE_PARTITIONING.key, complexExpressionPushDown.toString) + .option(DataSourceWriteOptions.HIVE_STYLE_PARTITIONING.key, complexExpressionPushDown.toString) + .mode(SaveMode.Overwrite) + .save(basePath) + + // NOTE: We're init-ing file-index in advance to additionally test refreshing capability + metaClient = HoodieTableMetaClient.reload(metaClient) + val fileIndex = HoodieFileIndex(spark, metaClient, None, readerOpts) + + val partitionFilters = EqualTo(attribute("hh"), Literal.create(5)) + + val partitionAndFilesAfterPrune = fileIndex.listFiles(Seq(partitionFilters), Seq.empty) + assertEquals(1, partitionAndFilesAfterPrune.size) + + assertEquals(fileIndex.areAllPartitionPathsCached(), !complexExpressionPushDown) + + val PartitionDirectory(partitionActualValues, filesAfterPrune) = partitionAndFilesAfterPrune.head + val partitionExpectValues = Seq("default", "2021-03-01", "5", "CN") + assertEquals(partitionExpectValues.mkString(","), partitionActualValues.toSeq(Seq(StringType)).mkString(",")) + assertEquals(getFileCountInPartitionPath(makePartitionPath(partitionNames, partitionExpectValues, complexExpressionPushDown)), + filesAfterPrune.size) + + val readDF = spark.read.format("hudi").options(readerOpts).load() + + assertEquals(10, readDF.count()) + assertEquals(1, readDF.filter("hh = 5").count()) + } + @ParameterizedTest @CsvSource(value = Array("true", "false")) def testFileListingPartitionPrefixAnalysis(enablePartitionPathPrefixAnalysis: Boolean): Unit = { @@ -686,6 +746,18 @@ class TestHoodieFileIndex extends HoodieSparkClientTestBase with ScalaAssertionS private def getFileCountInPartitionPaths(partitionPaths: String*): Int = { partitionPaths.map(getFileCountInPartitionPath).sum } + + private def makePartitionPath(partitionNames: Seq[String], + partitionValues: Seq[String], + hiveStylePartitioning: Boolean): String = { + if (hiveStylePartitioning) { + partitionNames.zip(partitionValues).map { + case (name, value) => s"$name=$value" + }.mkString(Path.SEPARATOR) + } else { + partitionValues.mkString(Path.SEPARATOR) + } + } } object TestHoodieFileIndex { diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestSparkFilterHelper.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestSparkFilterHelper.scala new file mode 100644 index 00000000000..7d53596ac10 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestSparkFilterHelper.scala @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi + +import org.apache.hudi.SparkFilterHelper.convertFilter +import org.apache.hudi.expression.{Expression, NameReference, Predicates, Literal => HLiteral} +import org.apache.hudi.testutils.HoodieClientTestHarness +import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.functions._ +import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.types.UTF8String +import org.junit.jupiter.api.{Assertions, Test} + +import scala.jdk.CollectionConverters.seqAsJavaListConverter + +class TestSparkFilterHelper extends HoodieClientTestHarness with SparkAdapterSupport { + + @Test + def testConvertInExpression(): Unit = { + val filterExpr = sparkAdapter.translateFilter( + expr("col1 IN (1, 2, 3)").expr.transformUp { + case UnresolvedAttribute(nameParts) => AttributeReference(nameParts.mkString("."), IntegerType)() + }) + + val result = SparkFilterHelper.convertFilter(filterExpr.get).get + + val expected = Predicates.in( + new NameReference("col1"), + Seq(1, 2, 3).map(v => HLiteral.from(v).asInstanceOf[Expression]).asJava) + + Assertions.assertEquals(result.toString, expected.toString) + } + + @Test + def testConvertInSetExpression(): Unit = { + val filterExpr = sparkAdapter.translateFilter( + InSet(AttributeReference("col1", StringType)(), Set("value1", "value2", "value3").map(UTF8String.fromString))) + + val result = SparkFilterHelper.convertFilter(filterExpr.get).get + + val expected = Predicates.in( + new NameReference("col1"), + Seq("value1", "value2", "value3").map(v => HLiteral.from(v).asInstanceOf[Expression]).asJava) + + Assertions.assertEquals(result.toString, expected.toString) + } + + @Test + def testConvertEqualToExpression(): Unit = { + val filter = sparkAdapter.translateFilter(EqualTo(AttributeReference("col1", LongType)(), Literal(1L))) + val result = convertFilter(filter.get).get + + val expected = Predicates.eq( + new NameReference("col1"), + HLiteral.from(1L).asInstanceOf[Expression]) + + Assertions.assertEquals(result.toString, expected.toString) + } + + @Test + def testConvertGreaterThanExpression(): Unit = { + val filter = sparkAdapter.translateFilter(GreaterThan(AttributeReference("col3", DoubleType)(), Literal(3.0D))) + val result = convertFilter(filter.get).get + + val expected = Predicates.gt( + new NameReference("col3"), + HLiteral.from(3.0D).asInstanceOf[Expression]) + + Assertions.assertEquals(result.toString, expected.toString) + } + + @Test + def testConvertGreaterThanOrEqualExpression(): Unit = { + val filter = sparkAdapter.translateFilter(GreaterThanOrEqual(AttributeReference("col4", FloatType)(), Literal(4.0f))) + val result = convertFilter(filter.get).get + + val expected = Predicates.gteq( + new NameReference("col4"), + HLiteral.from(4.0f).asInstanceOf[Expression]) + + Assertions.assertEquals(result.toString, expected.toString) + } + + @Test + def testConvertLessThanExpression(): Unit = { + val filter = sparkAdapter.translateFilter(LessThan(AttributeReference("col5", StringType)(), Literal("abc"))) + val result = convertFilter(filter.get).get + + val expected = Predicates.lt( + new NameReference("col5"), + HLiteral.from("abc").asInstanceOf[Expression]) + + Assertions.assertEquals(result.toString, expected.toString) + } + + @Test + def testConvertLessThanOrEqualExpression(): Unit = { + val filter = sparkAdapter.translateFilter(LessThanOrEqual(AttributeReference("col6", BooleanType)(), Literal(true))) + val result = convertFilter(filter.get).get + + val expected = Predicates.lteq( + new NameReference("col6"), + HLiteral.from(true).asInstanceOf[Expression]) + + Assertions.assertEquals(result.toString, expected.toString) + } + + @Test + def testConvertStartsWithExpression(): Unit = { + val filter = sparkAdapter.translateFilter(StartsWith(AttributeReference("col2", StringType)(), Literal("prefix"))) + val result = convertFilter(filter.get).get + + val expected = Predicates.startsWith( + new NameReference("col2"), + HLiteral.from("prefix").asInstanceOf[Expression]) + + Assertions.assertEquals(result.toString, expected.toString) + } + + @Test + def testConvertContainsExpression(): Unit = { + val filter = sparkAdapter.translateFilter(Contains(AttributeReference("col2", StringType)(), Literal("prefix"))) + val result = convertFilter(filter.get).get + + val expected = Predicates.contains( + new NameReference("col2"), + HLiteral.from("prefix").asInstanceOf[Expression]) + + Assertions.assertEquals(result.toString, expected.toString) + } + + @Test + def testConvertAndExpression(): Unit = { + val filter = sparkAdapter.translateFilter(And( + EqualTo(AttributeReference("col1", IntegerType)(), Literal(1)), + GreaterThan(AttributeReference("col2", FloatType)(), Literal(2.0F)))) + val result = convertFilter(filter.get).get + + val expected = Predicates.and( + Predicates.eq( + new NameReference("col1"), + HLiteral.from(1).asInstanceOf[Expression]), + Predicates.gt( + new NameReference("col2"), + HLiteral.from(2.0F).asInstanceOf[Expression])) + + Assertions.assertEquals(result.toString, expected.toString) + } + + @Test + def testConvertOrEqualExpression(): Unit = { + val filter = sparkAdapter.translateFilter(Or( + LessThan(AttributeReference("col1", ShortType)(), Literal(10.toShort)), + GreaterThanOrEqual(AttributeReference("col2", DoubleType)(), Literal(100.0D)))) + val result = convertFilter(filter.get).get + + val expected = Predicates.or( + Predicates.lt( + new NameReference("col1"), + HLiteral.from(10.toShort).asInstanceOf[Expression]), + Predicates.gteq( + new NameReference("col2"), + HLiteral.from(100.0D).asInstanceOf[Expression])) + + Assertions.assertEquals(result.toString, expected.toString) + } +} diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestHoodieInternalRowUtils.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestHoodieInternalRowUtils.scala index be1dae8cc27..35afff918b9 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestHoodieInternalRowUtils.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestHoodieInternalRowUtils.scala @@ -220,7 +220,7 @@ class TestHoodieInternalRowUtils extends FunSuite with Matchers with BeforeAndAf avroRecord.put("preferences", preferencesRecord) // fill mapType val locations = new HashMap[String, GenericData.Record] - val mapSchema = AvroInternalSchemaConverter.convert(record.field("locations").`type`.asInstanceOf[Types.MapType].valueType, "test1_locations") + val mapSchema = AvroInternalSchemaConverter.convert(record.fieldByNameCaseInsensitive("locations").`type`.asInstanceOf[Types.MapType].valueType, "test1_locations") val locationsValue: GenericData.Record = new GenericData.Record(mapSchema) locationsValue.put("lat", 1.2f) locationsValue.put("long", 1.4f) diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestLazyPartitionPathFetching.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestLazyPartitionPathFetching.scala index 0467b6664c6..e2635c0cba8 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestLazyPartitionPathFetching.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestLazyPartitionPathFetching.scala @@ -55,6 +55,36 @@ class TestLazyPartitionPathFetching extends HoodieSparkSqlTestBase { } } + test("Test querying with date column + partition pruning") { + withTempDir { tmp => + val tableName = generateTableName + spark.sql( + s""" + |create table $tableName ( + | id int, + | name string, + | price double, + | ts long, + | grass_date date + |) using hudi + | location '${tmp.getCanonicalPath}' + | tblproperties ( + | primaryKey ='id', + | type = 'cow', + | preCombineField = 'ts' + | ) + | PARTITIONED BY (grass_date) + """.stripMargin) + spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000, date('2023-02-27'))") + spark.sql(s"insert into $tableName values(2, 'a2', 10, 1000, date('2023-02-28'))") + spark.sql(s"insert into $tableName values(3, 'a3', 10, 1000, date('2023-03-01'))") + + checkAnswer(s"select id, name, price, ts from $tableName where grass_date = date'2023-03-01' order by id")( + Seq(3, "a3", 10.0, 1000) + ) + } + } + test("Test querying with date column + partition pruning (multi-level partitioning)") { withTempDir { tmp => val tableName = generateTableName diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestPartitionPushDownWhenListingPaths.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestPartitionPushDownWhenListingPaths.scala new file mode 100644 index 00000000000..1b5e590913f --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestPartitionPushDownWhenListingPaths.scala @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hudi + +import org.apache.hudi.common.config.HoodieMetadataConfig + +class TestPartitionPushDownWhenListingPaths extends HoodieSparkSqlTestBase { + + test("Test push down different partitions") { + Seq("true", "false").foreach { enableMetadata => + withSQLConf(HoodieMetadataConfig.ENABLE.key -> enableMetadata) { + Seq("cow", "mor").foreach { tableType => + withTempDir { tmp => + val tableName = generateTableName + spark.sql( + s""" + |create table $tableName ( + | id int, + | name string, + | price double, + | ts long, + | date_par date, + | country string, + | hour int, + | longValue long + |) using hudi + | location '${tmp.getCanonicalPath}' + | tblproperties ( + | primaryKey ='id', + | type = '$tableType', + | preCombineField = 'ts', + | hoodie.datasource.write.hive_style_partitioning = 'true', + | hoodie.datasource.write.partitionpath.urlencode = 'true' + | ) + | PARTITIONED BY (date_par, country, hour, longValue)""".stripMargin) + spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000, date '2023-02-27', 'ID', 1, 102345L)") + spark.sql(s"insert into $tableName values(2, 'a2', 10, 1000, date '2023-02-28', 'US', 4, 102346L)") + spark.sql(s"insert into $tableName values(3, 'a3', 10, 1000, date '2023-03-01', 'CN', 10, 102347L)") + + // Only filter one partition column + checkAnswer(s"select id, name, price, ts from $tableName where date_par = date'2023-03-01' order by id")( + Seq(3, "a3", 10.0, 1000) + ) + + // Filter with And operation + checkAnswer(s"select id, name, price, ts from $tableName where date_par = date'2023-02-28' and hour = 4 order by id")( + Seq(2, "a2", 10.0, 1000) + ) + + // Filter with Or operation + checkAnswer(s"select id, name, price, ts from $tableName where date_par = date'2023-02-28' or country = 'CN' order by id")( + Seq(2, "a2", 10.0, 1000), + Seq(3, "a3", 10.0, 1000) + ) + + // Filter with GT + checkAnswer(s"select id, name, price, ts from $tableName where date_par > date'2023-02-27' order by id")( + Seq(2, "a2", 10.0, 1000), + Seq(3, "a3", 10.0, 1000) + ) + + // Filter with LT + checkAnswer(s"select id, name, price, ts from $tableName where longValue < 102346L order by id")( + Seq(1, "a1", 10.0, 1000) + ) + + // Filter with EQ + checkAnswer(s"select id, name, price, ts from $tableName where longValue = 102346L order by id")( + Seq(2, "a2", 10.0, 1000) + ) + + // Filter with GT_EQ + checkAnswer(s"select id, name, price, ts from $tableName where date_par >= date'2023-02-27' order by id")( + Seq(1, "a1", 10.0, 1000), + Seq(2, "a2", 10.0, 1000), + Seq(3, "a3", 10.0, 1000) + ) + + // Filter with LT_EQ + checkAnswer(s"select id, name, price, ts from $tableName where date_par <= date'2023-02-27' order by id")( + Seq(1, "a1", 10.0, 1000) + ) + + // Filter with In operation + checkAnswer(s"select id, name, price, ts from $tableName where country in ('CN', 'US') order by id")( + Seq(2, "a2", 10.0, 1000), + Seq(3, "a3", 10.0, 1000) + ) + } + } + } + } + } +} diff --git a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala index 9eddf81d121..cc2e25f9891 100644 --- a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala +++ b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala @@ -38,7 +38,7 @@ import org.apache.spark.sql.execution.vectorized.MutableColumnarRow import org.apache.spark.sql.hudi.SparkAdapter import org.apache.spark.sql.hudi.parser.HoodieSpark2ExtendedSqlParser import org.apache.spark.sql.parser.HoodieExtendedParserInterface -import org.apache.spark.sql.sources.BaseRelation +import org.apache.spark.sql.sources.{BaseRelation, Filter} import org.apache.spark.sql.types.{DataType, Metadata, MetadataBuilder, StructType} import org.apache.spark.storage.StorageLevel import org.apache.spark.storage.StorageLevel._ @@ -187,4 +187,17 @@ class Spark2Adapter extends SparkAdapter { case OFF_HEAP => "OFF_HEAP" case _ => throw new IllegalArgumentException(s"Invalid StorageLevel: $level") } + + /** + * Spark2 doesn't support nestedPredicatePushdown, + * so fail it if [[supportNestedPredicatePushdown]] is true here. + */ + override def translateFilter(predicate: Expression, + supportNestedPredicatePushdown: Boolean = false): Option[Filter] = { + if (supportNestedPredicatePushdown) { + throw new UnsupportedOperationException("Nested predicate push down is not supported") + } + + DataSourceStrategy.translateFilter(predicate) + } } diff --git a/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark3Adapter.scala b/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark3Adapter.scala index d5a35980526..ce9499ae7d2 100644 --- a/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark3Adapter.scala +++ b/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark3Adapter.scala @@ -31,7 +31,7 @@ import org.apache.spark.sql.catalyst.expressions.{Expression, InterpretedPredica import org.apache.spark.sql.catalyst.util.DateFormatter import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.hudi.SparkAdapter -import org.apache.spark.sql.sources.BaseRelation +import org.apache.spark.sql.sources.{BaseRelation, Filter} import org.apache.spark.sql.{HoodieSpark3CatalogUtils, SQLContext, SparkSession} import org.apache.spark.sql.types.StructType import org.apache.spark.storage.StorageLevel @@ -92,4 +92,9 @@ abstract class BaseSpark3Adapter extends SparkAdapter with Logging { } override def convertStorageLevelToString(level: StorageLevel): String + + override def translateFilter(predicate: Expression, + supportNestedPredicatePushdown: Boolean = false): Option[Filter] = { + DataSourceStrategy.translateFilter(predicate, supportNestedPredicatePushdown) + } } diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/FilterGenVisitor.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/FilterGenVisitor.java index f1d4724cc1b..f42b157727c 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/FilterGenVisitor.java +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/FilterGenVisitor.java @@ -18,13 +18,14 @@ package org.apache.hudi.hive.util; -import org.apache.hudi.hive.expression.AttributeReferenceExpression; -import org.apache.hudi.hive.expression.BinaryOperator; -import org.apache.hudi.hive.expression.Expression; -import org.apache.hudi.hive.expression.ExpressionVisitor; -import org.apache.hudi.hive.expression.Literal; - -import java.util.Locale; +import org.apache.hudi.expression.NameReference; +import org.apache.hudi.expression.BoundReference; +import org.apache.hudi.expression.Expression; +import org.apache.hudi.expression.ExpressionVisitor; +import org.apache.hudi.expression.Literal; +import org.apache.hudi.expression.Predicate; +import org.apache.hudi.expression.Predicates; +import org.apache.hudi.internal.schema.Types; public class FilterGenVisitor implements ExpressionVisitor<String> { @@ -42,9 +43,10 @@ public class FilterGenVisitor implements ExpressionVisitor<String> { } } - private String visitAnd(Expression left, Expression right) { - String leftResult = left.accept(this); - String rightResult = right.accept(this); + @Override + public String visitAnd(Predicates.And and) { + String leftResult = and.getLeft().accept(this); + String rightResult = and.getRight().accept(this); if (leftResult.isEmpty()) { if (rightResult.isEmpty()) { @@ -59,9 +61,10 @@ public class FilterGenVisitor implements ExpressionVisitor<String> { return "(" + makeBinaryOperatorString(leftResult, Expression.Operator.AND, rightResult) + ")"; } - private String visitOr(Expression left, Expression right) { - String leftResult = left.accept(this); - String rightResult = right.accept(this); + @Override + public String visitOr(Predicates.Or or) { + String leftResult = or.getLeft().accept(this); + String rightResult = or.getRight().accept(this); if (!leftResult.isEmpty() && !rightResult.isEmpty()) { return "(" + makeBinaryOperatorString(leftResult, Expression.Operator.OR, rightResult) + ")"; @@ -81,39 +84,46 @@ public class FilterGenVisitor implements ExpressionVisitor<String> { } @Override - public String visitBinaryOperator(BinaryOperator expr) { - switch (expr.getOperator()) { - case AND: - return visitAnd(expr.getLeft(), expr.getRight()); - case OR: - return visitOr(expr.getLeft(), expr.getRight()); - case EQ: - case GT: - case LT: - case GT_EQ: - case LT_EQ: - return visitBinaryComparator(expr.getLeft(), expr.getOperator(), expr.getRight()); - default: - return ""; + public String visitPredicate(Predicate predicate) { + if (predicate instanceof Predicates.BinaryComparison) { + Predicates.BinaryComparison expr = (Predicates.BinaryComparison) predicate; + return visitBinaryComparator(expr.getLeft(), expr.getOperator(), expr.getRight()); } + + return ""; + } + + @Override + public String alwaysTrue() { + return ""; + } + + @Override + public String alwaysFalse() { + return ""; } @Override public String visitLiteral(Literal literalExpr) { - switch (literalExpr.getType().toLowerCase(Locale.ROOT)) { - case HiveSchemaUtil.STRING_TYPE_NAME: - return quoteStringLiteral(literalExpr.getValue()); - case HiveSchemaUtil.INT_TYPE_NAME: - case HiveSchemaUtil.BIGINT_TYPE_NAME: - case HiveSchemaUtil.DATE_TYPE_NAME: - return literalExpr.getValue(); - default: - return ""; + if (literalExpr.getDataType() instanceof Types.StringType) { + return quoteStringLiteral(((Literal<String>)literalExpr).getValue()); } + + if (literalExpr.getDataType() instanceof Types.IntType || literalExpr.getDataType() instanceof Types.LongType + || literalExpr.getDataType() instanceof Types.DateType) { + return literalExpr.getValue().toString(); + } + + return ""; } @Override - public String visitAttribute(AttributeReferenceExpression attribute) { + public String visitNameReference(NameReference attribute) { return attribute.getName(); } + + @Override + public String visitBoundReference(BoundReference boundReference) { + throw new UnsupportedOperationException("BoundReference cannot be used to build filter string"); + } } diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/PartitionFilterGenerator.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/PartitionFilterGenerator.java index 8202408d8bd..9ff22d2d5dc 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/PartitionFilterGenerator.java +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/util/PartitionFilterGenerator.java @@ -20,12 +20,14 @@ package org.apache.hudi.hive.util; import org.apache.hudi.common.util.ReflectionUtils; import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.expression.Predicates; import org.apache.hudi.hive.HiveSyncConfig; import org.apache.hudi.hive.HoodieHiveSyncException; -import org.apache.hudi.hive.expression.AttributeReferenceExpression; -import org.apache.hudi.hive.expression.BinaryOperator; -import org.apache.hudi.hive.expression.Expression; -import org.apache.hudi.hive.expression.Literal; +import org.apache.hudi.expression.NameReference; +import org.apache.hudi.expression.BinaryExpression; +import org.apache.hudi.expression.Expression; +import org.apache.hudi.expression.Literal; +import org.apache.hudi.internal.schema.Types; import org.apache.hudi.sync.common.model.FieldSchema; import org.apache.hudi.sync.common.model.Partition; import org.apache.hudi.sync.common.model.PartitionValueExtractor; @@ -54,6 +56,28 @@ public class PartitionFilterGenerator { } }; + private static final String UNSUPPORTED_TYPE_ERROR = "The value type: %s doesn't support to " + + "be pushed down to HMS, acceptable types: " + String.join(",", SUPPORT_TYPES); + + private static Literal buildLiteralExpression(String fieldValue, String fieldType) { + switch (fieldType.toLowerCase(Locale.ROOT)) { + case HiveSchemaUtil.INT_TYPE_NAME: + return new Literal<>(Integer.parseInt(fieldValue), Types.IntType.get()); + case HiveSchemaUtil.BIGINT_TYPE_NAME: + return new Literal<>(Long.parseLong(fieldValue), Types.LongType.get()); + // TODO Handle Date value + case HiveSchemaUtil.DATE_TYPE_NAME: + return new Literal<>(fieldValue, Types.DateType.get()); + case HiveSchemaUtil.STRING_TYPE_NAME: + return new Literal<>(fieldValue, Types.StringType.get()); + case HiveSchemaUtil.BOOLEAN_TYPE_NAME: + return new Literal<>(Boolean.parseBoolean(fieldValue), Types.BooleanType.get()); + default: + throw new IllegalArgumentException(String.format(UNSUPPORTED_TYPE_ERROR, fieldType)); + } + } + + /** * Build expression from the Partition list. Here we're trying to match all partitions. * @@ -68,11 +92,10 @@ public class PartitionFilterGenerator { for (int i = 0; i < partitionFields.size(); i++) { FieldSchema field = partitionFields.get(i); - String value = partitionValues.get(i); - BinaryOperator exp = BinaryOperator.eq(new AttributeReferenceExpression(field.getName()), - new Literal(value, field.getType())); + BinaryExpression exp = Predicates.eq(new NameReference(field.getName()), + buildLiteralExpression(partitionValues.get(i), field.getType())); if (root != null) { - root = BinaryOperator.and(root, exp); + root = Predicates.and(root, exp); } else { root = exp; } @@ -82,7 +105,7 @@ public class PartitionFilterGenerator { if (result == null) { return expr; } else { - return BinaryOperator.or(result, expr); + return Predicates.or(result, expr); } }); } @@ -129,8 +152,7 @@ public class PartitionFilterGenerator { case HiveSchemaUtil.STRING_TYPE_NAME: return s1.compareTo(s2); default: - throw new IllegalArgumentException("The value type: " + valueType + " doesn't support to " - + "be pushed down to HMS, acceptable types: " + String.join(",", SUPPORT_TYPES)); + throw new IllegalArgumentException(String.format(UNSUPPORTED_TYPE_ERROR, valueType)); } } } @@ -152,26 +174,26 @@ public class PartitionFilterGenerator { String[] values = fieldWithValues.getValue(); if (values.length == 1) { - return BinaryOperator.eq(new AttributeReferenceExpression(fieldSchema.getName()), - new Literal(values[0], fieldSchema.getType())); + return Predicates.eq(new NameReference(fieldSchema.getName()), + buildLiteralExpression(values[0], fieldSchema.getType())); } Arrays.sort(values, new ValueComparator(fieldSchema.getType())); - return BinaryOperator.and( - BinaryOperator.gteq( - new AttributeReferenceExpression(fieldSchema.getName()), - new Literal(values[0], fieldSchema.getType())), - BinaryOperator.lteq( - new AttributeReferenceExpression(fieldSchema.getName()), - new Literal(values[values.length - 1], fieldSchema.getType()))); + return Predicates.and( + Predicates.gteq( + new NameReference(fieldSchema.getName()), + buildLiteralExpression(values[0], fieldSchema.getType())), + Predicates.lteq( + new NameReference(fieldSchema.getName()), + buildLiteralExpression(values[values.length - 1], fieldSchema.getType()))); }) .filter(Objects::nonNull) .reduce(null, (result, expr) -> { if (result == null) { return expr; } else { - return BinaryOperator.and(result, expr); + return Predicates.and(result, expr); } }); } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieDataTableValidator.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieDataTableValidator.java index 0c85d44d068..3f1a19421ac 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieDataTableValidator.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieDataTableValidator.java @@ -297,7 +297,7 @@ public class HoodieDataTableValidator implements Serializable { HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc); try { HoodieTableMetadata tableMetadata = new FileSystemBackedTableMetadata( - engineContext, engineContext.getHadoopConf(), cfg.basePath, cfg.assumeDatePartitioning); + engineContext, metaClient.getTableConfig(), engineContext.getHadoopConf(), cfg.basePath, cfg.assumeDatePartitioning); List<Path> allDataFilePaths = HoodieDataTableUtils.getBaseAndLogFilePathsFromFileSystem(tableMetadata, cfg.basePath); // verify that no data files present with commit time < earliest commit in active timeline. if (metaClient.getActiveTimeline().firstInstant().isPresent()) { diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieRepairTool.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieRepairTool.java index 84fa604c76d..70146ef55c8 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieRepairTool.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieRepairTool.java @@ -164,7 +164,7 @@ public class HoodieRepairTool { .build(); this.tableMetadata = new FileSystemBackedTableMetadata( - context, context.getHadoopConf(), cfg.basePath, cfg.assumeDatePartitioning); + context, metaClient.getTableConfig(), context.getHadoopConf(), cfg.basePath, cfg.assumeDatePartitioning); } public boolean run() {