This is an automated email from the ASF dual-hosted git repository.

akudinkin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 2770ff50714 [HUDI-915][HUDI-5656] Rebased `HoodieBootstrapRelation` 
onto `HoodieBaseRelation` (#7804)
2770ff50714 is described below

commit 2770ff507141f013f7500354595137b52a543e8b
Author: Alexey Kudinkin <alexey.kudin...@gmail.com>
AuthorDate: Fri Feb 24 08:43:49 2023 -0800

    [HUDI-915][HUDI-5656] Rebased `HoodieBootstrapRelation` onto 
`HoodieBaseRelation` (#7804)
    
    Currently `HoodieBootstrapRelation` is treats partitioned tables improperly 
resulting in NPE while trying to read bootstrapped table.
    
    To address that `HoodieBootstrapRelation` have been rebased onto 
`HoodieBaseRelation` sharing core of the reading semantic with other Hudi's 
file-based Relation implementations for COW, MOR (such as schema handling, 
file-listing, etc)
---
 .../scala/org/apache/hudi/HoodieBaseRelation.scala |  47 ++--
 .../scala/org/apache/hudi/HoodieBootstrapRDD.scala | 103 ++++----
 .../org/apache/hudi/HoodieBootstrapRelation.scala  | 259 +++++++++++----------
 .../apache/hudi/MergeOnReadSnapshotRelation.scala  |   2 +-
 .../spark/sql/hudi/HoodieSqlCommonUtils.scala      |  30 ++-
 .../functional/TestDataSourceForBootstrap.scala    | 166 +++++++------
 .../deltastreamer/TestHoodieDeltaStreamer.java     |   4 +
 7 files changed, 344 insertions(+), 267 deletions(-)

diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
index 99b5b5c87ba..cb02c59a690 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
@@ -48,6 +48,7 @@ import org.apache.spark.internal.Logging
 import org.apache.spark.rdd.RDD
 import 
org.apache.spark.sql.HoodieCatalystExpressionUtils.{convertToCatalystExpression,
 generateUnsafeProjection}
 import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.analysis.Resolver
 import org.apache.spark.sql.catalyst.expressions.{Expression, 
SubqueryExpression}
 import org.apache.spark.sql.execution.FileRelation
 import org.apache.spark.sql.execution.datasources._
@@ -66,7 +67,12 @@ import scala.util.{Failure, Success, Try}
 
 trait HoodieFileSplit {}
 
-case class HoodieTableSchema(structTypeSchema: StructType, avroSchemaStr: 
String, internalSchema: Option[InternalSchema] = None)
+case class HoodieTableSchema(structTypeSchema: StructType, avroSchemaStr: 
String, internalSchema: Option[InternalSchema] = None) {
+
+  def this(structTypeSchema: StructType) =
+    this(structTypeSchema, convertToAvroSchema(structTypeSchema).toString)
+
+}
 
 case class HoodieTableState(tablePath: String,
                             latestCommitTimestamp: Option[String],
@@ -98,6 +104,8 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
 
   protected val sparkSession: SparkSession = sqlContext.sparkSession
 
+  protected lazy val resolver: Resolver = 
sparkSession.sessionState.analyzer.resolver
+
   protected lazy val conf: Configuration = new 
Configuration(sqlContext.sparkContext.hadoopConfiguration)
   protected lazy val jobConf = new JobConf(conf)
 
@@ -174,8 +182,6 @@ abstract class HoodieBaseRelation(val sqlContext: 
SQLContext,
 
   protected lazy val tableStructSchema: StructType = {
     val converted = 
AvroConversionUtils.convertAvroSchemaToStructType(tableAvroSchema)
-
-    val resolver = sparkSession.sessionState.analyzer.resolver
     val metaFieldMetadata = sparkAdapter.createCatalystMetadataForMetaField
 
     // NOTE: Here we annotate meta-fields with corresponding metadata such 
that Spark (>= 3.2)
@@ -466,10 +472,15 @@ abstract class HoodieBaseRelation(val sqlContext: 
SQLContext,
    * For enable hoodie.datasource.write.drop.partition.columns, need to create 
an InternalRow on partition values
    * and pass this reader on parquet file. So that, we can query the partition 
columns.
    */
-  protected def getPartitionColumnsAsInternalRow(file: FileStatus): 
InternalRow = {
+
+  protected def getPartitionColumnsAsInternalRow(file: FileStatus): 
InternalRow =
+    getPartitionColumnsAsInternalRowInternal(file, 
shouldExtractPartitionValuesFromPartitionPath)
+
+  protected def getPartitionColumnsAsInternalRowInternal(file: FileStatus,
+                                                         
extractPartitionValuesFromPartitionPath: Boolean): InternalRow = {
     try {
       val tableConfig = metaClient.getTableConfig
-      if (shouldExtractPartitionValuesFromPartitionPath) {
+      if (extractPartitionValuesFromPartitionPath) {
         val relativePath = new URI(metaClient.getBasePath).relativize(new 
URI(file.getPath.getParent.toString)).toString
         val hiveStylePartitioningEnabled = 
tableConfig.getHiveStylePartitioningEnable.toBoolean
         if (hiveStylePartitioningEnabled) {
@@ -514,7 +525,8 @@ abstract class HoodieBaseRelation(val sqlContext: 
SQLContext,
                                      requiredDataSchema: HoodieTableSchema,
                                      filters: Seq[Filter],
                                      options: Map[String, String],
-                                     hadoopConf: Configuration): 
BaseFileReader = {
+                                     hadoopConf: Configuration,
+                                     shouldAppendPartitionValuesOverride: 
Option[Boolean] = None): BaseFileReader = {
     val tableBaseFileFormat = tableConfig.getBaseFileFormat
 
     // NOTE: PLEASE READ CAREFULLY
@@ -535,7 +547,7 @@ abstract class HoodieBaseRelation(val sqlContext: 
SQLContext,
             hadoopConf = hadoopConf,
             // We're delegating to Spark to append partition values to every 
row only in cases
             // when these corresponding partition-values are not persisted 
w/in the data file itself
-            appendPartitionValues = 
shouldExtractPartitionValuesFromPartitionPath
+            appendPartitionValues = 
shouldAppendPartitionValuesOverride.getOrElse(shouldExtractPartitionValuesFromPartitionPath)
           )
           // Since partition values by default are omitted, and not persisted 
w/in data-files by Spark,
           // data-file readers (such as [[ParquetFileFormat]]) have to inject 
partition values while reading
@@ -589,6 +601,12 @@ abstract class HoodieBaseRelation(val sqlContext: 
SQLContext,
 
   protected def tryPrunePartitionColumns(tableSchema: HoodieTableSchema,
                                          requiredSchema: HoodieTableSchema): 
(StructType, HoodieTableSchema, HoodieTableSchema) = {
+    tryPrunePartitionColumnsInternal(tableSchema, requiredSchema, 
shouldExtractPartitionValuesFromPartitionPath)
+  }
+
+  protected def tryPrunePartitionColumnsInternal(tableSchema: 
HoodieTableSchema,
+                                                 requiredSchema: 
HoodieTableSchema,
+                                                 
extractPartitionValuesFromPartitionPath: Boolean): (StructType, 
HoodieTableSchema, HoodieTableSchema) = {
     // Since schema requested by the caller might contain partition columns, 
we might need to
     // prune it, removing all partition columns from it in case these columns 
are not persisted
     // in the data files
@@ -598,21 +616,24 @@ abstract class HoodieBaseRelation(val sqlContext: 
SQLContext,
     //       the partition path, and omitted from the data file, back into 
fetched rows;
     //       Note that, by default, partition columns are not omitted 
therefore specifying
     //       partition schema for reader is not required
-    if (shouldExtractPartitionValuesFromPartitionPath) {
-      val partitionSchema = StructType(partitionColumns.map(StructField(_, 
StringType)))
+    if (extractPartitionValuesFromPartitionPath) {
+      val partitionSchema = 
filterInPartitionColumns(tableSchema.structTypeSchema)
       val prunedDataStructSchema = 
prunePartitionColumns(tableSchema.structTypeSchema)
       val prunedRequiredSchema = 
prunePartitionColumns(requiredSchema.structTypeSchema)
 
       (partitionSchema,
-        HoodieTableSchema(prunedDataStructSchema, 
convertToAvroSchema(prunedDataStructSchema).toString),
-        HoodieTableSchema(prunedRequiredSchema, 
convertToAvroSchema(prunedRequiredSchema).toString))
+        new HoodieTableSchema(prunedDataStructSchema),
+        new HoodieTableSchema(prunedRequiredSchema))
     } else {
       (StructType(Nil), tableSchema, requiredSchema)
     }
   }
 
-  private def prunePartitionColumns(dataStructSchema: StructType): StructType =
-    StructType(dataStructSchema.filterNot(f => 
partitionColumns.contains(f.name)))
+  private def filterInPartitionColumns(structType: StructType): StructType =
+    StructType(structType.filter(f => partitionColumns.exists(col => 
resolver(f.name, col))))
+
+  private def prunePartitionColumns(structType: StructType): StructType =
+    StructType(structType.filterNot(f => partitionColumns.exists(pc => 
resolver(f.name, pc))))
 
   private def getConfigValue(config: ConfigProperty[String],
                              defaultValueOption: Option[String]=Option.empty): 
String = {
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBootstrapRDD.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBootstrapRDD.scala
index ea997c86acb..b72c41bbd66 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBootstrapRDD.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBootstrapRDD.scala
@@ -18,23 +18,22 @@
 
 package org.apache.hudi
 
-import org.apache.spark.{Partition, TaskContext}
+import org.apache.hudi.HoodieBaseRelation.BaseFileReader
+import org.apache.hudi.common.util.ValidationUtils.checkState
 import org.apache.spark.rdd.RDD
+import 
org.apache.spark.sql.HoodieCatalystExpressionUtils.generateUnsafeProjection
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.execution.datasources.PartitionedFile
+import org.apache.spark.sql.catalyst.expressions.JoinedRow
 import org.apache.spark.sql.types.StructType
-
-import org.apache.hudi.HoodieDataSourceHelper._
+import org.apache.spark.{Partition, TaskContext}
 
 class HoodieBootstrapRDD(@transient spark: SparkSession,
-                        dataReadFunction: PartitionedFile => 
Iterator[InternalRow],
-                        skeletonReadFunction: PartitionedFile => 
Iterator[InternalRow],
-                        regularReadFunction: PartitionedFile => 
Iterator[InternalRow],
-                        dataSchema: StructType,
-                        skeletonSchema: StructType,
-                        requiredColumns: Array[String],
-                        tableState: HoodieBootstrapTableState)
+                         bootstrapDataFileReader: BaseFileReader,
+                         bootstrapSkeletonFileReader: BaseFileReader,
+                         regularFileReader: BaseFileReader,
+                         requiredSchema: HoodieTableSchema,
+                         @transient splits: Seq[HoodieBootstrapSplit])
   extends RDD[InternalRow](spark.sparkContext, Nil) {
 
   override def compute(split: Partition, context: TaskContext): 
Iterator[InternalRow] = {
@@ -51,59 +50,57 @@ class HoodieBootstrapRDD(@transient spark: SparkSession,
       }
     }
 
-    var partitionedFileIterator: Iterator[InternalRow] = null
+    bootstrapPartition.split.skeletonFile match {
+      case Some(skeletonFile) =>
+        // It is a bootstrap split. Check both skeleton and data files.
+        val (iterator, schema) = if (bootstrapDataFileReader.schema.isEmpty) {
+          // No data column to fetch, hence fetch only from skeleton file
+          (bootstrapSkeletonFileReader.read(skeletonFile), 
bootstrapSkeletonFileReader.schema)
+        } else if (bootstrapSkeletonFileReader.schema.isEmpty) {
+          // No metadata column to fetch, hence fetch only from data file
+          (bootstrapDataFileReader.read(bootstrapPartition.split.dataFile), 
bootstrapDataFileReader.schema)
+        } else {
+          // Fetch from both data and skeleton file, and merge
+          val dataFileIterator = 
bootstrapDataFileReader.read(bootstrapPartition.split.dataFile)
+          val skeletonFileIterator = 
bootstrapSkeletonFileReader.read(skeletonFile)
+          val mergedSchema = 
StructType(bootstrapSkeletonFileReader.schema.fields ++ 
bootstrapDataFileReader.schema.fields)
 
-    if (bootstrapPartition.split.skeletonFile.isDefined) {
-      // It is a bootstrap split. Check both skeleton and data files.
-      if (dataSchema.isEmpty) {
-        // No data column to fetch, hence fetch only from skeleton file
-        partitionedFileIterator = 
skeletonReadFunction(bootstrapPartition.split.skeletonFile.get)
-      } else if (skeletonSchema.isEmpty) {
-        // No metadata column to fetch, hence fetch only from data file
-        partitionedFileIterator = 
dataReadFunction(bootstrapPartition.split.dataFile)
-      } else {
-        // Fetch from both data and skeleton file, and merge
-        val dataFileIterator = 
dataReadFunction(bootstrapPartition.split.dataFile)
-        val skeletonFileIterator = 
skeletonReadFunction(bootstrapPartition.split.skeletonFile.get)
-        partitionedFileIterator = merge(skeletonFileIterator, dataFileIterator)
-      }
-    } else {
-      partitionedFileIterator = 
regularReadFunction(bootstrapPartition.split.dataFile)
+          (merge(skeletonFileIterator, dataFileIterator), mergedSchema)
+        }
+
+        // NOTE: Here we have to project the [[InternalRow]]s fetched into the 
expected target schema.
+        //       These could diverge for ex, when requested schema contains 
partition columns which might not be
+        //       persisted w/in the data file, but instead would be parsed 
from the partition path. In that case
+        //       output of the file-reader will have different ordering of the 
fields than the original required
+        //       schema (for more details please check out 
[[ParquetFileFormat]] implementation).
+        val unsafeProjection = generateUnsafeProjection(schema, 
requiredSchema.structTypeSchema)
+
+        iterator.map(unsafeProjection)
+
+      case _ =>
+        // NOTE: Regular file-reader is already projected into the required 
schema
+        regularFileReader.read(bootstrapPartition.split.dataFile)
     }
-    partitionedFileIterator
   }
 
-  def merge(skeletonFileIterator: Iterator[InternalRow], dataFileIterator: 
Iterator[InternalRow])
-  : Iterator[InternalRow] = {
+  def merge(skeletonFileIterator: Iterator[InternalRow], dataFileIterator: 
Iterator[InternalRow]): Iterator[InternalRow] = {
     new Iterator[InternalRow] {
-      override def hasNext: Boolean = dataFileIterator.hasNext && 
skeletonFileIterator.hasNext
-      override def next(): InternalRow = {
-        mergeInternalRow(skeletonFileIterator.next(), dataFileIterator.next())
-      }
-    }
-  }
+      private val combinedRow = new JoinedRow()
 
-  def mergeInternalRow(skeletonRow: InternalRow, dataRow: InternalRow): 
InternalRow = {
-    val skeletonArr  = skeletonRow.copy().toSeq(skeletonSchema)
-    val dataArr = dataRow.copy().toSeq(dataSchema)
-    // We need to return it in the order requested
-    val mergedArr = requiredColumns.map(col => {
-      if (skeletonSchema.fieldNames.contains(col)) {
-        val idx = skeletonSchema.fieldIndex(col)
-        skeletonArr(idx)
-      } else {
-        val idx = dataSchema.fieldIndex(col)
-        dataArr(idx)
+      override def hasNext: Boolean = {
+        checkState(dataFileIterator.hasNext == skeletonFileIterator.hasNext,
+          "Bootstrap data-file iterator and skeleton-file iterator have to be 
in-sync!")
+        dataFileIterator.hasNext && skeletonFileIterator.hasNext
       }
-    })
 
-    logDebug("Merged data and skeleton values => " + mergedArr.mkString(","))
-    val mergedRow = InternalRow.fromSeq(mergedArr)
-    mergedRow
+      override def next(): InternalRow = {
+        combinedRow(skeletonFileIterator.next(), dataFileIterator.next())
+      }
+    }
   }
 
   override protected def getPartitions: Array[Partition] = {
-    tableState.files.zipWithIndex.map(file => {
+    splits.zipWithIndex.map(file => {
       if (file._1.skeletonFile.isDefined) {
         logDebug("Forming partition with => Index: " + file._2 + ", Files: " + 
file._1.dataFile.filePath
           + "," + file._1.skeletonFile.get.filePath)
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBootstrapRelation.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBootstrapRelation.scala
index 0dd54237ef5..5c58c10493d 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBootstrapRelation.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBootstrapRelation.scala
@@ -19,20 +19,20 @@
 package org.apache.hudi
 
 import org.apache.hadoop.fs.Path
-import org.apache.hudi.common.model.HoodieBaseFile
-import org.apache.hudi.common.table.view.HoodieTableFileSystemView
-import org.apache.hudi.common.table.{HoodieTableMetaClient, 
TableSchemaResolver}
-import org.apache.hudi.exception.HoodieException
-import org.apache.spark.execution.datasources.HoodieInMemoryFileIndex
-import org.apache.spark.internal.Logging
+import org.apache.hudi.HoodieBaseRelation.{BaseFileReader, projectReader}
+import org.apache.hudi.HoodieBootstrapRelation.validate
+import org.apache.hudi.common.table.HoodieTableMetaClient
+import org.apache.hudi.common.util.ValidationUtils.checkState
 import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.SQLContext
 import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.execution.datasources.{FileStatusCache, 
PartitionedFile}
-import org.apache.spark.sql.sources.{BaseRelation, Filter, PrunedFilteredScan}
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.execution.datasources.PartitionedFile
+import org.apache.spark.sql.hudi.HoodieSqlCommonUtils.{isMetaField, 
removeMetaFields}
+import org.apache.spark.sql.sources.Filter
 import org.apache.spark.sql.types.StructType
-import org.apache.spark.sql.{Row, SQLContext}
 
-import scala.collection.JavaConverters._
+case class HoodieBootstrapSplit(dataFile: PartitionedFile, skeletonFile: 
Option[PartitionedFile] = None) extends HoodieFileSplit
 
 /**
   * This is Spark relation that can be used for querying metadata/fully 
bootstrapped query hoodie tables, as well as
@@ -44,150 +44,161 @@ import scala.collection.JavaConverters._
   * bootstrapped files, because then the metadata file and data file can 
return different number of rows causing errors
   * merging.
   *
-  * @param _sqlContext Spark SQL Context
+  * @param sqlContext Spark SQL Context
   * @param userSchema User specified schema in the datasource query
   * @param globPaths  The global paths to query. If it not none, read from the 
globPaths,
   *                   else read data from tablePath using HoodiFileIndex.
   * @param metaClient Hoodie table meta client
   * @param optParams DataSource options passed by the user
   */
-class HoodieBootstrapRelation(@transient val _sqlContext: SQLContext,
-                              val userSchema: Option[StructType],
-                              val globPaths: Seq[Path],
-                              val metaClient: HoodieTableMetaClient,
-                              val optParams: Map[String, String]) extends 
BaseRelation
-  with PrunedFilteredScan with Logging {
+case class HoodieBootstrapRelation(override val sqlContext: SQLContext,
+                                   private val userSchema: Option[StructType],
+                                   private val globPaths: Seq[Path],
+                                   override val metaClient: 
HoodieTableMetaClient,
+                                   override val optParams: Map[String, String],
+                                   private val prunedDataSchema: 
Option[StructType] = None)
+  extends HoodieBaseRelation(sqlContext, metaClient, optParams, userSchema, 
prunedDataSchema) {
 
-  val skeletonSchema: StructType = HoodieSparkUtils.getMetaSchema
-  var dataSchema: StructType = _
-  var fullSchema: StructType = _
+  override type FileSplit = HoodieBootstrapSplit
+  override type Relation = HoodieBootstrapRelation
 
-  val fileIndex: HoodieBootstrapFileIndex = buildFileIndex()
+  private lazy val skeletonSchema = HoodieSparkUtils.getMetaSchema
 
-  override def sqlContext: SQLContext = _sqlContext
+  override val mandatoryFields: Seq[String] = Seq.empty
 
-  override val needConversion: Boolean = false
+  protected override def collectFileSplits(partitionFilters: Seq[Expression], 
dataFilters: Seq[Expression]): Seq[FileSplit] = {
+    val fileSlices = listLatestFileSlices(globPaths, partitionFilters, 
dataFilters)
+    fileSlices.map { fileSlice =>
+      val baseFile = fileSlice.getBaseFile.get()
 
-  override def schema: StructType = inferFullSchema()
+      if (baseFile.getBootstrapBaseFile.isPresent) {
+        val partitionValues =
+          getPartitionColumnsAsInternalRowInternal(baseFile.getFileStatus, 
extractPartitionValuesFromPartitionPath = true)
+        val dataFile = PartitionedFile(partitionValues, 
baseFile.getBootstrapBaseFile.get().getPath, 0, 
baseFile.getBootstrapBaseFile.get().getFileLen)
+        val skeletonFile = Option(PartitionedFile(InternalRow.empty, 
baseFile.getPath, 0, baseFile.getFileLen))
 
-  override def buildScan(requiredColumns: Array[String], filters: 
Array[Filter]): RDD[Row] = {
-    logInfo("Starting scan..")
-
-    // Compute splits
-    val bootstrapSplits = fileIndex.files.map(hoodieBaseFile => {
-      var skeletonFile: Option[PartitionedFile] = Option.empty
-      var dataFile: PartitionedFile = null
-
-      if (hoodieBaseFile.getBootstrapBaseFile.isPresent) {
-        skeletonFile = Option(PartitionedFile(InternalRow.empty, 
hoodieBaseFile.getPath, 0, hoodieBaseFile.getFileLen))
-        dataFile = PartitionedFile(InternalRow.empty, 
hoodieBaseFile.getBootstrapBaseFile.get().getPath, 0,
-          hoodieBaseFile.getBootstrapBaseFile.get().getFileLen)
-      } else {
-        dataFile = PartitionedFile(InternalRow.empty, hoodieBaseFile.getPath, 
0, hoodieBaseFile.getFileLen)
-      }
-      HoodieBootstrapSplit(dataFile, skeletonFile)
-    })
-    val tableState = HoodieBootstrapTableState(bootstrapSplits)
-
-    // Get required schemas for column pruning
-    var requiredDataSchema = StructType(Seq())
-    var requiredSkeletonSchema = StructType(Seq())
-    // requiredColsSchema is the schema of requiredColumns, note that 
requiredColumns is in a random order
-    // so requiredColsSchema is not always equal to 
(requiredSkeletonSchema.fields ++ requiredDataSchema.fields)
-    var requiredColsSchema = StructType(Seq())
-    requiredColumns.foreach(col => {
-      var field = dataSchema.find(_.name == col)
-      if (field.isDefined) {
-        requiredDataSchema = requiredDataSchema.add(field.get)
+        HoodieBootstrapSplit(dataFile, skeletonFile)
       } else {
-        field = skeletonSchema.find(_.name == col)
-        requiredSkeletonSchema = requiredSkeletonSchema.add(field.get)
+        val dataFile = 
PartitionedFile(getPartitionColumnsAsInternalRow(baseFile.getFileStatus), 
baseFile.getPath, 0, baseFile.getFileLen)
+        HoodieBootstrapSplit(dataFile)
       }
-      requiredColsSchema = requiredColsSchema.add(field.get)
-    })
+    }
+  }
 
-    // Prepare readers for reading data file and skeleton files
-    val dataReadFunction = HoodieDataSourceHelper.buildHoodieParquetReader(
-      sparkSession = _sqlContext.sparkSession,
-      dataSchema = dataSchema,
-      partitionSchema = StructType(Seq.empty),
-      requiredSchema = requiredDataSchema,
-      filters = if (requiredSkeletonSchema.isEmpty) filters else Seq() ,
+  protected override def composeRDD(fileSplits: Seq[FileSplit],
+                                    tableSchema: HoodieTableSchema,
+                                    requiredSchema: HoodieTableSchema,
+                                    requestedColumns: Array[String],
+                                    filters: Array[Filter]): RDD[InternalRow] 
= {
+    val requiredSkeletonFileSchema =
+      StructType(skeletonSchema.filter(f => requestedColumns.exists(col => 
resolver(f.name, col))))
+
+    val (bootstrapDataFileReader, bootstrapSkeletonFileReader) =
+      createBootstrapFileReaders(tableSchema, requiredSchema, 
requiredSkeletonFileSchema, filters)
+
+    val regularFileReader = createRegularFileReader(tableSchema, 
requiredSchema, filters)
+
+    new HoodieBootstrapRDD(sqlContext.sparkSession, bootstrapDataFileReader, 
bootstrapSkeletonFileReader, regularFileReader,
+      requiredSchema, fileSplits)
+  }
+
+  private def createBootstrapFileReaders(tableSchema: HoodieTableSchema,
+                                         requiredSchema: HoodieTableSchema,
+                                         requiredSkeletonFileSchema: 
StructType,
+                                         filters: Array[Filter]): 
(BaseFileReader, BaseFileReader) = {
+    // NOTE: "Data" schema in here refers to the whole table's schema that 
doesn't include only partition
+    //       columns, as opposed to data file schema not including any 
meta-fields columns in case of
+    //       Bootstrap relation
+    val (partitionSchema, dataSchema, requiredDataSchema) =
+      tryPrunePartitionColumnsInternal(tableSchema, requiredSchema, 
extractPartitionValuesFromPartitionPath = true)
+
+    val bootstrapDataFileSchema = 
StructType(dataSchema.structTypeSchema.filterNot(sf => isMetaField(sf.name)))
+    val requiredBootstrapDataFileSchema = 
StructType(requiredDataSchema.structTypeSchema.filterNot(sf => 
isMetaField(sf.name)))
+
+    validate(requiredDataSchema, requiredBootstrapDataFileSchema, 
requiredSkeletonFileSchema)
+
+    val bootstrapDataFileReader = createBaseFileReader(
+      spark = sqlContext.sparkSession,
+      dataSchema = new HoodieTableSchema(bootstrapDataFileSchema),
+      partitionSchema = partitionSchema,
+      requiredDataSchema = new 
HoodieTableSchema(requiredBootstrapDataFileSchema),
+      // NOTE: For bootstrapped files we can't apply any filtering in case 
we'd need to merge it with
+      //       a skeleton-file as we rely on matching ordering of the records 
across bootstrap- and skeleton-files
+      filters = if (requiredSkeletonFileSchema.isEmpty) filters else Seq(),
       options = optParams,
-      hadoopConf = _sqlContext.sparkSession.sessionState.newHadoopConf()
+      hadoopConf = sqlContext.sparkSession.sessionState.newHadoopConf(),
+      // NOTE: Bootstrap relation have to always extract partition values from 
the partition-path as this is a
+      //       default Spark behavior: Spark by default strips 
partition-columns from the data schema and does
+      //       NOT persist them in the data files, instead parsing them from 
partition-paths (on the fly) whenever
+      //       table is queried
+      shouldAppendPartitionValuesOverride = Some(true)
     )
 
-    val skeletonReadFunction = HoodieDataSourceHelper.buildHoodieParquetReader(
-      sparkSession = _sqlContext.sparkSession,
-      dataSchema = skeletonSchema,
+    val boostrapSkeletonFileReader = createBaseFileReader(
+      spark = sqlContext.sparkSession,
+      dataSchema = new HoodieTableSchema(skeletonSchema),
+      // NOTE: Here we specify partition-schema as empty since we don't need 
Spark to inject partition-values
+      //       parsed from the partition-path
       partitionSchema = StructType(Seq.empty),
-      requiredSchema = requiredSkeletonSchema,
-      filters = if (requiredDataSchema.isEmpty) filters else Seq(),
+      requiredDataSchema = new HoodieTableSchema(requiredSkeletonFileSchema),
+      // NOTE: For bootstrapped files we can't apply any filtering in case 
we'd need to merge it with
+      //       a skeleton-file as we rely on matching ordering of the records 
across bootstrap- and skeleton-files
+      filters = if (requiredBootstrapDataFileSchema.isEmpty) filters else 
Seq(),
       options = optParams,
-      hadoopConf = _sqlContext.sparkSession.sessionState.newHadoopConf()
+      hadoopConf = sqlContext.sparkSession.sessionState.newHadoopConf(),
+      // NOTE: We override Spark to avoid injecting partition values into the 
records read from
+      //       skeleton-file
+      shouldAppendPartitionValuesOverride = Some(false)
     )
 
-    val regularReadFunction = HoodieDataSourceHelper.buildHoodieParquetReader(
-      sparkSession = _sqlContext.sparkSession,
-      dataSchema = fullSchema,
-      partitionSchema = StructType(Seq.empty),
-      requiredSchema = requiredColsSchema,
+    (bootstrapDataFileReader, boostrapSkeletonFileReader)
+  }
+
+  private def createRegularFileReader(tableSchema: HoodieTableSchema,
+                                     requiredSchema: HoodieTableSchema,
+                                     filters: Array[Filter]): BaseFileReader = 
{
+    // NOTE: "Data" schema in here refers to the whole table's schema that 
doesn't include only partition
+    //       columns, as opposed to data file schema not including any 
meta-fields columns in case of
+    //       Bootstrap relation
+    val (partitionSchema, dataSchema, requiredDataSchema) =
+      tryPrunePartitionColumns(tableSchema, requiredSchema)
+
+    // NOTE: Bootstrapped table allows Hudi created file-slices to be 
co-located w/ the "bootstrapped"
+    //       ones (ie persisted by Spark). Therefore to be able to read the 
data from Bootstrapped
+    //       table we also need to create regular file-reader to read 
file-slices created by Hudi
+    val regularFileReader = createBaseFileReader(
+      spark = sqlContext.sparkSession,
+      dataSchema = dataSchema,
+      partitionSchema = partitionSchema,
+      requiredDataSchema = requiredDataSchema,
       filters = filters,
       options = optParams,
-      hadoopConf = _sqlContext.sparkSession.sessionState.newHadoopConf()
+      hadoopConf = sqlContext.sparkSession.sessionState.newHadoopConf()
     )
 
-    val rdd = new HoodieBootstrapRDD(_sqlContext.sparkSession, 
dataReadFunction, skeletonReadFunction,
-      regularReadFunction, requiredDataSchema, requiredSkeletonSchema, 
requiredColumns, tableState)
-    rdd.asInstanceOf[RDD[Row]]
+    // NOTE: In some case schema of the reader's output (reader's schema) 
might not match the schema expected by the caller.
+    //       This could occur for ex, when requested schema contains partition 
columns which might not be persisted w/in the
+    //       data file, but instead would be parsed from the partition path. 
In that case output of the file-reader will have
+    //       different ordering of the fields than the original required 
schema (for more details please check out
+    //       [[ParquetFileFormat]] impl). In that case we have to project the 
rows from the file-reader's schema
+    //       back into the one expected by the caller
+    projectReader(regularFileReader, requiredSchema.structTypeSchema)
   }
 
-  def inferFullSchema(): StructType = {
-    if (fullSchema == null) {
-      logInfo("Inferring schema..")
-      val schemaResolver = new TableSchemaResolver(metaClient)
-      val tableSchema = schemaResolver.getTableAvroSchema(false)
-      dataSchema = 
AvroConversionUtils.convertAvroSchemaToStructType(tableSchema)
-      fullSchema = StructType(skeletonSchema.fields ++ dataSchema.fields)
-    }
-    fullSchema
-  }
-
-  def buildFileIndex(): HoodieBootstrapFileIndex = {
-    logInfo("Building file index..")
-    val fileStatuses  = if (globPaths.nonEmpty) {
-      // Load files from the global paths if it has defined to be compatible 
with the original mode
-      val inMemoryFileIndex = 
HoodieInMemoryFileIndex.create(_sqlContext.sparkSession, globPaths)
-      inMemoryFileIndex.allFiles()
-    } else { // Load files by the HoodieFileIndex.
-        HoodieFileIndex(sqlContext.sparkSession, metaClient, Some(schema), 
optParams,
-          FileStatusCache.getOrCreate(sqlContext.sparkSession)).allFiles
-    }
-    if (fileStatuses.isEmpty) {
-      throw new HoodieException("No files found for reading in user provided 
path.")
-    }
+  override def updatePrunedDataSchema(prunedSchema: StructType): 
HoodieBootstrapRelation =
+    this.copy(prunedDataSchema = Some(prunedSchema))
+}
 
-    val fsView = new HoodieTableFileSystemView(metaClient, 
metaClient.getActiveTimeline.getCommitsTimeline
-      .filterCompletedInstants, fileStatuses.toArray)
-    val latestFiles: List[HoodieBaseFile] = 
fsView.getLatestBaseFiles.iterator().asScala.toList
-
-    if (log.isDebugEnabled) {
-      latestFiles.foreach(file => {
-        logDebug("Printing indexed files:")
-        if (file.getBootstrapBaseFile.isPresent) {
-          logDebug("Skeleton File: " + file.getPath + ", Data File: " + 
file.getBootstrapBaseFile.get().getPath)
-        } else {
-          logDebug("Regular Hoodie File: " + file.getPath)
-        }
-      })
-    }
 
-    HoodieBootstrapFileIndex(latestFiles)
-  }
-}
+object HoodieBootstrapRelation {
 
-case class HoodieBootstrapFileIndex(files: List[HoodieBaseFile])
+  private def validate(requiredDataSchema: HoodieTableSchema, 
requiredDataFileSchema: StructType, requiredSkeletonFileSchema: StructType): 
Unit = {
+    val requiredDataColumns: Seq[String] = 
requiredDataSchema.structTypeSchema.fieldNames.toSeq
+    val combinedColumns = (requiredSkeletonFileSchema.fieldNames ++ 
requiredDataFileSchema.fieldNames).toSeq
 
-case class HoodieBootstrapTableState(files: List[HoodieBootstrapSplit])
+    // NOTE: Here we validate that all required data columns are covered by 
the combination of the columns
+    //       from both skeleton file and the corresponding data file
+    checkState(combinedColumns.sorted == requiredDataColumns.sorted)
+  }
 
-case class HoodieBootstrapSplit(dataFile: PartitionedFile, skeletonFile: 
Option[PartitionedFile])
+}
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala
index accfc8f2470..94168755cbf 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala
@@ -190,7 +190,7 @@ abstract class BaseMergeOnReadSnapshotRelation(sqlContext: 
SQLContext,
           StructType(requiredDataSchema.structTypeSchema.fields
             .filterNot(f => unusedMandatoryColumnNames.contains(f.name)))
 
-        HoodieTableSchema(prunedStructSchema, 
convertToAvroSchema(prunedStructSchema).toString)
+        new HoodieTableSchema(prunedStructSchema)
       }
 
       val requiredSchemaReaderSkipMerging = createBaseFileReader(
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlCommonUtils.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlCommonUtils.scala
index 8e589abbc18..54c58bace7c 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlCommonUtils.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlCommonUtils.scala
@@ -23,9 +23,11 @@ import org.apache.hudi.client.common.HoodieSparkEngineContext
 import org.apache.hudi.common.config.{DFSPropertiesConfiguration, 
HoodieMetadataConfig}
 import org.apache.hudi.common.fs.FSUtils
 import org.apache.hudi.common.model.HoodieRecord
-import org.apache.hudi.common.table.timeline.{HoodieActiveTimeline, 
HoodieInstantTimeGenerator}
+import 
org.apache.hudi.common.table.timeline.HoodieActiveTimeline.parseDateFromInstantTime
+import org.apache.hudi.common.table.timeline.{HoodieActiveTimeline, 
HoodieInstantTimeGenerator, HoodieTimeline}
 import org.apache.hudi.common.table.{HoodieTableMetaClient, 
TableSchemaResolver}
 import org.apache.hudi.common.util.PartitionPathEncodeUtils
+import org.apache.hudi.exception.HoodieException
 import org.apache.hudi.{AvroConversionUtils, DataSourceOptionsHelper, 
DataSourceReadOptions, SparkAdapterSupport}
 import org.apache.spark.api.java.JavaSparkContext
 import org.apache.spark.sql.catalyst.TableIdentifier
@@ -42,6 +44,7 @@ import java.text.SimpleDateFormat
 import java.util.{Locale, Properties}
 import scala.collection.JavaConverters._
 import scala.collection.immutable.Map
+import scala.util.Try
 
 object HoodieSqlCommonUtils extends SparkAdapterSupport {
   // NOTE: {@code SimpleDataFormat} is NOT thread-safe
@@ -251,11 +254,13 @@ object HoodieSqlCommonUtils extends SparkAdapterSupport {
    */
   def formatQueryInstant(queryInstant: String): String = {
     val instantLength = queryInstant.length
-    if (instantLength == 19 || instantLength == 23) { // for yyyy-MM-dd 
HH:mm:ss[.SSS]
+    if (instantLength == 19 || instantLength == 23) {
+      // Handle "yyyy-MM-dd HH:mm:ss[.SSS]" format
       HoodieInstantTimeGenerator.getInstantForDateString(queryInstant)
     } else if (instantLength == 
HoodieInstantTimeGenerator.SECS_INSTANT_ID_LENGTH
-      || instantLength  == 
HoodieInstantTimeGenerator.MILLIS_INSTANT_ID_LENGTH) { // for 
yyyyMMddHHmmss[SSS]
-      HoodieActiveTimeline.parseDateFromInstantTime(queryInstant) // validate 
the format
+      || instantLength  == 
HoodieInstantTimeGenerator.MILLIS_INSTANT_ID_LENGTH) {
+      // Handle already serialized "yyyyMMddHHmmss[SSS]" format
+      validateInstant(queryInstant)
       queryInstant
     } else if (instantLength == 10) { // for yyyy-MM-dd
       
HoodieActiveTimeline.formatDate(defaultDateFormat.get().parse(queryInstant))
@@ -356,4 +361,21 @@ object HoodieSqlCommonUtils extends SparkAdapterSupport {
     }.mkString(",")
     partitionsToDrop
   }
+
+  private def validateInstant(queryInstant: String): Unit = {
+    // Provided instant has to either
+    //  - Match one of the bootstrapping instants
+    //  - Be parse-able (as a date)
+    val valid = queryInstant match {
+      case HoodieTimeline.INIT_INSTANT_TS |
+           HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS |
+           HoodieTimeline.FULL_BOOTSTRAP_INSTANT_TS => true
+
+      case _ => Try(parseDateFromInstantTime(queryInstant)).isSuccess
+    }
+
+    if (!valid) {
+      throw new HoodieException(s"Got an invalid instant ($queryInstant)")
+    }
+  }
 }
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestDataSourceForBootstrap.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestDataSourceForBootstrap.scala
index 82f79eeb44e..e3d235591d4 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestDataSourceForBootstrap.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestDataSourceForBootstrap.scala
@@ -20,14 +20,16 @@ package org.apache.hudi.functional
 import org.apache.hadoop.fs.{FileSystem, Path}
 import org.apache.hudi.bootstrap.SparkParquetBootstrapDataProvider
 import 
org.apache.hudi.client.bootstrap.selector.FullRecordBootstrapModeSelector
+import org.apache.hudi.common.config.HoodieStorageConfig
 import org.apache.hudi.common.fs.FSUtils
 import org.apache.hudi.common.model.HoodieRecord
+import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType
 import org.apache.hudi.common.table.timeline.HoodieTimeline
 import org.apache.hudi.config.{HoodieBootstrapConfig, HoodieCompactionConfig, 
HoodieWriteConfig}
 import org.apache.hudi.functional.TestDataSourceForBootstrap.{dropMetaCols, 
sort}
 import org.apache.hudi.keygen.{NonpartitionedKeyGenerator, SimpleKeyGenerator}
 import org.apache.hudi.testutils.HoodieClientTestUtils
-import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, 
HoodieDataSourceHelpers}
+import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, 
HoodieDataSourceHelpers, HoodieSparkRecordMerger}
 import org.apache.spark.api.java.JavaSparkContext
 import org.apache.spark.sql.functions.{col, lit}
 import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
@@ -35,7 +37,7 @@ import org.junit.jupiter.api.Assertions.assertEquals
 import org.junit.jupiter.api.io.TempDir
 import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
 import org.junit.jupiter.params.ParameterizedTest
-import org.junit.jupiter.params.provider.ValueSource
+import org.junit.jupiter.params.provider.{CsvSource, EnumSource, ValueSource}
 
 import java.time.Instant
 import java.util.Collections
@@ -56,6 +58,12 @@ class TestDataSourceForBootstrap {
     DataSourceWriteOptions.PRECOMBINE_FIELD.key -> "timestamp",
     HoodieWriteConfig.TBL_NAME.key -> "hoodie_test"
   )
+
+  val sparkRecordTypeOpts = Map(
+    HoodieWriteConfig.RECORD_MERGER_IMPLS.key -> 
classOf[HoodieSparkRecordMerger].getName,
+    HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key -> "parquet"
+  )
+
   var basePath: String = _
   var srcPath: String = _
   var fs: FileSystem = _
@@ -153,12 +161,18 @@ class TestDataSourceForBootstrap {
     assertEquals(numRecords, hoodieROViewDF1WithBasePath.count())
     assertEquals(numRecordsUpdate, 
hoodieROViewDF1WithBasePath.filter(s"timestamp == $updateTimestamp").count())
 
-    verifyIncrementalViewResult(commitInstantTime1, commitInstantTime2, 
isPartitioned = false, isHiveStylePartitioned = false)
+    verifyIncrementalViewResult(commitInstantTime1, commitInstantTime2, 
isPartitioned = false, isHiveStylePartitioned = true)
   }
 
   @ParameterizedTest
-  @ValueSource(strings = Array("METADATA_ONLY", "FULL_RECORD"))
-  def testMetadataBootstrapCOWHiveStylePartitioned(bootstrapMode: String): 
Unit = {
+  @CsvSource(value = Array(
+    "METADATA_ONLY,AVRO",
+    // TODO(HUDI-5807) enable for spark native records
+    /* "METADATA_ONLY,SPARK", */
+    "FULL_RECORD,AVRO",
+    "FULL_RECORD,SPARK"
+  ))
+  def testMetadataBootstrapCOWHiveStylePartitioned(bootstrapMode: String, 
recordType: HoodieRecordType): Unit = {
     val timestamp = Instant.now.toEpochMilli
     val jsc = JavaSparkContext.fromSparkContext(spark.sparkContext)
 
@@ -181,16 +195,15 @@ class TestDataSourceForBootstrap {
     // Perform bootstrap
     val commitInstantTime1 = runMetadataBootstrapAndVerifyCommit(
       DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL,
-      readOpts,
+      readOpts ++ getRecordTypeOpts(recordType),
       classOf[SimpleKeyGenerator].getName)
 
     // check marked directory clean up
     assert(!fs.exists(new Path(basePath, ".hoodie/.temp/00000000000001")))
 
-    // TODO(HUDI-5602) troubleshoot
     val expectedDF = bootstrapMode match {
       case "METADATA_ONLY" =>
-        sort(sourceDF).withColumn("datestr", lit(null))
+        sort(sourceDF)
       case "FULL_RECORD" =>
         sort(sourceDF)
     }
@@ -208,9 +221,11 @@ class TestDataSourceForBootstrap {
     val updateDF = TestBootstrap.generateTestRawTripDataset(updateTimestamp, 
0, numRecordsUpdate, partitionPaths.asJava,
       jsc, spark.sqlContext)
 
+    val writeOpts = commonOpts ++ getRecordTypeOpts(recordType)
+
     updateDF.write
       .format("hudi")
-      .options(commonOpts)
+      .options(writeOpts)
       .option(DataSourceWriteOptions.OPERATION.key, 
DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
       .option(DataSourceWriteOptions.TABLE_TYPE.key, 
DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL)
       .option(DataSourceWriteOptions.PARTITIONPATH_FIELD.key, "datestr")
@@ -234,28 +249,31 @@ class TestDataSourceForBootstrap {
     verifyIncrementalViewResult(commitInstantTime1, commitInstantTime2, 
isPartitioned = true, isHiveStylePartitioned = true)
   }
 
-  @Test def testMetadataBootstrapCOWPartitioned(): Unit = {
+  @ParameterizedTest
+  @EnumSource(value = classOf[HoodieRecordType],
+    // TODO(HUDI-5807) enable for spark native records
+    names = Array("AVRO" /*, "SPARK" */))
+  def testMetadataBootstrapCOWPartitioned(recordType: HoodieRecordType): Unit 
= {
     val timestamp = Instant.now.toEpochMilli
     val jsc = JavaSparkContext.fromSparkContext(spark.sparkContext)
 
     val sourceDF = TestBootstrap.generateTestRawTripDataset(timestamp, 0, 
numRecords, partitionPaths.asJava, jsc,
       spark.sqlContext)
 
-    // Writing data for each partition instead of using partitionBy to avoid 
hive-style partitioning and hence
-    // have partitioned columns stored in the data file
-    partitionPaths.foreach(partitionPath => {
-      sourceDF
-        .filter(sourceDF("datestr").equalTo(lit(partitionPath)))
-        .write
-        .format("parquet")
-        .mode(SaveMode.Overwrite)
-        .save(srcPath + "/" + partitionPath)
-    })
+    sourceDF.write.format("parquet")
+      .partitionBy("datestr")
+      .mode(SaveMode.Overwrite)
+      .save(srcPath)
+
+    val writeOpts = commonOpts ++ getRecordTypeOpts(recordType) ++ Map(
+      DataSourceWriteOptions.HIVE_STYLE_PARTITIONING.key -> "true",
+      DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "datestr"
+    )
 
     // Perform bootstrap
     val commitInstantTime1 = runMetadataBootstrapAndVerifyCommit(
       DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL,
-      commonOpts.updated(DataSourceWriteOptions.PARTITIONPATH_FIELD.key, 
"datestr"),
+      writeOpts,
       classOf[SimpleKeyGenerator].getName)
 
     // Read bootstrapped table and verify count using glob path
@@ -270,10 +288,9 @@ class TestDataSourceForBootstrap {
     val updateDf1 = hoodieROViewDF1.filter(col("_row_key") === 
verificationRowKey).withColumn(verificationCol, lit(updatedVerificationVal))
     updateDf1.write
       .format("hudi")
-      .options(commonOpts)
+      .options(writeOpts)
       .option(DataSourceWriteOptions.OPERATION.key, 
DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
       .option(DataSourceWriteOptions.TABLE_TYPE.key, 
DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL)
-      .option(DataSourceWriteOptions.PARTITIONPATH_FIELD.key, "datestr")
       .mode(SaveMode.Append)
       .save(basePath)
 
@@ -290,10 +307,9 @@ class TestDataSourceForBootstrap {
 
     updateDF2.write
       .format("hudi")
-      .options(commonOpts)
+      .options(writeOpts)
       .option(DataSourceWriteOptions.OPERATION.key, 
DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
       .option(DataSourceWriteOptions.TABLE_TYPE.key, 
DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL)
-      .option(DataSourceWriteOptions.PARTITIONPATH_FIELD.key, "datestr")
       .mode(SaveMode.Append)
       .save(basePath)
 
@@ -309,31 +325,34 @@ class TestDataSourceForBootstrap {
     assertEquals(numRecords, hoodieROViewDF4.count())
     assertEquals(numRecordsUpdate, hoodieROViewDF4.filter(s"timestamp == 
$updateTimestamp").count())
 
-    verifyIncrementalViewResult(commitInstantTime1, commitInstantTime3, 
isPartitioned = true, isHiveStylePartitioned = false)
+    verifyIncrementalViewResult(commitInstantTime1, commitInstantTime3, 
isPartitioned = true, isHiveStylePartitioned = true)
   }
 
-  @Test def testMetadataBootstrapMORPartitionedInlineCompactionOn(): Unit = {
+  @ParameterizedTest
+  @EnumSource(value = classOf[HoodieRecordType],
+    // TODO(HUDI-5807) enable for spark native records
+    names = Array("AVRO" /*, "SPARK" */))
+  def testMetadataBootstrapMORPartitionedInlineCompactionOn(recordType: 
HoodieRecordType): Unit = {
     val timestamp = Instant.now.toEpochMilli
     val jsc = JavaSparkContext.fromSparkContext(spark.sparkContext)
 
     val sourceDF = TestBootstrap.generateTestRawTripDataset(timestamp, 0, 
numRecords, partitionPaths.asJava, jsc,
       spark.sqlContext)
 
-    // Writing data for each partition instead of using partitionBy to avoid 
hive-style partitioning and hence
-    // have partitioned columns stored in the data file
-    partitionPaths.foreach(partitionPath => {
-      sourceDF
-        .filter(sourceDF("datestr").equalTo(lit(partitionPath)))
-        .write
-        .format("parquet")
-        .mode(SaveMode.Overwrite)
-        .save(srcPath + "/" + partitionPath)
-    })
+    sourceDF.write.format("parquet")
+      .partitionBy("datestr")
+      .mode(SaveMode.Overwrite)
+      .save(srcPath)
+
+    val writeOpts = commonOpts ++ getRecordTypeOpts(recordType) ++ Map(
+      DataSourceWriteOptions.HIVE_STYLE_PARTITIONING.key -> "true",
+      DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "datestr"
+    )
 
     // Perform bootstrap
     val commitInstantTime1 = runMetadataBootstrapAndVerifyCommit(
       DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL,
-      commonOpts.updated(DataSourceWriteOptions.PARTITIONPATH_FIELD.key, 
"datestr"),
+      writeOpts,
       classOf[SimpleKeyGenerator].getName)
 
     // Read bootstrapped table and verify count
@@ -350,10 +369,9 @@ class TestDataSourceForBootstrap {
 
     updateDF.write
       .format("hudi")
-      .options(commonOpts)
+      .options(writeOpts)
       .option(DataSourceWriteOptions.OPERATION.key, 
DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
       .option(DataSourceWriteOptions.TABLE_TYPE.key, 
DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL)
-      .option(DataSourceWriteOptions.PARTITIONPATH_FIELD.key, "datestr")
       .option(HoodieCompactionConfig.INLINE_COMPACT.key, "true")
       .option(HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key, "1")
       .mode(SaveMode.Append)
@@ -379,28 +397,29 @@ class TestDataSourceForBootstrap {
     assertEquals(numRecordsUpdate, 
hoodieROViewDFWithBasePath.filter(s"timestamp == $updateTimestamp").count())
   }
 
-  @Test def testMetadataBootstrapMORPartitioned(): Unit = {
+  @ParameterizedTest
+  @EnumSource(value = classOf[HoodieRecordType], names = Array("AVRO", 
"SPARK"))
+  def testMetadataBootstrapMORPartitioned(recordType: HoodieRecordType): Unit 
= {
     val timestamp = Instant.now.toEpochMilli
     val jsc = JavaSparkContext.fromSparkContext(spark.sparkContext)
 
     val sourceDF = TestBootstrap.generateTestRawTripDataset(timestamp, 0, 
numRecords, partitionPaths.asJava, jsc,
       spark.sqlContext)
 
-    // Writing data for each partition instead of using partitionBy to avoid 
hive-style partitioning and hence
-    // have partitioned columns stored in the data file
-    partitionPaths.foreach(partitionPath => {
-      sourceDF
-        .filter(sourceDF("datestr").equalTo(lit(partitionPath)))
-        .write
-        .format("parquet")
-        .mode(SaveMode.Overwrite)
-        .save(srcPath + "/" + partitionPath)
-    })
+    sourceDF.write.format("parquet")
+      .partitionBy("datestr")
+      .mode(SaveMode.Overwrite)
+      .save(srcPath)
+
+    val writeOpts = commonOpts ++ getRecordTypeOpts(recordType) ++ Map(
+      DataSourceWriteOptions.HIVE_STYLE_PARTITIONING.key -> "true",
+      DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "datestr"
+    )
 
     // Perform bootstrap
     val commitInstantTime1 = runMetadataBootstrapAndVerifyCommit(
       DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL,
-      commonOpts.updated(DataSourceWriteOptions.PARTITIONPATH_FIELD.key, 
"datestr"),
+      writeOpts,
       classOf[SimpleKeyGenerator].getName)
 
     // Read bootstrapped table and verify count
@@ -423,10 +442,9 @@ class TestDataSourceForBootstrap {
     val updateDf1 = hoodieROViewDF1.filter(col("_row_key") === 
verificationRowKey).withColumn(verificationCol, lit(updatedVerificationVal))
     updateDf1.write
       .format("hudi")
-      .options(commonOpts)
+      .options(writeOpts)
       .option(DataSourceWriteOptions.OPERATION.key, 
DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
       .option(DataSourceWriteOptions.TABLE_TYPE.key, 
DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL)
-      .option(DataSourceWriteOptions.PARTITIONPATH_FIELD.key, "datestr")
       .mode(SaveMode.Append)
       .save(basePath)
 
@@ -446,10 +464,9 @@ class TestDataSourceForBootstrap {
 
     updateDF2.write
       .format("hudi")
-      .options(commonOpts)
+      .options(writeOpts)
       .option(DataSourceWriteOptions.OPERATION.key, 
DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
       .option(DataSourceWriteOptions.TABLE_TYPE.key, 
DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL)
-      .option(DataSourceWriteOptions.PARTITIONPATH_FIELD.key, "datestr")
       .mode(SaveMode.Append)
       .save(basePath)
 
@@ -466,31 +483,31 @@ class TestDataSourceForBootstrap {
     assertEquals(0, hoodieROViewDF3.filter(s"timestamp == 
$updateTimestamp").count())
   }
 
-  @Test def testFullBootstrapCOWPartitioned(): Unit = {
+  @ParameterizedTest
+  @EnumSource(value = classOf[HoodieRecordType], names = Array("AVRO", 
"SPARK"))
+  def testFullBootstrapCOWPartitioned(recordType: HoodieRecordType): Unit = {
     val timestamp = Instant.now.toEpochMilli
     val jsc = JavaSparkContext.fromSparkContext(spark.sparkContext)
 
     val sourceDF = TestBootstrap.generateTestRawTripDataset(timestamp, 0, 
numRecords, partitionPaths.asJava, jsc,
       spark.sqlContext)
 
-    // Writing data for each partition instead of using partitionBy to avoid 
hive-style partitioning and hence
-    // have partitioned columns stored in the data file
-    partitionPaths.foreach(partitionPath => {
-      sourceDF
-        .filter(sourceDF("datestr").equalTo(lit(partitionPath)))
-        .write
-        .format("parquet")
-        .mode(SaveMode.Overwrite)
-        .save(srcPath + "/" + partitionPath)
-    })
+    sourceDF.write.format("parquet")
+      .partitionBy("datestr")
+      .mode(SaveMode.Overwrite)
+      .save(srcPath)
+
+    val writeOpts = commonOpts ++ getRecordTypeOpts(recordType) ++ Map(
+      DataSourceWriteOptions.HIVE_STYLE_PARTITIONING.key -> "true",
+      DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "datestr"
+    )
 
     // Perform bootstrap
     val bootstrapDF = spark.emptyDataFrame
     bootstrapDF.write
       .format("hudi")
-      .options(commonOpts)
+      .options(writeOpts)
       .option(DataSourceWriteOptions.OPERATION.key, 
DataSourceWriteOptions.BOOTSTRAP_OPERATION_OPT_VAL)
-      .option(DataSourceWriteOptions.PARTITIONPATH_FIELD.key, "datestr")
       .option(HoodieBootstrapConfig.BASE_PATH.key, srcPath)
       .option(HoodieBootstrapConfig.KEYGEN_CLASS_NAME.key, 
classOf[SimpleKeyGenerator].getName)
       .option(HoodieBootstrapConfig.MODE_SELECTOR_CLASS_NAME.key, 
classOf[FullRecordBootstrapModeSelector].getName)
@@ -515,10 +532,9 @@ class TestDataSourceForBootstrap {
 
     updateDF.write
       .format("hudi")
-      .options(commonOpts)
+      .options(writeOpts)
       .option(DataSourceWriteOptions.OPERATION.key, 
DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
       .option(DataSourceWriteOptions.TABLE_TYPE.key, 
DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL)
-      .option(DataSourceWriteOptions.PARTITIONPATH_FIELD.key, "datestr")
       .mode(SaveMode.Append)
       .save(basePath)
 
@@ -530,7 +546,7 @@ class TestDataSourceForBootstrap {
     assertEquals(numRecords, hoodieROViewDF2.count())
     assertEquals(numRecordsUpdate, hoodieROViewDF2.filter(s"timestamp == 
$updateTimestamp").count())
 
-    verifyIncrementalViewResult(commitInstantTime1, commitInstantTime2, 
isPartitioned = true, isHiveStylePartitioned = false)
+    verifyIncrementalViewResult(commitInstantTime1, commitInstantTime2, 
isPartitioned = true, isHiveStylePartitioned = true)
   }
 
   def runMetadataBootstrapAndVerifyCommit(tableType: String,
@@ -596,6 +612,12 @@ class TestDataSourceForBootstrap {
         hoodieIncViewDF3.count())
     }
   }
+
+  def getRecordTypeOpts(recordType: HoodieRecordType): Map[String, String] =
+    recordType match {
+      case HoodieRecordType.SPARK => sparkRecordTypeOpts
+      case _ => Map.empty
+    }
 }
 
 object TestDataSourceForBootstrap {
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
index eb6ab80b5f9..f85e55dfd40 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
@@ -650,6 +650,7 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     cfg.configs.add(String.format("hoodie.bootstrap.base.path=%s", 
bootstrapSourcePath));
     cfg.configs.add(String.format("%s=%s", 
DataSourceWriteOptions.PARTITIONPATH_FIELD().key(), "rider"));
     cfg.configs.add(String.format("hoodie.bootstrap.keygen.class=%s", 
SimpleKeyGenerator.class.getName()));
+    cfg.configs.add("hoodie.datasource.write.hive_style_partitioning=true");
     cfg.configs.add("hoodie.bootstrap.parallelism=5");
     cfg.targetBasePath = newDatasetBasePath;
     new HoodieDeltaStreamer(cfg, jsc).sync();
@@ -660,6 +661,9 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     TestHelpers.assertRecordCount(1950, newDatasetBasePath, sqlContext);
     res.registerTempTable("bootstrapped");
     assertEquals(1950, sqlContext.sql("select distinct _hoodie_record_key from 
bootstrapped").count());
+    // NOTE: To fetch record's count Spark will optimize the query fetching 
minimal possible amount
+    //       of data, which might not provide adequate amount of test coverage
+    sqlContext.sql("select * from bootstrapped").show();
 
     StructField[] fields = res.schema().fields();
     List<String> fieldNames = Arrays.asList(res.schema().fieldNames());

Reply via email to