[GitHub] [hudi] bvaradar commented on a diff in pull request #8847: [HUDI-2071] Support Reading Bootstrap MOR RT Table In Spark DataSource Table
bvaradar commented on code in PR #8847: URL: https://github.com/apache/hudi/pull/8847#discussion_r1222417026 ## hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/Iterators.scala: ## @@ -55,10 +55,11 @@ import scala.collection.mutable import scala.util.Try /** - * Provided w/ instance of [[HoodieMergeOnReadFileSplit]], iterates over all of the records stored in + * Provided w/ list of log files, iterates over all of the records stored in * Delta Log files (represented as [[InternalRow]]s) */ -class LogFileIterator(split: HoodieMergeOnReadFileSplit, +class LogFileIterator(logFiles: List[HoodieLogFile], Review Comment: Can we retain HoodieMergeOnReadFileSplit in all the iterators instead of passing logFiles directly as you can derive this list from the split. ## hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/Iterators.scala: ## @@ -338,12 +339,12 @@ object LogFileIterator { } } - def getPartitionPath(split: HoodieMergeOnReadFileSplit): Path = { + def getPartitionPath(dataFile: Option[PartitionedFile], logFiles: List[HoodieLogFile]): Path = { Review Comment: same case here. ## hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/HoodieCDCRDD.scala: ## @@ -429,8 +428,10 @@ class HoodieCDCRDD( && currentCDCFileSplit.getBeforeFileSlice.isPresent) loadBeforeFileSliceIfNeeded(currentCDCFileSplit.getBeforeFileSlice.get) val absLogPath = new Path(basePath, currentCDCFileSplit.getCdcFiles.get(0)) -val morSplit = HoodieMergeOnReadFileSplit(None, List(new HoodieLogFile(fs.getFileStatus(absLogPath -val logFileIterator = new LogFileIterator(morSplit, originTableSchema, originTableSchema, tableState, conf) +val logFiles = List(new HoodieLogFile(fs.getFileStatus(absLogPath))) Review Comment: Is this change due to the cascading effect of changing the Iterator interface ? ## hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala: ## @@ -81,23 +81,29 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext, override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = { val partition = split.asInstanceOf[HoodieMergeOnReadPartition] +lazy val partitionPath = LogFileIterator.getPartitionPath(partition.split.dataFile, partition.split.logFiles) val iter = partition.split match { case dataFileOnlySplit if dataFileOnlySplit.logFiles.isEmpty => val projectedReader = projectReader(fileReaders.requiredSchemaReaderSkipMerging, requiredSchema.structTypeSchema) projectedReader(dataFileOnlySplit.dataFile.get) case logFileOnlySplit if logFileOnlySplit.dataFile.isEmpty => -new LogFileIterator(logFileOnlySplit, tableSchema, requiredSchema, tableState, getHadoopConf) +new LogFileIterator(logFileOnlySplit.logFiles, partitionPath, tableSchema, requiredSchema, + tableState, getHadoopConf) case split => mergeType match { case DataSourceReadOptions.REALTIME_SKIP_MERGE_OPT_VAL => val reader = fileReaders.requiredSchemaReaderSkipMerging -new SkipMergeIterator(split, reader, tableSchema, requiredSchema, tableState, getHadoopConf) +val iterator = reader(split.dataFile.get) +new SkipMergeIterator(split.logFiles, partitionPath, iterator, reader.schema, tableSchema, Review Comment: Since Split abstracts the set of files to process, Its better to pass the split directly to different iterators ## hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBootstrapRelation.scala: ## @@ -79,28 +118,36 @@ case class HoodieBootstrapRelation(override val sqlContext: SQLContext, val dataFile = PartitionedFile(partitionValues, getFilePath(baseFile.getBootstrapBaseFile.get.getFileStatus.getPath), 0, baseFile.getBootstrapBaseFile.get().getFileLen) val skeletonFile = Option(PartitionedFile(InternalRow.empty, baseFile.getPath, 0, baseFile.getFileLen)) - -HoodieBootstrapSplit(dataFile, skeletonFile) +createFileSplit(fileSlice, dataFile, skeletonFile) } else { val dataFile = PartitionedFile(getPartitionColumnsAsInternalRow(baseFile.getFileStatus), baseFile.getPath, 0, baseFile.getFileLen) -HoodieBootstrapSplit(dataFile) +createFileSplit(fileSlice, dataFile, Option.empty) } } } - protected override def composeRDD(fileSplits: Seq[FileSplit], -tableSchema: HoodieTableSchema, -requiredSchema: HoodieTableSchema, -requestedColumns: Array[String], -filters: Array[Filter]): RDD[InternalRow] = { +
[GitHub] [hudi] bvaradar commented on a diff in pull request #8847: [HUDI-2071] Support Reading Bootstrap MOR RT Table In Spark DataSource Table
bvaradar commented on code in PR #8847: URL: https://github.com/apache/hudi/pull/8847#discussion_r1217216256 ## hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBootstrapMORRDD.scala: ## @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.mapred.JobConf +import org.apache.hudi.HoodieBaseRelation.BaseFileReader +import org.apache.hudi.HoodieBootstrapMORRDD.CONFIG_INSTANTIATION_LOCK +import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils.getMaxCompactionMemoryInBytes +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.{Partition, SerializableWritable, TaskContext} + +class HoodieBootstrapMORRDD(@transient spark: SparkSession, +@transient config: Configuration, +bootstrapDataFileReader: BaseFileReader, +bootstrapSkeletonFileReader: BaseFileReader, +regularFileReader: BaseFileReader, +tableSchema: HoodieTableSchema, +requiredSchema: HoodieTableSchema, +tableState: HoodieTableState, +@transient splits: Seq[HoodieBootstrapSplit]) + extends HoodieBootstrapRDD(spark, bootstrapDataFileReader, bootstrapSkeletonFileReader, +regularFileReader, requiredSchema, splits) { + + protected val maxCompactionMemoryInBytes: Long = getMaxCompactionMemoryInBytes(new JobConf(config)) + + private val hadoopConfBroadcast = spark.sparkContext.broadcast(new SerializableWritable(config)) + + override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = { +val bootstrapPartition = split.asInstanceOf[HoodieBootstrapPartition] +maybeLog(bootstrapPartition) + +if (bootstrapPartition.split.logFiles.isEmpty) { + //no log files, treat like regular bootstrap + getIterator(bootstrapPartition) +} else { + bootstrapPartition.split.skeletonFile match { +case Some(skeletonFile) => + val (iterator, schema) = getSkeletonIteratorSchema(bootstrapPartition.split.dataFile, skeletonFile) + new RecordMergingFileIterator(HoodieMergeOnReadFileSplit(Some(bootstrapPartition.split.dataFile), bootstrapPartition.split.logFiles), +iterator, schema, tableSchema, requiredSchema, tableState, getHadoopConf) +case _ => + // NOTE: Regular file-reader is already projected into the required schema + new RecordMergingFileIterator(HoodieMergeOnReadFileSplit(Some(bootstrapPartition.split.dataFile), bootstrapPartition.split.logFiles), Review Comment: @jonvex : It looks like RecordMergingFileIterator expects the base file to be a hoodie file. the bootstrap data file won't have any Hudi record key fields setup. How would this work ? ## hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBootstrapRelation.scala: ## @@ -60,47 +88,62 @@ case class HoodieBootstrapRelation(override val sqlContext: SQLContext, extends HoodieBaseRelation(sqlContext, metaClient, optParams, userSchema, prunedDataSchema) { override type FileSplit = HoodieBootstrapSplit - override type Relation = HoodieBootstrapRelation + private lazy val skeletonSchema = HoodieSparkUtils.getMetaSchema private lazy val bootstrapBasePath = new Path(metaClient.getTableConfig.getBootstrapBasePath.get) - override val mandatoryFields: Seq[String] = Seq.empty + override lazy val mandatoryFields: Seq[String] = Seq.empty + + protected def getFileSlices(partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): Seq[FileSlice] = { +listLatestFileSlices(globPaths, partitionFilters, dataFilters) + } protected override def collectFileSplits(partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): Seq[FileSplit] = { -val fileSlices = listLatestFileSlices(globPaths, partitionFilters, dataFilters) +val fileSlices = getFileSlices(partitionFilters, dataFilters) val