pengzhiwei2018 commented on a change in pull request #2651:
URL: https://github.com/apache/hudi/pull/2651#discussion_r603865166



##########
File path: 
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala
##########
@@ -133,25 +132,47 @@ class MergeOnReadSnapshotRelation(val sqlContext: 
SQLContext,
   }
 
   def buildFileIndex(): List[HoodieMergeOnReadFileSplit] = {
-    val inMemoryFileIndex = 
HoodieSparkUtils.createInMemoryFileIndex(sqlContext.sparkSession, globPaths)
-    val fileStatuses = inMemoryFileIndex.allFiles()
-    if (fileStatuses.isEmpty) {
-      throw new HoodieException("No files found for reading in user provided 
path.")
+    val fileStatuses = if (globPaths.isDefined) {
+      // Load files from the global paths if it has defined to be compatible 
with the original mode
+      val inMemoryFileIndex = 
HoodieSparkUtils.createInMemoryFileIndex(sqlContext.sparkSession, globPaths.get)
+      inMemoryFileIndex.allFiles()
+    } else { // Load files by the HoodieFileIndex.
+      val hoodieFileIndex = HoodieFileIndex(sqlContext.sparkSession, 
metaClient,
+        Some(tableStructSchema), optParams, 
FileStatusCache.getOrCreate(sqlContext.sparkSession))
+      hoodieFileIndex.allFiles
     }
 
-    val fsView = new HoodieTableFileSystemView(metaClient,
-      metaClient.getActiveTimeline.getCommitsTimeline
-        .filterCompletedInstants, fileStatuses.toArray)
-    val latestFiles: List[HoodieBaseFile] = 
fsView.getLatestBaseFiles.iterator().asScala.toList
-    val latestCommit = fsView.getLastInstant.get().getTimestamp
-    val fileGroup = HoodieRealtimeInputFormatUtils.groupLogsByBaseFile(conf, 
latestFiles.asJava).asScala
-    val fileSplits = fileGroup.map(kv => {
-      val baseFile = kv._1
-      val logPaths = if (kv._2.isEmpty) Option.empty else 
Option(kv._2.asScala.toList)
-      val partitionedFile = PartitionedFile(InternalRow.empty, 
baseFile.getPath, 0, baseFile.getFileLen)
-      HoodieMergeOnReadFileSplit(Option(partitionedFile), logPaths, 
latestCommit,
-        metaClient.getBasePath, maxCompactionMemoryInBytes, mergeType)
-    }).toList
-    fileSplits
+    if (fileStatuses.isEmpty) { // If this an empty table, return an empty 
split list.
+      List.empty[HoodieMergeOnReadFileSplit]
+    } else {
+      val fsView = new HoodieTableFileSystemView(metaClient,
+        metaClient.getActiveTimeline.getCommitsTimeline
+          .filterCompletedInstants, fileStatuses.toArray)
+      val latestFiles: List[HoodieBaseFile] = 
fsView.getLatestBaseFiles.iterator().asScala.toList
+      val latestCommit = fsView.getLastInstant.get().getTimestamp
+      val fileGroup = HoodieRealtimeInputFormatUtils.groupLogsByBaseFile(conf, 
latestFiles.asJava).asScala
+      val fileSplits = fileGroup.map(kv => {
+        val baseFile = kv._1
+        val logPaths = if (kv._2.isEmpty) Option.empty else 
Option(kv._2.asScala.toList)
+
+        // Here we use the Path#toUri to encode the path string, as there is a 
decode in

Review comment:
       Yes, I will do refactor this code.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to