boneanxs commented on code in PR #8669:
URL: https://github.com/apache/hudi/pull/8669#discussion_r1188092438


##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelation.scala:
##########
@@ -17,281 +17,82 @@
 
 package org.apache.hudi
 
-import org.apache.avro.Schema
-import org.apache.hadoop.fs.{GlobPattern, Path}
-import org.apache.hudi.HoodieBaseRelation.isSchemaEvolutionEnabledOnRead
-import org.apache.hudi.client.common.HoodieSparkEngineContext
-import org.apache.hudi.client.utils.SparkInternalSchemaConverter
-import org.apache.hudi.common.fs.FSUtils
-import org.apache.hudi.common.model.{HoodieCommitMetadata, HoodieFileFormat, 
HoodieRecord, HoodieReplaceCommitMetadata}
-import org.apache.hudi.common.table.timeline.{HoodieInstant, HoodieTimeline}
-import org.apache.hudi.common.table.{HoodieTableMetaClient, 
TableSchemaResolver}
-import org.apache.hudi.common.util.{HoodieTimer, InternalSchemaCache}
-import org.apache.hudi.config.HoodieWriteConfig
-import org.apache.hudi.exception.HoodieException
-import org.apache.hudi.internal.schema.InternalSchema
-import org.apache.hudi.internal.schema.utils.SerDeHelper
-import org.apache.hudi.table.HoodieSparkTable
-import org.apache.spark.api.java.JavaSparkContext
+import org.apache.hudi.common.table.timeline.HoodieTimeline
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView
+import org.apache.hudi.common.table.HoodieTableMetaClient
+import 
org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.getWritePartitionPaths
 import org.apache.spark.rdd.RDD
-import 
org.apache.spark.sql.execution.datasources.parquet.HoodieParquetFileFormat
-import org.apache.spark.sql.sources.{BaseRelation, TableScan}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.sources.Filter
 import org.apache.spark.sql.types.StructType
-import org.apache.spark.sql.{DataFrame, Row, SQLContext}
-import org.slf4j.LoggerFactory
+import org.apache.spark.sql.SQLContext
 
-import scala.collection.JavaConversions._
-import scala.collection.mutable
+import scala.collection.JavaConverters._
 
 /**
  * Relation, that implements the Hoodie incremental view.
  *
  * Implemented for Copy_on_write storage.
- * TODO: rebase w/ HoodieBaseRelation HUDI-5362
  *
  */
