bvaradar commented on code in PR #8847:
URL: https://github.com/apache/hudi/pull/8847#discussion_r1222417026


##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/Iterators.scala:
##########
@@ -55,10 +55,11 @@ import scala.collection.mutable
 import scala.util.Try
 
 /**
- * Provided w/ instance of [[HoodieMergeOnReadFileSplit]], iterates over all 
of the records stored in
+ * Provided w/ list of log files, iterates over all of the records stored in
  * Delta Log files (represented as [[InternalRow]]s)
  */
-class LogFileIterator(split: HoodieMergeOnReadFileSplit,
+class LogFileIterator(logFiles: List[HoodieLogFile],

Review Comment:
   Can we retain HoodieMergeOnReadFileSplit in all the iterators instead of 
passing  logFiles directly as you can derive this list from the split. 



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/Iterators.scala:
##########
@@ -338,12 +339,12 @@ object LogFileIterator {
     }
   }
 
-  def getPartitionPath(split: HoodieMergeOnReadFileSplit): Path = {
+  def getPartitionPath(dataFile: Option[PartitionedFile], logFiles: 
List[HoodieLogFile]): Path = {

Review Comment:
   same case here.



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/HoodieCDCRDD.scala:
##########
@@ -429,8 +428,10 @@ class HoodieCDCRDD(
               && currentCDCFileSplit.getBeforeFileSlice.isPresent)
             
loadBeforeFileSliceIfNeeded(currentCDCFileSplit.getBeforeFileSlice.get)
             val absLogPath = new Path(basePath, 
currentCDCFileSplit.getCdcFiles.get(0))
-            val morSplit = HoodieMergeOnReadFileSplit(None, List(new 
HoodieLogFile(fs.getFileStatus(absLogPath))))
-            val logFileIterator = new LogFileIterator(morSplit, 
originTableSchema, originTableSchema, tableState, conf)
+            val logFiles = List(new 
HoodieLogFile(fs.getFileStatus(absLogPath)))

Review Comment:
   Is this change due to the cascading effect of changing the Iterator 
interface ? 



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala:
##########
@@ -81,23 +81,29 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext,
 
   override def compute(split: Partition, context: TaskContext): 
Iterator[InternalRow] = {
     val partition = split.asInstanceOf[HoodieMergeOnReadPartition]
+    lazy val partitionPath = 
LogFileIterator.getPartitionPath(partition.split.dataFile, 
partition.split.logFiles)
     val iter = partition.split match {
       case dataFileOnlySplit if dataFileOnlySplit.logFiles.isEmpty =>
         val projectedReader = 
projectReader(fileReaders.requiredSchemaReaderSkipMerging, 
requiredSchema.structTypeSchema)
         projectedReader(dataFileOnlySplit.dataFile.get)
 
       case logFileOnlySplit if logFileOnlySplit.dataFile.isEmpty =>
-        new LogFileIterator(logFileOnlySplit, tableSchema, requiredSchema, 
tableState, getHadoopConf)
+        new LogFileIterator(logFileOnlySplit.logFiles, partitionPath, 
tableSchema, requiredSchema,
+          tableState, getHadoopConf)
 
       case split =>
         mergeType match {
           case DataSourceReadOptions.REALTIME_SKIP_MERGE_OPT_VAL =>
             val reader = fileReaders.requiredSchemaReaderSkipMerging
-            new SkipMergeIterator(split, reader, tableSchema, requiredSchema, 
tableState, getHadoopConf)
+            val iterator = reader(split.dataFile.get)
+            new SkipMergeIterator(split.logFiles, partitionPath, iterator, 
reader.schema, tableSchema,

Review Comment:
   Since Split abstracts the set of files to process, Its better to pass the 
split directly to different iterators 



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBootstrapRelation.scala:
##########
@@ -79,28 +118,36 @@ case class HoodieBootstrapRelation(override val 
sqlContext: SQLContext,
         val dataFile = PartitionedFile(partitionValues, 
getFilePath(baseFile.getBootstrapBaseFile.get.getFileStatus.getPath),
           0, baseFile.getBootstrapBaseFile.get().getFileLen)
         val skeletonFile = Option(PartitionedFile(InternalRow.empty, 
baseFile.getPath, 0, baseFile.getFileLen))
-
-        HoodieBootstrapSplit(dataFile, skeletonFile)
+        createFileSplit(fileSlice, dataFile, skeletonFile)
       } else {
         val dataFile = 
PartitionedFile(getPartitionColumnsAsInternalRow(baseFile.getFileStatus), 
baseFile.getPath, 0, baseFile.getFileLen)
-        HoodieBootstrapSplit(dataFile)
+        createFileSplit(fileSlice, dataFile, Option.empty)
       }
     }
   }
 
-  protected override def composeRDD(fileSplits: Seq[FileSplit],
-                                    tableSchema: HoodieTableSchema,
-                                    requiredSchema: HoodieTableSchema,
-                                    requestedColumns: Array[String],
-                                    filters: Array[Filter]): RDD[InternalRow] 
= {
+  protected def getFileReaders(tableSchema: HoodieTableSchema,

Review Comment:
   Lets add comments describing the purpose of different file readers. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to