boneanxs commented on code in PR #8669: URL: https://github.com/apache/hudi/pull/8669#discussion_r1188092438
########## hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelation.scala: ########## @@ -17,281 +17,82 @@ package org.apache.hudi -import org.apache.avro.Schema -import org.apache.hadoop.fs.{GlobPattern, Path} -import org.apache.hudi.HoodieBaseRelation.isSchemaEvolutionEnabledOnRead -import org.apache.hudi.client.common.HoodieSparkEngineContext -import org.apache.hudi.client.utils.SparkInternalSchemaConverter -import org.apache.hudi.common.fs.FSUtils -import org.apache.hudi.common.model.{HoodieCommitMetadata, HoodieFileFormat, HoodieRecord, HoodieReplaceCommitMetadata} -import org.apache.hudi.common.table.timeline.{HoodieInstant, HoodieTimeline} -import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver} -import org.apache.hudi.common.util.{HoodieTimer, InternalSchemaCache} -import org.apache.hudi.config.HoodieWriteConfig -import org.apache.hudi.exception.HoodieException -import org.apache.hudi.internal.schema.InternalSchema -import org.apache.hudi.internal.schema.utils.SerDeHelper -import org.apache.hudi.table.HoodieSparkTable -import org.apache.spark.api.java.JavaSparkContext +import org.apache.hudi.common.table.timeline.HoodieTimeline +import org.apache.hudi.common.table.view.HoodieTableFileSystemView +import org.apache.hudi.common.table.HoodieTableMetaClient +import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.getWritePartitionPaths import org.apache.spark.rdd.RDD -import org.apache.spark.sql.execution.datasources.parquet.HoodieParquetFileFormat -import org.apache.spark.sql.sources.{BaseRelation, TableScan} +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.Expression +import org.apache.spark.sql.sources.Filter import org.apache.spark.sql.types.StructType -import org.apache.spark.sql.{DataFrame, Row, SQLContext} -import org.slf4j.LoggerFactory +import org.apache.spark.sql.SQLContext -import scala.collection.JavaConversions._ -import scala.collection.mutable +import scala.collection.JavaConverters._ /** * Relation, that implements the Hoodie incremental view. * * Implemented for Copy_on_write storage. - * TODO: rebase w/ HoodieBaseRelation HUDI-5362 * */ -class IncrementalRelation(val sqlContext: SQLContext, - val optParams: Map[String, String], - val userSchema: Option[StructType], - val metaClient: HoodieTableMetaClient) extends BaseRelation with TableScan { - - private val log = LoggerFactory.getLogger(classOf[IncrementalRelation]) - - val skeletonSchema: StructType = HoodieSparkUtils.getMetaSchema - private val basePath = metaClient.getBasePathV2 - // TODO : Figure out a valid HoodieWriteConfig - private val hoodieTable = HoodieSparkTable.create(HoodieWriteConfig.newBuilder().withPath(basePath.toString).build(), - new HoodieSparkEngineContext(new JavaSparkContext(sqlContext.sparkContext)), - metaClient) - private val commitTimeline = hoodieTable.getMetaClient.getCommitTimeline.filterCompletedInstants() - - private val useStateTransitionTime = optParams.get(DataSourceReadOptions.READ_BY_STATE_TRANSITION_TIME.key) - .map(_.toBoolean) - .getOrElse(DataSourceReadOptions.READ_BY_STATE_TRANSITION_TIME.defaultValue) - - if (commitTimeline.empty()) { - throw new HoodieException("No instants to incrementally pull") - } - if (!optParams.contains(DataSourceReadOptions.BEGIN_INSTANTTIME.key)) { - throw new HoodieException(s"Specify the begin instant time to pull from using " + - s"option ${DataSourceReadOptions.BEGIN_INSTANTTIME.key}") +case class IncrementalRelation(override val sqlContext: SQLContext, + override val optParams: Map[String, String], + private val userSchema: Option[StructType], + override val metaClient: HoodieTableMetaClient, + private val prunedDataSchema: Option[StructType] = None) + extends AbstractBaseFileOnlyRelation(sqlContext, metaClient, optParams, userSchema, Seq(), prunedDataSchema) + with HoodieIncrementalRelationTrait { + + override type Relation = IncrementalRelation + + override def imbueConfigs(sqlContext: SQLContext): Unit = { + super.imbueConfigs(sqlContext) + // TODO(HUDI-3639) vectorized reader has to be disabled to make sure IncrementalRelation is working properly + sqlContext.sparkSession.sessionState.conf.setConfString("spark.sql.parquet.enableVectorizedReader", "false") } - - if (!metaClient.getTableConfig.populateMetaFields()) { - throw new HoodieException("Incremental queries are not supported when meta fields are disabled") - } - - val useEndInstantSchema = optParams.getOrElse(DataSourceReadOptions.INCREMENTAL_READ_SCHEMA_USE_END_INSTANTTIME.key, - DataSourceReadOptions.INCREMENTAL_READ_SCHEMA_USE_END_INSTANTTIME.defaultValue).toBoolean - - private val lastInstant = commitTimeline.lastInstant().get() - - private val commitsTimelineToReturn = { - if (useStateTransitionTime) { - commitTimeline.findInstantsInRangeByStateTransitionTs( - optParams(DataSourceReadOptions.BEGIN_INSTANTTIME.key), - optParams.getOrElse(DataSourceReadOptions.END_INSTANTTIME.key(), lastInstant.getStateTransitionTime)) + override def updatePrunedDataSchema(prunedSchema: StructType): Relation = + this.copy(prunedDataSchema = Some(prunedSchema)) + + protected override def timeline: HoodieTimeline = { + if (fullTableScan) { + metaClient.getCommitsAndCompactionTimeline + } else if (useStateTransitionTime) { + metaClient.getCommitsAndCompactionTimeline.findInstantsInRangeByStateTransitionTs(startTimestamp, endTimestamp) } else { - commitTimeline.findInstantsInRange( - optParams(DataSourceReadOptions.BEGIN_INSTANTTIME.key), - optParams.getOrElse(DataSourceReadOptions.END_INSTANTTIME.key(), lastInstant.getTimestamp)) + metaClient.getCommitsAndCompactionTimeline.findInstantsInRange(startTimestamp, endTimestamp) } } - private val commitsToReturn = commitsTimelineToReturn.getInstantsAsStream.iterator().toList - - // use schema from a file produced in the end/latest instant - val (usedSchema, internalSchema) = { - log.info("Inferring schema..") - val schemaResolver = new TableSchemaResolver(metaClient) - val iSchema : InternalSchema = if (!isSchemaEvolutionEnabledOnRead(optParams, sqlContext.sparkSession)) { - InternalSchema.getEmptyInternalSchema - } else if (useEndInstantSchema && !commitsToReturn.isEmpty) { - InternalSchemaCache.searchSchemaAndCache(commitsToReturn.last.getTimestamp.toLong, metaClient, hoodieTable.getConfig.getInternalSchemaCacheEnable) - } else { - schemaResolver.getTableInternalSchemaFromCommitMetadata.orElse(null) - } - - val tableSchema = if (useEndInstantSchema && iSchema.isEmptySchema) { - if (commitsToReturn.isEmpty) schemaResolver.getTableAvroSchema(false) else - schemaResolver.getTableAvroSchema(commitsToReturn.last, false) - } else { - schemaResolver.getTableAvroSchema(false) - } - if (tableSchema.getType == Schema.Type.NULL) { - // if there is only one commit in the table and is an empty commit without schema, return empty RDD here - (StructType(Nil), InternalSchema.getEmptyInternalSchema) - } else { - val dataSchema = AvroConversionUtils.convertAvroSchemaToStructType(tableSchema) - if (iSchema != null && !iSchema.isEmptySchema) { - // if internalSchema is ready, dataSchema will contains skeletonSchema - (dataSchema, iSchema) - } else { - (StructType(skeletonSchema.fields ++ dataSchema.fields), InternalSchema.getEmptyInternalSchema) - } - } + protected override def composeRDD(fileSplits: Seq[HoodieBaseFileSplit], + tableSchema: HoodieTableSchema, + requiredSchema: HoodieTableSchema, + requestedColumns: Array[String], + filters: Array[Filter]): RDD[InternalRow] = { + super.composeRDD(fileSplits, tableSchema, requiredSchema, requestedColumns, + filters ++ incrementalSpanRecordFilters) } - private val filters = optParams.getOrElse(DataSourceReadOptions.PUSH_DOWN_INCR_FILTERS.key, Review Comment: Hey @bvaradar, it appears this is introduced in https://github.com/apache/hudi/pull/485, but I don't find any implementation in `MergeOnReadIncrementalRelation`, did we overlook it or this configure is a legacy one and no longer required? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org