This is an automated email from the ASF dual-hosted git repository.

vinoth pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 79ec7b4  [HUDI-920] Support Incremental query for MOR table (#1938)
79ec7b4 is described below

commit 79ec7b4894b997183a6e10fdc19d34f5ab4ea437
Author: Gary Li <yanjia.gary...@gmail.com>
AuthorDate: Sun Jan 10 00:02:08 2021 +0800

    [HUDI-920] Support Incremental query for MOR table (#1938)
---
 .../hudi/hadoop/utils/HoodieInputFormatUtils.java  |  41 ++++
 .../main/scala/org/apache/hudi/DefaultSource.scala |   7 +-
 .../org/apache/hudi/HoodieMergeOnReadRDD.scala     |  60 +++++-
 .../org/apache/hudi/IncrementalRelation.scala      |  11 +-
 .../hudi/MergeOnReadIncrementalRelation.scala      | 218 +++++++++++++++++++++
 .../apache/hudi/MergeOnReadSnapshotRelation.scala  |   6 +-
 .../apache/hudi/functional/TestMORDataSource.scala | 165 +++++++++++++---
 7 files changed, 463 insertions(+), 45 deletions(-)

diff --git 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java
 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java
index cf7da54..019b558 100644
--- 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java
+++ 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java
@@ -23,6 +23,7 @@ import org.apache.hudi.common.model.HoodieBaseFile;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.model.HoodieFileFormat;
 import org.apache.hudi.common.model.HoodiePartitionMetadata;
+import org.apache.hudi.common.model.HoodieWriteStat;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.HoodieDefaultTimeline;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
@@ -470,4 +471,44 @@ public class HoodieInputFormatUtils {
     }
   }
 
+  /**
+   * Iterate through a list of commits in ascending order, and extract the 
file status of
+   * all affected files from the commits metadata grouping by partition path. 
If the files has
+   * been touched multiple times in the given commits, the return value will 
keep the one
+   * from the latest commit.
+   * @param basePath
+   * @param commitsToCheck
+   * @param timeline
+   * @return HashMap<partitionPath, HashMap<fileName, FileStatus>>
+   * @throws IOException
+   */
+  public static HashMap<String, HashMap<String, FileStatus>> 
listAffectedFilesForCommits(
+      Path basePath, List<HoodieInstant> commitsToCheck, HoodieTimeline 
timeline) throws IOException {
+    // TODO: Use HoodieMetaTable to extract affected file directly.
+    HashMap<String, HashMap<String, FileStatus>> partitionToFileStatusesMap = 
new HashMap<>();
+    List<HoodieInstant> sortedCommitsToCheck = new ArrayList<>(commitsToCheck);
+    sortedCommitsToCheck.sort(HoodieInstant::compareTo);
+    // Iterate through the given commits.
+    for (HoodieInstant commit: sortedCommitsToCheck) {
+      HoodieCommitMetadata commitMetadata = 
HoodieCommitMetadata.fromBytes(timeline.getInstantDetails(commit).get(),
+          HoodieCommitMetadata.class);
+      // Iterate through all the affected partitions of a commit.
+      for (Map.Entry<String, List<HoodieWriteStat>> entry: 
commitMetadata.getPartitionToWriteStats().entrySet()) {
+        if (!partitionToFileStatusesMap.containsKey(entry.getKey())) {
+          partitionToFileStatusesMap.put(entry.getKey(), new HashMap<>());
+        }
+        // Iterate through all the written files of this partition.
+        for (HoodieWriteStat stat : entry.getValue()) {
+          String relativeFilePath = stat.getPath();
+          Path fullPath = relativeFilePath != null ? 
FSUtils.getPartitionPath(basePath, relativeFilePath) : null;
+          if (fullPath != null) {
+            FileStatus fs = new FileStatus(stat.getFileSizeInBytes(), false, 
0, 0,
+                0, fullPath);
+            
partitionToFileStatusesMap.get(entry.getKey()).put(fullPath.getName(), fs);
+          }
+        }
+      }
+    }
+    return partitionToFileStatusesMap;
+  }
 }
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala
index 0e322e2..d26390d 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala
@@ -94,7 +94,12 @@ class DefaultSource extends RelationProvider
     } else 
if(parameters(QUERY_TYPE_OPT_KEY).equals(QUERY_TYPE_READ_OPTIMIZED_OPT_VAL)) {
       getBaseFileOnlyView(sqlContext, parameters, schema, readPaths, 
isBootstrappedTable, globPaths, metaClient)
     } else if 
