jonvex commented on code in PR #8847:
URL: https://github.com/apache/hudi/pull/8847#discussion_r1218360481


##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBootstrapRDD.scala:
##########
@@ -36,53 +37,68 @@ class HoodieBootstrapRDD(@transient spark: SparkSession,
                          @transient splits: Seq[HoodieBootstrapSplit])
   extends RDD[InternalRow](spark.sparkContext, Nil) {
 
-  override def compute(split: Partition, context: TaskContext): 
Iterator[InternalRow] = {
-    val bootstrapPartition = split.asInstanceOf[HoodieBootstrapPartition]
 
+  protected def getSkeletonIteratorSchema(dataFile: PartitionedFile, 
skeletonFile: PartitionedFile): (Iterator[InternalRow], StructType) = {
+    if (bootstrapDataFileReader.schema.isEmpty) {
+      // No data column to fetch, hence fetch only from skeleton file
+      (bootstrapSkeletonFileReader.read(skeletonFile), 
bootstrapSkeletonFileReader.schema)
+    } else if (bootstrapSkeletonFileReader.schema.isEmpty) {
+      // No metadata column to fetch, hence fetch only from data file
+      (bootstrapDataFileReader.read(dataFile), bootstrapDataFileReader.schema)
+    } else {
+      // Fetch from both data and skeleton file, and merge
+      val dataFileIterator = bootstrapDataFileReader.read(dataFile)
+      val skeletonFileIterator = bootstrapSkeletonFileReader.read(skeletonFile)
+      val mergedSchema = StructType(bootstrapSkeletonFileReader.schema.fields 
++ bootstrapDataFileReader.schema.fields)
+
+      (merge(skeletonFileIterator, dataFileIterator), mergedSchema)
+    }
+  }
+
+  /**
+   *  Here we have to project the [[InternalRow]]s fetched into the expected 
target schema.
+   *  These could diverge for ex, when requested schema contains partition 
columns which might not be
+   *  persisted w/in the data file, but instead would be parsed from the 
partition path. In that case
+   *  output of the file-reader will have different ordering of the fields 
than the original required
+   *  schema (for more details please check out [[ParquetFileFormat]] 
implementation).
+   */
+  protected def unsafeProjectIterator(iterator: Iterator[InternalRow], schema: 
StructType): Iterator[InternalRow] = {
+    val unsafeProjection = generateUnsafeProjection(schema, 
requiredSchema.structTypeSchema)
+    iterator.map(unsafeProjection)
+  }
+
+  protected def maybeLog(bootstrapPartition: HoodieBootstrapPartition): Unit = 
{
     if (log.isDebugEnabled) {
+      var msg = "Got Split => Index: " + bootstrapPartition.index + ", Data 
File: " +
+        bootstrapPartition.split.dataFile.filePath
       if (bootstrapPartition.split.skeletonFile.isDefined) {
-        logDebug("Got Split => Index: " + bootstrapPartition.index + ", Data 
File: "
-          + bootstrapPartition.split.dataFile.filePath + ", Skeleton File: "
-          + bootstrapPartition.split.skeletonFile.get.filePath)
-      } else {
-        logDebug("Got Split => Index: " + bootstrapPartition.index + ", Data 
File: "
-          + bootstrapPartition.split.dataFile.filePath)
+        msg += ", Skeleton File: " + 
bootstrapPartition.split.skeletonFile.get.filePath
+      }
+      if (bootstrapPartition.split.logFiles.nonEmpty) {
+        msg += ", Log Paths: " + bootstrapPartition.split.logFiles
       }
+      logDebug(msg)
     }
+  }
 
+  protected def getIterator(bootstrapPartition: HoodieBootstrapPartition): 
Iterator[InternalRow] = {
     bootstrapPartition.split.skeletonFile match {
       case Some(skeletonFile) =>
         // It is a bootstrap split. Check both skeleton and data files.
-        val (iterator, schema) = if (bootstrapDataFileReader.schema.isEmpty) {
-          // No data column to fetch, hence fetch only from skeleton file
-          (bootstrapSkeletonFileReader.read(skeletonFile), 
bootstrapSkeletonFileReader.schema)
-        } else if (bootstrapSkeletonFileReader.schema.isEmpty) {
-          // No metadata column to fetch, hence fetch only from data file
-          (bootstrapDataFileReader.read(bootstrapPartition.split.dataFile), 
bootstrapDataFileReader.schema)
-        } else {
-          // Fetch from both data and skeleton file, and merge
-          val dataFileIterator = 
bootstrapDataFileReader.read(bootstrapPartition.split.dataFile)
-          val skeletonFileIterator = 
bootstrapSkeletonFileReader.read(skeletonFile)
-          val mergedSchema = 
StructType(bootstrapSkeletonFileReader.schema.fields ++ 
bootstrapDataFileReader.schema.fields)
-
-          (merge(skeletonFileIterator, dataFileIterator), mergedSchema)
-        }
-
-        // NOTE: Here we have to project the [[InternalRow]]s fetched into the 
expected target schema.
-        //       These could diverge for ex, when requested schema contains 
partition columns which might not be
-        //       persisted w/in the data file, but instead would be parsed 
from the partition path. In that case
-        //       output of the file-reader will have different ordering of the 
fields than the original required
-        //       schema (for more details please check out 
[[ParquetFileFormat]] implementation).
-        val unsafeProjection = generateUnsafeProjection(schema, 
requiredSchema.structTypeSchema)
-
-        iterator.map(unsafeProjection)
-
+        val (iterator, schema) = 
getSkeletonIteratorSchema(bootstrapPartition.split.dataFile, skeletonFile)
+        unsafeProjectIterator(iterator, schema)
       case _ =>
         // NOTE: Regular file-reader is already projected into the required 
schema
         regularFileReader.read(bootstrapPartition.split.dataFile)
     }
   }
 
+  override def compute(split: Partition, context: TaskContext): 
Iterator[InternalRow] = {

Review Comment:
   Yes the purpose is to reuse functionality



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to