jonvex commented on code in PR #8847: URL: https://github.com/apache/hudi/pull/8847#discussion_r1218360481
########## hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBootstrapRDD.scala: ########## @@ -36,53 +37,68 @@ class HoodieBootstrapRDD(@transient spark: SparkSession, @transient splits: Seq[HoodieBootstrapSplit]) extends RDD[InternalRow](spark.sparkContext, Nil) { - override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = { - val bootstrapPartition = split.asInstanceOf[HoodieBootstrapPartition] + protected def getSkeletonIteratorSchema(dataFile: PartitionedFile, skeletonFile: PartitionedFile): (Iterator[InternalRow], StructType) = { + if (bootstrapDataFileReader.schema.isEmpty) { + // No data column to fetch, hence fetch only from skeleton file + (bootstrapSkeletonFileReader.read(skeletonFile), bootstrapSkeletonFileReader.schema) + } else if (bootstrapSkeletonFileReader.schema.isEmpty) { + // No metadata column to fetch, hence fetch only from data file + (bootstrapDataFileReader.read(dataFile), bootstrapDataFileReader.schema) + } else { + // Fetch from both data and skeleton file, and merge + val dataFileIterator = bootstrapDataFileReader.read(dataFile) + val skeletonFileIterator = bootstrapSkeletonFileReader.read(skeletonFile) + val mergedSchema = StructType(bootstrapSkeletonFileReader.schema.fields ++ bootstrapDataFileReader.schema.fields) + + (merge(skeletonFileIterator, dataFileIterator), mergedSchema) + } + } + + /** + * Here we have to project the [[InternalRow]]s fetched into the expected target schema. + * These could diverge for ex, when requested schema contains partition columns which might not be + * persisted w/in the data file, but instead would be parsed from the partition path. In that case + * output of the file-reader will have different ordering of the fields than the original required + * schema (for more details please check out [[ParquetFileFormat]] implementation). + */ + protected def unsafeProjectIterator(iterator: Iterator[InternalRow], schema: StructType): Iterator[InternalRow] = { + val unsafeProjection = generateUnsafeProjection(schema, requiredSchema.structTypeSchema) + iterator.map(unsafeProjection) + } + + protected def maybeLog(bootstrapPartition: HoodieBootstrapPartition): Unit = { if (log.isDebugEnabled) { + var msg = "Got Split => Index: " + bootstrapPartition.index + ", Data File: " + + bootstrapPartition.split.dataFile.filePath if (bootstrapPartition.split.skeletonFile.isDefined) { - logDebug("Got Split => Index: " + bootstrapPartition.index + ", Data File: " - + bootstrapPartition.split.dataFile.filePath + ", Skeleton File: " - + bootstrapPartition.split.skeletonFile.get.filePath) - } else { - logDebug("Got Split => Index: " + bootstrapPartition.index + ", Data File: " - + bootstrapPartition.split.dataFile.filePath) + msg += ", Skeleton File: " + bootstrapPartition.split.skeletonFile.get.filePath + } + if (bootstrapPartition.split.logFiles.nonEmpty) { + msg += ", Log Paths: " + bootstrapPartition.split.logFiles } + logDebug(msg) } + } + protected def getIterator(bootstrapPartition: HoodieBootstrapPartition): Iterator[InternalRow] = { bootstrapPartition.split.skeletonFile match { case Some(skeletonFile) => // It is a bootstrap split. Check both skeleton and data files. - val (iterator, schema) = if (bootstrapDataFileReader.schema.isEmpty) { - // No data column to fetch, hence fetch only from skeleton file - (bootstrapSkeletonFileReader.read(skeletonFile), bootstrapSkeletonFileReader.schema) - } else if (bootstrapSkeletonFileReader.schema.isEmpty) { - // No metadata column to fetch, hence fetch only from data file - (bootstrapDataFileReader.read(bootstrapPartition.split.dataFile), bootstrapDataFileReader.schema) - } else { - // Fetch from both data and skeleton file, and merge - val dataFileIterator = bootstrapDataFileReader.read(bootstrapPartition.split.dataFile) - val skeletonFileIterator = bootstrapSkeletonFileReader.read(skeletonFile) - val mergedSchema = StructType(bootstrapSkeletonFileReader.schema.fields ++ bootstrapDataFileReader.schema.fields) - - (merge(skeletonFileIterator, dataFileIterator), mergedSchema) - } - - // NOTE: Here we have to project the [[InternalRow]]s fetched into the expected target schema. - // These could diverge for ex, when requested schema contains partition columns which might not be - // persisted w/in the data file, but instead would be parsed from the partition path. In that case - // output of the file-reader will have different ordering of the fields than the original required - // schema (for more details please check out [[ParquetFileFormat]] implementation). - val unsafeProjection = generateUnsafeProjection(schema, requiredSchema.structTypeSchema) - - iterator.map(unsafeProjection) - + val (iterator, schema) = getSkeletonIteratorSchema(bootstrapPartition.split.dataFile, skeletonFile) + unsafeProjectIterator(iterator, schema) case _ => // NOTE: Regular file-reader is already projected into the required schema regularFileReader.read(bootstrapPartition.split.dataFile) } } + override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = { Review Comment: Yes the purpose is to reuse functionality -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org