(parameters(QUERY_TYPE_OPT_KEY).equals(QUERY_TYPE_INCREMENTAL_OPT_VAL)) {
-      new IncrementalRelation(sqlContext, tablePath, optParams, schema)
+      val metaClient = new HoodieTableMetaClient(fs.getConf, tablePath)
+      if (metaClient.getTableType.equals(HoodieTableType.MERGE_ON_READ)) {
+        new MergeOnReadIncrementalRelation(sqlContext, optParams, schema, 
metaClient)
+      } else {
+        new IncrementalRelation(sqlContext, optParams, schema, metaClient)
+      }
     } else {
       throw new HoodieException("Invalid query type :" + 
parameters(QUERY_TYPE_OPT_KEY))
     }
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala
index e8caa63..e20c33c 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala
@@ -50,30 +50,32 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext,
   private val confBroadcast = sc.broadcast(new SerializableWritable(config))
 
   override def compute(split: Partition, context: TaskContext): 
Iterator[InternalRow] = {
-    val mergeParquetPartition = split.asInstanceOf[HoodieMergeOnReadPartition]
-    mergeParquetPartition.split match {
+    val mergeOnReadPartition = split.asInstanceOf[HoodieMergeOnReadPartition]
+    mergeOnReadPartition.split match {
       case dataFileOnlySplit if dataFileOnlySplit.logPaths.isEmpty =>
-        read(mergeParquetPartition.split.dataFile, requiredSchemaFileReader)
+        read(dataFileOnlySplit.dataFile.get, requiredSchemaFileReader)
+      case logFileOnlySplit if logFileOnlySplit.dataFile.isEmpty =>
+        logFileIterator(logFileOnlySplit, getConfig)
       case skipMergeSplit if skipMergeSplit.mergeType
         .equals(DataSourceReadOptions.REALTIME_SKIP_MERGE_OPT_VAL) =>
         skipMergeFileIterator(
           skipMergeSplit,
-          read(mergeParquetPartition.split.dataFile, requiredSchemaFileReader),
+          read(skipMergeSplit.dataFile.get, requiredSchemaFileReader),
           getConfig
         )
       case payloadCombineSplit if payloadCombineSplit.mergeType
         .equals(DataSourceReadOptions.REALTIME_PAYLOAD_COMBINE_OPT_VAL) =>
         payloadCombineFileIterator(
           payloadCombineSplit,
-          read(mergeParquetPartition.split.dataFile, fullSchemaFileReader),
+          read(payloadCombineSplit.dataFile.get, fullSchemaFileReader),
           getConfig
         )
       case _ => throw new HoodieException(s"Unable to select an Iterator to 
read the Hoodie MOR File Split for " +
-        s"file path: ${mergeParquetPartition.split.dataFile.filePath}" +
-        s"log paths: ${mergeParquetPartition.split.logPaths.toString}" +
-        s"hoodie table path: ${mergeParquetPartition.split.tablePath}" +
-        s"spark partition Index: ${mergeParquetPartition.index}" +
-        s"merge type: ${mergeParquetPartition.split.mergeType}")
+        s"file path: ${mergeOnReadPartition.split.dataFile.get.filePath}" +
+        s"log paths: ${mergeOnReadPartition.split.logPaths.toString}" +
+        s"hoodie table path: ${mergeOnReadPartition.split.tablePath}" +
+        s"spark partition Index: ${mergeOnReadPartition.index}" +
+        s"merge type: ${mergeOnReadPartition.split.mergeType}")
     }
   }
 
@@ -101,6 +103,44 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext,
     rows
   }
 
+  private def logFileIterator(split: HoodieMergeOnReadFileSplit,
+                             config: Configuration): Iterator[InternalRow] =
+    new Iterator[InternalRow] {
+      private val tableAvroSchema = new 
Schema.Parser().parse(tableState.tableAvroSchema)
+      private val requiredAvroSchema = new 
Schema.Parser().parse(tableState.requiredAvroSchema)
+      private val requiredFieldPosition =
+        tableState.requiredStructSchema
+          .map(f => tableAvroSchema.getField(f.name).pos()).toList
+      private val recordBuilder = new GenericRecordBuilder(requiredAvroSchema)
+      private val deserializer = new AvroDeserializer(requiredAvroSchema, 
tableState.requiredStructSchema)
+      private val unsafeProjection = 
UnsafeProjection.create(tableState.requiredStructSchema)
+      private val logRecords = HoodieMergeOnReadRDD.scanLog(split, 
tableAvroSchema, config).getRecords
+      private val logRecordsKeyIterator = 
logRecords.keySet().iterator().asScala
+
+      private var recordToLoad: InternalRow = _
+      override def hasNext: Boolean = {
+        if (logRecordsKeyIterator.hasNext) {
+          val curAvrokey = logRecordsKeyIterator.next()
+          val curAvroRecord = 
logRecords.get(curAvrokey).getData.getInsertValue(tableAvroSchema)
+          if (!curAvroRecord.isPresent) {
+            // delete record found, skipping
+            this.hasNext
+          } else {
+            val requiredAvroRecord = AvroConversionUtils
+              .buildAvroRecordBySchema(curAvroRecord.get(), 
requiredAvroSchema, requiredFieldPosition, recordBuilder)
+            recordToLoad = 
unsafeProjection(deserializer.deserialize(requiredAvroRecord).asInstanceOf[InternalRow])
+            true
+          }
+        } else {
+          false
+        }
+      }
+
+      override def next(): InternalRow = {
+        recordToLoad
+      }
+    }
+
   private def skipMergeFileIterator(split: HoodieMergeOnReadFileSplit,
                                   baseFileIterator: Iterator[InternalRow],
                                   config: Configuration): 
