codope commented on code in PR #9761:
URL: https://github.com/apache/hudi/pull/9761#discussion_r1367633031


##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala:
##########
@@ -516,6 +515,99 @@ abstract class HoodieBaseRelation(val sqlContext: 
SQLContext,
    */
   def updatePrunedDataSchema(prunedSchema: StructType): Relation
 
+  protected def createBaseFileReaders(tableSchema: HoodieTableSchema,
+                                      requiredSchema: HoodieTableSchema,
+                                      requestedColumns: Array[String],
+                                      requiredFilters: Seq[Filter],
+                                      optionalFilters: Seq[Filter] = Seq.empty,
+                                      baseFileFormat: HoodieFileFormat = 
tableConfig.getBaseFileFormat): HoodieMergeOnReadBaseFileReaders = {
+    val (partitionSchema, dataSchema, requiredDataSchema) =
+      tryPrunePartitionColumns(tableSchema, requiredSchema)
+
+    val fullSchemaReader = createBaseFileReader(
+      spark = sqlContext.sparkSession,
+      partitionSchema = partitionSchema,
+      dataSchema = dataSchema,
+      requiredDataSchema = dataSchema,
+      // This file-reader is used to read base file records, subsequently 
merging them with the records
+      // stored in delta-log files. As such, we have to read _all_ records 
from the base file, while avoiding
+      // applying any filtering _before_ we complete combining them w/ 
delta-log records (to make sure that
+      // we combine them correctly);
+      // As such only required filters could be pushed-down to such reader
+      filters = requiredFilters,
+      options = optParams,
+      // NOTE: We have to fork the Hadoop Config here as Spark will be 
modifying it
+      //       to configure Parquet reader appropriately
+      hadoopConf = embedInternalSchema(new Configuration(conf), 
internalSchemaOpt),
+      baseFileFormat = baseFileFormat
+    )
+
+    val requiredSchemaReader = createBaseFileReader(
+      spark = sqlContext.sparkSession,
+      partitionSchema = partitionSchema,
+      dataSchema = dataSchema,
+      requiredDataSchema = requiredDataSchema,
+      // This file-reader is used to read base file records, subsequently 
merging them with the records
+      // stored in delta-log files. As such, we have to read _all_ records 
from the base file, while avoiding
+      // applying any filtering _before_ we complete combining them w/ 
delta-log records (to make sure that
+      // we combine them correctly);
+      // As such only required filters could be pushed-down to such reader
+      filters = requiredFilters,
+      options = optParams,
+      // NOTE: We have to fork the Hadoop Config here as Spark will be 
modifying it
+      //       to configure Parquet reader appropriately
+      hadoopConf = embedInternalSchema(new Configuration(conf), 
requiredDataSchema.internalSchema),
+      baseFileFormat = baseFileFormat
+    )
+
+    // Check whether fields required for merging were also requested to be 
fetched
+    // by the query:
+    //    - In case they were, there's no optimization we could apply here (we 
will have
+    //    to fetch such fields)
+    //    - In case they were not, we will provide 2 separate file-readers
+    //        a) One which would be applied to file-groups w/ delta-logs 
(merging)
+    //        b) One which would be applied to file-groups w/ no delta-logs or
+    //           in case query-mode is skipping merging
+    val mandatoryColumns = 
mandatoryFields.map(HoodieAvroUtils.getRootLevelFieldName)
+    if (mandatoryColumns.forall(requestedColumns.contains)) {
+      HoodieMergeOnReadBaseFileReaders(
+        fullSchemaReader = fullSchemaReader,
+        requiredSchemaReader = requiredSchemaReader,
+        requiredSchemaReaderSkipMerging = requiredSchemaReader
+      )
+    } else {
+      val prunedRequiredSchema = {
+        val unusedMandatoryColumnNames = 
mandatoryColumns.filterNot(requestedColumns.contains)
+        val prunedStructSchema =
+          StructType(requiredDataSchema.structTypeSchema.fields
+            .filterNot(f => unusedMandatoryColumnNames.contains(f.name)))
+
+        HoodieTableSchema(prunedStructSchema, 
convertToAvroSchema(prunedStructSchema, tableName).toString)
+      }
+
+      val requiredSchemaReaderSkipMerging = createBaseFileReader(
+        spark = sqlContext.sparkSession,
+        partitionSchema = partitionSchema,
+        dataSchema = dataSchema,
+        requiredDataSchema = prunedRequiredSchema,
+        // This file-reader is only used in cases when no merging is 
performed, therefore it's safe to push
+        // down these filters to the base file readers
+        filters = requiredFilters ++ optionalFilters,
+        options = optParams,
+        // NOTE: We have to fork the Hadoop Config here as Spark will be 
modifying it
+        //       to configure Parquet reader appropriately
+        hadoopConf = embedInternalSchema(new Configuration(conf), 
requiredDataSchema.internalSchema),
+        baseFileFormat = baseFileFormat
+      )
+
+      HoodieMergeOnReadBaseFileReaders(
+        fullSchemaReader = fullSchemaReader,
+        requiredSchemaReader = requiredSchemaReader,
+        requiredSchemaReaderSkipMerging = requiredSchemaReaderSkipMerging
+      )

Review Comment:
   > The reader should always have one specific reading view right?
   
   That's correct - as mentioned 
[here](https://github.com/apache/hudi/blob/c9c7c1019f5ca903ad3782f79370663f5ae26cb9/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala#L39-L57).
 But, that view is not known beforehand i.e. until the RDDs are computed from 
splits. Instatiating reader everytime while computing RDDs will be costly. 
Instead, what we do is create the reader for all use cases here and then decide 
which one to use while computing RDDs.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to