-class IncrementalRelation(val sqlContext: SQLContext,
-                          val optParams: Map[String, String],
-                          val userSchema: Option[StructType],
-                          val metaClient: HoodieTableMetaClient) extends 
BaseRelation with TableScan {
-
-  private val log = LoggerFactory.getLogger(classOf[IncrementalRelation])
-
-  val skeletonSchema: StructType = HoodieSparkUtils.getMetaSchema
-  private val basePath = metaClient.getBasePathV2
-  // TODO : Figure out a valid HoodieWriteConfig
-  private val hoodieTable = 
HoodieSparkTable.create(HoodieWriteConfig.newBuilder().withPath(basePath.toString).build(),
-    new HoodieSparkEngineContext(new 
JavaSparkContext(sqlContext.sparkContext)),
-    metaClient)
-  private val commitTimeline = 
hoodieTable.getMetaClient.getCommitTimeline.filterCompletedInstants()
-
-  private val useStateTransitionTime = 
optParams.get(DataSourceReadOptions.READ_BY_STATE_TRANSITION_TIME.key)
-    .map(_.toBoolean)
-    
.getOrElse(DataSourceReadOptions.READ_BY_STATE_TRANSITION_TIME.defaultValue)
-
-  if (commitTimeline.empty()) {
-    throw new HoodieException("No instants to incrementally pull")
-  }
-  if (!optParams.contains(DataSourceReadOptions.BEGIN_INSTANTTIME.key)) {
-    throw new HoodieException(s"Specify the begin instant time to pull from 
using " +
-      s"option ${DataSourceReadOptions.BEGIN_INSTANTTIME.key}")
+case class IncrementalRelation(override val sqlContext: SQLContext,
+                               override val optParams: Map[String, String],
+                               private val userSchema: Option[StructType],
+                               override val metaClient: HoodieTableMetaClient,
+                               private val prunedDataSchema: 
Option[StructType] = None)
+  extends AbstractBaseFileOnlyRelation(sqlContext, metaClient, optParams, 
userSchema, Seq(), prunedDataSchema)
+    with HoodieIncrementalRelationTrait {
+
+  override type Relation = IncrementalRelation
+
+  override def imbueConfigs(sqlContext: SQLContext): Unit = {
+    super.imbueConfigs(sqlContext)
+    // TODO(HUDI-3639) vectorized reader has to be disabled to make sure 
IncrementalRelation is working properly
+    
sqlContext.sparkSession.sessionState.conf.setConfString("spark.sql.parquet.enableVectorizedReader",
 "false")
   }
-
-  if (!metaClient.getTableConfig.populateMetaFields()) {
-    throw new HoodieException("Incremental queries are not supported when meta 
fields are disabled")
-  }
-
-  val useEndInstantSchema = 
optParams.getOrElse(DataSourceReadOptions.INCREMENTAL_READ_SCHEMA_USE_END_INSTANTTIME.key,
-    
DataSourceReadOptions.INCREMENTAL_READ_SCHEMA_USE_END_INSTANTTIME.defaultValue).toBoolean
-
-  private val lastInstant = commitTimeline.lastInstant().get()
-
-  private val commitsTimelineToReturn = {
-    if (useStateTransitionTime) {
-      commitTimeline.findInstantsInRangeByStateTransitionTs(
-        optParams(DataSourceReadOptions.BEGIN_INSTANTTIME.key),
-        optParams.getOrElse(DataSourceReadOptions.END_INSTANTTIME.key(), 
lastInstant.getStateTransitionTime))
+  override def updatePrunedDataSchema(prunedSchema: StructType): Relation =
+    this.copy(prunedDataSchema = Some(prunedSchema))
+
+  protected override def timeline: HoodieTimeline = {
+    if (fullTableScan) {
+      metaClient.getCommitsAndCompactionTimeline
+    } else if (useStateTransitionTime) {
+      
metaClient.getCommitsAndCompactionTimeline.findInstantsInRangeByStateTransitionTs(startTimestamp,
 endTimestamp)
     } else {
-      commitTimeline.findInstantsInRange(
-        optParams(DataSourceReadOptions.BEGIN_INSTANTTIME.key),
-        optParams.getOrElse(DataSourceReadOptions.END_INSTANTTIME.key(), 
lastInstant.getTimestamp))
+      
metaClient.getCommitsAndCompactionTimeline.findInstantsInRange(startTimestamp, 
endTimestamp)
     }
   }
-  private val commitsToReturn = 
commitsTimelineToReturn.getInstantsAsStream.iterator().toList
-
-  // use schema from a file produced in the end/latest instant
 
-  val (usedSchema, internalSchema) = {
-    log.info("Inferring schema..")
-    val schemaResolver = new TableSchemaResolver(metaClient)
-    val iSchema : InternalSchema = if 
(!isSchemaEvolutionEnabledOnRead(optParams, sqlContext.sparkSession)) {
-      InternalSchema.getEmptyInternalSchema
-    } else if (useEndInstantSchema && !commitsToReturn.isEmpty) {
-      
InternalSchemaCache.searchSchemaAndCache(commitsToReturn.last.getTimestamp.toLong,
 metaClient, hoodieTable.getConfig.getInternalSchemaCacheEnable)
-    } else {
-      schemaResolver.getTableInternalSchemaFromCommitMetadata.orElse(null)
-    }
-
-    val tableSchema = if (useEndInstantSchema && iSchema.isEmptySchema) {
-      if (commitsToReturn.isEmpty) schemaResolver.getTableAvroSchema(false) 
else
-        schemaResolver.getTableAvroSchema(commitsToReturn.last, false)
-    } else {
-      schemaResolver.getTableAvroSchema(false)
-    }
-    if (tableSchema.getType == Schema.Type.NULL) {
-      // if there is only one commit in the table and is an empty commit 
without schema, return empty RDD here
-      (StructType(Nil), InternalSchema.getEmptyInternalSchema)
-    } else {
-      val dataSchema = 
AvroConversionUtils.convertAvroSchemaToStructType(tableSchema)
-      if (iSchema != null && !iSchema.isEmptySchema) {
-        // if internalSchema is ready, dataSchema will contains skeletonSchema
-        (dataSchema, iSchema)
-      } else {
-        (StructType(skeletonSchema.fields ++ dataSchema.fields), 
InternalSchema.getEmptyInternalSchema)
-      }
-    }
+  protected override def composeRDD(fileSplits: Seq[HoodieBaseFileSplit],
+                                    tableSchema: HoodieTableSchema,
+                                    requiredSchema: HoodieTableSchema,
+                                    requestedColumns: Array[String],
+                                    filters: Array[Filter]): RDD[InternalRow] 
= {
+    super.composeRDD(fileSplits, tableSchema, requiredSchema, requestedColumns,
+      filters ++ incrementalSpanRecordFilters)
   }
 
-  private val filters = 
optParams.getOrElse(DataSourceReadOptions.PUSH_DOWN_INCR_FILTERS.key,

Review Comment:
   Hey @bvaradar, it appears this is introduced in 
https://github.com/apache/hudi/pull/485, but I don't find any implementation in 
`MergeOnReadIncrementalRelation`, did we overlook it or this configure is a 
legacy one and no longer required?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to