This is an automated email from the ASF dual-hosted git repository. akudinkin pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push: new 2770ff50714 [HUDI-915][HUDI-5656] Rebased `HoodieBootstrapRelation` onto `HoodieBaseRelation` (#7804) 2770ff50714 is described below commit 2770ff507141f013f7500354595137b52a543e8b Author: Alexey Kudinkin <alexey.kudin...@gmail.com> AuthorDate: Fri Feb 24 08:43:49 2023 -0800 [HUDI-915][HUDI-5656] Rebased `HoodieBootstrapRelation` onto `HoodieBaseRelation` (#7804) Currently `HoodieBootstrapRelation` is treats partitioned tables improperly resulting in NPE while trying to read bootstrapped table. To address that `HoodieBootstrapRelation` have been rebased onto `HoodieBaseRelation` sharing core of the reading semantic with other Hudi's file-based Relation implementations for COW, MOR (such as schema handling, file-listing, etc) --- .../scala/org/apache/hudi/HoodieBaseRelation.scala | 47 ++-- .../scala/org/apache/hudi/HoodieBootstrapRDD.scala | 103 ++++---- .../org/apache/hudi/HoodieBootstrapRelation.scala | 259 +++++++++++---------- .../apache/hudi/MergeOnReadSnapshotRelation.scala | 2 +- .../spark/sql/hudi/HoodieSqlCommonUtils.scala | 30 ++- .../functional/TestDataSourceForBootstrap.scala | 166 +++++++------ .../deltastreamer/TestHoodieDeltaStreamer.java | 4 + 7 files changed, 344 insertions(+), 267 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala index 99b5b5c87ba..cb02c59a690 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala @@ -48,6 +48,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.rdd.RDD import org.apache.spark.sql.HoodieCatalystExpressionUtils.{convertToCatalystExpression, generateUnsafeProjection} import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.analysis.Resolver import org.apache.spark.sql.catalyst.expressions.{Expression, SubqueryExpression} import org.apache.spark.sql.execution.FileRelation import org.apache.spark.sql.execution.datasources._ @@ -66,7 +67,12 @@ import scala.util.{Failure, Success, Try} trait HoodieFileSplit {} -case class HoodieTableSchema(structTypeSchema: StructType, avroSchemaStr: String, internalSchema: Option[InternalSchema] = None) +case class HoodieTableSchema(structTypeSchema: StructType, avroSchemaStr: String, internalSchema: Option[InternalSchema] = None) { + + def this(structTypeSchema: StructType) = + this(structTypeSchema, convertToAvroSchema(structTypeSchema).toString) + +} case class HoodieTableState(tablePath: String, latestCommitTimestamp: Option[String], @@ -98,6 +104,8 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext, protected val sparkSession: SparkSession = sqlContext.sparkSession + protected lazy val resolver: Resolver = sparkSession.sessionState.analyzer.resolver + protected lazy val conf: Configuration = new Configuration(sqlContext.sparkContext.hadoopConfiguration) protected lazy val jobConf = new JobConf(conf) @@ -174,8 +182,6 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext, protected lazy val tableStructSchema: StructType = { val converted = AvroConversionUtils.convertAvroSchemaToStructType(tableAvroSchema) - - val resolver = sparkSession.sessionState.analyzer.resolver val metaFieldMetadata = sparkAdapter.createCatalystMetadataForMetaField // NOTE: Here we annotate meta-fields with corresponding metadata such that Spark (>= 3.2) @@ -466,10 +472,15 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext, * For enable hoodie.datasource.write.drop.partition.columns, need to create an InternalRow on partition values * and pass this reader on parquet file. So that, we can query the partition columns. */ - protected def getPartitionColumnsAsInternalRow(file: FileStatus): InternalRow = { + + protected def getPartitionColumnsAsInternalRow(file: FileStatus): InternalRow = + getPartitionColumnsAsInternalRowInternal(file, shouldExtractPartitionValuesFromPartitionPath) + + protected def getPartitionColumnsAsInternalRowInternal(file: FileStatus, + extractPartitionValuesFromPartitionPath: Boolean): InternalRow = { try { val tableConfig = metaClient.getTableConfig - if (shouldExtractPartitionValuesFromPartitionPath) { + if (extractPartitionValuesFromPartitionPath) { val relativePath = new URI(metaClient.getBasePath).relativize(new URI(file.getPath.getParent.toString)).toString val hiveStylePartitioningEnabled = tableConfig.getHiveStylePartitioningEnable.toBoolean if (hiveStylePartitioningEnabled) { @@ -514,7 +525,8 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext, requiredDataSchema: HoodieTableSchema, filters: Seq[Filter], options: Map[String, String], - hadoopConf: Configuration): BaseFileReader = { + hadoopConf: Configuration, + shouldAppendPartitionValuesOverride: Option[Boolean] = None): BaseFileReader = { val tableBaseFileFormat = tableConfig.getBaseFileFormat // NOTE: PLEASE READ CAREFULLY @@ -535,7 +547,7 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext, hadoopConf = hadoopConf, // We're delegating to Spark to append partition values to every row only in cases // when these corresponding partition-values are not persisted w/in the data file itself - appendPartitionValues = shouldExtractPartitionValuesFromPartitionPath + appendPartitionValues = shouldAppendPartitionValuesOverride.getOrElse(shouldExtractPartitionValuesFromPartitionPath) ) // Since partition values by default are omitted, and not persisted w/in data-files by Spark, // data-file readers (such as [[ParquetFileFormat]]) have to inject partition values while reading @@ -589,6 +601,12 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext, protected def tryPrunePartitionColumns(tableSchema: HoodieTableSchema, requiredSchema: HoodieTableSchema): (StructType, HoodieTableSchema, HoodieTableSchema) = { + tryPrunePartitionColumnsInternal(tableSchema, requiredSchema, shouldExtractPartitionValuesFromPartitionPath) + } + + protected def tryPrunePartitionColumnsInternal(tableSchema: HoodieTableSchema, + requiredSchema: HoodieTableSchema, + extractPartitionValuesFromPartitionPath: Boolean): (StructType, HoodieTableSchema, HoodieTableSchema) = { // Since schema requested by the caller might contain partition columns, we might need to // prune it, removing all partition columns from it in case these columns are not persisted // in the data files @@ -598,21 +616,24 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext, // the partition path, and omitted from the data file, back into fetched rows; // Note that, by default, partition columns are not omitted therefore specifying // partition schema for reader is not required - if (shouldExtractPartitionValuesFromPartitionPath) { - val partitionSchema = StructType(partitionColumns.map(StructField(_, StringType))) + if (extractPartitionValuesFromPartitionPath) { + val partitionSchema = filterInPartitionColumns(tableSchema.structTypeSchema) val prunedDataStructSchema = prunePartitionColumns(tableSchema.structTypeSchema) val prunedRequiredSchema = prunePartitionColumns(requiredSchema.structTypeSchema) (partitionSchema, - HoodieTableSchema(prunedDataStructSchema, convertToAvroSchema(prunedDataStructSchema).toString), - HoodieTableSchema(prunedRequiredSchema, convertToAvroSchema(prunedRequiredSchema).toString)) + new HoodieTableSchema(prunedDataStructSchema), + new HoodieTableSchema(prunedRequiredSchema)) } else { (StructType(Nil), tableSchema, requiredSchema) } } - private def prunePartitionColumns(dataStructSchema: StructType): StructType = - StructType(dataStructSchema.filterNot(f => partitionColumns.contains(f.name))) + private def filterInPartitionColumns(structType: StructType): StructType = + StructType(structType.filter(f => partitionColumns.exists(col => resolver(f.name, col)))) + + private def prunePartitionColumns(structType: StructType): StructType = + StructType(structType.filterNot(f => partitionColumns.exists(pc => resolver(f.name, pc)))) private def getConfigValue(config: ConfigProperty[String], defaultValueOption: Option[String]=Option.empty): String = { diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBootstrapRDD.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBootstrapRDD.scala index ea997c86acb..b72c41bbd66 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBootstrapRDD.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBootstrapRDD.scala @@ -18,23 +18,22 @@ package org.apache.hudi -import org.apache.spark.{Partition, TaskContext} +import org.apache.hudi.HoodieBaseRelation.BaseFileReader +import org.apache.hudi.common.util.ValidationUtils.checkState import org.apache.spark.rdd.RDD +import org.apache.spark.sql.HoodieCatalystExpressionUtils.generateUnsafeProjection import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.execution.datasources.PartitionedFile +import org.apache.spark.sql.catalyst.expressions.JoinedRow import org.apache.spark.sql.types.StructType - -import org.apache.hudi.HoodieDataSourceHelper._ +import org.apache.spark.{Partition, TaskContext} class HoodieBootstrapRDD(@transient spark: SparkSession, - dataReadFunction: PartitionedFile => Iterator[InternalRow], - skeletonReadFunction: PartitionedFile => Iterator[InternalRow], - regularReadFunction: PartitionedFile => Iterator[InternalRow], - dataSchema: StructType, - skeletonSchema: StructType, - requiredColumns: Array[String], - tableState: HoodieBootstrapTableState) + bootstrapDataFileReader: BaseFileReader, + bootstrapSkeletonFileReader: BaseFileReader, + regularFileReader: BaseFileReader, + requiredSchema: HoodieTableSchema, + @transient splits: Seq[HoodieBootstrapSplit]) extends RDD[InternalRow](spark.sparkContext, Nil) { override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = { @@ -51,59 +50,57 @@ class HoodieBootstrapRDD(@transient spark: SparkSession, } } - var partitionedFileIterator: Iterator[InternalRow] = null + bootstrapPartition.split.skeletonFile match { + case Some(skeletonFile) => + // It is a bootstrap split. Check both skeleton and data files. + val (iterator, schema) = if (bootstrapDataFileReader.schema.isEmpty) { + // No data column to fetch, hence fetch only from skeleton file + (bootstrapSkeletonFileReader.read(skeletonFile), bootstrapSkeletonFileReader.schema) + } else if (bootstrapSkeletonFileReader.schema.isEmpty) { + // No metadata column to fetch, hence fetch only from data file + (bootstrapDataFileReader.read(bootstrapPartition.split.dataFile), bootstrapDataFileReader.schema) + } else { + // Fetch from both data and skeleton file, and merge + val dataFileIterator = bootstrapDataFileReader.read(bootstrapPartition.split.dataFile) + val skeletonFileIterator = bootstrapSkeletonFileReader.read(skeletonFile) + val mergedSchema = StructType(bootstrapSkeletonFileReader.schema.fields ++ bootstrapDataFileReader.schema.fields) - if (bootstrapPartition.split.skeletonFile.isDefined) { - // It is a bootstrap split. Check both skeleton and data files. - if (dataSchema.isEmpty) { - // No data column to fetch, hence fetch only from skeleton file - partitionedFileIterator = skeletonReadFunction(bootstrapPartition.split.skeletonFile.get) - } else if (skeletonSchema.isEmpty) { - // No metadata column to fetch, hence fetch only from data file - partitionedFileIterator = dataReadFunction(bootstrapPartition.split.dataFile) - } else { - // Fetch from both data and skeleton file, and merge - val dataFileIterator = dataReadFunction(bootstrapPartition.split.dataFile) - val skeletonFileIterator = skeletonReadFunction(bootstrapPartition.split.skeletonFile.get) - partitionedFileIterator = merge(skeletonFileIterator, dataFileIterator) - } - } else { - partitionedFileIterator = regularReadFunction(bootstrapPartition.split.dataFile) + (merge(skeletonFileIterator, dataFileIterator), mergedSchema) + } + + // NOTE: Here we have to project the [[InternalRow]]s fetched into the expected target schema. + // These could diverge for ex, when requested schema contains partition columns which might not be + // persisted w/in the data file, but instead would be parsed from the partition path. In that case + // output of the file-reader will have different ordering of the fields than the original required + // schema (for more details please check out [[ParquetFileFormat]] implementation). + val unsafeProjection = generateUnsafeProjection(schema, requiredSchema.structTypeSchema) + + iterator.map(unsafeProjection) + + case _ => + // NOTE: Regular file-reader is already projected into the required schema + regularFileReader.read(bootstrapPartition.split.dataFile) } - partitionedFileIterator } - def merge(skeletonFileIterator: Iterator[InternalRow], dataFileIterator: Iterator[InternalRow]) - : Iterator[InternalRow] = { + def merge(skeletonFileIterator: Iterator[InternalRow], dataFileIterator: Iterator[InternalRow]): Iterator[InternalRow] = { new Iterator[InternalRow] { - override def hasNext: Boolean = dataFileIterator.hasNext && skeletonFileIterator.hasNext - override def next(): InternalRow = { - mergeInternalRow(skeletonFileIterator.next(), dataFileIterator.next()) - } - } - } + private val combinedRow = new JoinedRow() - def mergeInternalRow(skeletonRow: InternalRow, dataRow: InternalRow): InternalRow = { - val skeletonArr = skeletonRow.copy().toSeq(skeletonSchema) - val dataArr = dataRow.copy().toSeq(dataSchema) - // We need to return it in the order requested - val mergedArr = requiredColumns.map(col => { - if (skeletonSchema.fieldNames.contains(col)) { - val idx = skeletonSchema.fieldIndex(col) - skeletonArr(idx) - } else { - val idx = dataSchema.fieldIndex(col) - dataArr(idx) + override def hasNext: Boolean = { + checkState(dataFileIterator.hasNext == skeletonFileIterator.hasNext, + "Bootstrap data-file iterator and skeleton-file iterator have to be in-sync!") + dataFileIterator.hasNext && skeletonFileIterator.hasNext } - }) - logDebug("Merged data and skeleton values => " + mergedArr.mkString(",")) - val mergedRow = InternalRow.fromSeq(mergedArr) - mergedRow + override def next(): InternalRow = { + combinedRow(skeletonFileIterator.next(), dataFileIterator.next()) + } + } } override protected def getPartitions: Array[Partition] = { - tableState.files.zipWithIndex.map(file => { + splits.zipWithIndex.map(file => { if (file._1.skeletonFile.isDefined) { logDebug("Forming partition with => Index: " + file._2 + ", Files: " + file._1.dataFile.filePath + "," + file._1.skeletonFile.get.filePath) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBootstrapRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBootstrapRelation.scala index 0dd54237ef5..5c58c10493d 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBootstrapRelation.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBootstrapRelation.scala @@ -19,20 +19,20 @@ package org.apache.hudi import org.apache.hadoop.fs.Path -import org.apache.hudi.common.model.HoodieBaseFile -import org.apache.hudi.common.table.view.HoodieTableFileSystemView -import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver} -import org.apache.hudi.exception.HoodieException -import org.apache.spark.execution.datasources.HoodieInMemoryFileIndex -import org.apache.spark.internal.Logging +import org.apache.hudi.HoodieBaseRelation.{BaseFileReader, projectReader} +import org.apache.hudi.HoodieBootstrapRelation.validate +import org.apache.hudi.common.table.HoodieTableMetaClient +import org.apache.hudi.common.util.ValidationUtils.checkState import org.apache.spark.rdd.RDD +import org.apache.spark.sql.SQLContext import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.execution.datasources.{FileStatusCache, PartitionedFile} -import org.apache.spark.sql.sources.{BaseRelation, Filter, PrunedFilteredScan} +import org.apache.spark.sql.catalyst.expressions.Expression +import org.apache.spark.sql.execution.datasources.PartitionedFile +import org.apache.spark.sql.hudi.HoodieSqlCommonUtils.{isMetaField, removeMetaFields} +import org.apache.spark.sql.sources.Filter import org.apache.spark.sql.types.StructType -import org.apache.spark.sql.{Row, SQLContext} -import scala.collection.JavaConverters._ +case class HoodieBootstrapSplit(dataFile: PartitionedFile, skeletonFile: Option[PartitionedFile] = None) extends HoodieFileSplit /** * This is Spark relation that can be used for querying metadata/fully bootstrapped query hoodie tables, as well as @@ -44,150 +44,161 @@ import scala.collection.JavaConverters._ * bootstrapped files, because then the metadata file and data file can return different number of rows causing errors * merging. * - * @param _sqlContext Spark SQL Context + * @param sqlContext Spark SQL Context * @param userSchema User specified schema in the datasource query * @param globPaths The global paths to query. If it not none, read from the globPaths, * else read data from tablePath using HoodiFileIndex. * @param metaClient Hoodie table meta client * @param optParams DataSource options passed by the user */ -class HoodieBootstrapRelation(@transient val _sqlContext: SQLContext, - val userSchema: Option[StructType], - val globPaths: Seq[Path], - val metaClient: HoodieTableMetaClient, - val optParams: Map[String, String]) extends BaseRelation - with PrunedFilteredScan with Logging { +case class HoodieBootstrapRelation(override val sqlContext: SQLContext, + private val userSchema: Option[StructType], + private val globPaths: Seq[Path], + override val metaClient: HoodieTableMetaClient, + override val optParams: Map[String, String], + private val prunedDataSchema: Option[StructType] = None) + extends HoodieBaseRelation(sqlContext, metaClient, optParams, userSchema, prunedDataSchema) { - val skeletonSchema: StructType = HoodieSparkUtils.getMetaSchema - var dataSchema: StructType = _ - var fullSchema: StructType = _ + override type FileSplit = HoodieBootstrapSplit + override type Relation = HoodieBootstrapRelation - val fileIndex: HoodieBootstrapFileIndex = buildFileIndex() + private lazy val skeletonSchema = HoodieSparkUtils.getMetaSchema - override def sqlContext: SQLContext = _sqlContext + override val mandatoryFields: Seq[String] = Seq.empty - override val needConversion: Boolean = false + protected override def collectFileSplits(partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): Seq[FileSplit] = { + val fileSlices = listLatestFileSlices(globPaths, partitionFilters, dataFilters) + fileSlices.map { fileSlice => + val baseFile = fileSlice.getBaseFile.get() - override def schema: StructType = inferFullSchema() + if (baseFile.getBootstrapBaseFile.isPresent) { + val partitionValues = + getPartitionColumnsAsInternalRowInternal(baseFile.getFileStatus, extractPartitionValuesFromPartitionPath = true) + val dataFile = PartitionedFile(partitionValues, baseFile.getBootstrapBaseFile.get().getPath, 0, baseFile.getBootstrapBaseFile.get().getFileLen) + val skeletonFile = Option(PartitionedFile(InternalRow.empty, baseFile.getPath, 0, baseFile.getFileLen)) - override def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] = { - logInfo("Starting scan..") - - // Compute splits - val bootstrapSplits = fileIndex.files.map(hoodieBaseFile => { - var skeletonFile: Option[PartitionedFile] = Option.empty - var dataFile: PartitionedFile = null - - if (hoodieBaseFile.getBootstrapBaseFile.isPresent) { - skeletonFile = Option(PartitionedFile(InternalRow.empty, hoodieBaseFile.getPath, 0, hoodieBaseFile.getFileLen)) - dataFile = PartitionedFile(InternalRow.empty, hoodieBaseFile.getBootstrapBaseFile.get().getPath, 0, - hoodieBaseFile.getBootstrapBaseFile.get().getFileLen) - } else { - dataFile = PartitionedFile(InternalRow.empty, hoodieBaseFile.getPath, 0, hoodieBaseFile.getFileLen) - } - HoodieBootstrapSplit(dataFile, skeletonFile) - }) - val tableState = HoodieBootstrapTableState(bootstrapSplits) - - // Get required schemas for column pruning - var requiredDataSchema = StructType(Seq()) - var requiredSkeletonSchema = StructType(Seq()) - // requiredColsSchema is the schema of requiredColumns, note that requiredColumns is in a random order - // so requiredColsSchema is not always equal to (requiredSkeletonSchema.fields ++ requiredDataSchema.fields) - var requiredColsSchema = StructType(Seq()) - requiredColumns.foreach(col => { - var field = dataSchema.find(_.name == col) - if (field.isDefined) { - requiredDataSchema = requiredDataSchema.add(field.get) + HoodieBootstrapSplit(dataFile, skeletonFile) } else { - field = skeletonSchema.find(_.name == col) - requiredSkeletonSchema = requiredSkeletonSchema.add(field.get) + val dataFile = PartitionedFile(getPartitionColumnsAsInternalRow(baseFile.getFileStatus), baseFile.getPath, 0, baseFile.getFileLen) + HoodieBootstrapSplit(dataFile) } - requiredColsSchema = requiredColsSchema.add(field.get) - }) + } + } - // Prepare readers for reading data file and skeleton files - val dataReadFunction = HoodieDataSourceHelper.buildHoodieParquetReader( - sparkSession = _sqlContext.sparkSession, - dataSchema = dataSchema, - partitionSchema = StructType(Seq.empty), - requiredSchema = requiredDataSchema, - filters = if (requiredSkeletonSchema.isEmpty) filters else Seq() , + protected override def composeRDD(fileSplits: Seq[FileSplit], + tableSchema: HoodieTableSchema, + requiredSchema: HoodieTableSchema, + requestedColumns: Array[String], + filters: Array[Filter]): RDD[InternalRow] = { + val requiredSkeletonFileSchema = + StructType(skeletonSchema.filter(f => requestedColumns.exists(col => resolver(f.name, col)))) + + val (bootstrapDataFileReader, bootstrapSkeletonFileReader) = + createBootstrapFileReaders(tableSchema, requiredSchema, requiredSkeletonFileSchema, filters) + + val regularFileReader = createRegularFileReader(tableSchema, requiredSchema, filters) + + new HoodieBootstrapRDD(sqlContext.sparkSession, bootstrapDataFileReader, bootstrapSkeletonFileReader, regularFileReader, + requiredSchema, fileSplits) + } + + private def createBootstrapFileReaders(tableSchema: HoodieTableSchema, + requiredSchema: HoodieTableSchema, + requiredSkeletonFileSchema: StructType, + filters: Array[Filter]): (BaseFileReader, BaseFileReader) = { + // NOTE: "Data" schema in here refers to the whole table's schema that doesn't include only partition + // columns, as opposed to data file schema not including any meta-fields columns in case of + // Bootstrap relation + val (partitionSchema, dataSchema, requiredDataSchema) = + tryPrunePartitionColumnsInternal(tableSchema, requiredSchema, extractPartitionValuesFromPartitionPath = true) + + val bootstrapDataFileSchema = StructType(dataSchema.structTypeSchema.filterNot(sf => isMetaField(sf.name))) + val requiredBootstrapDataFileSchema = StructType(requiredDataSchema.structTypeSchema.filterNot(sf => isMetaField(sf.name))) + + validate(requiredDataSchema, requiredBootstrapDataFileSchema, requiredSkeletonFileSchema) + + val bootstrapDataFileReader = createBaseFileReader( + spark = sqlContext.sparkSession, + dataSchema = new HoodieTableSchema(bootstrapDataFileSchema), + partitionSchema = partitionSchema, + requiredDataSchema = new HoodieTableSchema(requiredBootstrapDataFileSchema), + // NOTE: For bootstrapped files we can't apply any filtering in case we'd need to merge it with + // a skeleton-file as we rely on matching ordering of the records across bootstrap- and skeleton-files + filters = if (requiredSkeletonFileSchema.isEmpty) filters else Seq(), options = optParams, - hadoopConf = _sqlContext.sparkSession.sessionState.newHadoopConf() + hadoopConf = sqlContext.sparkSession.sessionState.newHadoopConf(), + // NOTE: Bootstrap relation have to always extract partition values from the partition-path as this is a + // default Spark behavior: Spark by default strips partition-columns from the data schema and does + // NOT persist them in the data files, instead parsing them from partition-paths (on the fly) whenever + // table is queried + shouldAppendPartitionValuesOverride = Some(true) ) - val skeletonReadFunction = HoodieDataSourceHelper.buildHoodieParquetReader( - sparkSession = _sqlContext.sparkSession, - dataSchema = skeletonSchema, + val boostrapSkeletonFileReader = createBaseFileReader( + spark = sqlContext.sparkSession, + dataSchema = new HoodieTableSchema(skeletonSchema), + // NOTE: Here we specify partition-schema as empty since we don't need Spark to inject partition-values + // parsed from the partition-path partitionSchema = StructType(Seq.empty), - requiredSchema = requiredSkeletonSchema, - filters = if (requiredDataSchema.isEmpty) filters else Seq(), + requiredDataSchema = new HoodieTableSchema(requiredSkeletonFileSchema), + // NOTE: For bootstrapped files we can't apply any filtering in case we'd need to merge it with + // a skeleton-file as we rely on matching ordering of the records across bootstrap- and skeleton-files + filters = if (requiredBootstrapDataFileSchema.isEmpty) filters else Seq(), options = optParams, - hadoopConf = _sqlContext.sparkSession.sessionState.newHadoopConf() + hadoopConf = sqlContext.sparkSession.sessionState.newHadoopConf(), + // NOTE: We override Spark to avoid injecting partition values into the records read from + // skeleton-file + shouldAppendPartitionValuesOverride = Some(false) ) - val regularReadFunction = HoodieDataSourceHelper.buildHoodieParquetReader( - sparkSession = _sqlContext.sparkSession, - dataSchema = fullSchema, - partitionSchema = StructType(Seq.empty), - requiredSchema = requiredColsSchema, + (bootstrapDataFileReader, boostrapSkeletonFileReader) + } + + private def createRegularFileReader(tableSchema: HoodieTableSchema, + requiredSchema: HoodieTableSchema, + filters: Array[Filter]): BaseFileReader = { + // NOTE: "Data" schema in here refers to the whole table's schema that doesn't include only partition + // columns, as opposed to data file schema not including any meta-fields columns in case of + // Bootstrap relation + val (partitionSchema, dataSchema, requiredDataSchema) = + tryPrunePartitionColumns(tableSchema, requiredSchema) + + // NOTE: Bootstrapped table allows Hudi created file-slices to be co-located w/ the "bootstrapped" + // ones (ie persisted by Spark). Therefore to be able to read the data from Bootstrapped + // table we also need to create regular file-reader to read file-slices created by Hudi + val regularFileReader = createBaseFileReader( + spark = sqlContext.sparkSession, + dataSchema = dataSchema, + partitionSchema = partitionSchema, + requiredDataSchema = requiredDataSchema, filters = filters, options = optParams, - hadoopConf = _sqlContext.sparkSession.sessionState.newHadoopConf() + hadoopConf = sqlContext.sparkSession.sessionState.newHadoopConf() ) - val rdd = new HoodieBootstrapRDD(_sqlContext.sparkSession, dataReadFunction, skeletonReadFunction, - regularReadFunction, requiredDataSchema, requiredSkeletonSchema, requiredColumns, tableState) - rdd.asInstanceOf[RDD[Row]] + // NOTE: In some case schema of the reader's output (reader's schema) might not match the schema expected by the caller. + // This could occur for ex, when requested schema contains partition columns which might not be persisted w/in the + // data file, but instead would be parsed from the partition path. In that case output of the file-reader will have + // different ordering of the fields than the original required schema (for more details please check out + // [[ParquetFileFormat]] impl). In that case we have to project the rows from the file-reader's schema + // back into the one expected by the caller + projectReader(regularFileReader, requiredSchema.structTypeSchema) } - def inferFullSchema(): StructType = { - if (fullSchema == null) { - logInfo("Inferring schema..") - val schemaResolver = new TableSchemaResolver(metaClient) - val tableSchema = schemaResolver.getTableAvroSchema(false) - dataSchema = AvroConversionUtils.convertAvroSchemaToStructType(tableSchema) - fullSchema = StructType(skeletonSchema.fields ++ dataSchema.fields) - } - fullSchema - } - - def buildFileIndex(): HoodieBootstrapFileIndex = { - logInfo("Building file index..") - val fileStatuses = if (globPaths.nonEmpty) { - // Load files from the global paths if it has defined to be compatible with the original mode - val inMemoryFileIndex = HoodieInMemoryFileIndex.create(_sqlContext.sparkSession, globPaths) - inMemoryFileIndex.allFiles() - } else { // Load files by the HoodieFileIndex. - HoodieFileIndex(sqlContext.sparkSession, metaClient, Some(schema), optParams, - FileStatusCache.getOrCreate(sqlContext.sparkSession)).allFiles - } - if (fileStatuses.isEmpty) { - throw new HoodieException("No files found for reading in user provided path.") - } + override def updatePrunedDataSchema(prunedSchema: StructType): HoodieBootstrapRelation = + this.copy(prunedDataSchema = Some(prunedSchema)) +} - val fsView = new HoodieTableFileSystemView(metaClient, metaClient.getActiveTimeline.getCommitsTimeline - .filterCompletedInstants, fileStatuses.toArray) - val latestFiles: List[HoodieBaseFile] = fsView.getLatestBaseFiles.iterator().asScala.toList - - if (log.isDebugEnabled) { - latestFiles.foreach(file => { - logDebug("Printing indexed files:") - if (file.getBootstrapBaseFile.isPresent) { - logDebug("Skeleton File: " + file.getPath + ", Data File: " + file.getBootstrapBaseFile.get().getPath) - } else { - logDebug("Regular Hoodie File: " + file.getPath) - } - }) - } - HoodieBootstrapFileIndex(latestFiles) - } -} +object HoodieBootstrapRelation { -case class HoodieBootstrapFileIndex(files: List[HoodieBaseFile]) + private def validate(requiredDataSchema: HoodieTableSchema, requiredDataFileSchema: StructType, requiredSkeletonFileSchema: StructType): Unit = { + val requiredDataColumns: Seq[String] = requiredDataSchema.structTypeSchema.fieldNames.toSeq + val combinedColumns = (requiredSkeletonFileSchema.fieldNames ++ requiredDataFileSchema.fieldNames).toSeq -case class HoodieBootstrapTableState(files: List[HoodieBootstrapSplit]) + // NOTE: Here we validate that all required data columns are covered by the combination of the columns + // from both skeleton file and the corresponding data file + checkState(combinedColumns.sorted == requiredDataColumns.sorted) + } -case class HoodieBootstrapSplit(dataFile: PartitionedFile, skeletonFile: Option[PartitionedFile]) +} diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala index accfc8f2470..94168755cbf 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala @@ -190,7 +190,7 @@ abstract class BaseMergeOnReadSnapshotRelation(sqlContext: SQLContext, StructType(requiredDataSchema.structTypeSchema.fields .filterNot(f => unusedMandatoryColumnNames.contains(f.name))) - HoodieTableSchema(prunedStructSchema, convertToAvroSchema(prunedStructSchema).toString) + new HoodieTableSchema(prunedStructSchema) } val requiredSchemaReaderSkipMerging = createBaseFileReader( diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlCommonUtils.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlCommonUtils.scala index 8e589abbc18..54c58bace7c 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlCommonUtils.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlCommonUtils.scala @@ -23,9 +23,11 @@ import org.apache.hudi.client.common.HoodieSparkEngineContext import org.apache.hudi.common.config.{DFSPropertiesConfiguration, HoodieMetadataConfig} import org.apache.hudi.common.fs.FSUtils import org.apache.hudi.common.model.HoodieRecord -import org.apache.hudi.common.table.timeline.{HoodieActiveTimeline, HoodieInstantTimeGenerator} +import org.apache.hudi.common.table.timeline.HoodieActiveTimeline.parseDateFromInstantTime +import org.apache.hudi.common.table.timeline.{HoodieActiveTimeline, HoodieInstantTimeGenerator, HoodieTimeline} import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver} import org.apache.hudi.common.util.PartitionPathEncodeUtils +import org.apache.hudi.exception.HoodieException import org.apache.hudi.{AvroConversionUtils, DataSourceOptionsHelper, DataSourceReadOptions, SparkAdapterSupport} import org.apache.spark.api.java.JavaSparkContext import org.apache.spark.sql.catalyst.TableIdentifier @@ -42,6 +44,7 @@ import java.text.SimpleDateFormat import java.util.{Locale, Properties} import scala.collection.JavaConverters._ import scala.collection.immutable.Map +import scala.util.Try object HoodieSqlCommonUtils extends SparkAdapterSupport { // NOTE: {@code SimpleDataFormat} is NOT thread-safe @@ -251,11 +254,13 @@ object HoodieSqlCommonUtils extends SparkAdapterSupport { */ def formatQueryInstant(queryInstant: String): String = { val instantLength = queryInstant.length - if (instantLength == 19 || instantLength == 23) { // for yyyy-MM-dd HH:mm:ss[.SSS] + if (instantLength == 19 || instantLength == 23) { + // Handle "yyyy-MM-dd HH:mm:ss[.SSS]" format HoodieInstantTimeGenerator.getInstantForDateString(queryInstant) } else if (instantLength == HoodieInstantTimeGenerator.SECS_INSTANT_ID_LENGTH - || instantLength == HoodieInstantTimeGenerator.MILLIS_INSTANT_ID_LENGTH) { // for yyyyMMddHHmmss[SSS] - HoodieActiveTimeline.parseDateFromInstantTime(queryInstant) // validate the format + || instantLength == HoodieInstantTimeGenerator.MILLIS_INSTANT_ID_LENGTH) { + // Handle already serialized "yyyyMMddHHmmss[SSS]" format + validateInstant(queryInstant) queryInstant } else if (instantLength == 10) { // for yyyy-MM-dd HoodieActiveTimeline.formatDate(defaultDateFormat.get().parse(queryInstant)) @@ -356,4 +361,21 @@ object HoodieSqlCommonUtils extends SparkAdapterSupport { }.mkString(",") partitionsToDrop } + + private def validateInstant(queryInstant: String): Unit = { + // Provided instant has to either + // - Match one of the bootstrapping instants + // - Be parse-able (as a date) + val valid = queryInstant match { + case HoodieTimeline.INIT_INSTANT_TS | + HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS | + HoodieTimeline.FULL_BOOTSTRAP_INSTANT_TS => true + + case _ => Try(parseDateFromInstantTime(queryInstant)).isSuccess + } + + if (!valid) { + throw new HoodieException(s"Got an invalid instant ($queryInstant)") + } + } } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestDataSourceForBootstrap.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestDataSourceForBootstrap.scala index 82f79eeb44e..e3d235591d4 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestDataSourceForBootstrap.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestDataSourceForBootstrap.scala @@ -20,14 +20,16 @@ package org.apache.hudi.functional import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hudi.bootstrap.SparkParquetBootstrapDataProvider import org.apache.hudi.client.bootstrap.selector.FullRecordBootstrapModeSelector +import org.apache.hudi.common.config.HoodieStorageConfig import org.apache.hudi.common.fs.FSUtils import org.apache.hudi.common.model.HoodieRecord +import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType import org.apache.hudi.common.table.timeline.HoodieTimeline import org.apache.hudi.config.{HoodieBootstrapConfig, HoodieCompactionConfig, HoodieWriteConfig} import org.apache.hudi.functional.TestDataSourceForBootstrap.{dropMetaCols, sort} import org.apache.hudi.keygen.{NonpartitionedKeyGenerator, SimpleKeyGenerator} import org.apache.hudi.testutils.HoodieClientTestUtils -import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, HoodieDataSourceHelpers} +import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, HoodieDataSourceHelpers, HoodieSparkRecordMerger} import org.apache.spark.api.java.JavaSparkContext import org.apache.spark.sql.functions.{col, lit} import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession} @@ -35,7 +37,7 @@ import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.io.TempDir import org.junit.jupiter.api.{AfterEach, BeforeEach, Test} import org.junit.jupiter.params.ParameterizedTest -import org.junit.jupiter.params.provider.ValueSource +import org.junit.jupiter.params.provider.{CsvSource, EnumSource, ValueSource} import java.time.Instant import java.util.Collections @@ -56,6 +58,12 @@ class TestDataSourceForBootstrap { DataSourceWriteOptions.PRECOMBINE_FIELD.key -> "timestamp", HoodieWriteConfig.TBL_NAME.key -> "hoodie_test" ) + + val sparkRecordTypeOpts = Map( + HoodieWriteConfig.RECORD_MERGER_IMPLS.key -> classOf[HoodieSparkRecordMerger].getName, + HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key -> "parquet" + ) + var basePath: String = _ var srcPath: String = _ var fs: FileSystem = _ @@ -153,12 +161,18 @@ class TestDataSourceForBootstrap { assertEquals(numRecords, hoodieROViewDF1WithBasePath.count()) assertEquals(numRecordsUpdate, hoodieROViewDF1WithBasePath.filter(s"timestamp == $updateTimestamp").count()) - verifyIncrementalViewResult(commitInstantTime1, commitInstantTime2, isPartitioned = false, isHiveStylePartitioned = false) + verifyIncrementalViewResult(commitInstantTime1, commitInstantTime2, isPartitioned = false, isHiveStylePartitioned = true) } @ParameterizedTest - @ValueSource(strings = Array("METADATA_ONLY", "FULL_RECORD")) - def testMetadataBootstrapCOWHiveStylePartitioned(bootstrapMode: String): Unit = { + @CsvSource(value = Array( + "METADATA_ONLY,AVRO", + // TODO(HUDI-5807) enable for spark native records + /* "METADATA_ONLY,SPARK", */ + "FULL_RECORD,AVRO", + "FULL_RECORD,SPARK" + )) + def testMetadataBootstrapCOWHiveStylePartitioned(bootstrapMode: String, recordType: HoodieRecordType): Unit = { val timestamp = Instant.now.toEpochMilli val jsc = JavaSparkContext.fromSparkContext(spark.sparkContext) @@ -181,16 +195,15 @@ class TestDataSourceForBootstrap { // Perform bootstrap val commitInstantTime1 = runMetadataBootstrapAndVerifyCommit( DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL, - readOpts, + readOpts ++ getRecordTypeOpts(recordType), classOf[SimpleKeyGenerator].getName) // check marked directory clean up assert(!fs.exists(new Path(basePath, ".hoodie/.temp/00000000000001"))) - // TODO(HUDI-5602) troubleshoot val expectedDF = bootstrapMode match { case "METADATA_ONLY" => - sort(sourceDF).withColumn("datestr", lit(null)) + sort(sourceDF) case "FULL_RECORD" => sort(sourceDF) } @@ -208,9 +221,11 @@ class TestDataSourceForBootstrap { val updateDF = TestBootstrap.generateTestRawTripDataset(updateTimestamp, 0, numRecordsUpdate, partitionPaths.asJava, jsc, spark.sqlContext) + val writeOpts = commonOpts ++ getRecordTypeOpts(recordType) + updateDF.write .format("hudi") - .options(commonOpts) + .options(writeOpts) .option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL) .option(DataSourceWriteOptions.TABLE_TYPE.key, DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL) .option(DataSourceWriteOptions.PARTITIONPATH_FIELD.key, "datestr") @@ -234,28 +249,31 @@ class TestDataSourceForBootstrap { verifyIncrementalViewResult(commitInstantTime1, commitInstantTime2, isPartitioned = true, isHiveStylePartitioned = true) } - @Test def testMetadataBootstrapCOWPartitioned(): Unit = { + @ParameterizedTest + @EnumSource(value = classOf[HoodieRecordType], + // TODO(HUDI-5807) enable for spark native records + names = Array("AVRO" /*, "SPARK" */)) + def testMetadataBootstrapCOWPartitioned(recordType: HoodieRecordType): Unit = { val timestamp = Instant.now.toEpochMilli val jsc = JavaSparkContext.fromSparkContext(spark.sparkContext) val sourceDF = TestBootstrap.generateTestRawTripDataset(timestamp, 0, numRecords, partitionPaths.asJava, jsc, spark.sqlContext) - // Writing data for each partition instead of using partitionBy to avoid hive-style partitioning and hence - // have partitioned columns stored in the data file - partitionPaths.foreach(partitionPath => { - sourceDF - .filter(sourceDF("datestr").equalTo(lit(partitionPath))) - .write - .format("parquet") - .mode(SaveMode.Overwrite) - .save(srcPath + "/" + partitionPath) - }) + sourceDF.write.format("parquet") + .partitionBy("datestr") + .mode(SaveMode.Overwrite) + .save(srcPath) + + val writeOpts = commonOpts ++ getRecordTypeOpts(recordType) ++ Map( + DataSourceWriteOptions.HIVE_STYLE_PARTITIONING.key -> "true", + DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "datestr" + ) // Perform bootstrap val commitInstantTime1 = runMetadataBootstrapAndVerifyCommit( DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL, - commonOpts.updated(DataSourceWriteOptions.PARTITIONPATH_FIELD.key, "datestr"), + writeOpts, classOf[SimpleKeyGenerator].getName) // Read bootstrapped table and verify count using glob path @@ -270,10 +288,9 @@ class TestDataSourceForBootstrap { val updateDf1 = hoodieROViewDF1.filter(col("_row_key") === verificationRowKey).withColumn(verificationCol, lit(updatedVerificationVal)) updateDf1.write .format("hudi") - .options(commonOpts) + .options(writeOpts) .option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL) .option(DataSourceWriteOptions.TABLE_TYPE.key, DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL) - .option(DataSourceWriteOptions.PARTITIONPATH_FIELD.key, "datestr") .mode(SaveMode.Append) .save(basePath) @@ -290,10 +307,9 @@ class TestDataSourceForBootstrap { updateDF2.write .format("hudi") - .options(commonOpts) + .options(writeOpts) .option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL) .option(DataSourceWriteOptions.TABLE_TYPE.key, DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL) - .option(DataSourceWriteOptions.PARTITIONPATH_FIELD.key, "datestr") .mode(SaveMode.Append) .save(basePath) @@ -309,31 +325,34 @@ class TestDataSourceForBootstrap { assertEquals(numRecords, hoodieROViewDF4.count()) assertEquals(numRecordsUpdate, hoodieROViewDF4.filter(s"timestamp == $updateTimestamp").count()) - verifyIncrementalViewResult(commitInstantTime1, commitInstantTime3, isPartitioned = true, isHiveStylePartitioned = false) + verifyIncrementalViewResult(commitInstantTime1, commitInstantTime3, isPartitioned = true, isHiveStylePartitioned = true) } - @Test def testMetadataBootstrapMORPartitionedInlineCompactionOn(): Unit = { + @ParameterizedTest + @EnumSource(value = classOf[HoodieRecordType], + // TODO(HUDI-5807) enable for spark native records + names = Array("AVRO" /*, "SPARK" */)) + def testMetadataBootstrapMORPartitionedInlineCompactionOn(recordType: HoodieRecordType): Unit = { val timestamp = Instant.now.toEpochMilli val jsc = JavaSparkContext.fromSparkContext(spark.sparkContext) val sourceDF = TestBootstrap.generateTestRawTripDataset(timestamp, 0, numRecords, partitionPaths.asJava, jsc, spark.sqlContext) - // Writing data for each partition instead of using partitionBy to avoid hive-style partitioning and hence - // have partitioned columns stored in the data file - partitionPaths.foreach(partitionPath => { - sourceDF - .filter(sourceDF("datestr").equalTo(lit(partitionPath))) - .write - .format("parquet") - .mode(SaveMode.Overwrite) - .save(srcPath + "/" + partitionPath) - }) + sourceDF.write.format("parquet") + .partitionBy("datestr") + .mode(SaveMode.Overwrite) + .save(srcPath) + + val writeOpts = commonOpts ++ getRecordTypeOpts(recordType) ++ Map( + DataSourceWriteOptions.HIVE_STYLE_PARTITIONING.key -> "true", + DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "datestr" + ) // Perform bootstrap val commitInstantTime1 = runMetadataBootstrapAndVerifyCommit( DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL, - commonOpts.updated(DataSourceWriteOptions.PARTITIONPATH_FIELD.key, "datestr"), + writeOpts, classOf[SimpleKeyGenerator].getName) // Read bootstrapped table and verify count @@ -350,10 +369,9 @@ class TestDataSourceForBootstrap { updateDF.write .format("hudi") - .options(commonOpts) + .options(writeOpts) .option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL) .option(DataSourceWriteOptions.TABLE_TYPE.key, DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL) - .option(DataSourceWriteOptions.PARTITIONPATH_FIELD.key, "datestr") .option(HoodieCompactionConfig.INLINE_COMPACT.key, "true") .option(HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key, "1") .mode(SaveMode.Append) @@ -379,28 +397,29 @@ class TestDataSourceForBootstrap { assertEquals(numRecordsUpdate, hoodieROViewDFWithBasePath.filter(s"timestamp == $updateTimestamp").count()) } - @Test def testMetadataBootstrapMORPartitioned(): Unit = { + @ParameterizedTest + @EnumSource(value = classOf[HoodieRecordType], names = Array("AVRO", "SPARK")) + def testMetadataBootstrapMORPartitioned(recordType: HoodieRecordType): Unit = { val timestamp = Instant.now.toEpochMilli val jsc = JavaSparkContext.fromSparkContext(spark.sparkContext) val sourceDF = TestBootstrap.generateTestRawTripDataset(timestamp, 0, numRecords, partitionPaths.asJava, jsc, spark.sqlContext) - // Writing data for each partition instead of using partitionBy to avoid hive-style partitioning and hence - // have partitioned columns stored in the data file - partitionPaths.foreach(partitionPath => { - sourceDF - .filter(sourceDF("datestr").equalTo(lit(partitionPath))) - .write - .format("parquet") - .mode(SaveMode.Overwrite) - .save(srcPath + "/" + partitionPath) - }) + sourceDF.write.format("parquet") + .partitionBy("datestr") + .mode(SaveMode.Overwrite) + .save(srcPath) + + val writeOpts = commonOpts ++ getRecordTypeOpts(recordType) ++ Map( + DataSourceWriteOptions.HIVE_STYLE_PARTITIONING.key -> "true", + DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "datestr" + ) // Perform bootstrap val commitInstantTime1 = runMetadataBootstrapAndVerifyCommit( DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL, - commonOpts.updated(DataSourceWriteOptions.PARTITIONPATH_FIELD.key, "datestr"), + writeOpts, classOf[SimpleKeyGenerator].getName) // Read bootstrapped table and verify count @@ -423,10 +442,9 @@ class TestDataSourceForBootstrap { val updateDf1 = hoodieROViewDF1.filter(col("_row_key") === verificationRowKey).withColumn(verificationCol, lit(updatedVerificationVal)) updateDf1.write .format("hudi") - .options(commonOpts) + .options(writeOpts) .option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL) .option(DataSourceWriteOptions.TABLE_TYPE.key, DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL) - .option(DataSourceWriteOptions.PARTITIONPATH_FIELD.key, "datestr") .mode(SaveMode.Append) .save(basePath) @@ -446,10 +464,9 @@ class TestDataSourceForBootstrap { updateDF2.write .format("hudi") - .options(commonOpts) + .options(writeOpts) .option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL) .option(DataSourceWriteOptions.TABLE_TYPE.key, DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL) - .option(DataSourceWriteOptions.PARTITIONPATH_FIELD.key, "datestr") .mode(SaveMode.Append) .save(basePath) @@ -466,31 +483,31 @@ class TestDataSourceForBootstrap { assertEquals(0, hoodieROViewDF3.filter(s"timestamp == $updateTimestamp").count()) } - @Test def testFullBootstrapCOWPartitioned(): Unit = { + @ParameterizedTest + @EnumSource(value = classOf[HoodieRecordType], names = Array("AVRO", "SPARK")) + def testFullBootstrapCOWPartitioned(recordType: HoodieRecordType): Unit = { val timestamp = Instant.now.toEpochMilli val jsc = JavaSparkContext.fromSparkContext(spark.sparkContext) val sourceDF = TestBootstrap.generateTestRawTripDataset(timestamp, 0, numRecords, partitionPaths.asJava, jsc, spark.sqlContext) - // Writing data for each partition instead of using partitionBy to avoid hive-style partitioning and hence - // have partitioned columns stored in the data file - partitionPaths.foreach(partitionPath => { - sourceDF - .filter(sourceDF("datestr").equalTo(lit(partitionPath))) - .write - .format("parquet") - .mode(SaveMode.Overwrite) - .save(srcPath + "/" + partitionPath) - }) + sourceDF.write.format("parquet") + .partitionBy("datestr") + .mode(SaveMode.Overwrite) + .save(srcPath) + + val writeOpts = commonOpts ++ getRecordTypeOpts(recordType) ++ Map( + DataSourceWriteOptions.HIVE_STYLE_PARTITIONING.key -> "true", + DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "datestr" + ) // Perform bootstrap val bootstrapDF = spark.emptyDataFrame bootstrapDF.write .format("hudi") - .options(commonOpts) + .options(writeOpts) .option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.BOOTSTRAP_OPERATION_OPT_VAL) - .option(DataSourceWriteOptions.PARTITIONPATH_FIELD.key, "datestr") .option(HoodieBootstrapConfig.BASE_PATH.key, srcPath) .option(HoodieBootstrapConfig.KEYGEN_CLASS_NAME.key, classOf[SimpleKeyGenerator].getName) .option(HoodieBootstrapConfig.MODE_SELECTOR_CLASS_NAME.key, classOf[FullRecordBootstrapModeSelector].getName) @@ -515,10 +532,9 @@ class TestDataSourceForBootstrap { updateDF.write .format("hudi") - .options(commonOpts) + .options(writeOpts) .option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL) .option(DataSourceWriteOptions.TABLE_TYPE.key, DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL) - .option(DataSourceWriteOptions.PARTITIONPATH_FIELD.key, "datestr") .mode(SaveMode.Append) .save(basePath) @@ -530,7 +546,7 @@ class TestDataSourceForBootstrap { assertEquals(numRecords, hoodieROViewDF2.count()) assertEquals(numRecordsUpdate, hoodieROViewDF2.filter(s"timestamp == $updateTimestamp").count()) - verifyIncrementalViewResult(commitInstantTime1, commitInstantTime2, isPartitioned = true, isHiveStylePartitioned = false) + verifyIncrementalViewResult(commitInstantTime1, commitInstantTime2, isPartitioned = true, isHiveStylePartitioned = true) } def runMetadataBootstrapAndVerifyCommit(tableType: String, @@ -596,6 +612,12 @@ class TestDataSourceForBootstrap { hoodieIncViewDF3.count()) } } + + def getRecordTypeOpts(recordType: HoodieRecordType): Map[String, String] = + recordType match { + case HoodieRecordType.SPARK => sparkRecordTypeOpts + case _ => Map.empty + } } object TestDataSourceForBootstrap { diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java index eb6ab80b5f9..f85e55dfd40 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java @@ -650,6 +650,7 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase { cfg.configs.add(String.format("hoodie.bootstrap.base.path=%s", bootstrapSourcePath)); cfg.configs.add(String.format("%s=%s", DataSourceWriteOptions.PARTITIONPATH_FIELD().key(), "rider")); cfg.configs.add(String.format("hoodie.bootstrap.keygen.class=%s", SimpleKeyGenerator.class.getName())); + cfg.configs.add("hoodie.datasource.write.hive_style_partitioning=true"); cfg.configs.add("hoodie.bootstrap.parallelism=5"); cfg.targetBasePath = newDatasetBasePath; new HoodieDeltaStreamer(cfg, jsc).sync(); @@ -660,6 +661,9 @@ public class TestHoodieDeltaStreamer extends HoodieDeltaStreamerTestBase { TestHelpers.assertRecordCount(1950, newDatasetBasePath, sqlContext); res.registerTempTable("bootstrapped"); assertEquals(1950, sqlContext.sql("select distinct _hoodie_record_key from bootstrapped").count()); + // NOTE: To fetch record's count Spark will optimize the query fetching minimal possible amount + // of data, which might not provide adequate amount of test coverage + sqlContext.sql("select * from bootstrapped").show(); StructField[] fields = res.schema().fields(); List<String> fieldNames = Arrays.asList(res.schema().fieldNames());