Iterator[InternalRow] =
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/IncrementalRelation.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/IncrementalRelation.scala
index 9cd562c..5c20656 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/IncrementalRelation.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/IncrementalRelation.scala
@@ -42,19 +42,14 @@ import scala.collection.mutable
   *
   */
 class IncrementalRelation(val sqlContext: SQLContext,
-                          val basePath: String,
                           val optParams: Map[String, String],
-                          val userSchema: StructType) extends BaseRelation 
with TableScan {
+                          val userSchema: StructType,
+                          val metaClient: HoodieTableMetaClient) extends 
BaseRelation with TableScan {
 
   private val log = LogManager.getLogger(classOf[IncrementalRelation])
 
   val skeletonSchema: StructType = HoodieSparkUtils.getMetaSchema
-  private val metaClient = new 
HoodieTableMetaClient(sqlContext.sparkContext.hadoopConfiguration, basePath, 
true)
-
-  // MOR tables not supported yet
-  if (metaClient.getTableType.equals(HoodieTableType.MERGE_ON_READ)) {
-    throw new HoodieException("Incremental view not implemented yet, for 
merge-on-read tables")
-  }
+  private val basePath = metaClient.getBasePath
   // TODO : Figure out a valid HoodieWriteConfig
   private val hoodieTable = 
HoodieSparkTable.create(HoodieWriteConfig.newBuilder().withPath(basePath).build(),
     new HoodieSparkEngineContext(new 
JavaSparkContext(sqlContext.sparkContext)),
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala
new file mode 100644
index 0000000..d7b8cff
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.hadoop.fs.{FileStatus, FileSystem, GlobPattern, Path}
+import org.apache.hadoop.mapred.JobConf
+import org.apache.hudi.common.fs.FSUtils
+import org.apache.hudi.common.model.HoodieRecord
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView
+import org.apache.hudi.common.table.{HoodieTableMetaClient, 
TableSchemaResolver}
+import org.apache.hudi.exception.HoodieException
+import 
org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.listAffectedFilesForCommits
+import 
org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils.getMaxCompactionMemoryInBytes
+import org.apache.log4j.LogManager
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.datasources.PartitionedFile
+import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
+import org.apache.spark.sql.sources._
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.{Row, SQLContext}
+
+import scala.collection.JavaConversions._
+import scala.collection.mutable.ListBuffer
+
+/**
+  * Experimental.
+  * Relation, that implements the Hoodie incremental view for Merge On Read 
table.
+  *
+  */
+class MergeOnReadIncrementalRelation(val sqlContext: SQLContext,
+                                     val optParams: Map[String, String],
+                                     val userSchema: StructType,
+                                     val metaClient: HoodieTableMetaClient)
+  extends BaseRelation with PrunedFilteredScan {
+
+  private val log = 
LogManager.getLogger(classOf[MergeOnReadIncrementalRelation])
+  private val conf = sqlContext.sparkContext.hadoopConfiguration
+  private val jobConf = new JobConf(conf)
+  private val fs = FSUtils.getFs(metaClient.getBasePath, conf)
+  private val commitTimeline = 
metaClient.getCommitsAndCompactionTimeline.filterCompletedInstants()
+  if (commitTimeline.empty()) {
+    throw new HoodieException("No instants to incrementally pull")
+  }
+  if (!optParams.contains(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY)) {
+    throw new HoodieException(s"Specify the begin instant time to pull from 
using " +
+      s"option ${DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY}")
+  }
+
+  private val lastInstant = commitTimeline.lastInstant().get()
+  private val mergeType = optParams.getOrElse(
+    DataSourceReadOptions.REALTIME_MERGE_OPT_KEY,
+    DataSourceReadOptions.DEFAULT_REALTIME_MERGE_OPT_VAL)
+
+  private val commitsTimelineToReturn = commitTimeline.findInstantsInRange(
+    optParams(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY),
+    optParams.getOrElse(DataSourceReadOptions.END_INSTANTTIME_OPT_KEY, 
lastInstant.getTimestamp))
+  log.debug(s"${commitsTimelineToReturn.getInstants.iterator().toList.map(f => 
f.toString).mkString(",")}")
+  private val commitsToReturn = 
commitsTimelineToReturn.getInstants.iterator().toList
+  private val schemaUtil = new TableSchemaResolver(metaClient)
+  private val tableAvroSchema = schemaUtil.getTableAvroSchema
+  private val tableStructSchema = 
AvroConversionUtils.convertAvroSchemaToStructType(tableAvroSchema)
+  private val maxCompactionMemoryInBytes = 
getMaxCompactionMemoryInBytes(jobConf)
+  private val fileIndex = buildFileIndex()
+
+  override def schema: StructType = tableStructSchema
+
+  override def needConversion: Boolean = false
+
+  override def unhandledFilters(filters: Array[Filter]): Array[Filter] = {
+    val isNotNullFilter = IsNotNull(HoodieRecord.COMMIT_TIME_METADATA_FIELD)
+    val largerThanFilter = 
GreaterThanOrEqual(HoodieRecord.COMMIT_TIME_METADATA_FIELD, 
commitsToReturn.head.getTimestamp)
+    val lessThanFilter = 
LessThanOrEqual(HoodieRecord.COMMIT_TIME_METADATA_FIELD, 
commitsToReturn.last.getTimestamp)
+    filters :+isNotNullFilter :+ largerThanFilter :+ lessThanFilter
+  }
+
+  override def buildScan(requiredColumns: Array[String], filters: 
Array[Filter]): RDD[Row] = {
+    log.debug(s"buildScan requiredColumns = ${requiredColumns.mkString(",")}")
+    log.debug(s"buildScan filters = ${filters.mkString(",")}")
+    // config to ensure the push down filter for parquet will be applied.
+    
sqlContext.sparkSession.sessionState.conf.setConfString("spark.sql.parquet.filterPushdown",
 "true")
+    
sqlContext.sparkSession.sessionState.conf.setConfString("spark.sql.parquet.recordLevelFilter.enabled",
 "true")
+    
sqlContext.sparkSession.sessionState.conf.setConfString("spark.sql.parquet.enableVectorizedReader",
 "false")
+    val pushDownFilter = {
+      val isNotNullFilter = IsNotNull(HoodieRecord.COMMIT_TIME_METADATA_FIELD)
+      val largerThanFilter = 
GreaterThanOrEqual(HoodieRecord.COMMIT_TIME_METADATA_FIELD, 
commitsToReturn.head.getTimestamp)
+      val lessThanFilter = 
LessThanOrEqual(HoodieRecord.COMMIT_TIME_METADATA_FIELD, 
commitsToReturn.last.getTimestamp)
+      filters :+isNotNullFilter :+ largerThanFilter :+ lessThanFilter
+    }
+    var requiredStructSchema = StructType(Seq())
+    requiredColumns.foreach(col => {
+      val field = tableStructSchema.find(_.name == col)
+      if (field.isDefined) {
+        requiredStructSchema = requiredStructSchema.add(field.get)
+      }
+    })
+    val requiredAvroSchema = AvroConversionUtils
+      .convertStructTypeToAvroSchema(requiredStructSchema, 
tableAvroSchema.getName, tableAvroSchema.getNamespace)
+    val hoodieTableState = HoodieMergeOnReadTableState(
+      tableStructSchema,
+      requiredStructSchema,
+      tableAvroSchema.toString,
+      requiredAvroSchema.toString,
+      fileIndex
+    )
+    val fullSchemaParquetReader = new 
ParquetFileFormat().buildReaderWithPartitionValues(
+      sparkSession = sqlContext.sparkSession,
+      dataSchema = tableStructSchema,
+      partitionSchema = StructType(Nil),
+      requiredSchema = tableStructSchema,
+      filters = pushDownFilter,
+      options = optParams,
+      hadoopConf = sqlContext.sparkSession.sessionState.newHadoopConf()
+    )
+    val requiredSchemaParquetReader = new 
ParquetFileFormat().buildReaderWithPartitionValues(
+      sparkSession = sqlContext.sparkSession,
+      dataSchema = tableStructSchema,
+      partitionSchema = StructType(Nil),
+      requiredSchema = requiredStructSchema,
+      filters = pushDownFilter,
+      options = optParams,
+      hadoopConf = sqlContext.sparkSession.sessionState.newHadoopConf()
+    )
+
+    // Follow the implementation of Spark internal HadoopRDD to handle the 
broadcast configuration.
+    FileSystem.getLocal(jobConf)
+    SparkHadoopUtil.get.addCredentials(jobConf)
+    val rdd = new HoodieMergeOnReadRDD(
+      sqlContext.sparkContext,
+      jobConf,
+      fullSchemaParquetReader,
+      requiredSchemaParquetReader,
+      hoodieTableState
+    )
+    rdd.asInstanceOf[RDD[Row]]
+  }
+
+  def buildFileIndex(): List[HoodieMergeOnReadFileSplit] = {
+    val partitionsWithFileStatus = listAffectedFilesForCommits(new 
Path(metaClient.getBasePath),
+      commitsToReturn, commitsTimelineToReturn)
+    val affectedFileStatus = new ListBuffer[FileStatus]
+    partitionsWithFileStatus.iterator.foreach(p =>
+      p._2.iterator.foreach(status => affectedFileStatus += status._2))
+    val fsView = new HoodieTableFileSystemView(metaClient,
+      commitsTimelineToReturn, affectedFileStatus.toArray)
+
+    // Iterate partitions to create splits
+    val fileGroup = partitionsWithFileStatus.keySet().flatMap(partitionPath =>
+      fsView.getAllFileGroups(partitionPath).iterator()
+    ).toList
+    val latestCommit = fsView.getLastInstant.get().getTimestamp
+    if (log.isDebugEnabled) {
+      fileGroup.foreach(f => log.debug(s"current file group id: " +
+        s"${f.getFileGroupId} and file slices 
${f.getLatestFileSlice.get().toString}"))
+    }
+
+    // Filter files based on user defined glob pattern
+    val pathGlobPattern = optParams.getOrElse(
+      DataSourceReadOptions.INCR_PATH_GLOB_OPT_KEY,
+      DataSourceReadOptions.DEFAULT_INCR_PATH_GLOB_OPT_VAL)
+    val filteredFileGroup = if(!pathGlobPattern
+      .equals(DataSourceReadOptions.DEFAULT_INCR_PATH_GLOB_OPT_VAL)) {
+      val globMatcher = new GlobPattern("*" + pathGlobPattern)
+      fileGroup.filter(f => {
+        if (f.getLatestFileSlice.get().getBaseFile.isPresent) {
+          
globMatcher.matches(f.getLatestFileSlice.get().getBaseFile.get.getPath)
+        } else {
+          
globMatcher.matches(f.getLatestFileSlice.get().getLatestLogFile.get().getPath.toString)
+        }
+      })
+    } else {
+      fileGroup
+    }
+
+    // Build HoodieMergeOnReadFileSplit.
+    filteredFileGroup.map(f => {
+      // Ensure get the base file when there is a pending compaction, which 
means the base file
+      // won't be in the latest file slice.
+      val baseFiles = f.getAllFileSlices.iterator().filter(slice => 
slice.getBaseFile.isPresent).toList
+      val partitionedFile = if (baseFiles.nonEmpty) {
+        val baseFile = baseFiles.head.getBaseFile
+        Option(PartitionedFile(InternalRow.empty, baseFile.get.getPath, 0, 
baseFile.get.getFileLen))
+      }
+      else {
+        Option.empty
+      }
+
+      val logPath = if (f.getLatestFileSlice.isPresent) {
+        //If log path doesn't exist, we still include an empty path to avoid 
using
+        // the default parquet reader to ensure the push down filter will be 
applied.
+        Option(f.getLatestFileSlice.get().getLogFiles.iterator().toList
+          .map(logfile => logfile.getPath.toString))
+      }
+      else {
+        Option.empty
+      }
+
+      HoodieMergeOnReadFileSplit(partitionedFile, logPath,
+        latestCommit, metaClient.getBasePath, maxCompactionMemoryInBytes, 
mergeType)
+    })
+  }
+}
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala
index 0b81fa7..328e3c3 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala
@@ -39,7 +39,7 @@ import org.apache.spark.sql.types.StructType
 
 import scala.collection.JavaConverters._
 
-case class HoodieMergeOnReadFileSplit(dataFile: PartitionedFile,
+case class HoodieMergeOnReadFileSplit(dataFile: Option[PartitionedFile],
                                       logPaths: Option[List[String]],
                                       latestCommit: String,
                                       tablePath: String,
@@ -99,7 +99,7 @@ class MergeOnReadSnapshotRelation(val sqlContext: SQLContext,
       dataSchema = tableStructSchema,
       partitionSchema = StructType(Nil),
       requiredSchema = tableStructSchema,
-      filters = Seq(),
+      filters = filters,
       options = optParams,
       hadoopConf = sqlContext.sparkSession.sessionState.newHadoopConf()
     )
@@ -140,7 +140,7 @@ class MergeOnReadSnapshotRelation(val sqlContext: 
SQLContext,
       val baseFile = kv._1
       val logPaths = if (kv._2.isEmpty) Option.empty else 
Option(kv._2.asScala.toList)
       val partitionedFile = PartitionedFile(InternalRow.empty, 
baseFile.getPath, 0, baseFile.getFileLen)
-      HoodieMergeOnReadFileSplit(partitionedFile, logPaths, latestCommit,
+      HoodieMergeOnReadFileSplit(Option(partitionedFile), logPaths, 
latestCommit,
         metaClient.getBasePath, maxCompactionMemoryInBytes, mergeType)
     }).toList
     fileSplits
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
index 7c73669..121957e 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
@@ -19,7 +19,7 @@ package org.apache.hudi.functional
 
 import org.apache.hudi.common.fs.FSUtils
 import org.apache.hudi.common.testutils.HoodieTestDataGenerator
-import org.apache.hudi.config.HoodieWriteConfig
+import org.apache.hudi.config.{HoodieCompactionConfig, HoodieWriteConfig}
 import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, 
HoodieDataSourceHelpers}
 import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings
 import org.apache.hudi.testutils.HoodieClientTestBase
@@ -29,6 +29,7 @@ import org.apache.spark.sql.functions._
 import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
 import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
 
+
 import scala.collection.JavaConversions._
 
 /**
@@ -157,6 +158,39 @@ class TestMORDataSource extends HoodieClientTestBase {
     assertTrue(commit2Time > commit1Time)
     assertEquals(100, hudiSnapshotDF2.join(hudiSnapshotDF1, 
Seq("_hoodie_record_key"), "left").count())
 
+    // incremental view
+    // base file only
+    val hudiIncDF1 = spark.read.format("org.apache.hudi")
+      .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, 
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
+      .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, "000")
+      .option(DataSourceReadOptions.END_INSTANTTIME_OPT_KEY, commit1Time)
+      .load(basePath)
+    assertEquals(100, hudiIncDF1.count())
+    assertEquals(1, 
hudiIncDF1.select("_hoodie_commit_time").distinct().count())
+    assertEquals(commit1Time, 
hudiIncDF1.select("_hoodie_commit_time").head().get(0).toString)
+    hudiIncDF1.show(1)
+    // log file only
+    val hudiIncDF2 = spark.read.format("org.apache.hudi")
+      .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, 
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
+      .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, commit1Time)
+      .option(DataSourceReadOptions.END_INSTANTTIME_OPT_KEY, commit2Time)
+      .load(basePath)
+    assertEquals(100, hudiIncDF2.count())
+    assertEquals(1, 
hudiIncDF2.select("_hoodie_commit_time").distinct().count())
+    assertEquals(commit2Time, 
hudiIncDF2.select("_hoodie_commit_time").head().get(0).toString)
+    hudiIncDF2.show(1)
+
+    // base file + log file
+    val hudiIncDF3 = spark.read.format("org.apache.hudi")
+      .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, 
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
+      .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, "000")
+      .option(DataSourceReadOptions.END_INSTANTTIME_OPT_KEY, commit2Time)
+      .load(basePath)
+    assertEquals(100, hudiIncDF3.count())
+    // log file being load
+    assertEquals(1, 
hudiIncDF3.select("_hoodie_commit_time").distinct().count())
+    assertEquals(commit2Time, 
hudiIncDF3.select("_hoodie_commit_time").head().get(0).toString)
+
     // Unmerge
     val hudiSnapshotSkipMergeDF2 = spark.read.format("org.apache.hudi")
       .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, 
DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL)
@@ -193,6 +227,22 @@ class TestMORDataSource extends HoodieClientTestBase {
     assertEquals(50,
       hudiSnapshotDF3.join(hudiSnapshotDF2, Seq("_hoodie_record_key", 
"_hoodie_commit_time"), "inner").count())
 
+    // incremental query from commit2Time
+    val hudiIncDF4 = spark.read.format("org.apache.hudi")
+      .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, 
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
+      .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, commit2Time)
+      .load(basePath)
+    assertEquals(50, hudiIncDF4.count())
+
+    // skip merge incremental view
+    // including commit 2 and commit 3
+    val hudiIncDF4SkipMerge = spark.read.format("org.apache.hudi")
+      .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, 
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
+      .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, "000")
+      .option(DataSourceReadOptions.REALTIME_MERGE_OPT_KEY, 
DataSourceReadOptions.REALTIME_SKIP_MERGE_OPT_VAL)
+      .load(basePath)
+    assertEquals(200, hudiIncDF4SkipMerge.count())
+
     // Fourth Operation:
     // Insert records to a new partition. Produced a new parquet file.
     // SNAPSHOT view should read the latest log files from the default 
partition and parquet from the new partition.
@@ -213,21 +263,51 @@ class TestMORDataSource extends HoodieClientTestBase {
     assertEquals(100,
       hudiSnapshotDF1.join(hudiSnapshotDF4, Seq("_hoodie_record_key"), 
"inner").count())
 
+    // Incremental query, 50 from log file, 100 from base file of the new 
partition.
+    val hudiIncDF5 = spark.read.format("org.apache.hudi")
+      .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, 
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
+      .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, commit2Time)
+      .load(basePath)
+    assertEquals(150, hudiIncDF5.count())
+
     // Fifth Operation:
     // Upsert records to the new partition. Produced a newer version of 
parquet file.
     // SNAPSHOT view should read the latest log files from the default 
partition
     // and the latest parquet from the new partition.
-    val records5 = recordsToStrings(newDataGen.generateUpdates("005", 
100)).toList
+    val records5 = recordsToStrings(newDataGen.generateUniqueUpdates("005", 
50)).toList
     val inputDF5: Dataset[Row] = 
spark.read.json(spark.sparkContext.parallelize(records5, 2))
     inputDF5.write.format("org.apache.hudi")
       .options(commonOpts)
-      .option(DataSourceWriteOptions.OPERATION_OPT_KEY, 
DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
+      .option("hoodie.compact.inline", "true")
       .mode(SaveMode.Append)
       .save(basePath)
+    val commit5Time = HoodieDataSourceHelpers.latestCommit(fs, basePath)
     val hudiSnapshotDF5 = spark.read.format("org.apache.hudi")
       .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, 
DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL)
       .load(basePath + "/*/*/*/*")
     assertEquals(200, hudiSnapshotDF5.count())
+
+    // Sixth Operation:
+    // Insert 2 records and trigger compaction.
+    val records6 = recordsToStrings(newDataGen.generateInserts("006", 
2)).toList
+    val inputDF6: Dataset[Row] = 
spark.read.json(spark.sparkContext.parallelize(records6, 2))
+    inputDF6.write.format("org.apache.hudi")
+      .options(commonOpts)
+      .option("hoodie.compact.inline", "true")
+      .mode(SaveMode.Append)
+      .save(basePath)
+    val commit6Time = HoodieDataSourceHelpers.latestCommit(fs, basePath)
+    val hudiSnapshotDF6 = spark.read.format("org.apache.hudi")
+      .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, 
DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL)
+      .load(basePath + "/2020/01/10/*")
+    assertEquals(102, hudiSnapshotDF6.count())
+    val hudiIncDF6 = spark.read.format("org.apache.hudi")
+      .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, 
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
+      .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, commit5Time)
+      .option(DataSourceReadOptions.END_INSTANTTIME_OPT_KEY, commit6Time)
+      .load(basePath)
+    // compaction updated 150 rows + inserted 2 new row
+    assertEquals(152, hudiIncDF6.count())
   }
 
   @Test
@@ -276,6 +356,13 @@ class TestMORDataSource extends HoodieClientTestBase {
       .load(basePath + "/*/*/*/*")
     assertEquals(100, hudiSnapshotDF2Unmerge.count())
 
+    // incremental query, read 50 delete records from log file and get 0 count.
+    val hudiIncDF1 = spark.read.format("org.apache.hudi")
+      .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, 
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
+      .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, commit2Time)
+      .load(basePath)
+    assertEquals(0, hudiIncDF1.count())
+
     // Third Operation:
     // Upsert 50 delete records to delete the reset
     // Snopshot view should read 0 record
@@ -308,6 +395,8 @@ class TestMORDataSource extends HoodieClientTestBase {
     val hudiSnapshotDF1 = spark.read.format("org.apache.hudi")
       .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, 
DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL)
       .load(basePath + "/*/*/*/*")
+    val commit1Time = 
hudiSnapshotDF1.select("_hoodie_commit_time").head().get(0).toString
+
     assertEquals(100, hudiSnapshotDF1.count())
     // select nested columns with order different from the actual schema
     assertEquals("amount,currency,tip_history,_hoodie_commit_seqno",
@@ -329,34 +418,43 @@ class TestMORDataSource extends HoodieClientTestBase {
     val hudiSnapshotDF2 = spark.read.format("org.apache.hudi")
       .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, 
DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL)
       .load(basePath + "/*/*/*/*")
-
-    val commit1Time = 
hudiSnapshotDF1.select("_hoodie_commit_time").head().get(0).toString
+    val hudiIncDF1 = spark.read.format("org.apache.hudi")
+      .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, 
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
+      .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, "000")
+      .load(basePath)
+    val hudiIncDF1Skipmerge = spark.read.format("org.apache.hudi")
+      .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, 
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
+      .option(DataSourceReadOptions.REALTIME_MERGE_OPT_KEY, 
DataSourceReadOptions.REALTIME_SKIP_MERGE_OPT_VAL)
+      .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, "000")
+      .load(basePath)
+    val hudiIncDF2 = spark.read.format("org.apache.hudi")
+      .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, 
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
+      .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, commit1Time)
+      .load(basePath)
 
     // filter first commit and only read log records
     assertEquals(50,  hudiSnapshotDF2.select("_hoodie_commit_seqno", 
"fare.amount", "fare.currency", "tip_history")
       .filter(col("_hoodie_commit_time") > commit1Time).count())
+    assertEquals(50,  hudiIncDF1.select("_hoodie_commit_seqno", "fare.amount", 
"fare.currency", "tip_history")
+      .filter(col("_hoodie_commit_time") > commit1Time).count())
+    assertEquals(50,  hudiIncDF2
+      .select("_hoodie_commit_seqno", "fare.amount", "fare.currency", 
"tip_history").count())
+    assertEquals(150,  hudiIncDF1Skipmerge
+      .select("_hoodie_commit_seqno", "fare.amount", "fare.currency", 
"tip_history").count())
 
     // select nested columns with order different from the actual schema
-    assertEquals("amount,currency,tip_history,_hoodie_commit_seqno",
-      hudiSnapshotDF2
-      .select("fare.amount", "fare.currency", "tip_history", 
"_hoodie_commit_seqno")
-      .orderBy(desc("_hoodie_commit_seqno"))
-      .columns.mkString(","))
-
-    // Correctly loading type
-    val sampleRow = hudiSnapshotDF2
-      .select("begin_lat", "current_date", "fare.currency", "tip_history", 
"nation")
-      .orderBy(desc("_hoodie_commit_time"))
-      .head()
-    assertEquals(sampleRow.getDouble(0), sampleRow.get(0))
-    assertEquals(sampleRow.getLong(1), sampleRow.get(1))
-    assertEquals(sampleRow.getString(2), sampleRow.get(2))
-    assertEquals(sampleRow.getSeq(3), sampleRow.get(3))
-    assertEquals(sampleRow.getStruct(4), sampleRow.get(4))
+    verifySchemaAndTypes(hudiSnapshotDF1)
+    verifySchemaAndTypes(hudiSnapshotDF2)
+    verifySchemaAndTypes(hudiIncDF1)
+    verifySchemaAndTypes(hudiIncDF2)
+    verifySchemaAndTypes(hudiIncDF1Skipmerge)
 
     // make sure show() work
-    hudiSnapshotDF1.show(1)
-    hudiSnapshotDF2.show(1)
+    verifyShow(hudiSnapshotDF1)
+    verifyShow(hudiSnapshotDF2)
+    verifyShow(hudiIncDF1)
+    verifyShow(hudiIncDF2)
+    verifyShow(hudiIncDF1Skipmerge)
   }
 
   @Test
@@ -404,4 +502,25 @@ class TestMORDataSource extends HoodieClientTestBase {
     hudiSnapshotDF1.show(1)
     hudiSnapshotDF2.show(1)
   }
+
+  def verifySchemaAndTypes(df: DataFrame): Unit = {
+    assertEquals("amount,currency,tip_history,_hoodie_commit_seqno",
+      df.select("fare.amount", "fare.currency", "tip_history", 
"_hoodie_commit_seqno")
+        .orderBy(desc("_hoodie_commit_seqno"))
+        .columns.mkString(","))
+    val sampleRow = df
+      .select("begin_lat", "current_date", "fare.currency", "tip_history", 
"nation")
+      .orderBy(desc("_hoodie_commit_time"))
+      .head()
+    assertEquals(sampleRow.getDouble(0), sampleRow.get(0))
+    assertEquals(sampleRow.getLong(1), sampleRow.get(1))
+    assertEquals(sampleRow.getString(2), sampleRow.get(2))
+    assertEquals(sampleRow.getSeq(3), sampleRow.get(3))
+    assertEquals(sampleRow.getStruct(4), sampleRow.get(4))
+  }
+
+  def verifyShow(df: DataFrame): Unit = {
+    df.show(1)
+    df.select("_hoodie_commit_seqno", "fare.amount", "fare.currency", 
"tip_history").show(1)
+  }
 }

Reply via email to