[GitHub] [hudi] bvaradar commented on a diff in pull request #8847: [HUDI-2071] Support Reading Bootstrap MOR RT Table In Spark DataSource Table

2023-06-07 Thread via GitHub


bvaradar commented on code in PR #8847:
URL: https://github.com/apache/hudi/pull/8847#discussion_r1222417026


##
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/Iterators.scala:
##
@@ -55,10 +55,11 @@ import scala.collection.mutable
 import scala.util.Try
 
 /**
- * Provided w/ instance of [[HoodieMergeOnReadFileSplit]], iterates over all 
of the records stored in
+ * Provided w/ list of log files, iterates over all of the records stored in
  * Delta Log files (represented as [[InternalRow]]s)
  */
-class LogFileIterator(split: HoodieMergeOnReadFileSplit,
+class LogFileIterator(logFiles: List[HoodieLogFile],

Review Comment:
   Can we retain HoodieMergeOnReadFileSplit in all the iterators instead of 
passing  logFiles directly as you can derive this list from the split. 



##
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/Iterators.scala:
##
@@ -338,12 +339,12 @@ object LogFileIterator {
 }
   }
 
-  def getPartitionPath(split: HoodieMergeOnReadFileSplit): Path = {
+  def getPartitionPath(dataFile: Option[PartitionedFile], logFiles: 
List[HoodieLogFile]): Path = {

Review Comment:
   same case here.



##
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/HoodieCDCRDD.scala:
##
@@ -429,8 +428,10 @@ class HoodieCDCRDD(
   && currentCDCFileSplit.getBeforeFileSlice.isPresent)
 
loadBeforeFileSliceIfNeeded(currentCDCFileSplit.getBeforeFileSlice.get)
 val absLogPath = new Path(basePath, 
currentCDCFileSplit.getCdcFiles.get(0))
-val morSplit = HoodieMergeOnReadFileSplit(None, List(new 
HoodieLogFile(fs.getFileStatus(absLogPath
-val logFileIterator = new LogFileIterator(morSplit, 
originTableSchema, originTableSchema, tableState, conf)
+val logFiles = List(new 
HoodieLogFile(fs.getFileStatus(absLogPath)))

Review Comment:
   Is this change due to the cascading effect of changing the Iterator 
interface ? 



##
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala:
##
@@ -81,23 +81,29 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext,
 
   override def compute(split: Partition, context: TaskContext): 
Iterator[InternalRow] = {
 val partition = split.asInstanceOf[HoodieMergeOnReadPartition]
+lazy val partitionPath = 
LogFileIterator.getPartitionPath(partition.split.dataFile, 
partition.split.logFiles)
 val iter = partition.split match {
   case dataFileOnlySplit if dataFileOnlySplit.logFiles.isEmpty =>
 val projectedReader = 
projectReader(fileReaders.requiredSchemaReaderSkipMerging, 
requiredSchema.structTypeSchema)
 projectedReader(dataFileOnlySplit.dataFile.get)
 
   case logFileOnlySplit if logFileOnlySplit.dataFile.isEmpty =>
-new LogFileIterator(logFileOnlySplit, tableSchema, requiredSchema, 
tableState, getHadoopConf)
+new LogFileIterator(logFileOnlySplit.logFiles, partitionPath, 
tableSchema, requiredSchema,
+  tableState, getHadoopConf)
 
   case split =>
 mergeType match {
   case DataSourceReadOptions.REALTIME_SKIP_MERGE_OPT_VAL =>
 val reader = fileReaders.requiredSchemaReaderSkipMerging
-new SkipMergeIterator(split, reader, tableSchema, requiredSchema, 
tableState, getHadoopConf)
+val iterator = reader(split.dataFile.get)
+new SkipMergeIterator(split.logFiles, partitionPath, iterator, 
reader.schema, tableSchema,

Review Comment:
   Since Split abstracts the set of files to process, Its better to pass the 
split directly to different iterators 



##
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBootstrapRelation.scala:
##
@@ -79,28 +118,36 @@ case class HoodieBootstrapRelation(override val 
sqlContext: SQLContext,
 val dataFile = PartitionedFile(partitionValues, 
getFilePath(baseFile.getBootstrapBaseFile.get.getFileStatus.getPath),
   0, baseFile.getBootstrapBaseFile.get().getFileLen)
 val skeletonFile = Option(PartitionedFile(InternalRow.empty, 
baseFile.getPath, 0, baseFile.getFileLen))
-
-HoodieBootstrapSplit(dataFile, skeletonFile)
+createFileSplit(fileSlice, dataFile, skeletonFile)
   } else {
 val dataFile = 
PartitionedFile(getPartitionColumnsAsInternalRow(baseFile.getFileStatus), 
baseFile.getPath, 0, baseFile.getFileLen)
-HoodieBootstrapSplit(dataFile)
+createFileSplit(fileSlice, dataFile, Option.empty)
   }
 }
   }
 
-  protected override def composeRDD(fileSplits: Seq[FileSplit],
-tableSchema: HoodieTableSchema,
-requiredSchema: HoodieTableSchema,
-requestedColumns: Array[String],
-filters: Array[Filter]): RDD[InternalRow] 
= {
+  

[GitHub] [hudi] bvaradar commented on a diff in pull request #8847: [HUDI-2071] Support Reading Bootstrap MOR RT Table In Spark DataSource Table

2023-06-04 Thread via GitHub


bvaradar commented on code in PR #8847:
URL: https://github.com/apache/hudi/pull/8847#discussion_r1217216256


##
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBootstrapMORRDD.scala:
##
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.mapred.JobConf
+import org.apache.hudi.HoodieBaseRelation.BaseFileReader
+import org.apache.hudi.HoodieBootstrapMORRDD.CONFIG_INSTANTIATION_LOCK
+import 
org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils.getMaxCompactionMemoryInBytes
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.{Partition, SerializableWritable, TaskContext}
+
+class HoodieBootstrapMORRDD(@transient spark: SparkSession,
+@transient config: Configuration,
+bootstrapDataFileReader: BaseFileReader,
+bootstrapSkeletonFileReader: BaseFileReader,
+regularFileReader: BaseFileReader,
+tableSchema: HoodieTableSchema,
+requiredSchema: HoodieTableSchema,
+tableState: HoodieTableState,
+@transient splits: Seq[HoodieBootstrapSplit])
+  extends HoodieBootstrapRDD(spark, bootstrapDataFileReader, 
bootstrapSkeletonFileReader,
+regularFileReader, requiredSchema, splits) {
+
+  protected val maxCompactionMemoryInBytes: Long = 
getMaxCompactionMemoryInBytes(new JobConf(config))
+
+  private val hadoopConfBroadcast = spark.sparkContext.broadcast(new 
SerializableWritable(config))
+
+  override def compute(split: Partition, context: TaskContext): 
Iterator[InternalRow] = {
+val bootstrapPartition = split.asInstanceOf[HoodieBootstrapPartition]
+maybeLog(bootstrapPartition)
+
+if (bootstrapPartition.split.logFiles.isEmpty) {
+  //no log files, treat like regular bootstrap
+  getIterator(bootstrapPartition)
+} else {
+  bootstrapPartition.split.skeletonFile match {
+case Some(skeletonFile) =>
+  val (iterator, schema) = 
getSkeletonIteratorSchema(bootstrapPartition.split.dataFile, skeletonFile)
+  new 
RecordMergingFileIterator(HoodieMergeOnReadFileSplit(Some(bootstrapPartition.split.dataFile),
 bootstrapPartition.split.logFiles),
+iterator, schema, tableSchema, requiredSchema, tableState, 
getHadoopConf)
+case _ =>
+  // NOTE: Regular file-reader is already projected into the required 
schema
+  new 
RecordMergingFileIterator(HoodieMergeOnReadFileSplit(Some(bootstrapPartition.split.dataFile),
 bootstrapPartition.split.logFiles),

Review Comment:
   @jonvex : It looks like RecordMergingFileIterator expects the base file to 
be a hoodie file. the bootstrap data file won't have any Hudi record key fields 
setup. How would this work ? 



##
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBootstrapRelation.scala:
##
@@ -60,47 +88,62 @@ case class HoodieBootstrapRelation(override val sqlContext: 
SQLContext,
   extends HoodieBaseRelation(sqlContext, metaClient, optParams, userSchema, 
prunedDataSchema) {
 
   override type FileSplit = HoodieBootstrapSplit
-  override type Relation = HoodieBootstrapRelation
+
 
   private lazy val skeletonSchema = HoodieSparkUtils.getMetaSchema
 
   private lazy val bootstrapBasePath = new 
Path(metaClient.getTableConfig.getBootstrapBasePath.get)
 
-  override val mandatoryFields: Seq[String] = Seq.empty
+  override lazy val mandatoryFields: Seq[String] = Seq.empty
+
+  protected def getFileSlices(partitionFilters: Seq[Expression], dataFilters: 
Seq[Expression]): Seq[FileSlice] = {
+listLatestFileSlices(globPaths, partitionFilters, dataFilters)
+  }
 
   protected override def collectFileSplits(partitionFilters: Seq[Expression], 
dataFilters: Seq[Expression]): Seq[FileSplit] = {
-val fileSlices = listLatestFileSlices(globPaths, partitionFilters, 
dataFilters)
+val fileSlices = getFileSlices(partitionFilters, dataFilters)
 val