This is an automated email from the ASF dual-hosted git repository. danny0405 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push: new 5d196fe6175 [HUDI-3639] Add Proper Incremental Records FIltering support into Hudi's custom RDD (#8668) 5d196fe6175 is described below commit 5d196fe61757987af29b38e1b5cf38d7ca001924 Author: cxzl25 <cxz...@users.noreply.github.com> AuthorDate: Tue Jul 4 09:25:38 2023 +0800 [HUDI-3639] Add Proper Incremental Records FIltering support into Hudi's custom RDD (#8668) * filter operator for incremental RDD * remove the hard code conf 'spark.sql.parquet.enableVectorizedReader' in relations --------- Co-authored-by: Danny Chan <yuzhao....@gmail.com> --- .../scala/org/apache/hudi/HoodieBaseRelation.scala | 2 - .../org/apache/hudi/HoodieMergeOnReadRDD.scala | 39 +++++++++++++++++- .../hudi/MergeOnReadIncrementalRelation.scala | 28 +++++++------ .../apache/hudi/MergeOnReadSnapshotRelation.scala | 5 --- .../functional/TestParquetColumnProjection.scala | 48 +++++++++++++++++++--- 5 files changed, 95 insertions(+), 27 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala index a9ddbfa4503..a67d4463bf5 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala @@ -467,8 +467,6 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext, def imbueConfigs(sqlContext: SQLContext): Unit = { sqlContext.sparkSession.sessionState.conf.setConfString("spark.sql.parquet.filterPushdown", "true") sqlContext.sparkSession.sessionState.conf.setConfString("spark.sql.parquet.recordLevelFilter.enabled", "true") - // TODO(HUDI-3639) vectorized reader has to be disabled to make sure MORIncrementalRelation is working properly - sqlContext.sparkSession.sessionState.conf.setConfString("spark.sql.parquet.enableVectorizedReader", "false") } /** diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala index d7b60db4929..db538f110c9 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala @@ -23,6 +23,8 @@ import org.apache.hadoop.mapred.JobConf import org.apache.hudi.HoodieBaseRelation.{BaseFileReader, projectReader} import org.apache.hudi.HoodieMergeOnReadRDD.CONFIG_INSTANTIATION_LOCK import org.apache.hudi.MergeOnReadSnapshotRelation.isProjectionCompatible +import org.apache.hudi.common.model.HoodieRecord +import org.apache.hudi.common.util.StringUtils import org.apache.hudi.exception.HoodieException import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils.getMaxCompactionMemoryInBytes import org.apache.spark.rdd.RDD @@ -30,6 +32,7 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.{Partition, SerializableWritable, SparkContext, TaskContext} import java.io.Closeable +import java.util.function.Predicate case class HoodieMergeOnReadPartition(index: Int, split: HoodieMergeOnReadFileSplit) extends Partition @@ -64,6 +67,9 @@ private[hudi] case class HoodieMergeOnReadBaseFileReaders(fullSchemaReader: Base * @param tableState table's state * @param mergeType type of merge performed * @param fileSplits target file-splits this RDD will be iterating over + * @param includeStartTime whether to include the commit with the commitTime + * @param startTimestamp start timestamp to filter records + * @param endTimestamp end timestamp to filter records */ class HoodieMergeOnReadRDD(@transient sc: SparkContext, @transient config: Configuration, @@ -72,7 +78,10 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext, requiredSchema: HoodieTableSchema, tableState: HoodieTableState, mergeType: String, - @transient fileSplits: Seq[HoodieMergeOnReadFileSplit]) + @transient fileSplits: Seq[HoodieMergeOnReadFileSplit], + includeStartTime: Boolean = false, + startTimestamp: String = null, + endTimestamp: String = null) extends RDD[InternalRow](sc, Nil) with HoodieUnsafeRDD { protected val maxCompactionMemoryInBytes: Long = getMaxCompactionMemoryInBytes(new JobConf(config)) @@ -116,7 +125,33 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext, Option(TaskContext.get()).foreach(_.addTaskCompletionListener[Unit](_ => iter.asInstanceOf[Closeable].close())) } - iter + val commitTimeMetadataFieldIdx = requiredSchema.structTypeSchema.fieldNames.indexOf(HoodieRecord.COMMIT_TIME_METADATA_FIELD) + val needsFiltering = commitTimeMetadataFieldIdx >= 0 && !StringUtils.isNullOrEmpty(startTimestamp) && !StringUtils.isNullOrEmpty(endTimestamp) + if (needsFiltering) { + val filterT: Predicate[InternalRow] = getCommitTimeFilter(includeStartTime, commitTimeMetadataFieldIdx) + iter.filter(filterT.test) + } + else { + iter + } + } + + private def getCommitTimeFilter(includeStartTime: Boolean, commitTimeMetadataFieldIdx: Int): Predicate[InternalRow] = { + if (includeStartTime) { + new Predicate[InternalRow] { + override def test(row: InternalRow): Boolean = { + val commitTime = row.getString(commitTimeMetadataFieldIdx) + commitTime >= startTimestamp && commitTime <= endTimestamp + } + } + } else { + new Predicate[InternalRow] { + override def test(row: InternalRow): Boolean = { + val commitTime = row.getString(commitTimeMetadataFieldIdx) + commitTime > startTimestamp && commitTime <= endTimestamp + } + } + } } private def pickBaseFileReader(): BaseFileReader = { diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala index 2bb67000753..a3163586ad4 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala @@ -54,11 +54,6 @@ case class MergeOnReadIncrementalRelation(override val sqlContext: SQLContext, override def updatePrunedDataSchema(prunedSchema: StructType): Relation = this.copy(prunedDataSchema = Some(prunedSchema)) - override def imbueConfigs(sqlContext: SQLContext): Unit = { - super.imbueConfigs(sqlContext) - sqlContext.sparkSession.sessionState.conf.setConfString("spark.sql.parquet.enableVectorizedReader", "false") - } - override protected def timeline: HoodieTimeline = { if (fullTableScan) { handleHollowCommitIfNeeded(metaClient.getCommitsAndCompactionTimeline, metaClient, hollowCommitHandling) @@ -81,8 +76,6 @@ case class MergeOnReadIncrementalRelation(override val sqlContext: SQLContext, val optionalFilters = filters val readers = createBaseFileReaders(tableSchema, requiredSchema, requestedColumns, requiredFilters, optionalFilters) - // TODO(HUDI-3639) implement incremental span record filtering w/in RDD to make sure returned iterator is appropriately - // filtered, since file-reader might not be capable to perform filtering new HoodieMergeOnReadRDD( sqlContext.sparkContext, config = jobConf, @@ -91,7 +84,10 @@ case class MergeOnReadIncrementalRelation(override val sqlContext: SQLContext, requiredSchema = requiredSchema, tableState = tableState, mergeType = mergeType, - fileSplits = fileSplits) + fileSplits = fileSplits, + includeStartTime = includeStartTime, + startTimestamp = startTs, + endTimestamp = endTs) } override protected def collectFileSplits(partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): List[HoodieMergeOnReadFileSplit] = { @@ -184,19 +180,25 @@ trait HoodieIncrementalRelationTrait extends HoodieBaseRelation { listAffectedFilesForCommits(conf, new Path(metaClient.getBasePath), commitsMetadata) } + protected lazy val (includeStartTime, startTs) = if (startInstantArchived) { + (false, startTimestamp) + } else { + (true, includedCommits.head.getTimestamp) + } + protected lazy val endTs: String = if (endInstantArchived) endTimestamp else includedCommits.last.getTimestamp + // Record filters making sure that only records w/in the requested bounds are being fetched as part of the // scan collected by this relation protected lazy val incrementalSpanRecordFilters: Seq[Filter] = { val isNotNullFilter = IsNotNull(HoodieRecord.COMMIT_TIME_METADATA_FIELD) - val largerThanFilter = if (startInstantArchived) { - GreaterThan(HoodieRecord.COMMIT_TIME_METADATA_FIELD, startTimestamp) + val largerThanFilter = if (includeStartTime) { + GreaterThanOrEqual(HoodieRecord.COMMIT_TIME_METADATA_FIELD, startTs) } else { - GreaterThanOrEqual(HoodieRecord.COMMIT_TIME_METADATA_FIELD, includedCommits.head.getTimestamp) + GreaterThan(HoodieRecord.COMMIT_TIME_METADATA_FIELD, startTs) } - val lessThanFilter = LessThanOrEqual(HoodieRecord.COMMIT_TIME_METADATA_FIELD, - if (endInstantArchived) endTimestamp else includedCommits.last.getTimestamp) + val lessThanFilter = LessThanOrEqual(HoodieRecord.COMMIT_TIME_METADATA_FIELD, endTs) Seq(isNotNullFilter, largerThanFilter, lessThanFilter) } diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala index 46ce935c260..e8468f0a7a1 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala @@ -101,11 +101,6 @@ abstract class BaseMergeOnReadSnapshotRelation(sqlContext: SQLContext, override def canPruneRelationSchema: Boolean = super.canPruneRelationSchema && isProjectionCompatible(tableState) - override def imbueConfigs(sqlContext: SQLContext): Unit = { - super.imbueConfigs(sqlContext) - sqlContext.sparkSession.sessionState.conf.setConfString("spark.sql.parquet.enableVectorizedReader", "true") - } - protected override def composeRDD(fileSplits: Seq[HoodieMergeOnReadFileSplit], tableSchema: HoodieTableSchema, requiredSchema: HoodieTableSchema, diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestParquetColumnProjection.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestParquetColumnProjection.scala index a62ef840ab9..ee1edbcccb2 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestParquetColumnProjection.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestParquetColumnProjection.scala @@ -25,16 +25,15 @@ import org.apache.hudi.common.model.{HoodieRecord, OverwriteNonDefaultsWithLates import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient} import org.apache.hudi.common.testutils.{HadoopMapRedUtils, HoodieTestDataGenerator} import org.apache.hudi.config.{HoodieCompactionConfig, HoodieWriteConfig} -import org.apache.hudi.keygen.NonpartitionedKeyGenerator import org.apache.hudi.testutils.SparkClientFunctionalTestHarness import org.apache.hudi.testutils.SparkClientFunctionalTestHarness.getSparkSqlConf -import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, DefaultSource, HoodieBaseRelation, HoodieMergeOnReadRDD, HoodieSparkUtils, HoodieUnsafeRDD} +import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, DefaultSource, HoodieBaseRelation, HoodieSparkUtils, HoodieUnsafeRDD} import org.apache.parquet.hadoop.util.counters.BenchmarkCounter import org.apache.spark.SparkConf import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.{Dataset, HoodieUnsafeUtils, Row, SaveMode} -import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue, fail} +import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue, fail} import org.junit.jupiter.api.{Disabled, Tag, Test} import scala.collection.JavaConverters._ @@ -330,6 +329,40 @@ class TestParquetColumnProjection extends SparkClientFunctionalTestHarness with assertTrue(commitNum > 1) } + @Test + def testMergeOnReadIncrementalRelationWithFilter(): Unit = { + val tablePath = s"$basePath/mor-with-logs-incr-filter" + val targetRecordsCount = 100 + + bootstrapMORTableWithDeltaLog(tablePath, targetRecordsCount, defaultWriteOpts, populateMetaFields = true, inlineCompact = true) + + val hoodieMetaClient = HoodieTableMetaClient.builder().setConf(spark.sparkContext.hadoopConfiguration).setBasePath(tablePath).setLoadActiveTimelineOnLoad(true).build() + val completedCommits = hoodieMetaClient.getCommitsAndCompactionTimeline.filterCompletedInstants() + val startUnarchivedCommitTs = (completedCommits.nthInstant(1).get().getTimestamp.toLong - 1L).toString + val endUnarchivedCommitTs = completedCommits.nthInstant(3).get().getTimestamp //commit + + val readOpts = defaultWriteOpts ++ Map( + "path" -> tablePath, + DataSourceReadOptions.QUERY_TYPE.key -> DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL, + DataSourceReadOptions.BEGIN_INSTANTTIME.key -> startUnarchivedCommitTs, + DataSourceReadOptions.END_INSTANTTIME.key -> endUnarchivedCommitTs + ) + + val inputDf = spark.read.format("hudi") + .options(readOpts) + .load() + // Make sure the filter is not applied at the row group level + spark.sessionState.conf.setConfString("spark.sql.parquet.filterPushdown", "false") + try { + val rows = inputDf.select("_hoodie_commit_time").distinct().sort("_hoodie_commit_time").collect() + assertTrue(rows.length == 2) + assertFalse(rows.exists(_.getString(0) < startUnarchivedCommitTs)) + assertFalse(rows.exists(_.getString(0) > endUnarchivedCommitTs)) + } finally { + spark.sessionState.conf.setConfString("spark.sql.parquet.filterPushdown", "true") + } + } + // Test routine private def runTest(tableState: TableState, queryType: String, @@ -441,7 +474,8 @@ class TestParquetColumnProjection extends SparkClientFunctionalTestHarness with recordCount: Int, opts: Map[String, String], populateMetaFields: Boolean, - dataGenOpt: Option[HoodieTestDataGenerator] = None): (List[HoodieRecord[_]], Schema) = { + dataGenOpt: Option[HoodieTestDataGenerator] = None, + inlineCompact: Boolean = false): (List[HoodieRecord[_]], Schema) = { val dataGen = dataGenOpt.getOrElse(new HoodieTestDataGenerator(0x12345)) // Step 1: Bootstrap table w/ N records (t/h bulk-insert) @@ -455,10 +489,14 @@ class TestParquetColumnProjection extends SparkClientFunctionalTestHarness with // Step 2: Update M records out of those (t/h update) val inputDF = toDataset(updatedRecords, HoodieTestDataGenerator.AVRO_SCHEMA) + val compactScheduleInline = if (inlineCompact) "false" else "true" + val compactInline = if (inlineCompact) "true" else "false" + inputDF.write.format("org.apache.hudi") .options(opts) .option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL) - .option(HoodieCompactionConfig.SCHEDULE_INLINE_COMPACT.key, "true") + .option(HoodieCompactionConfig.SCHEDULE_INLINE_COMPACT.key, compactScheduleInline) + .option(HoodieCompactionConfig.INLINE_COMPACT.key, compactInline) .option(DataSourceWriteOptions.ASYNC_COMPACT_ENABLE.key, "false") .option(HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key, "3") .option(HoodieTableConfig.POPULATE_META_FIELDS.key, populateMetaFields.toString)