This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 5d196fe6175 [HUDI-3639] Add Proper Incremental Records FIltering 
support into Hudi's custom RDD (#8668)
5d196fe6175 is described below

commit 5d196fe61757987af29b38e1b5cf38d7ca001924
Author: cxzl25 <cxz...@users.noreply.github.com>
AuthorDate: Tue Jul 4 09:25:38 2023 +0800

    [HUDI-3639] Add Proper Incremental Records FIltering support into Hudi's 
custom RDD (#8668)
    
    * filter operator for incremental RDD
    * remove the hard code conf 'spark.sql.parquet.enableVectorizedReader' in 
relations
    
    ---------
    
    Co-authored-by: Danny Chan <yuzhao....@gmail.com>
---
 .../scala/org/apache/hudi/HoodieBaseRelation.scala |  2 -
 .../org/apache/hudi/HoodieMergeOnReadRDD.scala     | 39 +++++++++++++++++-
 .../hudi/MergeOnReadIncrementalRelation.scala      | 28 +++++++------
 .../apache/hudi/MergeOnReadSnapshotRelation.scala  |  5 ---
 .../functional/TestParquetColumnProjection.scala   | 48 +++++++++++++++++++---
 5 files changed, 95 insertions(+), 27 deletions(-)

diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
index a9ddbfa4503..a67d4463bf5 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
@@ -467,8 +467,6 @@ abstract class HoodieBaseRelation(val sqlContext: 
SQLContext,
   def imbueConfigs(sqlContext: SQLContext): Unit = {
     
sqlContext.sparkSession.sessionState.conf.setConfString("spark.sql.parquet.filterPushdown",
 "true")
     
sqlContext.sparkSession.sessionState.conf.setConfString("spark.sql.parquet.recordLevelFilter.enabled",
 "true")
-    // TODO(HUDI-3639) vectorized reader has to be disabled to make sure 
MORIncrementalRelation is working properly
-    
sqlContext.sparkSession.sessionState.conf.setConfString("spark.sql.parquet.enableVectorizedReader",
 "false")
   }
 
   /**
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala
index d7b60db4929..db538f110c9 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala
@@ -23,6 +23,8 @@ import org.apache.hadoop.mapred.JobConf
 import org.apache.hudi.HoodieBaseRelation.{BaseFileReader, projectReader}
 import org.apache.hudi.HoodieMergeOnReadRDD.CONFIG_INSTANTIATION_LOCK
 import org.apache.hudi.MergeOnReadSnapshotRelation.isProjectionCompatible
+import org.apache.hudi.common.model.HoodieRecord
+import org.apache.hudi.common.util.StringUtils
 import org.apache.hudi.exception.HoodieException
 import 
org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils.getMaxCompactionMemoryInBytes
 import org.apache.spark.rdd.RDD
@@ -30,6 +32,7 @@ import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.{Partition, SerializableWritable, SparkContext, 
TaskContext}
 
 import java.io.Closeable
+import java.util.function.Predicate
 
 case class HoodieMergeOnReadPartition(index: Int, split: 
HoodieMergeOnReadFileSplit) extends Partition
 
@@ -64,6 +67,9 @@ private[hudi] case class 
HoodieMergeOnReadBaseFileReaders(fullSchemaReader: Base
  * @param tableState table's state
  * @param mergeType type of merge performed
  * @param fileSplits target file-splits this RDD will be iterating over
+ * @param includeStartTime whether to include the commit with the commitTime
+ * @param startTimestamp start timestamp to filter records
+ * @param endTimestamp end timestamp to filter records
  */
 class HoodieMergeOnReadRDD(@transient sc: SparkContext,
                            @transient config: Configuration,
@@ -72,7 +78,10 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext,
                            requiredSchema: HoodieTableSchema,
                            tableState: HoodieTableState,
                            mergeType: String,
-                           @transient fileSplits: 
Seq[HoodieMergeOnReadFileSplit])
+                           @transient fileSplits: 
Seq[HoodieMergeOnReadFileSplit],
+                           includeStartTime: Boolean = false,
+                           startTimestamp: String = null,
+                           endTimestamp: String = null)
   extends RDD[InternalRow](sc, Nil) with HoodieUnsafeRDD {
 
   protected val maxCompactionMemoryInBytes: Long = 
getMaxCompactionMemoryInBytes(new JobConf(config))
@@ -116,7 +125,33 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext,
       Option(TaskContext.get()).foreach(_.addTaskCompletionListener[Unit](_ => 
iter.asInstanceOf[Closeable].close()))
     }
 
-    iter
+    val commitTimeMetadataFieldIdx = 
requiredSchema.structTypeSchema.fieldNames.indexOf(HoodieRecord.COMMIT_TIME_METADATA_FIELD)
+    val needsFiltering = commitTimeMetadataFieldIdx >= 0 && 
!StringUtils.isNullOrEmpty(startTimestamp) && 
!StringUtils.isNullOrEmpty(endTimestamp)
+    if (needsFiltering) {
+      val filterT: Predicate[InternalRow] = 
getCommitTimeFilter(includeStartTime, commitTimeMetadataFieldIdx)
+      iter.filter(filterT.test)
+    }
+    else {
+      iter
+    }
+  }
+
+  private def getCommitTimeFilter(includeStartTime: Boolean, 
commitTimeMetadataFieldIdx: Int): Predicate[InternalRow] = {
+    if (includeStartTime) {
+      new Predicate[InternalRow] {
+        override def test(row: InternalRow): Boolean = {
+          val commitTime = row.getString(commitTimeMetadataFieldIdx)
+          commitTime >= startTimestamp && commitTime <= endTimestamp
+        }
+      }
+    } else {
+      new Predicate[InternalRow] {
+        override def test(row: InternalRow): Boolean = {
+          val commitTime = row.getString(commitTimeMetadataFieldIdx)
+          commitTime > startTimestamp && commitTime <= endTimestamp
+        }
+      }
+    }
   }
 
   private def pickBaseFileReader(): BaseFileReader = {
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala
index 2bb67000753..a3163586ad4 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala
@@ -54,11 +54,6 @@ case class MergeOnReadIncrementalRelation(override val 
sqlContext: SQLContext,
   override def updatePrunedDataSchema(prunedSchema: StructType): Relation =
     this.copy(prunedDataSchema = Some(prunedSchema))
 
-  override def imbueConfigs(sqlContext: SQLContext): Unit = {
-    super.imbueConfigs(sqlContext)
-    
sqlContext.sparkSession.sessionState.conf.setConfString("spark.sql.parquet.enableVectorizedReader",
 "false")
-  }
-
   override protected def timeline: HoodieTimeline = {
     if (fullTableScan) {
       handleHollowCommitIfNeeded(metaClient.getCommitsAndCompactionTimeline, 
metaClient, hollowCommitHandling)
@@ -81,8 +76,6 @@ case class MergeOnReadIncrementalRelation(override val 
sqlContext: SQLContext,
     val optionalFilters = filters
     val readers = createBaseFileReaders(tableSchema, requiredSchema, 
requestedColumns, requiredFilters, optionalFilters)
 
-    // TODO(HUDI-3639) implement incremental span record filtering w/in RDD to 
make sure returned iterator is appropriately
-    //                 filtered, since file-reader might not be capable to 
perform filtering
     new HoodieMergeOnReadRDD(
       sqlContext.sparkContext,
       config = jobConf,
@@ -91,7 +84,10 @@ case class MergeOnReadIncrementalRelation(override val 
sqlContext: SQLContext,
       requiredSchema = requiredSchema,
       tableState = tableState,
       mergeType = mergeType,
-      fileSplits = fileSplits)
+      fileSplits = fileSplits,
+      includeStartTime = includeStartTime,
+      startTimestamp = startTs,
+      endTimestamp = endTs)
   }
 
   override protected def collectFileSplits(partitionFilters: Seq[Expression], 
dataFilters: Seq[Expression]): List[HoodieMergeOnReadFileSplit] = {
@@ -184,19 +180,25 @@ trait HoodieIncrementalRelationTrait extends 
HoodieBaseRelation {
     listAffectedFilesForCommits(conf, new Path(metaClient.getBasePath), 
commitsMetadata)
   }
 
+  protected lazy val (includeStartTime, startTs) = if (startInstantArchived) {
+    (false, startTimestamp)
+  } else {
+    (true, includedCommits.head.getTimestamp)
+  }
+  protected lazy val endTs: String = if (endInstantArchived) endTimestamp else 
includedCommits.last.getTimestamp
+
   // Record filters making sure that only records w/in the requested bounds 
are being fetched as part of the
   // scan collected by this relation
   protected lazy val incrementalSpanRecordFilters: Seq[Filter] = {
     val isNotNullFilter = IsNotNull(HoodieRecord.COMMIT_TIME_METADATA_FIELD)
 
-    val largerThanFilter = if (startInstantArchived) {
-      GreaterThan(HoodieRecord.COMMIT_TIME_METADATA_FIELD, startTimestamp)
+    val largerThanFilter = if (includeStartTime) {
+      GreaterThanOrEqual(HoodieRecord.COMMIT_TIME_METADATA_FIELD, startTs)
     } else {
-      GreaterThanOrEqual(HoodieRecord.COMMIT_TIME_METADATA_FIELD, 
includedCommits.head.getTimestamp)
+      GreaterThan(HoodieRecord.COMMIT_TIME_METADATA_FIELD, startTs)
     }
 
-    val lessThanFilter = 
LessThanOrEqual(HoodieRecord.COMMIT_TIME_METADATA_FIELD,
-      if (endInstantArchived) endTimestamp else 
includedCommits.last.getTimestamp)
+    val lessThanFilter = 
LessThanOrEqual(HoodieRecord.COMMIT_TIME_METADATA_FIELD, endTs)
 
     Seq(isNotNullFilter, largerThanFilter, lessThanFilter)
   }
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala
index 46ce935c260..e8468f0a7a1 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala
@@ -101,11 +101,6 @@ abstract class BaseMergeOnReadSnapshotRelation(sqlContext: 
SQLContext,
   override def canPruneRelationSchema: Boolean =
     super.canPruneRelationSchema && isProjectionCompatible(tableState)
 
-  override def imbueConfigs(sqlContext: SQLContext): Unit = {
-    super.imbueConfigs(sqlContext)
-    
sqlContext.sparkSession.sessionState.conf.setConfString("spark.sql.parquet.enableVectorizedReader",
 "true")
-  }
-
   protected override def composeRDD(fileSplits: 
Seq[HoodieMergeOnReadFileSplit],
                                     tableSchema: HoodieTableSchema,
                                     requiredSchema: HoodieTableSchema,
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestParquetColumnProjection.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestParquetColumnProjection.scala
index a62ef840ab9..ee1edbcccb2 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestParquetColumnProjection.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestParquetColumnProjection.scala
@@ -25,16 +25,15 @@ import org.apache.hudi.common.model.{HoodieRecord, 
OverwriteNonDefaultsWithLates
 import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
 import org.apache.hudi.common.testutils.{HadoopMapRedUtils, 
HoodieTestDataGenerator}
 import org.apache.hudi.config.{HoodieCompactionConfig, HoodieWriteConfig}
-import org.apache.hudi.keygen.NonpartitionedKeyGenerator
 import org.apache.hudi.testutils.SparkClientFunctionalTestHarness
 import 
org.apache.hudi.testutils.SparkClientFunctionalTestHarness.getSparkSqlConf
-import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, 
DefaultSource, HoodieBaseRelation, HoodieMergeOnReadRDD, HoodieSparkUtils, 
HoodieUnsafeRDD}
+import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, 
DefaultSource, HoodieBaseRelation, HoodieSparkUtils, HoodieUnsafeRDD}
 import org.apache.parquet.hadoop.util.counters.BenchmarkCounter
 import org.apache.spark.SparkConf
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.{Dataset, HoodieUnsafeUtils, Row, SaveMode}
-import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue, fail}
+import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, 
assertTrue, fail}
 import org.junit.jupiter.api.{Disabled, Tag, Test}
 
 import scala.collection.JavaConverters._
@@ -330,6 +329,40 @@ class TestParquetColumnProjection extends 
SparkClientFunctionalTestHarness with
     assertTrue(commitNum > 1)
   }
 
+  @Test
+  def testMergeOnReadIncrementalRelationWithFilter(): Unit = {
+    val tablePath = s"$basePath/mor-with-logs-incr-filter"
+    val targetRecordsCount = 100
+
+    bootstrapMORTableWithDeltaLog(tablePath, targetRecordsCount, 
defaultWriteOpts, populateMetaFields = true, inlineCompact = true)
+
+    val hoodieMetaClient = 
HoodieTableMetaClient.builder().setConf(spark.sparkContext.hadoopConfiguration).setBasePath(tablePath).setLoadActiveTimelineOnLoad(true).build()
+    val completedCommits = 
hoodieMetaClient.getCommitsAndCompactionTimeline.filterCompletedInstants()
+    val startUnarchivedCommitTs = 
(completedCommits.nthInstant(1).get().getTimestamp.toLong - 1L).toString
+    val endUnarchivedCommitTs = 
completedCommits.nthInstant(3).get().getTimestamp //commit
+
+    val readOpts = defaultWriteOpts ++ Map(
+      "path" -> tablePath,
+      DataSourceReadOptions.QUERY_TYPE.key -> 
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL,
+      DataSourceReadOptions.BEGIN_INSTANTTIME.key -> startUnarchivedCommitTs,
+      DataSourceReadOptions.END_INSTANTTIME.key -> endUnarchivedCommitTs
+    )
+
+    val inputDf = spark.read.format("hudi")
+      .options(readOpts)
+      .load()
+    // Make sure the filter is not applied at the row group level
+    spark.sessionState.conf.setConfString("spark.sql.parquet.filterPushdown", 
"false")
+    try {
+      val rows = 
inputDf.select("_hoodie_commit_time").distinct().sort("_hoodie_commit_time").collect()
+      assertTrue(rows.length == 2)
+      assertFalse(rows.exists(_.getString(0) < startUnarchivedCommitTs))
+      assertFalse(rows.exists(_.getString(0) > endUnarchivedCommitTs))
+    } finally {
+      
spark.sessionState.conf.setConfString("spark.sql.parquet.filterPushdown", 
"true")
+    }
+  }
+
   // Test routine
   private def runTest(tableState: TableState,
                       queryType: String,
@@ -441,7 +474,8 @@ class TestParquetColumnProjection extends 
SparkClientFunctionalTestHarness with
                                 recordCount: Int,
                                 opts: Map[String, String],
                                 populateMetaFields: Boolean,
-                                dataGenOpt: Option[HoodieTestDataGenerator] = 
None): (List[HoodieRecord[_]], Schema) = {
+                                dataGenOpt: Option[HoodieTestDataGenerator] = 
None,
+                                inlineCompact: Boolean = false): 
(List[HoodieRecord[_]], Schema) = {
     val dataGen = dataGenOpt.getOrElse(new HoodieTestDataGenerator(0x12345))
 
     // Step 1: Bootstrap table w/ N records (t/h bulk-insert)
@@ -455,10 +489,14 @@ class TestParquetColumnProjection extends 
SparkClientFunctionalTestHarness with
       // Step 2: Update M records out of those (t/h update)
       val inputDF = toDataset(updatedRecords, 
HoodieTestDataGenerator.AVRO_SCHEMA)
 
+      val compactScheduleInline = if (inlineCompact) "false" else "true"
+      val compactInline = if (inlineCompact) "true" else "false"
+
       inputDF.write.format("org.apache.hudi")
         .options(opts)
         .option(DataSourceWriteOptions.OPERATION.key, 
DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
-        .option(HoodieCompactionConfig.SCHEDULE_INLINE_COMPACT.key, "true")
+        .option(HoodieCompactionConfig.SCHEDULE_INLINE_COMPACT.key, 
compactScheduleInline)
+        .option(HoodieCompactionConfig.INLINE_COMPACT.key, compactInline)
         .option(DataSourceWriteOptions.ASYNC_COMPACT_ENABLE.key, "false")
         .option(HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key, 
"3")
         .option(HoodieTableConfig.POPULATE_META_FIELDS.key, 
populateMetaFields.toString)

Reply via email to