This is an automated email from the ASF dual-hosted git repository. vinoth pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push: new 79ec7b4 [HUDI-920] Support Incremental query for MOR table (#1938) 79ec7b4 is described below commit 79ec7b4894b997183a6e10fdc19d34f5ab4ea437 Author: Gary Li <yanjia.gary...@gmail.com> AuthorDate: Sun Jan 10 00:02:08 2021 +0800 [HUDI-920] Support Incremental query for MOR table (#1938) --- .../hudi/hadoop/utils/HoodieInputFormatUtils.java | 41 ++++ .../main/scala/org/apache/hudi/DefaultSource.scala | 7 +- .../org/apache/hudi/HoodieMergeOnReadRDD.scala | 60 +++++- .../org/apache/hudi/IncrementalRelation.scala | 11 +- .../hudi/MergeOnReadIncrementalRelation.scala | 218 +++++++++++++++++++++ .../apache/hudi/MergeOnReadSnapshotRelation.scala | 6 +- .../apache/hudi/functional/TestMORDataSource.scala | 165 +++++++++++++--- 7 files changed, 463 insertions(+), 45 deletions(-) diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java index cf7da54..019b558 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java @@ -23,6 +23,7 @@ import org.apache.hudi.common.model.HoodieBaseFile; import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieFileFormat; import org.apache.hudi.common.model.HoodiePartitionMetadata; +import org.apache.hudi.common.model.HoodieWriteStat; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieDefaultTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; @@ -470,4 +471,44 @@ public class HoodieInputFormatUtils { } } + /** + * Iterate through a list of commits in ascending order, and extract the file status of + * all affected files from the commits metadata grouping by partition path. If the files has + * been touched multiple times in the given commits, the return value will keep the one + * from the latest commit. + * @param basePath + * @param commitsToCheck + * @param timeline + * @return HashMap<partitionPath, HashMap<fileName, FileStatus>> + * @throws IOException + */ + public static HashMap<String, HashMap<String, FileStatus>> listAffectedFilesForCommits( + Path basePath, List<HoodieInstant> commitsToCheck, HoodieTimeline timeline) throws IOException { + // TODO: Use HoodieMetaTable to extract affected file directly. + HashMap<String, HashMap<String, FileStatus>> partitionToFileStatusesMap = new HashMap<>(); + List<HoodieInstant> sortedCommitsToCheck = new ArrayList<>(commitsToCheck); + sortedCommitsToCheck.sort(HoodieInstant::compareTo); + // Iterate through the given commits. + for (HoodieInstant commit: sortedCommitsToCheck) { + HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes(timeline.getInstantDetails(commit).get(), + HoodieCommitMetadata.class); + // Iterate through all the affected partitions of a commit. + for (Map.Entry<String, List<HoodieWriteStat>> entry: commitMetadata.getPartitionToWriteStats().entrySet()) { + if (!partitionToFileStatusesMap.containsKey(entry.getKey())) { + partitionToFileStatusesMap.put(entry.getKey(), new HashMap<>()); + } + // Iterate through all the written files of this partition. + for (HoodieWriteStat stat : entry.getValue()) { + String relativeFilePath = stat.getPath(); + Path fullPath = relativeFilePath != null ? FSUtils.getPartitionPath(basePath, relativeFilePath) : null; + if (fullPath != null) { + FileStatus fs = new FileStatus(stat.getFileSizeInBytes(), false, 0, 0, + 0, fullPath); + partitionToFileStatusesMap.get(entry.getKey()).put(fullPath.getName(), fs); + } + } + } + } + return partitionToFileStatusesMap; + } } diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala index 0e322e2..d26390d 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala @@ -94,7 +94,12 @@ class DefaultSource extends RelationProvider } else if(parameters(QUERY_TYPE_OPT_KEY).equals(QUERY_TYPE_READ_OPTIMIZED_OPT_VAL)) { getBaseFileOnlyView(sqlContext, parameters, schema, readPaths, isBootstrappedTable, globPaths, metaClient) } else if (parameters(QUERY_TYPE_OPT_KEY).equals(QUERY_TYPE_INCREMENTAL_OPT_VAL)) { - new IncrementalRelation(sqlContext, tablePath, optParams, schema) + val metaClient = new HoodieTableMetaClient(fs.getConf, tablePath) + if (metaClient.getTableType.equals(HoodieTableType.MERGE_ON_READ)) { + new MergeOnReadIncrementalRelation(sqlContext, optParams, schema, metaClient) + } else { + new IncrementalRelation(sqlContext, optParams, schema, metaClient) + } } else { throw new HoodieException("Invalid query type :" + parameters(QUERY_TYPE_OPT_KEY)) } diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala index e8caa63..e20c33c 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala @@ -50,30 +50,32 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext, private val confBroadcast = sc.broadcast(new SerializableWritable(config)) override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = { - val mergeParquetPartition = split.asInstanceOf[HoodieMergeOnReadPartition] - mergeParquetPartition.split match { + val mergeOnReadPartition = split.asInstanceOf[HoodieMergeOnReadPartition] + mergeOnReadPartition.split match { case dataFileOnlySplit if dataFileOnlySplit.logPaths.isEmpty => - read(mergeParquetPartition.split.dataFile, requiredSchemaFileReader) + read(dataFileOnlySplit.dataFile.get, requiredSchemaFileReader) + case logFileOnlySplit if logFileOnlySplit.dataFile.isEmpty => + logFileIterator(logFileOnlySplit, getConfig) case skipMergeSplit if skipMergeSplit.mergeType .equals(DataSourceReadOptions.REALTIME_SKIP_MERGE_OPT_VAL) => skipMergeFileIterator( skipMergeSplit, - read(mergeParquetPartition.split.dataFile, requiredSchemaFileReader), + read(skipMergeSplit.dataFile.get, requiredSchemaFileReader), getConfig ) case payloadCombineSplit if payloadCombineSplit.mergeType .equals(DataSourceReadOptions.REALTIME_PAYLOAD_COMBINE_OPT_VAL) => payloadCombineFileIterator( payloadCombineSplit, - read(mergeParquetPartition.split.dataFile, fullSchemaFileReader), + read(payloadCombineSplit.dataFile.get, fullSchemaFileReader), getConfig ) case _ => throw new HoodieException(s"Unable to select an Iterator to read the Hoodie MOR File Split for " + - s"file path: ${mergeParquetPartition.split.dataFile.filePath}" + - s"log paths: ${mergeParquetPartition.split.logPaths.toString}" + - s"hoodie table path: ${mergeParquetPartition.split.tablePath}" + - s"spark partition Index: ${mergeParquetPartition.index}" + - s"merge type: ${mergeParquetPartition.split.mergeType}") + s"file path: ${mergeOnReadPartition.split.dataFile.get.filePath}" + + s"log paths: ${mergeOnReadPartition.split.logPaths.toString}" + + s"hoodie table path: ${mergeOnReadPartition.split.tablePath}" + + s"spark partition Index: ${mergeOnReadPartition.index}" + + s"merge type: ${mergeOnReadPartition.split.mergeType}") } } @@ -101,6 +103,44 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext, rows } + private def logFileIterator(split: HoodieMergeOnReadFileSplit, + config: Configuration): Iterator[InternalRow] = + new Iterator[InternalRow] { + private val tableAvroSchema = new Schema.Parser().parse(tableState.tableAvroSchema) + private val requiredAvroSchema = new Schema.Parser().parse(tableState.requiredAvroSchema) + private val requiredFieldPosition = + tableState.requiredStructSchema + .map(f => tableAvroSchema.getField(f.name).pos()).toList + private val recordBuilder = new GenericRecordBuilder(requiredAvroSchema) + private val deserializer = new AvroDeserializer(requiredAvroSchema, tableState.requiredStructSchema) + private val unsafeProjection = UnsafeProjection.create(tableState.requiredStructSchema) + private val logRecords = HoodieMergeOnReadRDD.scanLog(split, tableAvroSchema, config).getRecords + private val logRecordsKeyIterator = logRecords.keySet().iterator().asScala + + private var recordToLoad: InternalRow = _ + override def hasNext: Boolean = { + if (logRecordsKeyIterator.hasNext) { + val curAvrokey = logRecordsKeyIterator.next() + val curAvroRecord = logRecords.get(curAvrokey).getData.getInsertValue(tableAvroSchema) + if (!curAvroRecord.isPresent) { + // delete record found, skipping + this.hasNext + } else { + val requiredAvroRecord = AvroConversionUtils + .buildAvroRecordBySchema(curAvroRecord.get(), requiredAvroSchema, requiredFieldPosition, recordBuilder) + recordToLoad = unsafeProjection(deserializer.deserialize(requiredAvroRecord).asInstanceOf[InternalRow]) + true + } + } else { + false + } + } + + override def next(): InternalRow = { + recordToLoad + } + } + private def skipMergeFileIterator(split: HoodieMergeOnReadFileSplit, baseFileIterator: Iterator[InternalRow], config: Configuration): Iterator[InternalRow] = diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/IncrementalRelation.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/IncrementalRelation.scala index 9cd562c..5c20656 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/IncrementalRelation.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/IncrementalRelation.scala @@ -42,19 +42,14 @@ import scala.collection.mutable * */ class IncrementalRelation(val sqlContext: SQLContext, - val basePath: String, val optParams: Map[String, String], - val userSchema: StructType) extends BaseRelation with TableScan { + val userSchema: StructType, + val metaClient: HoodieTableMetaClient) extends BaseRelation with TableScan { private val log = LogManager.getLogger(classOf[IncrementalRelation]) val skeletonSchema: StructType = HoodieSparkUtils.getMetaSchema - private val metaClient = new HoodieTableMetaClient(sqlContext.sparkContext.hadoopConfiguration, basePath, true) - - // MOR tables not supported yet - if (metaClient.getTableType.equals(HoodieTableType.MERGE_ON_READ)) { - throw new HoodieException("Incremental view not implemented yet, for merge-on-read tables") - } + private val basePath = metaClient.getBasePath // TODO : Figure out a valid HoodieWriteConfig private val hoodieTable = HoodieSparkTable.create(HoodieWriteConfig.newBuilder().withPath(basePath).build(), new HoodieSparkEngineContext(new JavaSparkContext(sqlContext.sparkContext)), diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala new file mode 100644 index 0000000..d7b8cff --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala @@ -0,0 +1,218 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi + +import org.apache.hadoop.fs.{FileStatus, FileSystem, GlobPattern, Path} +import org.apache.hadoop.mapred.JobConf +import org.apache.hudi.common.fs.FSUtils +import org.apache.hudi.common.model.HoodieRecord +import org.apache.hudi.common.table.view.HoodieTableFileSystemView +import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver} +import org.apache.hudi.exception.HoodieException +import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.listAffectedFilesForCommits +import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils.getMaxCompactionMemoryInBytes +import org.apache.log4j.LogManager +import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.execution.datasources.PartitionedFile +import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat +import org.apache.spark.sql.sources._ +import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.{Row, SQLContext} + +import scala.collection.JavaConversions._ +import scala.collection.mutable.ListBuffer + +/** + * Experimental. + * Relation, that implements the Hoodie incremental view for Merge On Read table. + * + */ +class MergeOnReadIncrementalRelation(val sqlContext: SQLContext, + val optParams: Map[String, String], + val userSchema: StructType, + val metaClient: HoodieTableMetaClient) + extends BaseRelation with PrunedFilteredScan { + + private val log = LogManager.getLogger(classOf[MergeOnReadIncrementalRelation]) + private val conf = sqlContext.sparkContext.hadoopConfiguration + private val jobConf = new JobConf(conf) + private val fs = FSUtils.getFs(metaClient.getBasePath, conf) + private val commitTimeline = metaClient.getCommitsAndCompactionTimeline.filterCompletedInstants() + if (commitTimeline.empty()) { + throw new HoodieException("No instants to incrementally pull") + } + if (!optParams.contains(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY)) { + throw new HoodieException(s"Specify the begin instant time to pull from using " + + s"option ${DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY}") + } + + private val lastInstant = commitTimeline.lastInstant().get() + private val mergeType = optParams.getOrElse( + DataSourceReadOptions.REALTIME_MERGE_OPT_KEY, + DataSourceReadOptions.DEFAULT_REALTIME_MERGE_OPT_VAL) + + private val commitsTimelineToReturn = commitTimeline.findInstantsInRange( + optParams(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY), + optParams.getOrElse(DataSourceReadOptions.END_INSTANTTIME_OPT_KEY, lastInstant.getTimestamp)) + log.debug(s"${commitsTimelineToReturn.getInstants.iterator().toList.map(f => f.toString).mkString(",")}") + private val commitsToReturn = commitsTimelineToReturn.getInstants.iterator().toList + private val schemaUtil = new TableSchemaResolver(metaClient) + private val tableAvroSchema = schemaUtil.getTableAvroSchema + private val tableStructSchema = AvroConversionUtils.convertAvroSchemaToStructType(tableAvroSchema) + private val maxCompactionMemoryInBytes = getMaxCompactionMemoryInBytes(jobConf) + private val fileIndex = buildFileIndex() + + override def schema: StructType = tableStructSchema + + override def needConversion: Boolean = false + + override def unhandledFilters(filters: Array[Filter]): Array[Filter] = { + val isNotNullFilter = IsNotNull(HoodieRecord.COMMIT_TIME_METADATA_FIELD) + val largerThanFilter = GreaterThanOrEqual(HoodieRecord.COMMIT_TIME_METADATA_FIELD, commitsToReturn.head.getTimestamp) + val lessThanFilter = LessThanOrEqual(HoodieRecord.COMMIT_TIME_METADATA_FIELD, commitsToReturn.last.getTimestamp) + filters :+isNotNullFilter :+ largerThanFilter :+ lessThanFilter + } + + override def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] = { + log.debug(s"buildScan requiredColumns = ${requiredColumns.mkString(",")}") + log.debug(s"buildScan filters = ${filters.mkString(",")}") + // config to ensure the push down filter for parquet will be applied. + sqlContext.sparkSession.sessionState.conf.setConfString("spark.sql.parquet.filterPushdown", "true") + sqlContext.sparkSession.sessionState.conf.setConfString("spark.sql.parquet.recordLevelFilter.enabled", "true") + sqlContext.sparkSession.sessionState.conf.setConfString("spark.sql.parquet.enableVectorizedReader", "false") + val pushDownFilter = { + val isNotNullFilter = IsNotNull(HoodieRecord.COMMIT_TIME_METADATA_FIELD) + val largerThanFilter = GreaterThanOrEqual(HoodieRecord.COMMIT_TIME_METADATA_FIELD, commitsToReturn.head.getTimestamp) + val lessThanFilter = LessThanOrEqual(HoodieRecord.COMMIT_TIME_METADATA_FIELD, commitsToReturn.last.getTimestamp) + filters :+isNotNullFilter :+ largerThanFilter :+ lessThanFilter + } + var requiredStructSchema = StructType(Seq()) + requiredColumns.foreach(col => { + val field = tableStructSchema.find(_.name == col) + if (field.isDefined) { + requiredStructSchema = requiredStructSchema.add(field.get) + } + }) + val requiredAvroSchema = AvroConversionUtils + .convertStructTypeToAvroSchema(requiredStructSchema, tableAvroSchema.getName, tableAvroSchema.getNamespace) + val hoodieTableState = HoodieMergeOnReadTableState( + tableStructSchema, + requiredStructSchema, + tableAvroSchema.toString, + requiredAvroSchema.toString, + fileIndex + ) + val fullSchemaParquetReader = new ParquetFileFormat().buildReaderWithPartitionValues( + sparkSession = sqlContext.sparkSession, + dataSchema = tableStructSchema, + partitionSchema = StructType(Nil), + requiredSchema = tableStructSchema, + filters = pushDownFilter, + options = optParams, + hadoopConf = sqlContext.sparkSession.sessionState.newHadoopConf() + ) + val requiredSchemaParquetReader = new ParquetFileFormat().buildReaderWithPartitionValues( + sparkSession = sqlContext.sparkSession, + dataSchema = tableStructSchema, + partitionSchema = StructType(Nil), + requiredSchema = requiredStructSchema, + filters = pushDownFilter, + options = optParams, + hadoopConf = sqlContext.sparkSession.sessionState.newHadoopConf() + ) + + // Follow the implementation of Spark internal HadoopRDD to handle the broadcast configuration. + FileSystem.getLocal(jobConf) + SparkHadoopUtil.get.addCredentials(jobConf) + val rdd = new HoodieMergeOnReadRDD( + sqlContext.sparkContext, + jobConf, + fullSchemaParquetReader, + requiredSchemaParquetReader, + hoodieTableState + ) + rdd.asInstanceOf[RDD[Row]] + } + + def buildFileIndex(): List[HoodieMergeOnReadFileSplit] = { + val partitionsWithFileStatus = listAffectedFilesForCommits(new Path(metaClient.getBasePath), + commitsToReturn, commitsTimelineToReturn) + val affectedFileStatus = new ListBuffer[FileStatus] + partitionsWithFileStatus.iterator.foreach(p => + p._2.iterator.foreach(status => affectedFileStatus += status._2)) + val fsView = new HoodieTableFileSystemView(metaClient, + commitsTimelineToReturn, affectedFileStatus.toArray) + + // Iterate partitions to create splits + val fileGroup = partitionsWithFileStatus.keySet().flatMap(partitionPath => + fsView.getAllFileGroups(partitionPath).iterator() + ).toList + val latestCommit = fsView.getLastInstant.get().getTimestamp + if (log.isDebugEnabled) { + fileGroup.foreach(f => log.debug(s"current file group id: " + + s"${f.getFileGroupId} and file slices ${f.getLatestFileSlice.get().toString}")) + } + + // Filter files based on user defined glob pattern + val pathGlobPattern = optParams.getOrElse( + DataSourceReadOptions.INCR_PATH_GLOB_OPT_KEY, + DataSourceReadOptions.DEFAULT_INCR_PATH_GLOB_OPT_VAL) + val filteredFileGroup = if(!pathGlobPattern + .equals(DataSourceReadOptions.DEFAULT_INCR_PATH_GLOB_OPT_VAL)) { + val globMatcher = new GlobPattern("*" + pathGlobPattern) + fileGroup.filter(f => { + if (f.getLatestFileSlice.get().getBaseFile.isPresent) { + globMatcher.matches(f.getLatestFileSlice.get().getBaseFile.get.getPath) + } else { + globMatcher.matches(f.getLatestFileSlice.get().getLatestLogFile.get().getPath.toString) + } + }) + } else { + fileGroup + } + + // Build HoodieMergeOnReadFileSplit. + filteredFileGroup.map(f => { + // Ensure get the base file when there is a pending compaction, which means the base file + // won't be in the latest file slice. + val baseFiles = f.getAllFileSlices.iterator().filter(slice => slice.getBaseFile.isPresent).toList + val partitionedFile = if (baseFiles.nonEmpty) { + val baseFile = baseFiles.head.getBaseFile + Option(PartitionedFile(InternalRow.empty, baseFile.get.getPath, 0, baseFile.get.getFileLen)) + } + else { + Option.empty + } + + val logPath = if (f.getLatestFileSlice.isPresent) { + //If log path doesn't exist, we still include an empty path to avoid using + // the default parquet reader to ensure the push down filter will be applied. + Option(f.getLatestFileSlice.get().getLogFiles.iterator().toList + .map(logfile => logfile.getPath.toString)) + } + else { + Option.empty + } + + HoodieMergeOnReadFileSplit(partitionedFile, logPath, + latestCommit, metaClient.getBasePath, maxCompactionMemoryInBytes, mergeType) + }) + } +} diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala index 0b81fa7..328e3c3 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala @@ -39,7 +39,7 @@ import org.apache.spark.sql.types.StructType import scala.collection.JavaConverters._ -case class HoodieMergeOnReadFileSplit(dataFile: PartitionedFile, +case class HoodieMergeOnReadFileSplit(dataFile: Option[PartitionedFile], logPaths: Option[List[String]], latestCommit: String, tablePath: String, @@ -99,7 +99,7 @@ class MergeOnReadSnapshotRelation(val sqlContext: SQLContext, dataSchema = tableStructSchema, partitionSchema = StructType(Nil), requiredSchema = tableStructSchema, - filters = Seq(), + filters = filters, options = optParams, hadoopConf = sqlContext.sparkSession.sessionState.newHadoopConf() ) @@ -140,7 +140,7 @@ class MergeOnReadSnapshotRelation(val sqlContext: SQLContext, val baseFile = kv._1 val logPaths = if (kv._2.isEmpty) Option.empty else Option(kv._2.asScala.toList) val partitionedFile = PartitionedFile(InternalRow.empty, baseFile.getPath, 0, baseFile.getFileLen) - HoodieMergeOnReadFileSplit(partitionedFile, logPaths, latestCommit, + HoodieMergeOnReadFileSplit(Option(partitionedFile), logPaths, latestCommit, metaClient.getBasePath, maxCompactionMemoryInBytes, mergeType) }).toList fileSplits diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala index 7c73669..121957e 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala @@ -19,7 +19,7 @@ package org.apache.hudi.functional import org.apache.hudi.common.fs.FSUtils import org.apache.hudi.common.testutils.HoodieTestDataGenerator -import org.apache.hudi.config.HoodieWriteConfig +import org.apache.hudi.config.{HoodieCompactionConfig, HoodieWriteConfig} import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, HoodieDataSourceHelpers} import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings import org.apache.hudi.testutils.HoodieClientTestBase @@ -29,6 +29,7 @@ import org.apache.spark.sql.functions._ import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue} import org.junit.jupiter.api.{AfterEach, BeforeEach, Test} + import scala.collection.JavaConversions._ /** @@ -157,6 +158,39 @@ class TestMORDataSource extends HoodieClientTestBase { assertTrue(commit2Time > commit1Time) assertEquals(100, hudiSnapshotDF2.join(hudiSnapshotDF1, Seq("_hoodie_record_key"), "left").count()) + // incremental view + // base file only + val hudiIncDF1 = spark.read.format("org.apache.hudi") + .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL) + .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, "000") + .option(DataSourceReadOptions.END_INSTANTTIME_OPT_KEY, commit1Time) + .load(basePath) + assertEquals(100, hudiIncDF1.count()) + assertEquals(1, hudiIncDF1.select("_hoodie_commit_time").distinct().count()) + assertEquals(commit1Time, hudiIncDF1.select("_hoodie_commit_time").head().get(0).toString) + hudiIncDF1.show(1) + // log file only + val hudiIncDF2 = spark.read.format("org.apache.hudi") + .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL) + .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, commit1Time) + .option(DataSourceReadOptions.END_INSTANTTIME_OPT_KEY, commit2Time) + .load(basePath) + assertEquals(100, hudiIncDF2.count()) + assertEquals(1, hudiIncDF2.select("_hoodie_commit_time").distinct().count()) + assertEquals(commit2Time, hudiIncDF2.select("_hoodie_commit_time").head().get(0).toString) + hudiIncDF2.show(1) + + // base file + log file + val hudiIncDF3 = spark.read.format("org.apache.hudi") + .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL) + .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, "000") + .option(DataSourceReadOptions.END_INSTANTTIME_OPT_KEY, commit2Time) + .load(basePath) + assertEquals(100, hudiIncDF3.count()) + // log file being load + assertEquals(1, hudiIncDF3.select("_hoodie_commit_time").distinct().count()) + assertEquals(commit2Time, hudiIncDF3.select("_hoodie_commit_time").head().get(0).toString) + // Unmerge val hudiSnapshotSkipMergeDF2 = spark.read.format("org.apache.hudi") .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL) @@ -193,6 +227,22 @@ class TestMORDataSource extends HoodieClientTestBase { assertEquals(50, hudiSnapshotDF3.join(hudiSnapshotDF2, Seq("_hoodie_record_key", "_hoodie_commit_time"), "inner").count()) + // incremental query from commit2Time + val hudiIncDF4 = spark.read.format("org.apache.hudi") + .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL) + .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, commit2Time) + .load(basePath) + assertEquals(50, hudiIncDF4.count()) + + // skip merge incremental view + // including commit 2 and commit 3 + val hudiIncDF4SkipMerge = spark.read.format("org.apache.hudi") + .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL) + .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, "000") + .option(DataSourceReadOptions.REALTIME_MERGE_OPT_KEY, DataSourceReadOptions.REALTIME_SKIP_MERGE_OPT_VAL) + .load(basePath) + assertEquals(200, hudiIncDF4SkipMerge.count()) + // Fourth Operation: // Insert records to a new partition. Produced a new parquet file. // SNAPSHOT view should read the latest log files from the default partition and parquet from the new partition. @@ -213,21 +263,51 @@ class TestMORDataSource extends HoodieClientTestBase { assertEquals(100, hudiSnapshotDF1.join(hudiSnapshotDF4, Seq("_hoodie_record_key"), "inner").count()) + // Incremental query, 50 from log file, 100 from base file of the new partition. + val hudiIncDF5 = spark.read.format("org.apache.hudi") + .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL) + .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, commit2Time) + .load(basePath) + assertEquals(150, hudiIncDF5.count()) + // Fifth Operation: // Upsert records to the new partition. Produced a newer version of parquet file. // SNAPSHOT view should read the latest log files from the default partition // and the latest parquet from the new partition. - val records5 = recordsToStrings(newDataGen.generateUpdates("005", 100)).toList + val records5 = recordsToStrings(newDataGen.generateUniqueUpdates("005", 50)).toList val inputDF5: Dataset[Row] = spark.read.json(spark.sparkContext.parallelize(records5, 2)) inputDF5.write.format("org.apache.hudi") .options(commonOpts) - .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL) + .option("hoodie.compact.inline", "true") .mode(SaveMode.Append) .save(basePath) + val commit5Time = HoodieDataSourceHelpers.latestCommit(fs, basePath) val hudiSnapshotDF5 = spark.read.format("org.apache.hudi") .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL) .load(basePath + "/*/*/*/*") assertEquals(200, hudiSnapshotDF5.count()) + + // Sixth Operation: + // Insert 2 records and trigger compaction. + val records6 = recordsToStrings(newDataGen.generateInserts("006", 2)).toList + val inputDF6: Dataset[Row] = spark.read.json(spark.sparkContext.parallelize(records6, 2)) + inputDF6.write.format("org.apache.hudi") + .options(commonOpts) + .option("hoodie.compact.inline", "true") + .mode(SaveMode.Append) + .save(basePath) + val commit6Time = HoodieDataSourceHelpers.latestCommit(fs, basePath) + val hudiSnapshotDF6 = spark.read.format("org.apache.hudi") + .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL) + .load(basePath + "/2020/01/10/*") + assertEquals(102, hudiSnapshotDF6.count()) + val hudiIncDF6 = spark.read.format("org.apache.hudi") + .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL) + .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, commit5Time) + .option(DataSourceReadOptions.END_INSTANTTIME_OPT_KEY, commit6Time) + .load(basePath) + // compaction updated 150 rows + inserted 2 new row + assertEquals(152, hudiIncDF6.count()) } @Test @@ -276,6 +356,13 @@ class TestMORDataSource extends HoodieClientTestBase { .load(basePath + "/*/*/*/*") assertEquals(100, hudiSnapshotDF2Unmerge.count()) + // incremental query, read 50 delete records from log file and get 0 count. + val hudiIncDF1 = spark.read.format("org.apache.hudi") + .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL) + .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, commit2Time) + .load(basePath) + assertEquals(0, hudiIncDF1.count()) + // Third Operation: // Upsert 50 delete records to delete the reset // Snopshot view should read 0 record @@ -308,6 +395,8 @@ class TestMORDataSource extends HoodieClientTestBase { val hudiSnapshotDF1 = spark.read.format("org.apache.hudi") .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL) .load(basePath + "/*/*/*/*") + val commit1Time = hudiSnapshotDF1.select("_hoodie_commit_time").head().get(0).toString + assertEquals(100, hudiSnapshotDF1.count()) // select nested columns with order different from the actual schema assertEquals("amount,currency,tip_history,_hoodie_commit_seqno", @@ -329,34 +418,43 @@ class TestMORDataSource extends HoodieClientTestBase { val hudiSnapshotDF2 = spark.read.format("org.apache.hudi") .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL) .load(basePath + "/*/*/*/*") - - val commit1Time = hudiSnapshotDF1.select("_hoodie_commit_time").head().get(0).toString + val hudiIncDF1 = spark.read.format("org.apache.hudi") + .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL) + .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, "000") + .load(basePath) + val hudiIncDF1Skipmerge = spark.read.format("org.apache.hudi") + .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL) + .option(DataSourceReadOptions.REALTIME_MERGE_OPT_KEY, DataSourceReadOptions.REALTIME_SKIP_MERGE_OPT_VAL) + .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, "000") + .load(basePath) + val hudiIncDF2 = spark.read.format("org.apache.hudi") + .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL) + .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, commit1Time) + .load(basePath) // filter first commit and only read log records assertEquals(50, hudiSnapshotDF2.select("_hoodie_commit_seqno", "fare.amount", "fare.currency", "tip_history") .filter(col("_hoodie_commit_time") > commit1Time).count()) + assertEquals(50, hudiIncDF1.select("_hoodie_commit_seqno", "fare.amount", "fare.currency", "tip_history") + .filter(col("_hoodie_commit_time") > commit1Time).count()) + assertEquals(50, hudiIncDF2 + .select("_hoodie_commit_seqno", "fare.amount", "fare.currency", "tip_history").count()) + assertEquals(150, hudiIncDF1Skipmerge + .select("_hoodie_commit_seqno", "fare.amount", "fare.currency", "tip_history").count()) // select nested columns with order different from the actual schema - assertEquals("amount,currency,tip_history,_hoodie_commit_seqno", - hudiSnapshotDF2 - .select("fare.amount", "fare.currency", "tip_history", "_hoodie_commit_seqno") - .orderBy(desc("_hoodie_commit_seqno")) - .columns.mkString(",")) - - // Correctly loading type - val sampleRow = hudiSnapshotDF2 - .select("begin_lat", "current_date", "fare.currency", "tip_history", "nation") - .orderBy(desc("_hoodie_commit_time")) - .head() - assertEquals(sampleRow.getDouble(0), sampleRow.get(0)) - assertEquals(sampleRow.getLong(1), sampleRow.get(1)) - assertEquals(sampleRow.getString(2), sampleRow.get(2)) - assertEquals(sampleRow.getSeq(3), sampleRow.get(3)) - assertEquals(sampleRow.getStruct(4), sampleRow.get(4)) + verifySchemaAndTypes(hudiSnapshotDF1) + verifySchemaAndTypes(hudiSnapshotDF2) + verifySchemaAndTypes(hudiIncDF1) + verifySchemaAndTypes(hudiIncDF2) + verifySchemaAndTypes(hudiIncDF1Skipmerge) // make sure show() work - hudiSnapshotDF1.show(1) - hudiSnapshotDF2.show(1) + verifyShow(hudiSnapshotDF1) + verifyShow(hudiSnapshotDF2) + verifyShow(hudiIncDF1) + verifyShow(hudiIncDF2) + verifyShow(hudiIncDF1Skipmerge) } @Test @@ -404,4 +502,25 @@ class TestMORDataSource extends HoodieClientTestBase { hudiSnapshotDF1.show(1) hudiSnapshotDF2.show(1) } + + def verifySchemaAndTypes(df: DataFrame): Unit = { + assertEquals("amount,currency,tip_history,_hoodie_commit_seqno", + df.select("fare.amount", "fare.currency", "tip_history", "_hoodie_commit_seqno") + .orderBy(desc("_hoodie_commit_seqno")) + .columns.mkString(",")) + val sampleRow = df + .select("begin_lat", "current_date", "fare.currency", "tip_history", "nation") + .orderBy(desc("_hoodie_commit_time")) + .head() + assertEquals(sampleRow.getDouble(0), sampleRow.get(0)) + assertEquals(sampleRow.getLong(1), sampleRow.get(1)) + assertEquals(sampleRow.getString(2), sampleRow.get(2)) + assertEquals(sampleRow.getSeq(3), sampleRow.get(3)) + assertEquals(sampleRow.getStruct(4), sampleRow.get(4)) + } + + def verifyShow(df: DataFrame): Unit = { + df.show(1) + df.select("_hoodie_commit_seqno", "fare.amount", "fare.currency", "tip_history").show(1) + } }