bvaradar commented on code in PR #8847:
URL: https://github.com/apache/hudi/pull/8847#discussion_r1217216256


##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBootstrapMORRDD.scala:
##########
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.mapred.JobConf
+import org.apache.hudi.HoodieBaseRelation.BaseFileReader
+import org.apache.hudi.HoodieBootstrapMORRDD.CONFIG_INSTANTIATION_LOCK
+import 
org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils.getMaxCompactionMemoryInBytes
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.{Partition, SerializableWritable, TaskContext}
+
+class HoodieBootstrapMORRDD(@transient spark: SparkSession,
+                            @transient config: Configuration,
+                            bootstrapDataFileReader: BaseFileReader,
+                            bootstrapSkeletonFileReader: BaseFileReader,
+                            regularFileReader: BaseFileReader,
+                            tableSchema: HoodieTableSchema,
+                            requiredSchema: HoodieTableSchema,
+                            tableState: HoodieTableState,
+                            @transient splits: Seq[HoodieBootstrapSplit])
+  extends HoodieBootstrapRDD(spark, bootstrapDataFileReader, 
bootstrapSkeletonFileReader,
+    regularFileReader, requiredSchema, splits) {
+
+  protected val maxCompactionMemoryInBytes: Long = 
getMaxCompactionMemoryInBytes(new JobConf(config))
+
+  private val hadoopConfBroadcast = spark.sparkContext.broadcast(new 
SerializableWritable(config))
+
+  override def compute(split: Partition, context: TaskContext): 
Iterator[InternalRow] = {
+    val bootstrapPartition = split.asInstanceOf[HoodieBootstrapPartition]
+    maybeLog(bootstrapPartition)
+
+    if (bootstrapPartition.split.logFiles.isEmpty) {
+      //no log files, treat like regular bootstrap
+      getIterator(bootstrapPartition)
+    } else {
+      bootstrapPartition.split.skeletonFile match {
+        case Some(skeletonFile) =>
+          val (iterator, schema) = 
getSkeletonIteratorSchema(bootstrapPartition.split.dataFile, skeletonFile)
+          new 
RecordMergingFileIterator(HoodieMergeOnReadFileSplit(Some(bootstrapPartition.split.dataFile),
 bootstrapPartition.split.logFiles),
+            iterator, schema, tableSchema, requiredSchema, tableState, 
getHadoopConf)
+        case _ =>
+          // NOTE: Regular file-reader is already projected into the required 
schema
+          new 
RecordMergingFileIterator(HoodieMergeOnReadFileSplit(Some(bootstrapPartition.split.dataFile),
 bootstrapPartition.split.logFiles),

Review Comment:
   @jonvex : It looks like RecordMergingFileIterator expects the base file to 
be a hoodie file. the bootstrap data file won't have any Hudi record key fields 
setup. How would this work ? 



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBootstrapRelation.scala:
##########
@@ -60,47 +88,62 @@ case class HoodieBootstrapRelation(override val sqlContext: 
SQLContext,
   extends HoodieBaseRelation(sqlContext, metaClient, optParams, userSchema, 
prunedDataSchema) {
 
   override type FileSplit = HoodieBootstrapSplit
-  override type Relation = HoodieBootstrapRelation
+
 
   private lazy val skeletonSchema = HoodieSparkUtils.getMetaSchema
 
   private lazy val bootstrapBasePath = new 
Path(metaClient.getTableConfig.getBootstrapBasePath.get)
 
-  override val mandatoryFields: Seq[String] = Seq.empty
+  override lazy val mandatoryFields: Seq[String] = Seq.empty
+
+  protected def getFileSlices(partitionFilters: Seq[Expression], dataFilters: 
Seq[Expression]): Seq[FileSlice] = {
+    listLatestFileSlices(globPaths, partitionFilters, dataFilters)
+  }
 
   protected override def collectFileSplits(partitionFilters: Seq[Expression], 
dataFilters: Seq[Expression]): Seq[FileSplit] = {
-    val fileSlices = listLatestFileSlices(globPaths, partitionFilters, 
dataFilters)
+    val fileSlices = getFileSlices(partitionFilters, dataFilters)
     val isPartitioned = metaClient.getTableConfig.isTablePartitioned
     fileSlices.map { fileSlice =>
       val baseFile = fileSlice.getBaseFile.get()
+      val logFiles = 
fileSlice.getLogFiles.sorted(HoodieLogFile.getLogFileComparator).iterator().asScala.toList

Review Comment:
   we should keep the MOR aspects to HoodieBootstrapMORRelation and have this 
class deal with common/COW aspects to have a clean separation. 



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBootstrapRDD.scala:
##########
@@ -36,53 +37,68 @@ class HoodieBootstrapRDD(@transient spark: SparkSession,
                          @transient splits: Seq[HoodieBootstrapSplit])
   extends RDD[InternalRow](spark.sparkContext, Nil) {
 
-  override def compute(split: Partition, context: TaskContext): 
Iterator[InternalRow] = {
-    val bootstrapPartition = split.asInstanceOf[HoodieBootstrapPartition]
 
+  protected def getSkeletonIteratorSchema(dataFile: PartitionedFile, 
skeletonFile: PartitionedFile): (Iterator[InternalRow], StructType) = {
+    if (bootstrapDataFileReader.schema.isEmpty) {
+      // No data column to fetch, hence fetch only from skeleton file
+      (bootstrapSkeletonFileReader.read(skeletonFile), 
bootstrapSkeletonFileReader.schema)
+    } else if (bootstrapSkeletonFileReader.schema.isEmpty) {
+      // No metadata column to fetch, hence fetch only from data file
+      (bootstrapDataFileReader.read(dataFile), bootstrapDataFileReader.schema)
+    } else {
+      // Fetch from both data and skeleton file, and merge
+      val dataFileIterator = bootstrapDataFileReader.read(dataFile)
+      val skeletonFileIterator = bootstrapSkeletonFileReader.read(skeletonFile)
+      val mergedSchema = StructType(bootstrapSkeletonFileReader.schema.fields 
++ bootstrapDataFileReader.schema.fields)
+
+      (merge(skeletonFileIterator, dataFileIterator), mergedSchema)
+    }
+  }
+
+  /**
+   *  Here we have to project the [[InternalRow]]s fetched into the expected 
target schema.
+   *  These could diverge for ex, when requested schema contains partition 
columns which might not be

Review Comment:
   what is the implication of this ? would read query fail here ? 



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBootstrapRDD.scala:
##########
@@ -36,53 +37,68 @@ class HoodieBootstrapRDD(@transient spark: SparkSession,
                          @transient splits: Seq[HoodieBootstrapSplit])
   extends RDD[InternalRow](spark.sparkContext, Nil) {
 
-  override def compute(split: Partition, context: TaskContext): 
Iterator[InternalRow] = {
-    val bootstrapPartition = split.asInstanceOf[HoodieBootstrapPartition]
 
+  protected def getSkeletonIteratorSchema(dataFile: PartitionedFile, 
skeletonFile: PartitionedFile): (Iterator[InternalRow], StructType) = {
+    if (bootstrapDataFileReader.schema.isEmpty) {
+      // No data column to fetch, hence fetch only from skeleton file
+      (bootstrapSkeletonFileReader.read(skeletonFile), 
bootstrapSkeletonFileReader.schema)
+    } else if (bootstrapSkeletonFileReader.schema.isEmpty) {
+      // No metadata column to fetch, hence fetch only from data file
+      (bootstrapDataFileReader.read(dataFile), bootstrapDataFileReader.schema)
+    } else {
+      // Fetch from both data and skeleton file, and merge
+      val dataFileIterator = bootstrapDataFileReader.read(dataFile)
+      val skeletonFileIterator = bootstrapSkeletonFileReader.read(skeletonFile)
+      val mergedSchema = StructType(bootstrapSkeletonFileReader.schema.fields 
++ bootstrapDataFileReader.schema.fields)
+
+      (merge(skeletonFileIterator, dataFileIterator), mergedSchema)
+    }
+  }
+
+  /**
+   *  Here we have to project the [[InternalRow]]s fetched into the expected 
target schema.
+   *  These could diverge for ex, when requested schema contains partition 
columns which might not be
+   *  persisted w/in the data file, but instead would be parsed from the 
partition path. In that case
+   *  output of the file-reader will have different ordering of the fields 
than the original required
+   *  schema (for more details please check out [[ParquetFileFormat]] 
implementation).
+   */
+  protected def unsafeProjectIterator(iterator: Iterator[InternalRow], schema: 
StructType): Iterator[InternalRow] = {
+    val unsafeProjection = generateUnsafeProjection(schema, 
requiredSchema.structTypeSchema)
+    iterator.map(unsafeProjection)
+  }
+
+  protected def maybeLog(bootstrapPartition: HoodieBootstrapPartition): Unit = 
{
     if (log.isDebugEnabled) {
+      var msg = "Got Split => Index: " + bootstrapPartition.index + ", Data 
File: " +
+        bootstrapPartition.split.dataFile.filePath
       if (bootstrapPartition.split.skeletonFile.isDefined) {
-        logDebug("Got Split => Index: " + bootstrapPartition.index + ", Data 
File: "
-          + bootstrapPartition.split.dataFile.filePath + ", Skeleton File: "
-          + bootstrapPartition.split.skeletonFile.get.filePath)
-      } else {
-        logDebug("Got Split => Index: " + bootstrapPartition.index + ", Data 
File: "
-          + bootstrapPartition.split.dataFile.filePath)
+        msg += ", Skeleton File: " + 
bootstrapPartition.split.skeletonFile.get.filePath
+      }
+      if (bootstrapPartition.split.logFiles.nonEmpty) {
+        msg += ", Log Paths: " + bootstrapPartition.split.logFiles
       }
+      logDebug(msg)
     }
+  }
 
+  protected def getIterator(bootstrapPartition: HoodieBootstrapPartition): 
Iterator[InternalRow] = {
     bootstrapPartition.split.skeletonFile match {
       case Some(skeletonFile) =>
         // It is a bootstrap split. Check both skeleton and data files.
-        val (iterator, schema) = if (bootstrapDataFileReader.schema.isEmpty) {
-          // No data column to fetch, hence fetch only from skeleton file
-          (bootstrapSkeletonFileReader.read(skeletonFile), 
bootstrapSkeletonFileReader.schema)
-        } else if (bootstrapSkeletonFileReader.schema.isEmpty) {
-          // No metadata column to fetch, hence fetch only from data file
-          (bootstrapDataFileReader.read(bootstrapPartition.split.dataFile), 
bootstrapDataFileReader.schema)
-        } else {
-          // Fetch from both data and skeleton file, and merge
-          val dataFileIterator = 
bootstrapDataFileReader.read(bootstrapPartition.split.dataFile)
-          val skeletonFileIterator = 
bootstrapSkeletonFileReader.read(skeletonFile)
-          val mergedSchema = 
StructType(bootstrapSkeletonFileReader.schema.fields ++ 
bootstrapDataFileReader.schema.fields)
-
-          (merge(skeletonFileIterator, dataFileIterator), mergedSchema)
-        }
-
-        // NOTE: Here we have to project the [[InternalRow]]s fetched into the 
expected target schema.
-        //       These could diverge for ex, when requested schema contains 
partition columns which might not be
-        //       persisted w/in the data file, but instead would be parsed 
from the partition path. In that case
-        //       output of the file-reader will have different ordering of the 
fields than the original required
-        //       schema (for more details please check out 
[[ParquetFileFormat]] implementation).
-        val unsafeProjection = generateUnsafeProjection(schema, 
requiredSchema.structTypeSchema)
-
-        iterator.map(unsafeProjection)
-
+        val (iterator, schema) = 
getSkeletonIteratorSchema(bootstrapPartition.split.dataFile, skeletonFile)
+        unsafeProjectIterator(iterator, schema)
       case _ =>
         // NOTE: Regular file-reader is already projected into the required 
schema
         regularFileReader.read(bootstrapPartition.split.dataFile)
     }
   }
 
+  override def compute(split: Partition, context: TaskContext): 
Iterator[InternalRow] = {

Review Comment:
   what is the main change being done in HoodieBootstrapRDD ? Is this 
refactoring to allow HoodieBootstrapMORRDD to reuse functionality easily  ? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to