bvaradar commented on code in PR #8847: URL: https://github.com/apache/hudi/pull/8847#discussion_r1217216256
########## hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBootstrapMORRDD.scala: ########## @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.mapred.JobConf +import org.apache.hudi.HoodieBaseRelation.BaseFileReader +import org.apache.hudi.HoodieBootstrapMORRDD.CONFIG_INSTANTIATION_LOCK +import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils.getMaxCompactionMemoryInBytes +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.{Partition, SerializableWritable, TaskContext} + +class HoodieBootstrapMORRDD(@transient spark: SparkSession, + @transient config: Configuration, + bootstrapDataFileReader: BaseFileReader, + bootstrapSkeletonFileReader: BaseFileReader, + regularFileReader: BaseFileReader, + tableSchema: HoodieTableSchema, + requiredSchema: HoodieTableSchema, + tableState: HoodieTableState, + @transient splits: Seq[HoodieBootstrapSplit]) + extends HoodieBootstrapRDD(spark, bootstrapDataFileReader, bootstrapSkeletonFileReader, + regularFileReader, requiredSchema, splits) { + + protected val maxCompactionMemoryInBytes: Long = getMaxCompactionMemoryInBytes(new JobConf(config)) + + private val hadoopConfBroadcast = spark.sparkContext.broadcast(new SerializableWritable(config)) + + override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = { + val bootstrapPartition = split.asInstanceOf[HoodieBootstrapPartition] + maybeLog(bootstrapPartition) + + if (bootstrapPartition.split.logFiles.isEmpty) { + //no log files, treat like regular bootstrap + getIterator(bootstrapPartition) + } else { + bootstrapPartition.split.skeletonFile match { + case Some(skeletonFile) => + val (iterator, schema) = getSkeletonIteratorSchema(bootstrapPartition.split.dataFile, skeletonFile) + new RecordMergingFileIterator(HoodieMergeOnReadFileSplit(Some(bootstrapPartition.split.dataFile), bootstrapPartition.split.logFiles), + iterator, schema, tableSchema, requiredSchema, tableState, getHadoopConf) + case _ => + // NOTE: Regular file-reader is already projected into the required schema + new RecordMergingFileIterator(HoodieMergeOnReadFileSplit(Some(bootstrapPartition.split.dataFile), bootstrapPartition.split.logFiles), Review Comment: @jonvex : It looks like RecordMergingFileIterator expects the base file to be a hoodie file. the bootstrap data file won't have any Hudi record key fields setup. How would this work ? ########## hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBootstrapRelation.scala: ########## @@ -60,47 +88,62 @@ case class HoodieBootstrapRelation(override val sqlContext: SQLContext, extends HoodieBaseRelation(sqlContext, metaClient, optParams, userSchema, prunedDataSchema) { override type FileSplit = HoodieBootstrapSplit - override type Relation = HoodieBootstrapRelation + private lazy val skeletonSchema = HoodieSparkUtils.getMetaSchema private lazy val bootstrapBasePath = new Path(metaClient.getTableConfig.getBootstrapBasePath.get) - override val mandatoryFields: Seq[String] = Seq.empty + override lazy val mandatoryFields: Seq[String] = Seq.empty + + protected def getFileSlices(partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): Seq[FileSlice] = { + listLatestFileSlices(globPaths, partitionFilters, dataFilters) + } protected override def collectFileSplits(partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): Seq[FileSplit] = { - val fileSlices = listLatestFileSlices(globPaths, partitionFilters, dataFilters) + val fileSlices = getFileSlices(partitionFilters, dataFilters) val isPartitioned = metaClient.getTableConfig.isTablePartitioned fileSlices.map { fileSlice => val baseFile = fileSlice.getBaseFile.get() + val logFiles = fileSlice.getLogFiles.sorted(HoodieLogFile.getLogFileComparator).iterator().asScala.toList Review Comment: we should keep the MOR aspects to HoodieBootstrapMORRelation and have this class deal with common/COW aspects to have a clean separation. ########## hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBootstrapRDD.scala: ########## @@ -36,53 +37,68 @@ class HoodieBootstrapRDD(@transient spark: SparkSession, @transient splits: Seq[HoodieBootstrapSplit]) extends RDD[InternalRow](spark.sparkContext, Nil) { - override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = { - val bootstrapPartition = split.asInstanceOf[HoodieBootstrapPartition] + protected def getSkeletonIteratorSchema(dataFile: PartitionedFile, skeletonFile: PartitionedFile): (Iterator[InternalRow], StructType) = { + if (bootstrapDataFileReader.schema.isEmpty) { + // No data column to fetch, hence fetch only from skeleton file + (bootstrapSkeletonFileReader.read(skeletonFile), bootstrapSkeletonFileReader.schema) + } else if (bootstrapSkeletonFileReader.schema.isEmpty) { + // No metadata column to fetch, hence fetch only from data file + (bootstrapDataFileReader.read(dataFile), bootstrapDataFileReader.schema) + } else { + // Fetch from both data and skeleton file, and merge + val dataFileIterator = bootstrapDataFileReader.read(dataFile) + val skeletonFileIterator = bootstrapSkeletonFileReader.read(skeletonFile) + val mergedSchema = StructType(bootstrapSkeletonFileReader.schema.fields ++ bootstrapDataFileReader.schema.fields) + + (merge(skeletonFileIterator, dataFileIterator), mergedSchema) + } + } + + /** + * Here we have to project the [[InternalRow]]s fetched into the expected target schema. + * These could diverge for ex, when requested schema contains partition columns which might not be Review Comment: what is the implication of this ? would read query fail here ? ########## hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBootstrapRDD.scala: ########## @@ -36,53 +37,68 @@ class HoodieBootstrapRDD(@transient spark: SparkSession, @transient splits: Seq[HoodieBootstrapSplit]) extends RDD[InternalRow](spark.sparkContext, Nil) { - override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = { - val bootstrapPartition = split.asInstanceOf[HoodieBootstrapPartition] + protected def getSkeletonIteratorSchema(dataFile: PartitionedFile, skeletonFile: PartitionedFile): (Iterator[InternalRow], StructType) = { + if (bootstrapDataFileReader.schema.isEmpty) { + // No data column to fetch, hence fetch only from skeleton file + (bootstrapSkeletonFileReader.read(skeletonFile), bootstrapSkeletonFileReader.schema) + } else if (bootstrapSkeletonFileReader.schema.isEmpty) { + // No metadata column to fetch, hence fetch only from data file + (bootstrapDataFileReader.read(dataFile), bootstrapDataFileReader.schema) + } else { + // Fetch from both data and skeleton file, and merge + val dataFileIterator = bootstrapDataFileReader.read(dataFile) + val skeletonFileIterator = bootstrapSkeletonFileReader.read(skeletonFile) + val mergedSchema = StructType(bootstrapSkeletonFileReader.schema.fields ++ bootstrapDataFileReader.schema.fields) + + (merge(skeletonFileIterator, dataFileIterator), mergedSchema) + } + } + + /** + * Here we have to project the [[InternalRow]]s fetched into the expected target schema. + * These could diverge for ex, when requested schema contains partition columns which might not be + * persisted w/in the data file, but instead would be parsed from the partition path. In that case + * output of the file-reader will have different ordering of the fields than the original required + * schema (for more details please check out [[ParquetFileFormat]] implementation). + */ + protected def unsafeProjectIterator(iterator: Iterator[InternalRow], schema: StructType): Iterator[InternalRow] = { + val unsafeProjection = generateUnsafeProjection(schema, requiredSchema.structTypeSchema) + iterator.map(unsafeProjection) + } + + protected def maybeLog(bootstrapPartition: HoodieBootstrapPartition): Unit = { if (log.isDebugEnabled) { + var msg = "Got Split => Index: " + bootstrapPartition.index + ", Data File: " + + bootstrapPartition.split.dataFile.filePath if (bootstrapPartition.split.skeletonFile.isDefined) { - logDebug("Got Split => Index: " + bootstrapPartition.index + ", Data File: " - + bootstrapPartition.split.dataFile.filePath + ", Skeleton File: " - + bootstrapPartition.split.skeletonFile.get.filePath) - } else { - logDebug("Got Split => Index: " + bootstrapPartition.index + ", Data File: " - + bootstrapPartition.split.dataFile.filePath) + msg += ", Skeleton File: " + bootstrapPartition.split.skeletonFile.get.filePath + } + if (bootstrapPartition.split.logFiles.nonEmpty) { + msg += ", Log Paths: " + bootstrapPartition.split.logFiles } + logDebug(msg) } + } + protected def getIterator(bootstrapPartition: HoodieBootstrapPartition): Iterator[InternalRow] = { bootstrapPartition.split.skeletonFile match { case Some(skeletonFile) => // It is a bootstrap split. Check both skeleton and data files. - val (iterator, schema) = if (bootstrapDataFileReader.schema.isEmpty) { - // No data column to fetch, hence fetch only from skeleton file - (bootstrapSkeletonFileReader.read(skeletonFile), bootstrapSkeletonFileReader.schema) - } else if (bootstrapSkeletonFileReader.schema.isEmpty) { - // No metadata column to fetch, hence fetch only from data file - (bootstrapDataFileReader.read(bootstrapPartition.split.dataFile), bootstrapDataFileReader.schema) - } else { - // Fetch from both data and skeleton file, and merge - val dataFileIterator = bootstrapDataFileReader.read(bootstrapPartition.split.dataFile) - val skeletonFileIterator = bootstrapSkeletonFileReader.read(skeletonFile) - val mergedSchema = StructType(bootstrapSkeletonFileReader.schema.fields ++ bootstrapDataFileReader.schema.fields) - - (merge(skeletonFileIterator, dataFileIterator), mergedSchema) - } - - // NOTE: Here we have to project the [[InternalRow]]s fetched into the expected target schema. - // These could diverge for ex, when requested schema contains partition columns which might not be - // persisted w/in the data file, but instead would be parsed from the partition path. In that case - // output of the file-reader will have different ordering of the fields than the original required - // schema (for more details please check out [[ParquetFileFormat]] implementation). - val unsafeProjection = generateUnsafeProjection(schema, requiredSchema.structTypeSchema) - - iterator.map(unsafeProjection) - + val (iterator, schema) = getSkeletonIteratorSchema(bootstrapPartition.split.dataFile, skeletonFile) + unsafeProjectIterator(iterator, schema) case _ => // NOTE: Regular file-reader is already projected into the required schema regularFileReader.read(bootstrapPartition.split.dataFile) } } + override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = { Review Comment: what is the main change being done in HoodieBootstrapRDD ? Is this refactoring to allow HoodieBootstrapMORRDD to reuse functionality easily ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org