pengzhiwei2018 commented on a change in pull request #2651: URL: https://github.com/apache/hudi/pull/2651#discussion_r603865166
########## File path: hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala ########## @@ -133,25 +132,47 @@ class MergeOnReadSnapshotRelation(val sqlContext: SQLContext, } def buildFileIndex(): List[HoodieMergeOnReadFileSplit] = { - val inMemoryFileIndex = HoodieSparkUtils.createInMemoryFileIndex(sqlContext.sparkSession, globPaths) - val fileStatuses = inMemoryFileIndex.allFiles() - if (fileStatuses.isEmpty) { - throw new HoodieException("No files found for reading in user provided path.") + val fileStatuses = if (globPaths.isDefined) { + // Load files from the global paths if it has defined to be compatible with the original mode + val inMemoryFileIndex = HoodieSparkUtils.createInMemoryFileIndex(sqlContext.sparkSession, globPaths.get) + inMemoryFileIndex.allFiles() + } else { // Load files by the HoodieFileIndex. + val hoodieFileIndex = HoodieFileIndex(sqlContext.sparkSession, metaClient, + Some(tableStructSchema), optParams, FileStatusCache.getOrCreate(sqlContext.sparkSession)) + hoodieFileIndex.allFiles } - val fsView = new HoodieTableFileSystemView(metaClient, - metaClient.getActiveTimeline.getCommitsTimeline - .filterCompletedInstants, fileStatuses.toArray) - val latestFiles: List[HoodieBaseFile] = fsView.getLatestBaseFiles.iterator().asScala.toList - val latestCommit = fsView.getLastInstant.get().getTimestamp - val fileGroup = HoodieRealtimeInputFormatUtils.groupLogsByBaseFile(conf, latestFiles.asJava).asScala - val fileSplits = fileGroup.map(kv => { - val baseFile = kv._1 - val logPaths = if (kv._2.isEmpty) Option.empty else Option(kv._2.asScala.toList) - val partitionedFile = PartitionedFile(InternalRow.empty, baseFile.getPath, 0, baseFile.getFileLen) - HoodieMergeOnReadFileSplit(Option(partitionedFile), logPaths, latestCommit, - metaClient.getBasePath, maxCompactionMemoryInBytes, mergeType) - }).toList - fileSplits + if (fileStatuses.isEmpty) { // If this an empty table, return an empty split list. + List.empty[HoodieMergeOnReadFileSplit] + } else { + val fsView = new HoodieTableFileSystemView(metaClient, + metaClient.getActiveTimeline.getCommitsTimeline + .filterCompletedInstants, fileStatuses.toArray) + val latestFiles: List[HoodieBaseFile] = fsView.getLatestBaseFiles.iterator().asScala.toList + val latestCommit = fsView.getLastInstant.get().getTimestamp + val fileGroup = HoodieRealtimeInputFormatUtils.groupLogsByBaseFile(conf, latestFiles.asJava).asScala + val fileSplits = fileGroup.map(kv => { + val baseFile = kv._1 + val logPaths = if (kv._2.isEmpty) Option.empty else Option(kv._2.asScala.toList) + + // Here we use the Path#toUri to encode the path string, as there is a decode in Review comment: Yes, I will do refactor this code. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org