This is an automated email from the ASF dual-hosted git repository. uditme pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push: new 684622c [HUDI-1591] Implement Spark's FileIndex for Hudi to support queries via Hudi DataSource using non-globbed table path and partition pruning (#2651) 684622c is described below commit 684622c7c9fa6df9eb177b51cb1e7bd6dd16f78d Author: pengzhiwei <pengzhiwei2...@icloud.com> AuthorDate: Fri Apr 2 02:12:28 2021 +0800 [HUDI-1591] Implement Spark's FileIndex for Hudi to support queries via Hudi DataSource using non-globbed table path and partition pruning (#2651) --- .../apache/hudi/keygen/CustomAvroKeyGenerator.java | 6 +- .../org/apache/hudi/keygen/CustomKeyGenerator.java | 2 +- .../datasources/SparkParsePartitionUtil.scala | 34 ++ .../java/org/apache/hudi/common/fs/FSUtils.java | 10 + .../hudi/common/table/HoodieTableConfig.java | 10 + .../hudi/common/table/HoodieTableMetaClient.java | 13 + .../scala/org/apache/hudi/DataSourceOptions.scala | 3 + .../main/scala/org/apache/hudi/DefaultSource.scala | 103 +++--- .../org/apache/hudi/HoodieBootstrapRelation.scala | 18 +- .../scala/org/apache/hudi/HoodieFileIndex.scala | 362 +++++++++++++++++++++ .../org/apache/hudi/HoodieSparkSqlWriter.scala | 8 +- .../scala/org/apache/hudi/HoodieSparkUtils.scala | 12 +- .../scala/org/apache/hudi/HoodieWriterUtils.scala | 31 +- .../hudi/MergeOnReadIncrementalRelation.scala | 3 +- .../apache/hudi/MergeOnReadSnapshotRelation.scala | 70 ++-- .../org/apache/hudi/TestHoodieFileIndex.scala | 252 ++++++++++++++ .../apache/hudi/functional/TestCOWDataSource.scala | 52 ++- .../functional/TestDataSourceForBootstrap.scala | 39 ++- .../apache/hudi/functional/TestMORDataSource.scala | 52 +++ .../datasources/Spark2ParsePartitionUtil.scala | 33 ++ .../datasources/Spark3ParsePartitionUtil.scala | 39 +++ .../hudi/utilities/deltastreamer/DeltaSync.java | 6 + 22 files changed, 1075 insertions(+), 83 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/CustomAvroKeyGenerator.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/CustomAvroKeyGenerator.java index 724cabd..3b927c9 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/CustomAvroKeyGenerator.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/CustomAvroKeyGenerator.java @@ -44,7 +44,7 @@ import java.util.stream.Collectors; public class CustomAvroKeyGenerator extends BaseKeyGenerator { private static final String DEFAULT_PARTITION_PATH_SEPARATOR = "/"; - private static final String SPLIT_REGEX = ":"; + public static final String SPLIT_REGEX = ":"; /** * Used as a part of config in CustomKeyGenerator.java. @@ -117,8 +117,4 @@ public class CustomAvroKeyGenerator extends BaseKeyGenerator { public String getDefaultPartitionPathSeparator() { return DEFAULT_PARTITION_PATH_SEPARATOR; } - - public String getSplitRegex() { - return SPLIT_REGEX; - } } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/CustomKeyGenerator.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/CustomKeyGenerator.java index 77896d2..a2a3012 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/CustomKeyGenerator.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/CustomKeyGenerator.java @@ -90,7 +90,7 @@ public class CustomKeyGenerator extends BuiltinKeyGenerator { return ""; } for (String field : getPartitionPathFields()) { - String[] fieldWithType = field.split(customAvroKeyGenerator.getSplitRegex()); + String[] fieldWithType = field.split(customAvroKeyGenerator.SPLIT_REGEX); if (fieldWithType.length != 2) { throw new HoodieKeyGeneratorException("Unable to find field names for partition path in proper format"); } diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/execution/datasources/SparkParsePartitionUtil.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/execution/datasources/SparkParsePartitionUtil.scala new file mode 100644 index 0000000..fc2275b --- /dev/null +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/execution/datasources/SparkParsePartitionUtil.scala @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources + +import java.util.TimeZone + +import org.apache.hadoop.fs.Path +import org.apache.spark.sql.execution.datasources.PartitioningUtils.PartitionValues +import org.apache.spark.sql.types.DataType + +trait SparkParsePartitionUtil extends Serializable { + + def parsePartition( + path: Path, + typeInference: Boolean, + basePaths: Set[Path], + userSpecifiedDataTypes: Map[String, DataType], + timeZone: TimeZone): Option[PartitionValues] +} diff --git a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java index d37c617..9b229a3 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java @@ -276,6 +276,16 @@ public class FSUtils { } } + public static FileStatus[] getFilesInPartition(HoodieEngineContext engineContext, HoodieMetadataConfig metadataConfig, + String basePathStr, Path partitionPath) { + try (HoodieTableMetadata tableMetadata = HoodieTableMetadata.create(engineContext, + metadataConfig, basePathStr, FileSystemViewStorageConfig.DEFAULT_VIEW_SPILLABLE_DIR)) { + return tableMetadata.getAllFilesInPartition(partitionPath); + } catch (Exception e) { + throw new HoodieException("Error get files in partition: " + partitionPath, e); + } + } + public static String getFileExtension(String fullName) { Objects.requireNonNull(fullName); String fileName = new File(fullName).getName(); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java index f519c91..0b36e31 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java @@ -18,6 +18,7 @@ package org.apache.hudi.common.table; +import java.util.Arrays; import org.apache.hudi.common.bootstrap.index.HFileBootstrapIndex; import org.apache.hudi.common.model.HoodieFileFormat; import org.apache.hudi.common.model.HoodieTableType; @@ -57,6 +58,7 @@ public class HoodieTableConfig implements Serializable { public static final String HOODIE_TABLE_TYPE_PROP_NAME = "hoodie.table.type"; public static final String HOODIE_TABLE_VERSION_PROP_NAME = "hoodie.table.version"; public static final String HOODIE_TABLE_PRECOMBINE_FIELD = "hoodie.table.precombine.field"; + public static final String HOODIE_TABLE_PARTITION_COLUMNS = "hoodie.table.partition.columns"; @Deprecated public static final String HOODIE_RO_FILE_FORMAT_PROP_NAME = "hoodie.table.ro.file.format"; @@ -193,6 +195,14 @@ public class HoodieTableConfig implements Serializable { return props.getProperty(HOODIE_TABLE_PRECOMBINE_FIELD); } + public Option<String[]> getPartitionColumns() { + if (props.containsKey(HOODIE_TABLE_PARTITION_COLUMNS)) { + return Option.of(Arrays.stream(props.getProperty(HOODIE_TABLE_PARTITION_COLUMNS).split(",")) + .filter(p -> p.length() > 0).collect(Collectors.toList()).toArray(new String[]{})); + } + return Option.empty(); + } + /** * Read the payload class for HoodieRecords from the table properties. */ diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java index 5de3b9a..f4edeb8 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java @@ -596,6 +596,7 @@ public class HoodieTableMetaClient implements Serializable { private Integer timelineLayoutVersion; private String baseFileFormat; private String preCombineField; + private String partitionColumns; private String bootstrapIndexClass; private String bootstrapBasePath; @@ -646,6 +647,11 @@ public class HoodieTableMetaClient implements Serializable { return this; } + public PropertyBuilder setPartitionColumns(String partitionColumns) { + this.partitionColumns = partitionColumns; + return this; + } + public PropertyBuilder setBootstrapIndexClass(String bootstrapIndexClass) { this.bootstrapIndexClass = bootstrapIndexClass; return this; @@ -696,6 +702,9 @@ public class HoodieTableMetaClient implements Serializable { if (properties.containsKey(HoodieTableConfig.HOODIE_TABLE_PRECOMBINE_FIELD)) { setPreCombineField(properties.getProperty(HoodieTableConfig.HOODIE_TABLE_PRECOMBINE_FIELD)); } + if (properties.containsKey(HoodieTableConfig.HOODIE_TABLE_PARTITION_COLUMNS)) { + setPartitionColumns(properties.getProperty(HoodieTableConfig.HOODIE_TABLE_PARTITION_COLUMNS)); + } return this; } @@ -738,6 +747,10 @@ public class HoodieTableMetaClient implements Serializable { if (null != preCombineField) { properties.put(HoodieTableConfig.HOODIE_TABLE_PRECOMBINE_FIELD, preCombineField); } + + if (null != partitionColumns) { + properties.put(HoodieTableConfig.HOODIE_TABLE_PARTITION_COLUMNS, partitionColumns); + } return properties; } diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala index 51f32a2..4c76f5f 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala @@ -68,6 +68,9 @@ object DataSourceReadOptions { val READ_PRE_COMBINE_FIELD = HoodieWriteConfig.PRECOMBINE_FIELD_PROP + val ENABLE_HOODIE_FILE_INDEX = "hoodie.file.index.enable" + val DEFAULT_ENABLE_HOODIE_FILE_INDEX = true + @Deprecated val VIEW_TYPE_OPT_KEY = "hoodie.datasource.view.type" @Deprecated diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala index 3299b8f..0b8234d 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala @@ -19,14 +19,16 @@ package org.apache.hudi import org.apache.hadoop.fs.Path import org.apache.hudi.DataSourceReadOptions._ -import org.apache.hudi.common.model.{HoodieRecord, HoodieTableType} +import org.apache.hudi.common.model.HoodieRecord import org.apache.hudi.DataSourceWriteOptions.{BOOTSTRAP_OPERATION_OPT_VAL, OPERATION_OPT_KEY} import org.apache.hudi.common.fs.FSUtils +import org.apache.hudi.common.model.HoodieTableType.{COPY_ON_WRITE, MERGE_ON_READ} import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver} import org.apache.hudi.exception.HoodieException import org.apache.hudi.hadoop.HoodieROTablePathFilter import org.apache.log4j.LogManager -import org.apache.spark.sql.execution.datasources.DataSource +import org.apache.spark.sql.execution.datasources.{DataSource, FileStatusCache, HadoopFsRelation} +import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat import org.apache.spark.sql.execution.streaming.{Sink, Source} import org.apache.spark.sql.hudi.streaming.HoodieStreamSource import org.apache.spark.sql.sources._ @@ -79,39 +81,53 @@ class DefaultSource extends RelationProvider val allPaths = path.map(p => Seq(p)).getOrElse(Seq()) ++ readPaths val fs = FSUtils.getFs(allPaths.head, sqlContext.sparkContext.hadoopConfiguration) - val globPaths = HoodieSparkUtils.checkAndGlobPathIfNecessary(allPaths, fs) - - val tablePath = DataSourceUtils.getTablePath(fs, globPaths.toArray) + // Use the HoodieFileIndex only if the 'path' is not globbed. + // Or else we use the original way to read hoodie table. + val enableFileIndex = optParams.get(ENABLE_HOODIE_FILE_INDEX) + .map(_.toBoolean).getOrElse(DEFAULT_ENABLE_HOODIE_FILE_INDEX) + val useHoodieFileIndex = enableFileIndex && path.isDefined && !path.get.contains("*") && + !parameters.contains(DataSourceReadOptions.READ_PATHS_OPT_KEY) + val globPaths = if (useHoodieFileIndex) { + None + } else { + Some(HoodieSparkUtils.checkAndGlobPathIfNecessary(allPaths, fs)) + } + // Get the table base path + val tablePath = if (globPaths.isDefined) { + DataSourceUtils.getTablePath(fs, globPaths.get.toArray) + } else { + DataSourceUtils.getTablePath(fs, Array(new Path(path.get))) + } log.info("Obtained hudi table path: " + tablePath) val metaClient = HoodieTableMetaClient.builder().setConf(fs.getConf).setBasePath(tablePath).build() val isBootstrappedTable = metaClient.getTableConfig.getBootstrapBasePath.isPresent - log.info("Is bootstrapped table => " + isBootstrappedTable) - - if (parameters(QUERY_TYPE_OPT_KEY).equals(QUERY_TYPE_SNAPSHOT_OPT_VAL)) { - if (metaClient.getTableType.equals(HoodieTableType.MERGE_ON_READ)) { - if (isBootstrappedTable) { - // Snapshot query is not supported for Bootstrapped MOR tables - log.warn("Snapshot query is not supported for Bootstrapped Merge-on-Read tables." + - " Falling back to Read Optimized query.") - new HoodieBootstrapRelation(sqlContext, schema, globPaths, metaClient, optParams) - } else { - new MergeOnReadSnapshotRelation(sqlContext, optParams, schema, globPaths, metaClient) - } - } else { - getBaseFileOnlyView(sqlContext, parameters, schema, readPaths, isBootstrappedTable, globPaths, metaClient) - } - } else if(parameters(QUERY_TYPE_OPT_KEY).equals(QUERY_TYPE_READ_OPTIMIZED_OPT_VAL)) { - getBaseFileOnlyView(sqlContext, parameters, schema, readPaths, isBootstrappedTable, globPaths, metaClient) - } else if (parameters(QUERY_TYPE_OPT_KEY).equals(QUERY_TYPE_INCREMENTAL_OPT_VAL)) { - val metaClient = HoodieTableMetaClient.builder().setConf(fs.getConf).setBasePath(tablePath).build() - if (metaClient.getTableType.equals(HoodieTableType.MERGE_ON_READ)) { - new MergeOnReadIncrementalRelation(sqlContext, optParams, schema, metaClient) - } else { - new IncrementalRelation(sqlContext, optParams, schema, metaClient) - } - } else { - throw new HoodieException("Invalid query type :" + parameters(QUERY_TYPE_OPT_KEY)) + val tableType = metaClient.getTableType + val queryType = parameters(QUERY_TYPE_OPT_KEY) + log.info(s"Is bootstrapped table => $isBootstrappedTable, tableType is: $tableType") + + (tableType, queryType, isBootstrappedTable) match { + case (COPY_ON_WRITE, QUERY_TYPE_SNAPSHOT_OPT_VAL, false) | + (COPY_ON_WRITE, QUERY_TYPE_READ_OPTIMIZED_OPT_VAL, false) | + (MERGE_ON_READ, QUERY_TYPE_READ_OPTIMIZED_OPT_VAL, false) => + getBaseFileOnlyView(useHoodieFileIndex, sqlContext, parameters, schema, tablePath, + readPaths, metaClient) + + case (COPY_ON_WRITE, QUERY_TYPE_INCREMENTAL_OPT_VAL, _) => + new IncrementalRelation(sqlContext, parameters, schema, metaClient) + + case (MERGE_ON_READ, QUERY_TYPE_SNAPSHOT_OPT_VAL, false) => + new MergeOnReadSnapshotRelation(sqlContext, parameters, schema, globPaths, metaClient) + + case (MERGE_ON_READ, QUERY_TYPE_INCREMENTAL_OPT_VAL, _) => + new MergeOnReadIncrementalRelation(sqlContext, parameters, schema, metaClient) + + case (_, _, true) => + new HoodieBootstrapRelation(sqlContext, schema, globPaths, metaClient, parameters) + + case (_, _, _) => + throw new HoodieException(s"Invalid query type : $queryType for tableType: $tableType," + + s"isBootstrappedTable: $isBootstrappedTable ") } } @@ -162,18 +178,28 @@ class DefaultSource extends RelationProvider override def shortName(): String = "hudi" - private def getBaseFileOnlyView(sqlContext: SQLContext, + private def getBaseFileOnlyView(useHoodieFileIndex: Boolean, + sqlContext: SQLContext, optParams: Map[String, String], schema: StructType, + tablePath: String, extraReadPaths: Seq[String], - isBootstrappedTable: Boolean, - globPaths: Seq[Path], metaClient: HoodieTableMetaClient): BaseRelation = { - log.warn("Loading Base File Only View.") + log.info("Loading Base File Only View with options :" + optParams) + + if (useHoodieFileIndex) { + + val fileIndex = HoodieFileIndex(sqlContext.sparkSession, metaClient, + if (schema == null) Option.empty[StructType] else Some(schema), + optParams, FileStatusCache.getOrCreate(sqlContext.sparkSession)) - if (isBootstrappedTable) { - // For bootstrapped tables, use our custom Spark relation for querying - new HoodieBootstrapRelation(sqlContext, schema, globPaths, metaClient, optParams) + HadoopFsRelation( + fileIndex, + fileIndex.partitionSchema, + fileIndex.dataSchema, + bucketSpec = None, + fileFormat = new ParquetFileFormat, + optParams)(sqlContext.sparkSession) } else { // this is just effectively RO view only, where `path` can contain a mix of // non-hoodie/hoodie path files. set the path filter up @@ -182,7 +208,6 @@ class DefaultSource extends RelationProvider classOf[HoodieROTablePathFilter], classOf[org.apache.hadoop.fs.PathFilter]) - log.info("Constructing hoodie (as parquet) data source with options :" + optParams) // simply return as a regular parquet relation DataSource.apply( sparkSession = sqlContext.sparkSession, diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieBootstrapRelation.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieBootstrapRelation.scala index f7415f9..b1ab83a 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieBootstrapRelation.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieBootstrapRelation.scala @@ -26,7 +26,7 @@ import org.apache.hudi.exception.HoodieException import org.apache.spark.internal.Logging import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.execution.datasources.PartitionedFile +import org.apache.spark.sql.execution.datasources.{FileStatusCache, PartitionedFile} import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat import org.apache.spark.sql.{Row, SQLContext} import org.apache.spark.sql.sources.{BaseRelation, Filter, PrunedFilteredScan} @@ -46,13 +46,14 @@ import scala.collection.JavaConverters._ * * @param _sqlContext Spark SQL Context * @param userSchema User specified schema in the datasource query - * @param globPaths Globbed paths obtained from the user provided path for querying + * @param globPaths The global paths to query. If it not none, read from the globPaths, + * else read data from tablePath using HoodiFileIndex. * @param metaClient Hoodie table meta client * @param optParams DataSource options passed by the user */ class HoodieBootstrapRelation(@transient val _sqlContext: SQLContext, val userSchema: StructType, - val globPaths: Seq[Path], + val globPaths: Option[Seq[Path]], val metaClient: HoodieTableMetaClient, val optParams: Map[String, String]) extends BaseRelation with PrunedFilteredScan with Logging { @@ -156,9 +157,14 @@ class HoodieBootstrapRelation(@transient val _sqlContext: SQLContext, def buildFileIndex(): HoodieBootstrapFileIndex = { logInfo("Building file index..") - val inMemoryFileIndex = HoodieSparkUtils.createInMemoryFileIndex(_sqlContext.sparkSession, globPaths) - val fileStatuses = inMemoryFileIndex.allFiles() - + val fileStatuses = if (globPaths.isDefined) { + // Load files from the global paths if it has defined to be compatible with the original mode + val inMemoryFileIndex = HoodieSparkUtils.createInMemoryFileIndex(_sqlContext.sparkSession, globPaths.get) + inMemoryFileIndex.allFiles() + } else { // Load files by the HoodieFileIndex. + HoodieFileIndex(sqlContext.sparkSession, metaClient, Some(schema), optParams, + FileStatusCache.getOrCreate(sqlContext.sparkSession)).allFiles + } if (fileStatuses.isEmpty) { throw new HoodieException("No files found for reading in user provided path.") } diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieFileIndex.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieFileIndex.scala new file mode 100644 index 0000000..61c2f3a --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieFileIndex.scala @@ -0,0 +1,362 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi + +import java.util.Properties + +import scala.collection.JavaConverters._ +import org.apache.hadoop.fs.{FileStatus, Path} +import org.apache.hudi.client.common.HoodieSparkEngineContext +import org.apache.hudi.common.config.{HoodieMetadataConfig, SerializableConfiguration} +import org.apache.hudi.common.engine.HoodieLocalEngineContext +import org.apache.hudi.common.fs.FSUtils +import org.apache.hudi.common.model.HoodieBaseFile +import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver} +import org.apache.hudi.common.table.view.HoodieTableFileSystemView +import org.apache.hudi.config.HoodieWriteConfig +import org.apache.spark.api.java.JavaSparkContext +import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.{InternalRow, expressions} +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.avro.SchemaConverters +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, BoundReference, Expression, InterpretedPredicate} +import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils} +import org.apache.spark.sql.execution.datasources.{FileIndex, FileStatusCache, NoopCache, PartitionDirectory} +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types.StructType +import org.apache.spark.unsafe.types.UTF8String + +import scala.collection.mutable + +/** + * A file index which support partition prune for hoodie snapshot and read-optimized query. + * + * Main steps to get the file list for query: + * 1、Load all files and partition values from the table path. + * 2、Do the partition prune by the partition filter condition. + * + * There are 3 cases for this: + * 1、If the partition columns size is equal to the actually partition path level, we + * read it as partitioned table.(e.g partition column is "dt", the partition path is "2021-03-10") + * + * 2、If the partition columns size is not equal to the partition path level, but the partition + * column size is "1" (e.g. partition column is "dt", but the partition path is "2021/03/10" + * who'es directory level is 3).We can still read it as a partitioned table. We will mapping the + * partition path (e.g. 2021/03/10) to the only partition column (e.g. "dt"). + * + * 3、Else the the partition columns size is not equal to the partition directory level and the + * size is great than "1" (e.g. partition column is "dt,hh", the partition path is "2021/03/10/12") + * , we read it as a Non-Partitioned table because we cannot know how to mapping the partition + * path with the partition columns in this case. + * + */ +case class HoodieFileIndex( + spark: SparkSession, + metaClient: HoodieTableMetaClient, + schemaSpec: Option[StructType], + options: Map[String, String], + @transient fileStatusCache: FileStatusCache = NoopCache) + extends FileIndex with Logging { + + private val basePath = metaClient.getBasePath + + @transient private val queryPath = new Path(options.getOrElse("path", "'path' option required")) + /** + * Get the schema of the table. + */ + lazy val schema: StructType = schemaSpec.getOrElse({ + val schemaUtil = new TableSchemaResolver(metaClient) + SchemaConverters.toSqlType(schemaUtil.getTableAvroSchema) + .dataType.asInstanceOf[StructType] + }) + + /** + * Get the partition schema from the hoodie.properties. + */ + private lazy val _partitionSchemaFromProperties: StructType = { + val tableConfig = metaClient.getTableConfig + val partitionColumns = tableConfig.getPartitionColumns + val nameFieldMap = schema.fields.map(filed => filed.name -> filed).toMap + + if (partitionColumns.isPresent) { + val partitionFields = partitionColumns.get().map(column => + nameFieldMap.getOrElse(column, throw new IllegalArgumentException(s"Cannot find column: '" + + s"$column' in the schema[${schema.fields.mkString(",")}]"))) + new StructType(partitionFields) + } else { // If the partition columns have not stored in hoodie.properites(the table that was + // created earlier), we trait it as a non-partitioned table. + logWarning("No partition columns available from hoodie.properties." + + " Partition pruning will not work") + new StructType() + } + } + + @transient @volatile private var fileSystemView: HoodieTableFileSystemView = _ + @transient @volatile private var cachedAllInputFiles: Array[HoodieBaseFile] = _ + @transient @volatile private var cachedFileSize: Long = 0L + @transient @volatile private var cachedAllPartitionPaths: Seq[PartitionRowPath] = _ + + @volatile private var queryAsNonePartitionedTable: Boolean = _ + + refresh0() + + override def rootPaths: Seq[Path] = queryPath :: Nil + + override def listFiles(partitionFilters: Seq[Expression], + dataFilters: Seq[Expression]): Seq[PartitionDirectory] = { + if (queryAsNonePartitionedTable) { // Read as Non-Partitioned table. + Seq(PartitionDirectory(InternalRow.empty, allFiles)) + } else { + // Prune the partition path by the partition filters + val prunedPartitions = prunePartition(cachedAllPartitionPaths, partitionFilters) + prunedPartitions.map { partition => + val fileStatues = fileSystemView.getLatestBaseFiles(partition.partitionPath).iterator() + .asScala.toSeq + .map(_.getFileStatus) + PartitionDirectory(partition.values, fileStatues) + } + } + } + + override def inputFiles: Array[String] = { + cachedAllInputFiles.map(_.getFileStatus.getPath.toString) + } + + override def refresh(): Unit = { + fileStatusCache.invalidateAll() + refresh0() + } + + private def refresh0(): Unit = { + val startTime = System.currentTimeMillis() + val partitionFiles = loadPartitionPathFiles() + val allFiles = partitionFiles.values.reduceOption(_ ++ _) + .getOrElse(Array.empty[FileStatus]) + + metaClient.reloadActiveTimeline() + val activeInstants = metaClient.getActiveTimeline.getCommitsTimeline.filterCompletedInstants + fileSystemView = new HoodieTableFileSystemView(metaClient, activeInstants, allFiles) + cachedAllInputFiles = fileSystemView.getLatestBaseFiles.iterator().asScala.toArray + cachedAllPartitionPaths = partitionFiles.keys.toSeq + cachedFileSize = cachedAllInputFiles.map(_.getFileLen).sum + + // If the partition value contains InternalRow.empty, we query it as a non-partitioned table. + queryAsNonePartitionedTable = cachedAllPartitionPaths + .exists(p => p.values == InternalRow.empty) + val flushSpend = System.currentTimeMillis() - startTime + logInfo(s"Refresh for table ${metaClient.getTableConfig.getTableName}," + + s" spend: $flushSpend ms") + } + + override def sizeInBytes: Long = { + cachedFileSize + } + + override def partitionSchema: StructType = { + if (queryAsNonePartitionedTable) { + // If we read it as Non-Partitioned table, we should not + // return the partition schema. + new StructType() + } else { + _partitionSchemaFromProperties + } + } + + /** + * Get the data schema of the table. + * @return + */ + def dataSchema: StructType = { + val partitionColumns = partitionSchema.fields.map(_.name).toSet + StructType(schema.fields.filterNot(f => partitionColumns.contains(f.name))) + } + + def allFiles: Seq[FileStatus] = cachedAllInputFiles.map(_.getFileStatus) + + /** + * Prune the partition by the filter.This implementation is fork from + * org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex#prunePartitions. + * @param partitionPaths All the partition paths. + * @param predicates The filter condition. + * @return The Pruned partition paths. + */ + private def prunePartition(partitionPaths: Seq[PartitionRowPath], + predicates: Seq[Expression]): Seq[PartitionRowPath] = { + + val partitionColumnNames = partitionSchema.fields.map(_.name).toSet + val partitionPruningPredicates = predicates.filter { + _.references.map(_.name).toSet.subsetOf(partitionColumnNames) + } + if (partitionPruningPredicates.nonEmpty) { + val predicate = partitionPruningPredicates.reduce(expressions.And) + + val boundPredicate = InterpretedPredicate(predicate.transform { + case a: AttributeReference => + val index = partitionSchema.indexWhere(a.name == _.name) + BoundReference(index, partitionSchema(index).dataType, nullable = true) + }) + + val prunedPartitionPaths = partitionPaths.filter { + case PartitionRowPath(values, _) => boundPredicate.eval(values) + } + logInfo(s"Total partition size is: ${partitionPaths.size}," + + s" after partition prune size is: ${prunedPartitionPaths.size}") + prunedPartitionPaths + } else { + partitionPaths + } + } + + /** + * Load all partition paths and it's files under the query table path. + */ + private def loadPartitionPathFiles(): Map[PartitionRowPath, Array[FileStatus]] = { + val sparkEngine = new HoodieSparkEngineContext(new JavaSparkContext(spark.sparkContext)) + val properties = new Properties() + properties.putAll(options.asJava) + val metadataConfig = HoodieMetadataConfig.newBuilder.fromProperties(properties).build() + + val queryPartitionPath = FSUtils.getRelativePartitionPath(new Path(basePath), queryPath) + // Load all the partition path from the basePath, and filter by the query partition path. + // TODO load files from the queryPartitionPath directly. + val partitionPaths = FSUtils.getAllPartitionPaths(sparkEngine, metadataConfig, basePath).asScala + .filter(_.startsWith(queryPartitionPath)) + + val writeConfig = HoodieWriteConfig.newBuilder() + .withPath(basePath).withProperties(properties).build() + val maxListParallelism = writeConfig.getFileListingParallelism + + val serializableConf = new SerializableConfiguration(spark.sessionState.newHadoopConf()) + val partitionSchema = _partitionSchemaFromProperties + val timeZoneId = CaseInsensitiveMap(options) + .get(DateTimeUtils.TIMEZONE_OPTION) + .getOrElse(SQLConf.get.sessionLocalTimeZone) + + val sparkParsePartitionUtil = HoodieSparkUtils.createSparkParsePartitionUtil(spark + .sessionState.conf) + // Convert partition path to PartitionRowPath + val partitionRowPaths = partitionPaths.map { partitionPath => + val partitionRow = if (partitionSchema.fields.length == 0) { + // This is a non-partitioned table + InternalRow.empty + } else { + val partitionFragments = partitionPath.split("/") + + if (partitionFragments.length != partitionSchema.fields.length && + partitionSchema.fields.length == 1) { + // If the partition column size is not equal to the partition fragment size + // and the partition column size is 1, we map the whole partition path + // to the partition column which can benefit from the partition prune. + InternalRow.fromSeq(Seq(UTF8String.fromString(partitionPath))) + } else if (partitionFragments.length != partitionSchema.fields.length && + partitionSchema.fields.length > 1) { + // If the partition column size is not equal to the partition fragments size + // and the partition column size > 1, we do not know how to map the partition + // fragments to the partition columns. So we trait it as a Non-Partitioned Table + // for the query which do not benefit from the partition prune. + logWarning( s"Cannot do the partition prune for table $basePath." + + s"The partitionFragments size (${partitionFragments.mkString(",")})" + + s" is not equal to the partition columns size(${partitionSchema.fields.mkString(",")})") + InternalRow.empty + } else { // If partitionSeqs.length == partitionSchema.fields.length + + // Append partition name to the partition value if the + // HIVE_STYLE_PARTITIONING_OPT_KEY is disable. + // e.g. convert "/xx/xx/2021/02" to "/xx/xx/year=2021/month=02" + val partitionWithName = + partitionFragments.zip(partitionSchema).map { + case (partition, field) => + if (partition.indexOf("=") == -1) { + s"${field.name}=$partition" + } else { + partition + } + }.mkString("/") + val pathWithPartitionName = new Path(basePath, partitionWithName) + val partitionDataTypes = partitionSchema.fields.map(f => f.name -> f.dataType).toMap + val partitionValues = sparkParsePartitionUtil.parsePartition(pathWithPartitionName, + typeInference = false, Set(new Path(basePath)), partitionDataTypes, + DateTimeUtils.getTimeZone(timeZoneId)) + + // Convert partitionValues to InternalRow + partitionValues.map(_.literals.map(_.value)) + .map(InternalRow.fromSeq) + .getOrElse(InternalRow.empty) + } + } + PartitionRowPath(partitionRow, partitionPath) + } + + // List files in all of the partition path. + val pathToFetch = mutable.ArrayBuffer[PartitionRowPath]() + val cachePartitionToFiles = mutable.Map[PartitionRowPath, Array[FileStatus]]() + // Fetch from the FileStatusCache + partitionRowPaths.foreach { partitionRowPath => + fileStatusCache.getLeafFiles(partitionRowPath.fullPartitionPath(basePath)) match { + case Some(filesInPartition) => + cachePartitionToFiles.put(partitionRowPath, filesInPartition) + + case None => pathToFetch.append(partitionRowPath) + } + } + // Fetch the rest from the file system. + val fetchedPartition2Files = + spark.sparkContext.parallelize(pathToFetch, Math.min(pathToFetch.size, maxListParallelism)) + .map { partitionRowPath => + // Here we use a LocalEngineContext to get the files in the partition. + // We can do this because the TableMetadata.getAllFilesInPartition only rely on the + // hadoopConf of the EngineContext. + val engineContext = new HoodieLocalEngineContext(serializableConf.get()) + val filesInPartition = FSUtils.getFilesInPartition(engineContext, metadataConfig, + basePath, partitionRowPath.fullPartitionPath(basePath)) + (partitionRowPath, filesInPartition) + }.collect().map(f => f._1 -> f._2).toMap + + // Update the fileStatusCache + fetchedPartition2Files.foreach { + case (partitionRowPath, filesInPartition) => + fileStatusCache.putLeafFiles(partitionRowPath.fullPartitionPath(basePath), filesInPartition) + } + cachePartitionToFiles.toMap ++ fetchedPartition2Files + } + + /** + * Represent a partition path. + * e.g. PartitionPath(InternalRow("2021","02","01"), "2021/02/01")) + * @param values The partition values of this partition path. + * @param partitionPath The partition path string. + */ + case class PartitionRowPath(values: InternalRow, partitionPath: String) { + override def equals(other: Any): Boolean = other match { + case PartitionRowPath(_, otherPath) => partitionPath == otherPath + case _ => false + } + + override def hashCode(): Int = { + partitionPath.hashCode + } + + def fullPartitionPath(basePath: String): Path = { + if (partitionPath.isEmpty) { + new Path(basePath) // This is a non-partition path + } else { + new Path(basePath, partitionPath) + } + } + } +} diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala index 94d07b9..5b87278 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala @@ -100,6 +100,7 @@ private[hudi] object HoodieSparkSqlWriter { val fs = basePath.getFileSystem(sparkContext.hadoopConfiguration) tableExists = fs.exists(new Path(basePath, HoodieTableMetaClient.METAFOLDER_NAME)) var tableConfig = getHoodieTableConfig(sparkContext, path.get, hoodieTableConfigOpt) + val keyGenerator = DataSourceUtils.createKeyGenerator(toProperties(parameters)) if (mode == SaveMode.Ignore && tableExists) { log.warn(s"hoodie table at $basePath already exists. Ignoring & not performing actual writes.") @@ -112,12 +113,15 @@ private[hudi] object HoodieSparkSqlWriter { val archiveLogFolder = parameters.getOrElse( HoodieTableConfig.HOODIE_ARCHIVELOG_FOLDER_PROP_NAME, "archived") + val partitionColumns = HoodieWriterUtils.getPartitionColumns(keyGenerator) + val tableMetaClient = HoodieTableMetaClient.withPropertyBuilder() .setTableType(tableType) .setTableName(tblName) .setArchiveLogFolder(archiveLogFolder) .setPayloadClassName(parameters(PAYLOAD_CLASS_OPT_KEY)) .setPreCombineField(parameters.getOrDefault(PRECOMBINE_FIELD_OPT_KEY, null)) + .setPartitionColumns(partitionColumns) .initTable(sparkContext.hadoopConfiguration, path.get) tableConfig = tableMetaClient.getTableConfig } @@ -146,7 +150,6 @@ private[hudi] object HoodieSparkSqlWriter { log.info(s"Registered avro schema : ${schema.toString(true)}") // Convert to RDD[HoodieRecord] - val keyGenerator = DataSourceUtils.createKeyGenerator(toProperties(parameters)) val genericRecords: RDD[GenericRecord] = HoodieSparkUtils.createRdd(df, schema, structName, nameSpace) val shouldCombine = parameters(INSERT_DROP_DUPS_OPT_KEY).toBoolean || operation.equals(WriteOperationType.UPSERT); val hoodieAllIncomingRecords = genericRecords.map(gr => { @@ -193,7 +196,6 @@ private[hudi] object HoodieSparkSqlWriter { classOf[org.apache.avro.Schema])) // Convert to RDD[HoodieKey] - val keyGenerator = DataSourceUtils.createKeyGenerator(toProperties(parameters)) val genericRecords: RDD[GenericRecord] = HoodieSparkUtils.createRdd(df, structName, nameSpace) val hoodieKeysToDelete = genericRecords.map(gr => keyGenerator.getKey(gr)).toJavaRDD() @@ -283,6 +285,7 @@ private[hudi] object HoodieSparkSqlWriter { if (!tableExists) { val archiveLogFolder = parameters.getOrElse( HoodieTableConfig.HOODIE_ARCHIVELOG_FOLDER_PROP_NAME, "archived") + val partitionColumns = HoodieWriterUtils.getPartitionColumns(parameters) HoodieTableMetaClient.withPropertyBuilder() .setTableType(HoodieTableType.valueOf(tableType)) .setTableName(tableName) @@ -291,6 +294,7 @@ private[hudi] object HoodieSparkSqlWriter { .setPreCombineField(parameters.getOrDefault(PRECOMBINE_FIELD_OPT_KEY, null)) .setBootstrapIndexClass(bootstrapIndexClass) .setBootstrapBasePath(bootstrapBasePath) + .setPartitionColumns(partitionColumns) .initTable(sparkContext.hadoopConfiguration, path) } diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala index bd55930..72b26be 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala @@ -28,7 +28,8 @@ import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, Row, SparkSession} import org.apache.spark.sql.avro.SchemaConverters import org.apache.spark.sql.catalyst.encoders.{ExpressionEncoder, RowEncoder} -import org.apache.spark.sql.execution.datasources.{FileStatusCache, InMemoryFileIndex} +import org.apache.spark.sql.execution.datasources.{FileStatusCache, InMemoryFileIndex, Spark2ParsePartitionUtil, Spark3ParsePartitionUtil, SparkParsePartitionUtil} +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.{StringType, StructField, StructType} import scala.collection.JavaConverters._ @@ -118,4 +119,13 @@ object HoodieSparkUtils { new Spark3RowSerDe(encoder) } } + + def createSparkParsePartitionUtil(conf: SQLConf): SparkParsePartitionUtil = { + // TODO remove Spark2RowSerDe if Spark 2.x support is dropped + if (SPARK_VERSION.startsWith("2.")) { + new Spark2ParsePartitionUtil + } else { + new Spark3ParsePartitionUtil(conf) + } + } } diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala index 02b5abd..fd3e078 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala @@ -17,16 +17,17 @@ package org.apache.hudi +import scala.collection.JavaConverters._ import org.apache.hudi.DataSourceWriteOptions._ import org.apache.hudi.common.config.TypedProperties import scala.collection.JavaConversions.mapAsJavaMap import scala.collection.JavaConverters.mapAsScalaMapConverter - import org.apache.hudi.common.config.HoodieMetadataConfig.DEFAULT_METADATA_ENABLE import org.apache.hudi.common.config.HoodieMetadataConfig.DEFAULT_METADATA_VALIDATE import org.apache.hudi.common.config.HoodieMetadataConfig.METADATA_ENABLE_PROP import org.apache.hudi.common.config.HoodieMetadataConfig.METADATA_VALIDATE_PROP +import org.apache.hudi.keygen.{BaseKeyGenerator, CustomAvroKeyGenerator, CustomKeyGenerator, KeyGenerator} /** * WriterUtils to assist in write path in Datasource and tests. @@ -81,4 +82,32 @@ object HoodieWriterUtils { params.foreach(kv => props.setProperty(kv._1, kv._2)) props } + + /** + * Get the partition columns to stored to hoodie.properties. + * @param parameters + * @return + */ + def getPartitionColumns(parameters: Map[String, String]): String = { + val props = new TypedProperties() + props.putAll(parameters.asJava) + val keyGen = DataSourceUtils.createKeyGenerator(props) + getPartitionColumns(keyGen) + } + + def getPartitionColumns(keyGen: KeyGenerator): String = { + keyGen match { + // For CustomKeyGenerator and CustomAvroKeyGenerator, the partition path filed format + // is: "field_name: field_type", we extract the field_name from the partition path field. + case c: BaseKeyGenerator + if c.isInstanceOf[CustomKeyGenerator] || c.isInstanceOf[CustomAvroKeyGenerator] => + c.getPartitionPathFields.asScala.map(pathField => + pathField.split(CustomAvroKeyGenerator.SPLIT_REGEX) + .headOption.getOrElse(s"Illegal partition path field format: '$pathField' for ${c.getClass.getSimpleName}")) + .mkString(",") + + case b: BaseKeyGenerator => b.getPartitionPathFields.asScala.mkString(",") + case _=> null + } + } } diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala index 13766da..4c2d332 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala @@ -201,7 +201,8 @@ class MergeOnReadIncrementalRelation(val sqlContext: SQLContext, val baseFiles = f.getAllFileSlices.iterator().filter(slice => slice.getBaseFile.isPresent).toList val partitionedFile = if (baseFiles.nonEmpty) { val baseFile = baseFiles.head.getBaseFile - Option(PartitionedFile(InternalRow.empty, baseFile.get.getPath, 0, baseFile.get.getFileLen)) + val filePath = MergeOnReadSnapshotRelation.getFilePath(baseFile.get.getFileStatus.getPath) + Option(PartitionedFile(InternalRow.empty, filePath, 0, baseFile.get.getFileLen)) } else { Option.empty diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala index 50e2ec5..c9d413b 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala @@ -21,7 +21,6 @@ package org.apache.hudi import org.apache.hudi.common.model.HoodieBaseFile import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver} import org.apache.hudi.common.table.view.HoodieTableFileSystemView -import org.apache.hudi.exception.HoodieException import org.apache.hudi.hadoop.utils.HoodieRealtimeInputFormatUtils import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils.getMaxCompactionMemoryInBytes import org.apache.hadoop.fs.Path @@ -29,7 +28,7 @@ import org.apache.hadoop.mapred.JobConf import org.apache.spark.internal.Logging import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.execution.datasources.PartitionedFile +import org.apache.spark.sql.execution.datasources.{FileStatusCache, PartitionedFile} import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat import org.apache.spark.sql.{Row, SQLContext} import org.apache.spark.sql.sources.{BaseRelation, Filter, PrunedFilteredScan} @@ -54,7 +53,7 @@ case class HoodieMergeOnReadTableState(tableStructSchema: StructType, class MergeOnReadSnapshotRelation(val sqlContext: SQLContext, val optParams: Map[String, String], val userSchema: StructType, - val globPaths: Seq[Path], + val globPaths: Option[Seq[Path]], val metaClient: HoodieTableMetaClient) extends BaseRelation with PrunedFilteredScan with Logging { @@ -133,25 +132,54 @@ class MergeOnReadSnapshotRelation(val sqlContext: SQLContext, } def buildFileIndex(): List[HoodieMergeOnReadFileSplit] = { - val inMemoryFileIndex = HoodieSparkUtils.createInMemoryFileIndex(sqlContext.sparkSession, globPaths) - val fileStatuses = inMemoryFileIndex.allFiles() - if (fileStatuses.isEmpty) { - throw new HoodieException("No files found for reading in user provided path.") + val fileStatuses = if (globPaths.isDefined) { + // Load files from the global paths if it has defined to be compatible with the original mode + val inMemoryFileIndex = HoodieSparkUtils.createInMemoryFileIndex(sqlContext.sparkSession, globPaths.get) + inMemoryFileIndex.allFiles() + } else { // Load files by the HoodieFileIndex. + val hoodieFileIndex = HoodieFileIndex(sqlContext.sparkSession, metaClient, + Some(tableStructSchema), optParams, FileStatusCache.getOrCreate(sqlContext.sparkSession)) + hoodieFileIndex.allFiles } - val fsView = new HoodieTableFileSystemView(metaClient, - metaClient.getActiveTimeline.getCommitsTimeline - .filterCompletedInstants, fileStatuses.toArray) - val latestFiles: List[HoodieBaseFile] = fsView.getLatestBaseFiles.iterator().asScala.toList - val latestCommit = fsView.getLastInstant.get().getTimestamp - val fileGroup = HoodieRealtimeInputFormatUtils.groupLogsByBaseFile(conf, latestFiles.asJava).asScala - val fileSplits = fileGroup.map(kv => { - val baseFile = kv._1 - val logPaths = if (kv._2.isEmpty) Option.empty else Option(kv._2.asScala.toList) - val partitionedFile = PartitionedFile(InternalRow.empty, baseFile.getPath, 0, baseFile.getFileLen) - HoodieMergeOnReadFileSplit(Option(partitionedFile), logPaths, latestCommit, - metaClient.getBasePath, maxCompactionMemoryInBytes, mergeType) - }).toList - fileSplits + if (fileStatuses.isEmpty) { // If this an empty table, return an empty split list. + List.empty[HoodieMergeOnReadFileSplit] + } else { + val fsView = new HoodieTableFileSystemView(metaClient, + metaClient.getActiveTimeline.getCommitsTimeline + .filterCompletedInstants, fileStatuses.toArray) + val latestFiles: List[HoodieBaseFile] = fsView.getLatestBaseFiles.iterator().asScala.toList + val latestCommit = fsView.getLastInstant.get().getTimestamp + val fileGroup = HoodieRealtimeInputFormatUtils.groupLogsByBaseFile(conf, latestFiles.asJava).asScala + val fileSplits = fileGroup.map(kv => { + val baseFile = kv._1 + val logPaths = if (kv._2.isEmpty) Option.empty else Option(kv._2.asScala.toList) + + val filePath = MergeOnReadSnapshotRelation.getFilePath(baseFile.getFileStatus.getPath) + val partitionedFile = PartitionedFile(InternalRow.empty, filePath, 0, baseFile.getFileLen) + HoodieMergeOnReadFileSplit(Option(partitionedFile), logPaths, latestCommit, + metaClient.getBasePath, maxCompactionMemoryInBytes, mergeType) + }).toList + fileSplits + } + } +} + +object MergeOnReadSnapshotRelation { + + def getFilePath(path: Path): String = { + // Here we use the Path#toUri to encode the path string, as there is a decode in + // ParquetFileFormat#buildReaderWithPartitionValues in the spark project when read the table + // .So we should encode the file path here. Otherwise, there is a FileNotException throw + // out. + // For example, If the "pt" is the partition path field, and "pt" = "2021/02/02", If + // we enable the URL_ENCODE_PARTITIONING_OPT_KEY and write data to hudi table.The data + // path in the table will just like "/basePath/2021%2F02%2F02/xxxx.parquet". When we read + // data from the table, if there are no encode for the file path, + // ParquetFileFormat#buildReaderWithPartitionValues will decode it to + // "/basePath/2021/02/02/xxxx.parquet" witch will result to a FileNotException. + // See FileSourceScanExec#createBucketedReadRDD in spark project which do the same thing + // when create PartitionedFile. + path.toUri.toString } } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieFileIndex.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieFileIndex.scala new file mode 100644 index 0000000..08cc50d --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieFileIndex.scala @@ -0,0 +1,252 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi + +import java.net.URLEncoder + +import org.apache.hudi.DataSourceWriteOptions._ +import org.apache.hudi.common.config.HoodieMetadataConfig +import org.apache.hudi.common.table.HoodieTableMetaClient +import org.apache.hudi.common.table.view.HoodieTableFileSystemView +import org.apache.hudi.common.testutils.HoodieTestDataGenerator +import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings +import org.apache.hudi.config.HoodieWriteConfig +import org.apache.hudi.keygen.ComplexKeyGenerator +import org.apache.hudi.keygen.TimestampBasedAvroKeyGenerator.{Config, TimestampType} +import org.apache.hudi.testutils.HoodieClientTestBase +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.{SaveMode, SparkSession} +import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, EqualTo, GreaterThanOrEqual, LessThan, Literal} +import org.apache.spark.sql.execution.datasources.PartitionDirectory +import org.apache.spark.sql.types.StringType +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.BeforeEach +import org.junit.jupiter.params.ParameterizedTest +import org.junit.jupiter.params.provider.ValueSource + +import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ + +class TestHoodieFileIndex extends HoodieClientTestBase { + + var spark: SparkSession = _ + val commonOpts = Map( + "hoodie.insert.shuffle.parallelism" -> "4", + "hoodie.upsert.shuffle.parallelism" -> "4", + DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "_row_key", + DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> "partition", + DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> "timestamp", + HoodieWriteConfig.TABLE_NAME -> "hoodie_test" + ) + + @BeforeEach override def setUp() { + initPath() + initSparkContexts() + spark = sqlContext.sparkSession + initTestDataGenerator() + initFileSystem() + initMetaClient() + } + + @ParameterizedTest + @ValueSource(booleans = Array(true, false)) + def testPartitionSchema(partitionEncode: Boolean): Unit = { + val records1 = dataGen.generateInsertsContainsAllPartitions("000", 100) + val inputDF1 = spark.read.json(spark.sparkContext.parallelize(recordsToStrings(records1), 2)) + inputDF1.write.format("hudi") + .options(commonOpts) + .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL) + .option(DataSourceWriteOptions.URL_ENCODE_PARTITIONING_OPT_KEY, partitionEncode) + .mode(SaveMode.Overwrite) + .save(basePath) + metaClient = HoodieTableMetaClient.reload(metaClient) + val fileIndex = HoodieFileIndex(spark, metaClient, None, Map("path" -> basePath)) + assertEquals("partition", fileIndex.partitionSchema.fields.map(_.name).mkString(",")) + } + + @ParameterizedTest + @ValueSource(strings = Array( + "org.apache.hudi.keygen.ComplexKeyGenerator", + "org.apache.hudi.keygen.SimpleKeyGenerator", + "org.apache.hudi.keygen.TimestampBasedKeyGenerator")) + def testPartitionSchemaForBuildInKeyGenerator(keyGenerator: String): Unit = { + val records1 = dataGen.generateInsertsContainsAllPartitions("000", 100) + val inputDF1 = spark.read.json(spark.sparkContext.parallelize(recordsToStrings(records1), 2)) + inputDF1.write.format("hudi") + .options(commonOpts) + .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL) + .option(DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY, keyGenerator) + .option(Config.TIMESTAMP_TYPE_FIELD_PROP, TimestampType.DATE_STRING.name()) + .option(Config.TIMESTAMP_INPUT_DATE_FORMAT_PROP, "yyyy/MM/dd") + .option(Config.TIMESTAMP_OUTPUT_DATE_FORMAT_PROP, "yyyy-MM-dd") + .mode(SaveMode.Overwrite) + .save(basePath) + metaClient = HoodieTableMetaClient.reload(metaClient) + val fileIndex = HoodieFileIndex(spark, metaClient, None, Map("path" -> basePath)) + assertEquals("partition", fileIndex.partitionSchema.fields.map(_.name).mkString(",")) + } + + @ParameterizedTest + @ValueSource(strings = Array( + "org.apache.hudi.keygen.CustomKeyGenerator", + "org.apache.hudi.keygen.CustomAvroKeyGenerator")) + def testPartitionSchemaForCustomKeyGenerator(keyGenerator: String): Unit = { + val records1 = dataGen.generateInsertsContainsAllPartitions("000", 100) + val inputDF1 = spark.read.json(spark.sparkContext.parallelize(recordsToStrings(records1), 2)) + inputDF1.write.format("hudi") + .options(commonOpts) + .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL) + .option(DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY, keyGenerator) + .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "partition:simple") + .mode(SaveMode.Overwrite) + .save(basePath) + metaClient = HoodieTableMetaClient.reload(metaClient) + val fileIndex = HoodieFileIndex(spark, metaClient, None, Map("path" -> basePath)) + assertEquals("partition", fileIndex.partitionSchema.fields.map(_.name).mkString(",")) + } + + @ParameterizedTest + @ValueSource(booleans = Array(true, false)) + def testPartitionPruneWithPartitionEncode(partitionEncode: Boolean): Unit = { + val partitions = Array("2021/03/08", "2021/03/09", "2021/03/10", "2021/03/11", "2021/03/12") + val newDataGen = new HoodieTestDataGenerator(partitions) + val records1 = newDataGen.generateInsertsContainsAllPartitions("000", 100) + val inputDF1 = spark.read.json(spark.sparkContext.parallelize(recordsToStrings(records1), 2)) + inputDF1.write.format("hudi") + .options(commonOpts) + .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL) + .option(DataSourceWriteOptions.URL_ENCODE_PARTITIONING_OPT_KEY, partitionEncode) + .mode(SaveMode.Overwrite) + .save(basePath) + metaClient = HoodieTableMetaClient.reload(metaClient) + val fileIndex = HoodieFileIndex(spark, metaClient, None, Map("path" -> basePath)) + + val partitionFilter1 = EqualTo(attribute("partition"), literal("2021/03/08")) + val partitionName = if (partitionEncode) URLEncoder.encode("2021/03/08") else "2021/03/08" + val partitionAndFilesAfterPrune = fileIndex.listFiles(Seq(partitionFilter1), Seq.empty) + assertEquals(1, partitionAndFilesAfterPrune.size) + + val PartitionDirectory(partitionValues, filesInPartition) = partitionAndFilesAfterPrune(0) + assertEquals(partitionValues.toSeq(Seq(StringType)).mkString(","), "2021/03/08") + assertEquals(getFileCountInPartitionPath(partitionName), filesInPartition.size) + + val partitionFilter2 = And( + GreaterThanOrEqual(attribute("partition"), literal("2021/03/08")), + LessThan(attribute("partition"), literal("2021/03/10")) + ) + val prunedPartitions = fileIndex.listFiles(Seq(partitionFilter2), + Seq.empty).map(_.values.toSeq(Seq(StringType)).mkString(",")).toList + + assertEquals(List("2021/03/08", "2021/03/09"), prunedPartitions) + } + + @ParameterizedTest + @ValueSource(booleans = Array(true, false)) + def testPartitionPruneWithMultiPartitionColumns(useMetaFileList: Boolean): Unit = { + val _spark = spark + import _spark.implicits._ + // Test the case the partition column size is equal to the partition directory level. + val inputDF1 = (for (i <- 0 until 10) yield (i, s"a$i", 10 + i, 10000, + s"2021-03-0${i % 2 + 1}", "10")).toDF("id", "name", "price", "version", "dt", "hh") + + inputDF1.write.format("hudi") + .options(commonOpts) + .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL) + .option(RECORDKEY_FIELD_OPT_KEY, "id") + .option(PRECOMBINE_FIELD_OPT_KEY, "version") + .option(PARTITIONPATH_FIELD_OPT_KEY, "dt,hh") + .option(KEYGENERATOR_CLASS_OPT_KEY, classOf[ComplexKeyGenerator].getName) + .option(DataSourceWriteOptions.URL_ENCODE_PARTITIONING_OPT_KEY, "false") + .option(HoodieMetadataConfig.METADATA_ENABLE_PROP, useMetaFileList) + .mode(SaveMode.Overwrite) + .save(basePath) + metaClient = HoodieTableMetaClient.reload(metaClient) + val fileIndex = HoodieFileIndex(spark, metaClient, None, Map("path" -> basePath)) + + val partitionFilter1 = And( + EqualTo(attribute("dt"), literal("2021-03-01")), + EqualTo(attribute("hh"), literal("10")) + ) + val partitionAndFilesAfterPrune = fileIndex.listFiles(Seq(partitionFilter1), Seq.empty) + assertEquals(1, partitionAndFilesAfterPrune.size) + + val PartitionDirectory(partitionValues, filesAfterPrune) = partitionAndFilesAfterPrune(0) + // The partition prune will work for this case. + assertEquals(partitionValues.toSeq(Seq(StringType)).mkString(","), "2021-03-01,10") + assertEquals(getFileCountInPartitionPath("2021-03-01/10"), filesAfterPrune.size) + + val readDF1 = spark.read.format("hudi").load(basePath) + assertEquals(10, readDF1.count()) + assertEquals(5, readDF1.filter("dt = '2021-03-01' and hh = '10'").count()) + + // Test the case that partition column size not match the partition directory level and + // partition column size is > 1. We will not trait it as partitioned table when read. + val inputDF2 = (for (i <- 0 until 10) yield (i, s"a$i", 10 + i, 100 * i + 10000, + s"2021/03/0${i % 2 + 1}", "10")).toDF("id", "name", "price", "version", "dt", "hh") + inputDF2.write.format("hudi") + .options(commonOpts) + .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL) + .option(RECORDKEY_FIELD_OPT_KEY, "id") + .option(PRECOMBINE_FIELD_OPT_KEY, "version") + .option(PARTITIONPATH_FIELD_OPT_KEY, "dt,hh") + .option(KEYGENERATOR_CLASS_OPT_KEY, classOf[ComplexKeyGenerator].getName) + .option(DataSourceWriteOptions.URL_ENCODE_PARTITIONING_OPT_KEY, "false") + .mode(SaveMode.Overwrite) + .save(basePath) + + fileIndex.refresh() + val partitionFilter2 = And( + EqualTo(attribute("dt"), literal("2021/03/01")), + EqualTo(attribute("hh"), literal("10")) + ) + val partitionAndFilesAfterPrune2 = fileIndex.listFiles(Seq(partitionFilter2), Seq.empty) + + assertEquals(1, partitionAndFilesAfterPrune2.size) + val PartitionDirectory(partitionValues2, filesAfterPrune2) = partitionAndFilesAfterPrune2(0) + // The partition prune would not work for this case, so the partition value it + // returns is a InternalRow.empty. + assertEquals(partitionValues2, InternalRow.empty) + // The returned file size should equal to the whole file size in all the partition paths. + assertEquals(getFileCountInPartitionPaths("2021/03/01/10", "2021/03/02/10"), + filesAfterPrune2.length) + val readDF2 = spark.read.format("hudi").load(basePath) + + assertEquals(10, readDF2.count()) + // There are 5 rows in the dt = 2021/03/01 and hh = 10 + assertEquals(5, readDF2.filter("dt = '2021/03/01' and hh ='10'").count()) + } + + private def attribute(partition: String): AttributeReference = { + AttributeReference(partition, StringType, true)() + } + + private def literal(value: String): Literal = { + Literal.create(value) + } + + private def getFileCountInPartitionPath(partitionPath: String): Int = { + metaClient.reloadActiveTimeline() + val activeInstants = metaClient.getActiveTimeline.getCommitsTimeline.filterCompletedInstants + val fileSystemView = new HoodieTableFileSystemView(metaClient, activeInstants) + fileSystemView.getAllBaseFiles(partitionPath).iterator().asScala.toSeq.length + } + + private def getFileCountInPartitionPaths(partitionPaths: String*): Int = { + partitionPaths.map(getFileCountInPartitionPath).sum + } +} diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala index b671bc6..88ed65f 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala @@ -18,6 +18,10 @@ package org.apache.hudi.functional import java.sql.{Date, Timestamp} + +import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ + import org.apache.hudi.common.config.HoodieMetadataConfig import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver} import org.apache.hudi.common.table.timeline.HoodieInstant @@ -38,7 +42,6 @@ import org.junit.jupiter.api.{AfterEach, BeforeEach, Test} import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.ValueSource -import scala.collection.JavaConversions._ /** * Basic tests on the spark datasource for COW table. @@ -619,4 +622,51 @@ class TestCOWDataSource extends HoodieClientTestBase { .load(basePath + "/*") assertTrue(recordsReadDF.filter(col("_hoodie_partition_path") =!= lit("")).count() == 0) } + + @ParameterizedTest + @ValueSource(booleans = Array(true, false)) + def testQueryCOWWithBasePathAndFileIndex(partitionEncode: Boolean): Unit = { + val N = 20 + // Test query with partition prune if URL_ENCODE_PARTITIONING_OPT_KEY has enable + val records1 = dataGen.generateInsertsContainsAllPartitions("000", N) + val inputDF1 = spark.read.json(spark.sparkContext.parallelize(recordsToStrings(records1), 2)) + inputDF1.write.format("hudi") + .options(commonOpts) + .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL) + .option(DataSourceWriteOptions.URL_ENCODE_PARTITIONING_OPT_KEY, partitionEncode) + .mode(SaveMode.Overwrite) + .save(basePath) + val commitInstantTime1 = HoodieDataSourceHelpers.latestCommit(fs, basePath) + + val countIn20160315 = records1.asScala.count(record => record.getPartitionPath == "2016/03/15") + // query the partition by filter + val count1 = spark.read.format("hudi") + .load(basePath) + .filter("partition = '2016/03/15'") + .count() + assertEquals(countIn20160315, count1) + + // query the partition by path + val partitionPath = if (partitionEncode) "2016%2F03%2F15" else "2016/03/15" + val count2 = spark.read.format("hudi") + .load(basePath + s"/$partitionPath") + .count() + assertEquals(countIn20160315, count2) + + // Second write with Append mode + val records2 = dataGen.generateInsertsContainsAllPartitions("000", N + 1) + val inputDF2 = spark.read.json(spark.sparkContext.parallelize(recordsToStrings(records2), 2)) + inputDF2.write.format("hudi") + .options(commonOpts) + .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL) + .option(DataSourceWriteOptions.URL_ENCODE_PARTITIONING_OPT_KEY, partitionEncode) + .mode(SaveMode.Append) + .save(basePath) + // Incremental query without "*" in path + val hoodieIncViewDF1 = spark.read.format("org.apache.hudi") + .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL) + .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, commitInstantTime1) + .load(basePath) + assertEquals(N + 1, hoodieIncViewDF1.count()) + } } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestDataSourceForBootstrap.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestDataSourceForBootstrap.scala index 2a6a0a7..0746f6d 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestDataSourceForBootstrap.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestDataSourceForBootstrap.scala @@ -130,6 +130,10 @@ class TestDataSourceForBootstrap { hoodieROViewDF1 = spark.read.format("hudi").load(basePath + "/*") assertEquals(numRecords, hoodieROViewDF1.count()) assertEquals(numRecordsUpdate, hoodieROViewDF1.filter(s"timestamp == $updateTimestamp").count()) + // Read without * + val hoodieROViewDF1WithBasePath = spark.read.format("hudi").load(basePath) + assertEquals(numRecords, hoodieROViewDF1WithBasePath.count()) + assertEquals(numRecordsUpdate, hoodieROViewDF1WithBasePath.filter(s"timestamp == $updateTimestamp").count()) verifyIncrementalViewResult(commitInstantTime1, commitInstantTime2, isPartitioned = false, isHiveStylePartitioned = false) } @@ -149,7 +153,8 @@ class TestDataSourceForBootstrap { .save(srcPath) // Perform bootstrap - val commitInstantTime1 = runMetadataBootstrapAndVerifyCommit(DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL) + val commitInstantTime1 = runMetadataBootstrapAndVerifyCommit( + DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL, Some("datestr")) // Read bootstrapped table and verify count val hoodieROViewDF1 = spark.read.format("hudi").load(basePath + "/*") @@ -201,11 +206,15 @@ class TestDataSourceForBootstrap { }) // Perform bootstrap - val commitInstantTime1 = runMetadataBootstrapAndVerifyCommit(DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL) + val commitInstantTime1 = runMetadataBootstrapAndVerifyCommit( + DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL, Some("datestr")) // Read bootstrapped table and verify count val hoodieROViewDF1 = spark.read.format("hudi").load(basePath + "/*") assertEquals(numRecords, hoodieROViewDF1.count()) + // Read without * + val hoodieROViewWithBasePathDF1 = spark.read.format("hudi").load(basePath) + assertEquals(numRecords, hoodieROViewWithBasePathDF1.count()) // Perform upsert based on the written bootstrap table val updateDf1 = hoodieROViewDF1.filter(col("_row_key") === verificationRowKey).withColumn(verificationCol, lit(updatedVerificationVal)) @@ -268,7 +277,8 @@ class TestDataSourceForBootstrap { }) // Perform bootstrap - val commitInstantTime1 = runMetadataBootstrapAndVerifyCommit(DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL) + val commitInstantTime1 = runMetadataBootstrapAndVerifyCommit( + DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL, Some("datestr")) // Read bootstrapped table and verify count val hoodieROViewDF1 = spark.read.format("hudi") @@ -304,6 +314,13 @@ class TestDataSourceForBootstrap { .load(basePath + "/*") assertEquals(numRecords, hoodieROViewDF2.count()) assertEquals(numRecordsUpdate, hoodieROViewDF2.filter(s"timestamp == $updateTimestamp").count()) + // Test query without "*" for MOR READ_OPTIMIZED + val hoodieROViewDFWithBasePath = spark.read.format("hudi") + .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, + DataSourceReadOptions.QUERY_TYPE_READ_OPTIMIZED_OPT_VAL) + .load(basePath) + assertEquals(numRecords, hoodieROViewDFWithBasePath.count()) + assertEquals(numRecordsUpdate, hoodieROViewDFWithBasePath.filter(s"timestamp == $updateTimestamp").count()) } @Test def testMetadataBootstrapMORPartitioned(): Unit = { @@ -325,7 +342,8 @@ class TestDataSourceForBootstrap { }) // Perform bootstrap - val commitInstantTime1 = runMetadataBootstrapAndVerifyCommit(DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL) + val commitInstantTime1 = runMetadataBootstrapAndVerifyCommit( + DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL, Some("datestr")) // Read bootstrapped table and verify count val hoodieROViewDF1 = spark.read.format("hudi") @@ -333,6 +351,12 @@ class TestDataSourceForBootstrap { DataSourceReadOptions.QUERY_TYPE_READ_OPTIMIZED_OPT_VAL) .load(basePath + "/*") assertEquals(numRecords, hoodieROViewDF1.count()) + // Read bootstrapped table without "*" + val hoodieROViewDFWithBasePath = spark.read.format("hudi") + .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, + DataSourceReadOptions.QUERY_TYPE_READ_OPTIMIZED_OPT_VAL) + .load(basePath) + assertEquals(numRecords, hoodieROViewDFWithBasePath.count()) // Perform upsert based on the written bootstrap table val updateDf1 = hoodieROViewDF1.filter(col("_row_key") === verificationRowKey).withColumn(verificationCol, lit(updatedVerificationVal)) @@ -420,6 +444,9 @@ class TestDataSourceForBootstrap { val hoodieROViewDF1 = spark.read.format("hudi").load(basePath + "/*") assertEquals(numRecords, hoodieROViewDF1.count()) + val hoodieROViewDFWithBasePath = spark.read.format("hudi").load(basePath) + assertEquals(numRecords, hoodieROViewDFWithBasePath.count()) + // Perform upsert val updateTimestamp = Instant.now.toEpochMilli val updateDF = TestBootstrap.generateTestRawTripDataset(updateTimestamp, 0, numRecordsUpdate, partitionPaths.asJava, @@ -445,13 +472,15 @@ class TestDataSourceForBootstrap { verifyIncrementalViewResult(commitInstantTime1, commitInstantTime2, isPartitioned = true, isHiveStylePartitioned = false) } - def runMetadataBootstrapAndVerifyCommit(tableType: String): String = { + def runMetadataBootstrapAndVerifyCommit(tableType: String, + partitionColumns: Option[String] = None): String = { val bootstrapDF = spark.emptyDataFrame bootstrapDF.write .format("hudi") .options(commonOpts) .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.BOOTSTRAP_OPERATION_OPT_VAL) .option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, tableType) + .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, partitionColumns.getOrElse("")) .option(HoodieBootstrapConfig.BOOTSTRAP_BASE_PATH_PROP, srcPath) .option(HoodieBootstrapConfig.BOOTSTRAP_KEYGEN_CLASS, classOf[SimpleKeyGenerator].getName) .mode(SaveMode.Overwrite) diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala index 92024a3..00c40ab 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala @@ -17,6 +17,7 @@ package org.apache.hudi.functional +import scala.collection.JavaConverters._ import org.apache.hudi.DataSourceWriteOptions.{KEYGENERATOR_CLASS_OPT_KEY, PARTITIONPATH_FIELD_OPT_KEY, PAYLOAD_CLASS_OPT_KEY, PRECOMBINE_FIELD_OPT_KEY, RECORDKEY_FIELD_OPT_KEY} import org.apache.hudi.common.fs.FSUtils import org.apache.hudi.common.model.DefaultHoodieRecordPayload @@ -31,6 +32,8 @@ import org.apache.spark.sql._ import org.apache.spark.sql.functions._ import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue} import org.junit.jupiter.api.{AfterEach, BeforeEach, Test} +import org.junit.jupiter.params.ParameterizedTest +import org.junit.jupiter.params.provider.ValueSource import scala.collection.JavaConversions._ @@ -562,4 +565,53 @@ class TestMORDataSource extends HoodieClientTestBase { df.show(1) df.select("_hoodie_commit_seqno", "fare.amount", "fare.currency", "tip_history").show(1) } + + @ParameterizedTest + @ValueSource(booleans = Array(true, false)) + def testQueryMORWithBasePathAndFileIndex(partitionEncode: Boolean): Unit = { + val N = 20 + // Test query with partition prune if URL_ENCODE_PARTITIONING_OPT_KEY has enable + val records1 = dataGen.generateInsertsContainsAllPartitions("000", N) + val inputDF1 = spark.read.json(spark.sparkContext.parallelize(recordsToStrings(records1), 2)) + inputDF1.write.format("hudi") + .options(commonOpts) + .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL) + .option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL) + .option(DataSourceWriteOptions.URL_ENCODE_PARTITIONING_OPT_KEY, partitionEncode) + .mode(SaveMode.Overwrite) + .save(basePath) + val commitInstantTime1 = HoodieDataSourceHelpers.latestCommit(fs, basePath) + + val countIn20160315 = records1.asScala.count(record => record.getPartitionPath == "2016/03/15") + // query the partition by filter + val count1 = spark.read.format("hudi") + .load(basePath) + .filter("partition = '2016/03/15'") + .count() + assertEquals(countIn20160315, count1) + + // query the partition by path + val partitionPath = if (partitionEncode) "2016%2F03%2F15" else "2016/03/15" + val count2 = spark.read.format("hudi") + .load(basePath + s"/$partitionPath") + .count() + assertEquals(countIn20160315, count2) + + // Second write with Append mode + val records2 = dataGen.generateInsertsContainsAllPartitions("000", N + 1) + val inputDF2 = spark.read.json(spark.sparkContext.parallelize(recordsToStrings(records2), 2)) + inputDF2.write.format("hudi") + .options(commonOpts) + .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL) + .option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL) + .option(DataSourceWriteOptions.URL_ENCODE_PARTITIONING_OPT_KEY, partitionEncode) + .mode(SaveMode.Append) + .save(basePath) + // Incremental query without "*" in path + val hoodieIncViewDF1 = spark.read.format("org.apache.hudi") + .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL) + .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, commitInstantTime1) + .load(basePath) + assertEquals(N + 1, hoodieIncViewDF1.count()) + } } diff --git a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/execution/datasources/Spark2ParsePartitionUtil.scala b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/execution/datasources/Spark2ParsePartitionUtil.scala new file mode 100644 index 0000000..5bf0284 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/execution/datasources/Spark2ParsePartitionUtil.scala @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources +import java.util.TimeZone + +import org.apache.hadoop.fs.Path +import org.apache.spark.sql.execution.datasources.PartitioningUtils.PartitionValues +import org.apache.spark.sql.types.DataType + +class Spark2ParsePartitionUtil extends SparkParsePartitionUtil { + override def parsePartition(path: Path, typeInference: Boolean, + basePaths: Set[Path], + userSpecifiedDataTypes: Map[String, DataType], + timeZone: TimeZone): Option[PartitionValues] = { + PartitioningUtils.parsePartition(path, typeInference, + basePaths, userSpecifiedDataTypes, timeZone)._1 + } +} diff --git a/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/execution/datasources/Spark3ParsePartitionUtil.scala b/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/execution/datasources/Spark3ParsePartitionUtil.scala new file mode 100644 index 0000000..ea9cc78 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/execution/datasources/Spark3ParsePartitionUtil.scala @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources +import java.util.TimeZone + +import org.apache.hadoop.fs.Path +import org.apache.spark.sql.catalyst.util.{DateFormatter, TimestampFormatter} +import org.apache.spark.sql.execution.datasources.PartitioningUtils.{PartitionValues, timestampPartitionPattern} +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types.DataType + +class Spark3ParsePartitionUtil(conf: SQLConf) extends SparkParsePartitionUtil { + + override def parsePartition(path: Path, typeInference: Boolean, + basePaths: Set[Path], userSpecifiedDataTypes: Map[String, DataType], + timeZone: TimeZone): Option[PartitionValues] = { + val dateFormatter = DateFormatter(timeZone.toZoneId) + val timestampFormatter = TimestampFormatter(timestampPartitionPattern, + timeZone.toZoneId, isParsing = true) + + PartitioningUtils.parsePartition(path, typeInference, basePaths, userSpecifiedDataTypes, + conf.validatePartitionColumns, timeZone.toZoneId, dateFormatter, timestampFormatter)._1 + } +} diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java index 336639c..01a374d 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java @@ -20,6 +20,7 @@ package org.apache.hudi.utilities.deltastreamer; import org.apache.hudi.DataSourceUtils; import org.apache.hudi.HoodieSparkUtils; +import org.apache.hudi.HoodieWriterUtils; import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.client.SparkRDDWriteClient; import org.apache.hudi.client.WriteStatus; @@ -235,12 +236,15 @@ public class DeltaSync implements Serializable { } } else { this.commitTimelineOpt = Option.empty(); + String partitionColumns = HoodieWriterUtils.getPartitionColumns(keyGenerator); + HoodieTableMetaClient.withPropertyBuilder() .setTableType(cfg.tableType) .setTableName(cfg.targetTableName) .setArchiveLogFolder("archived") .setPayloadClassName(cfg.payloadClassName) .setBaseFileFormat(cfg.baseFileFormat) + .setPartitionColumns(partitionColumns) .initTable(new Configuration(jssc.hadoopConfiguration()), cfg.targetBasePath); } @@ -326,12 +330,14 @@ public class DeltaSync implements Serializable { } } } else { + String partitionColumns = HoodieWriterUtils.getPartitionColumns(keyGenerator); HoodieTableMetaClient.withPropertyBuilder() .setTableType(cfg.tableType) .setTableName(cfg.targetTableName) .setArchiveLogFolder("archived") .setPayloadClassName(cfg.payloadClassName) .setBaseFileFormat(cfg.baseFileFormat) + .setPartitionColumns(partitionColumns) .initTable(new Configuration(jssc.hadoopConfiguration()), cfg.targetBasePath); }