This is an automated email from the ASF dual-hosted git repository.

uditme pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 684622c  [HUDI-1591] Implement Spark's FileIndex for Hudi to support 
queries via Hudi DataSource using non-globbed table path and partition pruning 
(#2651)
684622c is described below

commit 684622c7c9fa6df9eb177b51cb1e7bd6dd16f78d
Author: pengzhiwei <pengzhiwei2...@icloud.com>
AuthorDate: Fri Apr 2 02:12:28 2021 +0800

    [HUDI-1591] Implement Spark's FileIndex for Hudi to support queries via 
Hudi DataSource using non-globbed table path and partition pruning (#2651)
---
 .../apache/hudi/keygen/CustomAvroKeyGenerator.java |   6 +-
 .../org/apache/hudi/keygen/CustomKeyGenerator.java |   2 +-
 .../datasources/SparkParsePartitionUtil.scala      |  34 ++
 .../java/org/apache/hudi/common/fs/FSUtils.java    |  10 +
 .../hudi/common/table/HoodieTableConfig.java       |  10 +
 .../hudi/common/table/HoodieTableMetaClient.java   |  13 +
 .../scala/org/apache/hudi/DataSourceOptions.scala  |   3 +
 .../main/scala/org/apache/hudi/DefaultSource.scala | 103 +++---
 .../org/apache/hudi/HoodieBootstrapRelation.scala  |  18 +-
 .../scala/org/apache/hudi/HoodieFileIndex.scala    | 362 +++++++++++++++++++++
 .../org/apache/hudi/HoodieSparkSqlWriter.scala     |   8 +-
 .../scala/org/apache/hudi/HoodieSparkUtils.scala   |  12 +-
 .../scala/org/apache/hudi/HoodieWriterUtils.scala  |  31 +-
 .../hudi/MergeOnReadIncrementalRelation.scala      |   3 +-
 .../apache/hudi/MergeOnReadSnapshotRelation.scala  |  70 ++--
 .../org/apache/hudi/TestHoodieFileIndex.scala      | 252 ++++++++++++++
 .../apache/hudi/functional/TestCOWDataSource.scala |  52 ++-
 .../functional/TestDataSourceForBootstrap.scala    |  39 ++-
 .../apache/hudi/functional/TestMORDataSource.scala |  52 +++
 .../datasources/Spark2ParsePartitionUtil.scala     |  33 ++
 .../datasources/Spark3ParsePartitionUtil.scala     |  39 +++
 .../hudi/utilities/deltastreamer/DeltaSync.java    |   6 +
 22 files changed, 1075 insertions(+), 83 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/CustomAvroKeyGenerator.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/CustomAvroKeyGenerator.java
index 724cabd..3b927c9 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/CustomAvroKeyGenerator.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/CustomAvroKeyGenerator.java
@@ -44,7 +44,7 @@ import java.util.stream.Collectors;
 public class CustomAvroKeyGenerator extends BaseKeyGenerator {
 
   private static final String DEFAULT_PARTITION_PATH_SEPARATOR = "/";
-  private static final String SPLIT_REGEX = ":";
+  public static final String SPLIT_REGEX = ":";
 
   /**
    * Used as a part of config in CustomKeyGenerator.java.
@@ -117,8 +117,4 @@ public class CustomAvroKeyGenerator extends 
BaseKeyGenerator {
   public String getDefaultPartitionPathSeparator() {
     return DEFAULT_PARTITION_PATH_SEPARATOR;
   }
-
-  public String getSplitRegex() {
-    return SPLIT_REGEX;
-  }
 }
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/CustomKeyGenerator.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/CustomKeyGenerator.java
index 77896d2..a2a3012 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/CustomKeyGenerator.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/CustomKeyGenerator.java
@@ -90,7 +90,7 @@ public class CustomKeyGenerator extends BuiltinKeyGenerator {
       return "";
     }
     for (String field : getPartitionPathFields()) {
-      String[] fieldWithType = 
field.split(customAvroKeyGenerator.getSplitRegex());
+      String[] fieldWithType = field.split(customAvroKeyGenerator.SPLIT_REGEX);
       if (fieldWithType.length != 2) {
         throw new HoodieKeyGeneratorException("Unable to find field names for 
partition path in proper format");
       }
diff --git 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/execution/datasources/SparkParsePartitionUtil.scala
 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/execution/datasources/SparkParsePartitionUtil.scala
new file mode 100644
index 0000000..fc2275b
--- /dev/null
+++ 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/execution/datasources/SparkParsePartitionUtil.scala
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+
+import java.util.TimeZone
+
+import org.apache.hadoop.fs.Path
+import 
org.apache.spark.sql.execution.datasources.PartitioningUtils.PartitionValues
+import org.apache.spark.sql.types.DataType
+
+trait SparkParsePartitionUtil extends Serializable {
+
+  def parsePartition(
+    path: Path,
+    typeInference: Boolean,
+    basePaths: Set[Path],
+    userSpecifiedDataTypes: Map[String, DataType],
+    timeZone: TimeZone): Option[PartitionValues]
+}
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java 
b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
index d37c617..9b229a3 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
@@ -276,6 +276,16 @@ public class FSUtils {
     }
   }
 
+  public static FileStatus[] getFilesInPartition(HoodieEngineContext 
engineContext, HoodieMetadataConfig metadataConfig,
+                                                 String basePathStr, Path 
partitionPath) {
+    try (HoodieTableMetadata tableMetadata = 
HoodieTableMetadata.create(engineContext,
+        metadataConfig, basePathStr, 
FileSystemViewStorageConfig.DEFAULT_VIEW_SPILLABLE_DIR)) {
+      return tableMetadata.getAllFilesInPartition(partitionPath);
+    } catch (Exception e) {
+      throw new HoodieException("Error get files in partition: " + 
partitionPath, e);
+    }
+  }
+
   public static String getFileExtension(String fullName) {
     Objects.requireNonNull(fullName);
     String fileName = new File(fullName).getName();
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java 
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
index f519c91..0b36e31 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
@@ -18,6 +18,7 @@
 
 package org.apache.hudi.common.table;
 
+import java.util.Arrays;
 import org.apache.hudi.common.bootstrap.index.HFileBootstrapIndex;
 import org.apache.hudi.common.model.HoodieFileFormat;
 import org.apache.hudi.common.model.HoodieTableType;
@@ -57,6 +58,7 @@ public class HoodieTableConfig implements Serializable {
   public static final String HOODIE_TABLE_TYPE_PROP_NAME = "hoodie.table.type";
   public static final String HOODIE_TABLE_VERSION_PROP_NAME = 
"hoodie.table.version";
   public static final String HOODIE_TABLE_PRECOMBINE_FIELD = 
"hoodie.table.precombine.field";
+  public static final String HOODIE_TABLE_PARTITION_COLUMNS = 
"hoodie.table.partition.columns";
 
   @Deprecated
   public static final String HOODIE_RO_FILE_FORMAT_PROP_NAME = 
"hoodie.table.ro.file.format";
@@ -193,6 +195,14 @@ public class HoodieTableConfig implements Serializable {
     return props.getProperty(HOODIE_TABLE_PRECOMBINE_FIELD);
   }
 
+  public Option<String[]> getPartitionColumns() {
+    if (props.containsKey(HOODIE_TABLE_PARTITION_COLUMNS)) {
+      return 
Option.of(Arrays.stream(props.getProperty(HOODIE_TABLE_PARTITION_COLUMNS).split(","))
+        .filter(p -> p.length() > 0).collect(Collectors.toList()).toArray(new 
String[]{}));
+    }
+    return Option.empty();
+  }
+
   /**
    * Read the payload class for HoodieRecords from the table properties.
    */
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
index 5de3b9a..f4edeb8 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
@@ -596,6 +596,7 @@ public class HoodieTableMetaClient implements Serializable {
     private Integer timelineLayoutVersion;
     private String baseFileFormat;
     private String preCombineField;
+    private String partitionColumns;
     private String bootstrapIndexClass;
     private String bootstrapBasePath;
 
@@ -646,6 +647,11 @@ public class HoodieTableMetaClient implements Serializable 
{
       return this;
     }
 
+    public PropertyBuilder setPartitionColumns(String partitionColumns) {
+      this.partitionColumns = partitionColumns;
+      return this;
+    }
+
     public PropertyBuilder setBootstrapIndexClass(String bootstrapIndexClass) {
       this.bootstrapIndexClass = bootstrapIndexClass;
       return this;
@@ -696,6 +702,9 @@ public class HoodieTableMetaClient implements Serializable {
       if 
(properties.containsKey(HoodieTableConfig.HOODIE_TABLE_PRECOMBINE_FIELD)) {
         
setPreCombineField(properties.getProperty(HoodieTableConfig.HOODIE_TABLE_PRECOMBINE_FIELD));
       }
+      if 
(properties.containsKey(HoodieTableConfig.HOODIE_TABLE_PARTITION_COLUMNS)) {
+        
setPartitionColumns(properties.getProperty(HoodieTableConfig.HOODIE_TABLE_PARTITION_COLUMNS));
+      }
       return this;
     }
 
@@ -738,6 +747,10 @@ public class HoodieTableMetaClient implements Serializable 
{
       if (null != preCombineField) {
         properties.put(HoodieTableConfig.HOODIE_TABLE_PRECOMBINE_FIELD, 
preCombineField);
       }
+
+      if (null != partitionColumns) {
+        properties.put(HoodieTableConfig.HOODIE_TABLE_PARTITION_COLUMNS, 
partitionColumns);
+      }
       return properties;
     }
 
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
index 51f32a2..4c76f5f 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
@@ -68,6 +68,9 @@ object DataSourceReadOptions {
 
   val READ_PRE_COMBINE_FIELD = HoodieWriteConfig.PRECOMBINE_FIELD_PROP
 
+  val ENABLE_HOODIE_FILE_INDEX = "hoodie.file.index.enable"
+  val DEFAULT_ENABLE_HOODIE_FILE_INDEX = true
+
   @Deprecated
   val VIEW_TYPE_OPT_KEY = "hoodie.datasource.view.type"
   @Deprecated
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala
index 3299b8f..0b8234d 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala
@@ -19,14 +19,16 @@ package org.apache.hudi
 
 import org.apache.hadoop.fs.Path
 import org.apache.hudi.DataSourceReadOptions._
-import org.apache.hudi.common.model.{HoodieRecord, HoodieTableType}
+import org.apache.hudi.common.model.HoodieRecord
 import org.apache.hudi.DataSourceWriteOptions.{BOOTSTRAP_OPERATION_OPT_VAL, 
OPERATION_OPT_KEY}
 import org.apache.hudi.common.fs.FSUtils
+import org.apache.hudi.common.model.HoodieTableType.{COPY_ON_WRITE, 
MERGE_ON_READ}
 import org.apache.hudi.common.table.{HoodieTableMetaClient, 
TableSchemaResolver}
 import org.apache.hudi.exception.HoodieException
 import org.apache.hudi.hadoop.HoodieROTablePathFilter
 import org.apache.log4j.LogManager
-import org.apache.spark.sql.execution.datasources.DataSource
+import org.apache.spark.sql.execution.datasources.{DataSource, 
FileStatusCache, HadoopFsRelation}
+import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
 import org.apache.spark.sql.execution.streaming.{Sink, Source}
 import org.apache.spark.sql.hudi.streaming.HoodieStreamSource
 import org.apache.spark.sql.sources._
@@ -79,39 +81,53 @@ class DefaultSource extends RelationProvider
     val allPaths = path.map(p => Seq(p)).getOrElse(Seq()) ++ readPaths
 
     val fs = FSUtils.getFs(allPaths.head, 
sqlContext.sparkContext.hadoopConfiguration)
-    val globPaths = HoodieSparkUtils.checkAndGlobPathIfNecessary(allPaths, fs)
-
-    val tablePath = DataSourceUtils.getTablePath(fs, globPaths.toArray)
+    // Use the HoodieFileIndex only if the 'path' is not globbed.
+    // Or else we use the original way to read hoodie table.
+    val enableFileIndex = optParams.get(ENABLE_HOODIE_FILE_INDEX)
+      .map(_.toBoolean).getOrElse(DEFAULT_ENABLE_HOODIE_FILE_INDEX)
+    val useHoodieFileIndex = enableFileIndex && path.isDefined && 
!path.get.contains("*") &&
+      !parameters.contains(DataSourceReadOptions.READ_PATHS_OPT_KEY)
+    val globPaths = if (useHoodieFileIndex) {
+      None
+    } else {
+      Some(HoodieSparkUtils.checkAndGlobPathIfNecessary(allPaths, fs))
+    }
+    // Get the table base path
+    val tablePath = if (globPaths.isDefined) {
+      DataSourceUtils.getTablePath(fs, globPaths.get.toArray)
+    } else {
+      DataSourceUtils.getTablePath(fs, Array(new Path(path.get)))
+    }
     log.info("Obtained hudi table path: " + tablePath)
 
     val metaClient = 
HoodieTableMetaClient.builder().setConf(fs.getConf).setBasePath(tablePath).build()
     val isBootstrappedTable = 
metaClient.getTableConfig.getBootstrapBasePath.isPresent
-    log.info("Is bootstrapped table => " + isBootstrappedTable)
-
-    if (parameters(QUERY_TYPE_OPT_KEY).equals(QUERY_TYPE_SNAPSHOT_OPT_VAL)) {
-      if (metaClient.getTableType.equals(HoodieTableType.MERGE_ON_READ)) {
-        if (isBootstrappedTable) {
-          // Snapshot query is not supported for Bootstrapped MOR tables
-          log.warn("Snapshot query is not supported for Bootstrapped 
Merge-on-Read tables." +
-            " Falling back to Read Optimized query.")
-          new HoodieBootstrapRelation(sqlContext, schema, globPaths, 
metaClient, optParams)
-        } else {
-          new MergeOnReadSnapshotRelation(sqlContext, optParams, schema, 
globPaths, metaClient)
-        }
-      } else {
-        getBaseFileOnlyView(sqlContext, parameters, schema, readPaths, 
isBootstrappedTable, globPaths, metaClient)
-      }
-    } else 
if(parameters(QUERY_TYPE_OPT_KEY).equals(QUERY_TYPE_READ_OPTIMIZED_OPT_VAL)) {
-      getBaseFileOnlyView(sqlContext, parameters, schema, readPaths, 
isBootstrappedTable, globPaths, metaClient)
-    } else if 
(parameters(QUERY_TYPE_OPT_KEY).equals(QUERY_TYPE_INCREMENTAL_OPT_VAL)) {
-      val metaClient = 
HoodieTableMetaClient.builder().setConf(fs.getConf).setBasePath(tablePath).build()
-      if (metaClient.getTableType.equals(HoodieTableType.MERGE_ON_READ)) {
-        new MergeOnReadIncrementalRelation(sqlContext, optParams, schema, 
metaClient)
-      } else {
-        new IncrementalRelation(sqlContext, optParams, schema, metaClient)
-      }
-    } else {
-      throw new HoodieException("Invalid query type :" + 
parameters(QUERY_TYPE_OPT_KEY))
+    val tableType = metaClient.getTableType
+    val queryType = parameters(QUERY_TYPE_OPT_KEY)
+    log.info(s"Is bootstrapped table => $isBootstrappedTable, tableType is: 
$tableType")
+
+    (tableType, queryType, isBootstrappedTable) match {
+      case (COPY_ON_WRITE, QUERY_TYPE_SNAPSHOT_OPT_VAL, false) |
+           (COPY_ON_WRITE, QUERY_TYPE_READ_OPTIMIZED_OPT_VAL, false) |
+           (MERGE_ON_READ, QUERY_TYPE_READ_OPTIMIZED_OPT_VAL, false) =>
+        getBaseFileOnlyView(useHoodieFileIndex, sqlContext, parameters, 
schema, tablePath,
+          readPaths, metaClient)
+
+      case (COPY_ON_WRITE, QUERY_TYPE_INCREMENTAL_OPT_VAL, _) =>
+        new IncrementalRelation(sqlContext, parameters, schema, metaClient)
+
+      case (MERGE_ON_READ, QUERY_TYPE_SNAPSHOT_OPT_VAL, false) =>
+        new MergeOnReadSnapshotRelation(sqlContext, parameters, schema, 
globPaths, metaClient)
+
+      case (MERGE_ON_READ, QUERY_TYPE_INCREMENTAL_OPT_VAL, _) =>
+        new MergeOnReadIncrementalRelation(sqlContext, parameters, schema, 
metaClient)
+
+      case (_, _, true) =>
+        new HoodieBootstrapRelation(sqlContext, schema, globPaths, metaClient, 
parameters)
+
+      case (_, _, _) =>
+        throw new HoodieException(s"Invalid query type : $queryType for 
tableType: $tableType," +
+          s"isBootstrappedTable: $isBootstrappedTable ")
     }
   }
 
@@ -162,18 +178,28 @@ class DefaultSource extends RelationProvider
 
   override def shortName(): String = "hudi"
 
-  private def getBaseFileOnlyView(sqlContext: SQLContext,
+  private def getBaseFileOnlyView(useHoodieFileIndex: Boolean,
+                                  sqlContext: SQLContext,
                                   optParams: Map[String, String],
                                   schema: StructType,
+                                  tablePath: String,
                                   extraReadPaths: Seq[String],
-                                  isBootstrappedTable: Boolean,
-                                  globPaths: Seq[Path],
                                   metaClient: HoodieTableMetaClient): 
BaseRelation = {
-    log.warn("Loading Base File Only View.")
+    log.info("Loading Base File Only View  with options :" + optParams)
+
+    if (useHoodieFileIndex) {
+
+      val fileIndex = HoodieFileIndex(sqlContext.sparkSession, metaClient,
+        if (schema == null) Option.empty[StructType] else Some(schema),
+        optParams, FileStatusCache.getOrCreate(sqlContext.sparkSession))
 
-    if (isBootstrappedTable) {
-      // For bootstrapped tables, use our custom Spark relation for querying
-      new HoodieBootstrapRelation(sqlContext, schema, globPaths, metaClient, 
optParams)
+      HadoopFsRelation(
+        fileIndex,
+        fileIndex.partitionSchema,
+        fileIndex.dataSchema,
+        bucketSpec = None,
+        fileFormat = new ParquetFileFormat,
+        optParams)(sqlContext.sparkSession)
     } else {
       // this is just effectively RO view only, where `path` can contain a mix 
of
       // non-hoodie/hoodie path files. set the path filter up
@@ -182,7 +208,6 @@ class DefaultSource extends RelationProvider
         classOf[HoodieROTablePathFilter],
         classOf[org.apache.hadoop.fs.PathFilter])
 
-      log.info("Constructing hoodie (as parquet) data source with options :" + 
optParams)
       // simply return as a regular parquet relation
       DataSource.apply(
         sparkSession = sqlContext.sparkSession,
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieBootstrapRelation.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieBootstrapRelation.scala
index f7415f9..b1ab83a 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieBootstrapRelation.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieBootstrapRelation.scala
@@ -26,7 +26,7 @@ import org.apache.hudi.exception.HoodieException
 import org.apache.spark.internal.Logging
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.execution.datasources.PartitionedFile
+import org.apache.spark.sql.execution.datasources.{FileStatusCache, 
PartitionedFile}
 import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
 import org.apache.spark.sql.{Row, SQLContext}
 import org.apache.spark.sql.sources.{BaseRelation, Filter, PrunedFilteredScan}
@@ -46,13 +46,14 @@ import scala.collection.JavaConverters._
   *
   * @param _sqlContext Spark SQL Context
   * @param userSchema User specified schema in the datasource query
-  * @param globPaths Globbed paths obtained from the user provided path for 
querying
+  * @param globPaths  The global paths to query. If it not none, read from the 
globPaths,
+  *                   else read data from tablePath using HoodiFileIndex.
   * @param metaClient Hoodie table meta client
   * @param optParams DataSource options passed by the user
   */
 class HoodieBootstrapRelation(@transient val _sqlContext: SQLContext,
                               val userSchema: StructType,
-                              val globPaths: Seq[Path],
+                              val globPaths: Option[Seq[Path]],
                               val metaClient: HoodieTableMetaClient,
                               val optParams: Map[String, String]) extends 
BaseRelation
   with PrunedFilteredScan with Logging {
@@ -156,9 +157,14 @@ class HoodieBootstrapRelation(@transient val _sqlContext: 
SQLContext,
 
   def buildFileIndex(): HoodieBootstrapFileIndex = {
     logInfo("Building file index..")
-    val inMemoryFileIndex = 
HoodieSparkUtils.createInMemoryFileIndex(_sqlContext.sparkSession, globPaths)
-    val fileStatuses = inMemoryFileIndex.allFiles()
-
+    val fileStatuses  = if (globPaths.isDefined) {
+      // Load files from the global paths if it has defined to be compatible 
with the original mode
+      val inMemoryFileIndex = 
HoodieSparkUtils.createInMemoryFileIndex(_sqlContext.sparkSession, 
globPaths.get)
+      inMemoryFileIndex.allFiles()
+    } else { // Load files by the HoodieFileIndex.
+        HoodieFileIndex(sqlContext.sparkSession, metaClient, Some(schema), 
optParams,
+          FileStatusCache.getOrCreate(sqlContext.sparkSession)).allFiles
+    }
     if (fileStatuses.isEmpty) {
       throw new HoodieException("No files found for reading in user provided 
path.")
     }
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
new file mode 100644
index 0000000..61c2f3a
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
@@ -0,0 +1,362 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi
+
+import java.util.Properties
+
+import scala.collection.JavaConverters._
+import org.apache.hadoop.fs.{FileStatus, Path}
+import org.apache.hudi.client.common.HoodieSparkEngineContext
+import org.apache.hudi.common.config.{HoodieMetadataConfig, 
SerializableConfiguration}
+import org.apache.hudi.common.engine.HoodieLocalEngineContext
+import org.apache.hudi.common.fs.FSUtils
+import org.apache.hudi.common.model.HoodieBaseFile
+import org.apache.hudi.common.table.{HoodieTableMetaClient, 
TableSchemaResolver}
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView
+import org.apache.hudi.config.HoodieWriteConfig
+import org.apache.spark.api.java.JavaSparkContext
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.{InternalRow, expressions}
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.avro.SchemaConverters
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
BoundReference, Expression, InterpretedPredicate}
+import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils}
+import org.apache.spark.sql.execution.datasources.{FileIndex, FileStatusCache, 
NoopCache, PartitionDirectory}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.unsafe.types.UTF8String
+
+import scala.collection.mutable
+
+/**
+ * A file index which support partition prune for hoodie snapshot and 
read-optimized query.
+ *
+ * Main steps to get the file list for query:
+ * 1、Load all files and partition values from the table path.
+ * 2、Do the partition prune by the partition filter condition.
+ *
+ * There are 3 cases for this:
+ * 1、If the partition columns size is equal to the actually partition path 
level, we
+ * read it as partitioned table.(e.g partition column is "dt", the partition 
path is "2021-03-10")
+ *
+ * 2、If the partition columns size is not equal to the partition path level, 
but the partition
+ * column size is "1" (e.g. partition column is "dt", but the partition path 
is "2021/03/10"
+ * who'es directory level is 3).We can still read it as a partitioned table. 
We will mapping the
+ * partition path (e.g. 2021/03/10) to the only partition column (e.g. "dt").
+ *
+ * 3、Else the the partition columns size is not equal to the partition 
directory level and the
+ * size is great than "1" (e.g. partition column is "dt,hh", the partition 
path is "2021/03/10/12")
+ * , we read it as a Non-Partitioned table because we cannot know how to 
mapping the partition
+ * path with the partition columns in this case.
+ *
+ */
+case class HoodieFileIndex(
+     spark: SparkSession,
+     metaClient: HoodieTableMetaClient,
+     schemaSpec: Option[StructType],
+     options: Map[String, String],
+     @transient fileStatusCache: FileStatusCache = NoopCache)
+  extends FileIndex with Logging {
+
+  private val basePath = metaClient.getBasePath
+
+  @transient private val queryPath = new Path(options.getOrElse("path", 
"'path' option required"))
+  /**
+   * Get the schema of the table.
+   */
+  lazy val schema: StructType = schemaSpec.getOrElse({
+    val schemaUtil = new TableSchemaResolver(metaClient)
+    SchemaConverters.toSqlType(schemaUtil.getTableAvroSchema)
+      .dataType.asInstanceOf[StructType]
+  })
+
+  /**
+   * Get the partition schema from the hoodie.properties.
+   */
+  private lazy val _partitionSchemaFromProperties: StructType = {
+    val tableConfig = metaClient.getTableConfig
+    val partitionColumns = tableConfig.getPartitionColumns
+    val nameFieldMap = schema.fields.map(filed => filed.name -> filed).toMap
+
+    if (partitionColumns.isPresent) {
+      val partitionFields = partitionColumns.get().map(column =>
+        nameFieldMap.getOrElse(column, throw new 
IllegalArgumentException(s"Cannot find column: '" +
+          s"$column' in the schema[${schema.fields.mkString(",")}]")))
+      new StructType(partitionFields)
+    } else { // If the partition columns have not stored in 
hoodie.properites(the table that was
+      // created earlier), we trait it as a non-partitioned table.
+      logWarning("No partition columns available from hoodie.properties." +
+        " Partition pruning will not work")
+      new StructType()
+    }
+  }
+
+  @transient @volatile private var fileSystemView: HoodieTableFileSystemView = 
_
+  @transient @volatile private var cachedAllInputFiles: Array[HoodieBaseFile] 
= _
+  @transient @volatile private var cachedFileSize: Long = 0L
+  @transient @volatile private var cachedAllPartitionPaths: 
Seq[PartitionRowPath] = _
+
+  @volatile private var queryAsNonePartitionedTable: Boolean = _
+
+  refresh0()
+
+  override def rootPaths: Seq[Path] = queryPath :: Nil
+
+  override def listFiles(partitionFilters: Seq[Expression],
+                         dataFilters: Seq[Expression]): 
Seq[PartitionDirectory] = {
+    if (queryAsNonePartitionedTable) { // Read as Non-Partitioned table.
+      Seq(PartitionDirectory(InternalRow.empty, allFiles))
+    } else {
+      // Prune the partition path by the partition filters
+      val prunedPartitions = prunePartition(cachedAllPartitionPaths, 
partitionFilters)
+      prunedPartitions.map { partition =>
+        val fileStatues = 
fileSystemView.getLatestBaseFiles(partition.partitionPath).iterator()
+          .asScala.toSeq
+          .map(_.getFileStatus)
+        PartitionDirectory(partition.values, fileStatues)
+      }
+    }
+  }
+
+  override def inputFiles: Array[String] = {
+    cachedAllInputFiles.map(_.getFileStatus.getPath.toString)
+  }
+
+  override def refresh(): Unit = {
+    fileStatusCache.invalidateAll()
+    refresh0()
+  }
+
+  private def refresh0(): Unit = {
+    val startTime = System.currentTimeMillis()
+    val partitionFiles = loadPartitionPathFiles()
+    val allFiles = partitionFiles.values.reduceOption(_ ++ _)
+      .getOrElse(Array.empty[FileStatus])
+
+    metaClient.reloadActiveTimeline()
+    val activeInstants = 
metaClient.getActiveTimeline.getCommitsTimeline.filterCompletedInstants
+    fileSystemView = new HoodieTableFileSystemView(metaClient, activeInstants, 
allFiles)
+    cachedAllInputFiles = 
fileSystemView.getLatestBaseFiles.iterator().asScala.toArray
+    cachedAllPartitionPaths = partitionFiles.keys.toSeq
+    cachedFileSize = cachedAllInputFiles.map(_.getFileLen).sum
+
+    // If the partition value contains InternalRow.empty, we query it as a 
non-partitioned table.
+    queryAsNonePartitionedTable = cachedAllPartitionPaths
+      .exists(p => p.values == InternalRow.empty)
+    val flushSpend = System.currentTimeMillis() - startTime
+    logInfo(s"Refresh for table ${metaClient.getTableConfig.getTableName}," +
+      s" spend: $flushSpend ms")
+  }
+
+  override def sizeInBytes: Long = {
+    cachedFileSize
+  }
+
+  override def partitionSchema: StructType = {
+    if (queryAsNonePartitionedTable) {
+      // If we read it as Non-Partitioned table, we should not
+      // return the partition schema.
+      new StructType()
+    } else {
+      _partitionSchemaFromProperties
+    }
+  }
+
+  /**
+   * Get the data schema of the table.
+   * @return
+   */
+  def dataSchema: StructType = {
+    val partitionColumns = partitionSchema.fields.map(_.name).toSet
+    StructType(schema.fields.filterNot(f => partitionColumns.contains(f.name)))
+  }
+
+  def allFiles: Seq[FileStatus] = cachedAllInputFiles.map(_.getFileStatus)
+
+  /**
+   * Prune the partition by the filter.This implementation is fork from
+   * 
org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex#prunePartitions.
+   * @param partitionPaths All the partition paths.
+   * @param predicates The filter condition.
+   * @return The Pruned partition paths.
+   */
+  private def prunePartition(partitionPaths: Seq[PartitionRowPath],
+                             predicates: Seq[Expression]): 
Seq[PartitionRowPath] = {
+
+    val partitionColumnNames = partitionSchema.fields.map(_.name).toSet
+    val partitionPruningPredicates = predicates.filter {
+      _.references.map(_.name).toSet.subsetOf(partitionColumnNames)
+    }
+    if (partitionPruningPredicates.nonEmpty) {
+      val predicate = partitionPruningPredicates.reduce(expressions.And)
+
+      val boundPredicate = InterpretedPredicate(predicate.transform {
+        case a: AttributeReference =>
+          val index = partitionSchema.indexWhere(a.name == _.name)
+          BoundReference(index, partitionSchema(index).dataType, nullable = 
true)
+      })
+
+      val prunedPartitionPaths = partitionPaths.filter {
+        case PartitionRowPath(values, _) => boundPredicate.eval(values)
+      }
+      logInfo(s"Total partition size is: ${partitionPaths.size}," +
+        s" after partition prune size is: ${prunedPartitionPaths.size}")
+      prunedPartitionPaths
+    } else {
+      partitionPaths
+    }
+  }
+
+  /**
+   * Load all partition paths and it's files under the query table path.
+   */
+  private def loadPartitionPathFiles(): Map[PartitionRowPath, 
Array[FileStatus]] = {
+    val sparkEngine = new HoodieSparkEngineContext(new 
JavaSparkContext(spark.sparkContext))
+    val properties = new Properties()
+    properties.putAll(options.asJava)
+    val metadataConfig = 
HoodieMetadataConfig.newBuilder.fromProperties(properties).build()
+
+    val queryPartitionPath = FSUtils.getRelativePartitionPath(new 
Path(basePath), queryPath)
+    // Load all the partition path from the basePath, and filter by the query 
partition path.
+    // TODO load files from the queryPartitionPath directly.
+    val partitionPaths = FSUtils.getAllPartitionPaths(sparkEngine, 
metadataConfig, basePath).asScala
+      .filter(_.startsWith(queryPartitionPath))
+
+    val writeConfig = HoodieWriteConfig.newBuilder()
+      .withPath(basePath).withProperties(properties).build()
+    val maxListParallelism = writeConfig.getFileListingParallelism
+
+    val serializableConf = new 
SerializableConfiguration(spark.sessionState.newHadoopConf())
+    val partitionSchema = _partitionSchemaFromProperties
+    val timeZoneId = CaseInsensitiveMap(options)
+      .get(DateTimeUtils.TIMEZONE_OPTION)
+      .getOrElse(SQLConf.get.sessionLocalTimeZone)
+
+    val sparkParsePartitionUtil = 
HoodieSparkUtils.createSparkParsePartitionUtil(spark
+      .sessionState.conf)
+    // Convert partition path to PartitionRowPath
+    val partitionRowPaths = partitionPaths.map { partitionPath =>
+      val partitionRow = if (partitionSchema.fields.length == 0) {
+        // This is a non-partitioned table
+        InternalRow.empty
+      } else {
+        val partitionFragments = partitionPath.split("/")
+
+        if (partitionFragments.length != partitionSchema.fields.length &&
+          partitionSchema.fields.length == 1) {
+          // If the partition column size is not equal to the partition 
fragment size
+          // and the partition column size is 1, we map the whole partition 
path
+          // to the partition column which can benefit from the partition 
prune.
+          InternalRow.fromSeq(Seq(UTF8String.fromString(partitionPath)))
+        } else if (partitionFragments.length != partitionSchema.fields.length 
&&
+          partitionSchema.fields.length > 1) {
+          // If the partition column size is not equal to the partition 
fragments size
+          // and the partition column size > 1, we do not know how to map the 
partition
+          // fragments to the partition columns. So we trait it as a 
Non-Partitioned Table
+          // for the query which do not benefit from the partition prune.
+          logWarning( s"Cannot do the partition prune for table $basePath." +
+            s"The partitionFragments size 
(${partitionFragments.mkString(",")})" +
+            s" is not equal to the partition columns 
size(${partitionSchema.fields.mkString(",")})")
+          InternalRow.empty
+        } else { // If partitionSeqs.length == partitionSchema.fields.length
+
+          // Append partition name to the partition value if the
+          // HIVE_STYLE_PARTITIONING_OPT_KEY is disable.
+          // e.g. convert "/xx/xx/2021/02" to "/xx/xx/year=2021/month=02"
+          val partitionWithName =
+          partitionFragments.zip(partitionSchema).map {
+            case (partition, field) =>
+              if (partition.indexOf("=") == -1) {
+                s"${field.name}=$partition"
+              } else {
+                partition
+              }
+          }.mkString("/")
+          val pathWithPartitionName = new Path(basePath, partitionWithName)
+          val partitionDataTypes = partitionSchema.fields.map(f => f.name -> 
f.dataType).toMap
+          val partitionValues = 
sparkParsePartitionUtil.parsePartition(pathWithPartitionName,
+            typeInference = false, Set(new Path(basePath)), partitionDataTypes,
+            DateTimeUtils.getTimeZone(timeZoneId))
+
+          // Convert partitionValues to InternalRow
+          partitionValues.map(_.literals.map(_.value))
+            .map(InternalRow.fromSeq)
+            .getOrElse(InternalRow.empty)
+        }
+      }
+      PartitionRowPath(partitionRow, partitionPath)
+    }
+
+    // List files in all of the partition path.
+    val pathToFetch = mutable.ArrayBuffer[PartitionRowPath]()
+    val cachePartitionToFiles = mutable.Map[PartitionRowPath, 
Array[FileStatus]]()
+    // Fetch from the FileStatusCache
+    partitionRowPaths.foreach { partitionRowPath =>
+      
fileStatusCache.getLeafFiles(partitionRowPath.fullPartitionPath(basePath)) 
match {
+        case Some(filesInPartition) =>
+          cachePartitionToFiles.put(partitionRowPath, filesInPartition)
+
+        case None => pathToFetch.append(partitionRowPath)
+      }
+    }
+    // Fetch the rest from the file system.
+    val fetchedPartition2Files =
+      spark.sparkContext.parallelize(pathToFetch, Math.min(pathToFetch.size, 
maxListParallelism))
+        .map { partitionRowPath =>
+          // Here we use a LocalEngineContext to get the files in the 
partition.
+          // We can do this because the TableMetadata.getAllFilesInPartition 
only rely on the
+          // hadoopConf of the EngineContext.
+          val engineContext = new 
HoodieLocalEngineContext(serializableConf.get())
+          val filesInPartition =  FSUtils.getFilesInPartition(engineContext, 
metadataConfig,
+              basePath, partitionRowPath.fullPartitionPath(basePath))
+          (partitionRowPath, filesInPartition)
+        }.collect().map(f => f._1 -> f._2).toMap
+
+    // Update the fileStatusCache
+    fetchedPartition2Files.foreach {
+      case (partitionRowPath, filesInPartition) =>
+        
fileStatusCache.putLeafFiles(partitionRowPath.fullPartitionPath(basePath), 
filesInPartition)
+    }
+    cachePartitionToFiles.toMap ++ fetchedPartition2Files
+  }
+
+  /**
+   * Represent a partition path.
+   * e.g. PartitionPath(InternalRow("2021","02","01"), "2021/02/01"))
+   * @param values The partition values of this partition path.
+   * @param partitionPath The partition path string.
+   */
+  case class PartitionRowPath(values: InternalRow, partitionPath: String) {
+    override def equals(other: Any): Boolean = other match {
+      case PartitionRowPath(_, otherPath) => partitionPath == otherPath
+      case _ => false
+    }
+
+    override def hashCode(): Int = {
+      partitionPath.hashCode
+    }
+
+    def fullPartitionPath(basePath: String): Path = {
+      if (partitionPath.isEmpty) {
+        new Path(basePath) // This is a non-partition path
+      } else {
+        new Path(basePath, partitionPath)
+      }
+    }
+  }
+}
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index 94d07b9..5b87278 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -100,6 +100,7 @@ private[hudi] object HoodieSparkSqlWriter {
     val fs = basePath.getFileSystem(sparkContext.hadoopConfiguration)
     tableExists = fs.exists(new Path(basePath, 
HoodieTableMetaClient.METAFOLDER_NAME))
     var tableConfig = getHoodieTableConfig(sparkContext, path.get, 
hoodieTableConfigOpt)
+    val keyGenerator = 
DataSourceUtils.createKeyGenerator(toProperties(parameters))
 
     if (mode == SaveMode.Ignore && tableExists) {
       log.warn(s"hoodie table at $basePath already exists. Ignoring & not 
performing actual writes.")
@@ -112,12 +113,15 @@ private[hudi] object HoodieSparkSqlWriter {
         val archiveLogFolder = parameters.getOrElse(
           HoodieTableConfig.HOODIE_ARCHIVELOG_FOLDER_PROP_NAME, "archived")
 
+        val partitionColumns = 
HoodieWriterUtils.getPartitionColumns(keyGenerator)
+
         val tableMetaClient = HoodieTableMetaClient.withPropertyBuilder()
           .setTableType(tableType)
           .setTableName(tblName)
           .setArchiveLogFolder(archiveLogFolder)
           .setPayloadClassName(parameters(PAYLOAD_CLASS_OPT_KEY))
           
.setPreCombineField(parameters.getOrDefault(PRECOMBINE_FIELD_OPT_KEY, null))
+          .setPartitionColumns(partitionColumns)
           .initTable(sparkContext.hadoopConfiguration, path.get)
         tableConfig = tableMetaClient.getTableConfig
       }
@@ -146,7 +150,6 @@ private[hudi] object HoodieSparkSqlWriter {
           log.info(s"Registered avro schema : ${schema.toString(true)}")
 
           // Convert to RDD[HoodieRecord]
-          val keyGenerator = 
DataSourceUtils.createKeyGenerator(toProperties(parameters))
           val genericRecords: RDD[GenericRecord] = 
HoodieSparkUtils.createRdd(df, schema, structName, nameSpace)
           val shouldCombine = parameters(INSERT_DROP_DUPS_OPT_KEY).toBoolean 
|| operation.equals(WriteOperationType.UPSERT);
           val hoodieAllIncomingRecords = genericRecords.map(gr => {
@@ -193,7 +196,6 @@ private[hudi] object HoodieSparkSqlWriter {
               classOf[org.apache.avro.Schema]))
 
           // Convert to RDD[HoodieKey]
-          val keyGenerator = 
DataSourceUtils.createKeyGenerator(toProperties(parameters))
           val genericRecords: RDD[GenericRecord] = 
HoodieSparkUtils.createRdd(df, structName, nameSpace)
           val hoodieKeysToDelete = genericRecords.map(gr => 
keyGenerator.getKey(gr)).toJavaRDD()
 
@@ -283,6 +285,7 @@ private[hudi] object HoodieSparkSqlWriter {
     if (!tableExists) {
       val archiveLogFolder = parameters.getOrElse(
         HoodieTableConfig.HOODIE_ARCHIVELOG_FOLDER_PROP_NAME, "archived")
+      val partitionColumns = HoodieWriterUtils.getPartitionColumns(parameters)
       HoodieTableMetaClient.withPropertyBuilder()
           .setTableType(HoodieTableType.valueOf(tableType))
           .setTableName(tableName)
@@ -291,6 +294,7 @@ private[hudi] object HoodieSparkSqlWriter {
           
.setPreCombineField(parameters.getOrDefault(PRECOMBINE_FIELD_OPT_KEY, null))
           .setBootstrapIndexClass(bootstrapIndexClass)
           .setBootstrapBasePath(bootstrapBasePath)
+          .setPartitionColumns(partitionColumns)
           .initTable(sparkContext.hadoopConfiguration, path)
     }
 
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala
index bd55930..72b26be 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala
@@ -28,7 +28,8 @@ import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.{DataFrame, Row, SparkSession}
 import org.apache.spark.sql.avro.SchemaConverters
 import org.apache.spark.sql.catalyst.encoders.{ExpressionEncoder, RowEncoder}
-import org.apache.spark.sql.execution.datasources.{FileStatusCache, 
InMemoryFileIndex}
+import org.apache.spark.sql.execution.datasources.{FileStatusCache, 
InMemoryFileIndex, Spark2ParsePartitionUtil, Spark3ParsePartitionUtil, 
SparkParsePartitionUtil}
+import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types.{StringType, StructField, StructType}
 
 import scala.collection.JavaConverters._
@@ -118,4 +119,13 @@ object HoodieSparkUtils {
       new Spark3RowSerDe(encoder)
     }
   }
+
+  def createSparkParsePartitionUtil(conf: SQLConf): SparkParsePartitionUtil = {
+    // TODO remove Spark2RowSerDe if Spark 2.x support is dropped
+    if (SPARK_VERSION.startsWith("2.")) {
+      new Spark2ParsePartitionUtil
+    } else {
+      new Spark3ParsePartitionUtil(conf)
+    }
+  }
 }
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
index 02b5abd..fd3e078 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
@@ -17,16 +17,17 @@
 
 package org.apache.hudi
 
+import scala.collection.JavaConverters._
 import org.apache.hudi.DataSourceWriteOptions._
 import org.apache.hudi.common.config.TypedProperties
 
 import scala.collection.JavaConversions.mapAsJavaMap
 import scala.collection.JavaConverters.mapAsScalaMapConverter
-
 import 
org.apache.hudi.common.config.HoodieMetadataConfig.DEFAULT_METADATA_ENABLE
 import 
org.apache.hudi.common.config.HoodieMetadataConfig.DEFAULT_METADATA_VALIDATE
 import org.apache.hudi.common.config.HoodieMetadataConfig.METADATA_ENABLE_PROP
 import 
org.apache.hudi.common.config.HoodieMetadataConfig.METADATA_VALIDATE_PROP
+import org.apache.hudi.keygen.{BaseKeyGenerator, CustomAvroKeyGenerator, 
CustomKeyGenerator, KeyGenerator}
 
 /**
  * WriterUtils to assist in write path in Datasource and tests.
@@ -81,4 +82,32 @@ object HoodieWriterUtils {
     params.foreach(kv => props.setProperty(kv._1, kv._2))
     props
   }
+
+  /**
+   * Get the partition columns to stored to hoodie.properties.
+   * @param parameters
+   * @return
+   */
+  def getPartitionColumns(parameters: Map[String, String]): String = {
+    val props = new TypedProperties()
+    props.putAll(parameters.asJava)
+    val keyGen = DataSourceUtils.createKeyGenerator(props)
+    getPartitionColumns(keyGen)
+  }
+
+  def getPartitionColumns(keyGen: KeyGenerator): String = {
+    keyGen match {
+      // For CustomKeyGenerator and CustomAvroKeyGenerator, the partition path 
filed format
+      // is: "field_name: field_type", we extract the field_name from the 
partition path field.
+      case c: BaseKeyGenerator
+        if c.isInstanceOf[CustomKeyGenerator] || 
c.isInstanceOf[CustomAvroKeyGenerator] =>
+          c.getPartitionPathFields.asScala.map(pathField =>
+            pathField.split(CustomAvroKeyGenerator.SPLIT_REGEX)
+                .headOption.getOrElse(s"Illegal partition path field format: 
'$pathField' for ${c.getClass.getSimpleName}"))
+            .mkString(",")
+
+      case b: BaseKeyGenerator => 
b.getPartitionPathFields.asScala.mkString(",")
+      case _=> null
+    }
+  }
 }
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala
index 13766da..4c2d332 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala
@@ -201,7 +201,8 @@ class MergeOnReadIncrementalRelation(val sqlContext: 
SQLContext,
       val baseFiles = f.getAllFileSlices.iterator().filter(slice => 
slice.getBaseFile.isPresent).toList
       val partitionedFile = if (baseFiles.nonEmpty) {
         val baseFile = baseFiles.head.getBaseFile
-        Option(PartitionedFile(InternalRow.empty, baseFile.get.getPath, 0, 
baseFile.get.getFileLen))
+        val filePath = 
MergeOnReadSnapshotRelation.getFilePath(baseFile.get.getFileStatus.getPath)
+        Option(PartitionedFile(InternalRow.empty, filePath, 0, 
baseFile.get.getFileLen))
       }
       else {
         Option.empty
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala
index 50e2ec5..c9d413b 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala
@@ -21,7 +21,6 @@ package org.apache.hudi
 import org.apache.hudi.common.model.HoodieBaseFile
 import org.apache.hudi.common.table.{HoodieTableMetaClient, 
TableSchemaResolver}
 import org.apache.hudi.common.table.view.HoodieTableFileSystemView
-import org.apache.hudi.exception.HoodieException
 import org.apache.hudi.hadoop.utils.HoodieRealtimeInputFormatUtils
 import 
org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils.getMaxCompactionMemoryInBytes
 import org.apache.hadoop.fs.Path
@@ -29,7 +28,7 @@ import org.apache.hadoop.mapred.JobConf
 import org.apache.spark.internal.Logging
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.execution.datasources.PartitionedFile
+import org.apache.spark.sql.execution.datasources.{FileStatusCache, 
PartitionedFile}
 import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
 import org.apache.spark.sql.{Row, SQLContext}
 import org.apache.spark.sql.sources.{BaseRelation, Filter, PrunedFilteredScan}
@@ -54,7 +53,7 @@ case class HoodieMergeOnReadTableState(tableStructSchema: 
StructType,
 class MergeOnReadSnapshotRelation(val sqlContext: SQLContext,
                                   val optParams: Map[String, String],
                                   val userSchema: StructType,
-                                  val globPaths: Seq[Path],
+                                  val globPaths: Option[Seq[Path]],
                                   val metaClient: HoodieTableMetaClient)
   extends BaseRelation with PrunedFilteredScan with Logging {
 
@@ -133,25 +132,54 @@ class MergeOnReadSnapshotRelation(val sqlContext: 
SQLContext,
   }
 
   def buildFileIndex(): List[HoodieMergeOnReadFileSplit] = {
-    val inMemoryFileIndex = 
HoodieSparkUtils.createInMemoryFileIndex(sqlContext.sparkSession, globPaths)
-    val fileStatuses = inMemoryFileIndex.allFiles()
-    if (fileStatuses.isEmpty) {
-      throw new HoodieException("No files found for reading in user provided 
path.")
+    val fileStatuses = if (globPaths.isDefined) {
+      // Load files from the global paths if it has defined to be compatible 
with the original mode
+      val inMemoryFileIndex = 
HoodieSparkUtils.createInMemoryFileIndex(sqlContext.sparkSession, globPaths.get)
+      inMemoryFileIndex.allFiles()
+    } else { // Load files by the HoodieFileIndex.
+      val hoodieFileIndex = HoodieFileIndex(sqlContext.sparkSession, 
metaClient,
+        Some(tableStructSchema), optParams, 
FileStatusCache.getOrCreate(sqlContext.sparkSession))
+      hoodieFileIndex.allFiles
     }
 
-    val fsView = new HoodieTableFileSystemView(metaClient,
-      metaClient.getActiveTimeline.getCommitsTimeline
-        .filterCompletedInstants, fileStatuses.toArray)
-    val latestFiles: List[HoodieBaseFile] = 
fsView.getLatestBaseFiles.iterator().asScala.toList
-    val latestCommit = fsView.getLastInstant.get().getTimestamp
-    val fileGroup = HoodieRealtimeInputFormatUtils.groupLogsByBaseFile(conf, 
latestFiles.asJava).asScala
-    val fileSplits = fileGroup.map(kv => {
-      val baseFile = kv._1
-      val logPaths = if (kv._2.isEmpty) Option.empty else 
Option(kv._2.asScala.toList)
-      val partitionedFile = PartitionedFile(InternalRow.empty, 
baseFile.getPath, 0, baseFile.getFileLen)
-      HoodieMergeOnReadFileSplit(Option(partitionedFile), logPaths, 
latestCommit,
-        metaClient.getBasePath, maxCompactionMemoryInBytes, mergeType)
-    }).toList
-    fileSplits
+    if (fileStatuses.isEmpty) { // If this an empty table, return an empty 
split list.
+      List.empty[HoodieMergeOnReadFileSplit]
+    } else {
+      val fsView = new HoodieTableFileSystemView(metaClient,
+        metaClient.getActiveTimeline.getCommitsTimeline
+          .filterCompletedInstants, fileStatuses.toArray)
+      val latestFiles: List[HoodieBaseFile] = 
fsView.getLatestBaseFiles.iterator().asScala.toList
+      val latestCommit = fsView.getLastInstant.get().getTimestamp
+      val fileGroup = HoodieRealtimeInputFormatUtils.groupLogsByBaseFile(conf, 
latestFiles.asJava).asScala
+      val fileSplits = fileGroup.map(kv => {
+        val baseFile = kv._1
+        val logPaths = if (kv._2.isEmpty) Option.empty else 
Option(kv._2.asScala.toList)
+
+        val filePath = 
MergeOnReadSnapshotRelation.getFilePath(baseFile.getFileStatus.getPath)
+        val partitionedFile = PartitionedFile(InternalRow.empty, filePath, 0, 
baseFile.getFileLen)
+        HoodieMergeOnReadFileSplit(Option(partitionedFile), logPaths, 
latestCommit,
+          metaClient.getBasePath, maxCompactionMemoryInBytes, mergeType)
+      }).toList
+      fileSplits
+    }
+  }
+}
+
+object MergeOnReadSnapshotRelation {
+
+  def getFilePath(path: Path): String = {
+    // Here we use the Path#toUri to encode the path string, as there is a 
decode in
+    // ParquetFileFormat#buildReaderWithPartitionValues in the spark project 
when read the table
+    // .So we should encode the file path here. Otherwise, there is a 
FileNotException throw
+    // out.
+    // For example, If the "pt" is the partition path field, and "pt" = 
"2021/02/02", If
+    // we enable the URL_ENCODE_PARTITIONING_OPT_KEY and write data to hudi 
table.The data
+    // path in the table will just like 
"/basePath/2021%2F02%2F02/xxxx.parquet". When we read
+    // data from the table, if there are no encode for the file path,
+    // ParquetFileFormat#buildReaderWithPartitionValues will decode it to
+    // "/basePath/2021/02/02/xxxx.parquet" witch will result to a 
FileNotException.
+    // See FileSourceScanExec#createBucketedReadRDD in spark project which do 
the same thing
+    // when create PartitionedFile.
+    path.toUri.toString
   }
 }
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieFileIndex.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieFileIndex.scala
new file mode 100644
index 0000000..08cc50d
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieFileIndex.scala
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi
+
+import java.net.URLEncoder
+
+import org.apache.hudi.DataSourceWriteOptions._
+import org.apache.hudi.common.config.HoodieMetadataConfig
+import org.apache.hudi.common.table.HoodieTableMetaClient
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator
+import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings
+import org.apache.hudi.config.HoodieWriteConfig
+import org.apache.hudi.keygen.ComplexKeyGenerator
+import org.apache.hudi.keygen.TimestampBasedAvroKeyGenerator.{Config, 
TimestampType}
+import org.apache.hudi.testutils.HoodieClientTestBase
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.{SaveMode, SparkSession}
+import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, 
EqualTo, GreaterThanOrEqual, LessThan, Literal}
+import org.apache.spark.sql.execution.datasources.PartitionDirectory
+import org.apache.spark.sql.types.StringType
+import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.BeforeEach
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.ValueSource
+
+import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
+
+class TestHoodieFileIndex extends HoodieClientTestBase {
+
+  var spark: SparkSession = _
+  val commonOpts = Map(
+    "hoodie.insert.shuffle.parallelism" -> "4",
+    "hoodie.upsert.shuffle.parallelism" -> "4",
+    DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "_row_key",
+    DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> "partition",
+    DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> "timestamp",
+    HoodieWriteConfig.TABLE_NAME -> "hoodie_test"
+  )
+
+  @BeforeEach override def setUp() {
+    initPath()
+    initSparkContexts()
+    spark = sqlContext.sparkSession
+    initTestDataGenerator()
+    initFileSystem()
+    initMetaClient()
+  }
+
+  @ParameterizedTest
+  @ValueSource(booleans = Array(true, false))
+  def testPartitionSchema(partitionEncode: Boolean): Unit = {
+    val records1 = dataGen.generateInsertsContainsAllPartitions("000", 100)
+    val inputDF1 = 
spark.read.json(spark.sparkContext.parallelize(recordsToStrings(records1), 2))
+    inputDF1.write.format("hudi")
+      .options(commonOpts)
+      .option(DataSourceWriteOptions.OPERATION_OPT_KEY, 
DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
+      .option(DataSourceWriteOptions.URL_ENCODE_PARTITIONING_OPT_KEY, 
partitionEncode)
+      .mode(SaveMode.Overwrite)
+      .save(basePath)
+    metaClient = HoodieTableMetaClient.reload(metaClient)
+    val fileIndex = HoodieFileIndex(spark, metaClient, None, Map("path" -> 
basePath))
+    assertEquals("partition", 
fileIndex.partitionSchema.fields.map(_.name).mkString(","))
+  }
+
+  @ParameterizedTest
+  @ValueSource(strings = Array(
+    "org.apache.hudi.keygen.ComplexKeyGenerator",
+    "org.apache.hudi.keygen.SimpleKeyGenerator",
+    "org.apache.hudi.keygen.TimestampBasedKeyGenerator"))
+  def testPartitionSchemaForBuildInKeyGenerator(keyGenerator: String): Unit = {
+    val records1 = dataGen.generateInsertsContainsAllPartitions("000", 100)
+    val inputDF1 = 
spark.read.json(spark.sparkContext.parallelize(recordsToStrings(records1), 2))
+    inputDF1.write.format("hudi")
+      .options(commonOpts)
+      .option(DataSourceWriteOptions.OPERATION_OPT_KEY, 
DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
+      .option(DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY, keyGenerator)
+      .option(Config.TIMESTAMP_TYPE_FIELD_PROP, 
TimestampType.DATE_STRING.name())
+      .option(Config.TIMESTAMP_INPUT_DATE_FORMAT_PROP, "yyyy/MM/dd")
+      .option(Config.TIMESTAMP_OUTPUT_DATE_FORMAT_PROP, "yyyy-MM-dd")
+      .mode(SaveMode.Overwrite)
+      .save(basePath)
+    metaClient = HoodieTableMetaClient.reload(metaClient)
+    val fileIndex = HoodieFileIndex(spark, metaClient, None, Map("path" -> 
basePath))
+    assertEquals("partition", 
fileIndex.partitionSchema.fields.map(_.name).mkString(","))
+  }
+
+  @ParameterizedTest
+  @ValueSource(strings = Array(
+    "org.apache.hudi.keygen.CustomKeyGenerator",
+    "org.apache.hudi.keygen.CustomAvroKeyGenerator"))
+  def testPartitionSchemaForCustomKeyGenerator(keyGenerator: String): Unit = {
+    val records1 = dataGen.generateInsertsContainsAllPartitions("000", 100)
+    val inputDF1 = 
spark.read.json(spark.sparkContext.parallelize(recordsToStrings(records1), 2))
+    inputDF1.write.format("hudi")
+      .options(commonOpts)
+      .option(DataSourceWriteOptions.OPERATION_OPT_KEY, 
DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
+      .option(DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY, keyGenerator)
+      .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, 
"partition:simple")
+      .mode(SaveMode.Overwrite)
+      .save(basePath)
+    metaClient = HoodieTableMetaClient.reload(metaClient)
+    val fileIndex = HoodieFileIndex(spark, metaClient, None, Map("path" -> 
basePath))
+    assertEquals("partition", 
fileIndex.partitionSchema.fields.map(_.name).mkString(","))
+  }
+
+  @ParameterizedTest
+  @ValueSource(booleans = Array(true, false))
+  def testPartitionPruneWithPartitionEncode(partitionEncode: Boolean): Unit = {
+    val partitions = Array("2021/03/08", "2021/03/09", "2021/03/10", 
"2021/03/11", "2021/03/12")
+    val newDataGen =  new HoodieTestDataGenerator(partitions)
+    val records1 = newDataGen.generateInsertsContainsAllPartitions("000", 100)
+    val inputDF1 = 
spark.read.json(spark.sparkContext.parallelize(recordsToStrings(records1), 2))
+    inputDF1.write.format("hudi")
+      .options(commonOpts)
+      .option(DataSourceWriteOptions.OPERATION_OPT_KEY, 
DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
+      .option(DataSourceWriteOptions.URL_ENCODE_PARTITIONING_OPT_KEY, 
partitionEncode)
+      .mode(SaveMode.Overwrite)
+      .save(basePath)
+    metaClient = HoodieTableMetaClient.reload(metaClient)
+    val fileIndex = HoodieFileIndex(spark, metaClient, None, Map("path" -> 
basePath))
+
+    val partitionFilter1 = EqualTo(attribute("partition"), 
literal("2021/03/08"))
+    val partitionName = if (partitionEncode) URLEncoder.encode("2021/03/08") 
else "2021/03/08"
+    val partitionAndFilesAfterPrune = 
fileIndex.listFiles(Seq(partitionFilter1), Seq.empty)
+    assertEquals(1, partitionAndFilesAfterPrune.size)
+
+    val PartitionDirectory(partitionValues, filesInPartition) = 
partitionAndFilesAfterPrune(0)
+    assertEquals(partitionValues.toSeq(Seq(StringType)).mkString(","), 
"2021/03/08")
+    assertEquals(getFileCountInPartitionPath(partitionName), 
filesInPartition.size)
+
+    val partitionFilter2 = And(
+      GreaterThanOrEqual(attribute("partition"), literal("2021/03/08")),
+      LessThan(attribute("partition"), literal("2021/03/10"))
+    )
+    val prunedPartitions = fileIndex.listFiles(Seq(partitionFilter2),
+      Seq.empty).map(_.values.toSeq(Seq(StringType)).mkString(",")).toList
+
+    assertEquals(List("2021/03/08", "2021/03/09"), prunedPartitions)
+  }
+
+  @ParameterizedTest
+  @ValueSource(booleans = Array(true, false))
+  def testPartitionPruneWithMultiPartitionColumns(useMetaFileList: Boolean): 
Unit = {
+    val _spark = spark
+    import _spark.implicits._
+    // Test the case the partition column size is equal to the partition 
directory level.
+    val inputDF1 = (for (i <- 0 until 10) yield (i, s"a$i", 10 + i, 10000,
+      s"2021-03-0${i % 2 + 1}", "10")).toDF("id", "name", "price", "version", 
"dt", "hh")
+
+    inputDF1.write.format("hudi")
+      .options(commonOpts)
+      .option(DataSourceWriteOptions.OPERATION_OPT_KEY, 
DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
+      .option(RECORDKEY_FIELD_OPT_KEY, "id")
+      .option(PRECOMBINE_FIELD_OPT_KEY, "version")
+      .option(PARTITIONPATH_FIELD_OPT_KEY, "dt,hh")
+      .option(KEYGENERATOR_CLASS_OPT_KEY, classOf[ComplexKeyGenerator].getName)
+      .option(DataSourceWriteOptions.URL_ENCODE_PARTITIONING_OPT_KEY, "false")
+      .option(HoodieMetadataConfig.METADATA_ENABLE_PROP, useMetaFileList)
+      .mode(SaveMode.Overwrite)
+      .save(basePath)
+    metaClient = HoodieTableMetaClient.reload(metaClient)
+    val fileIndex = HoodieFileIndex(spark, metaClient, None, Map("path" -> 
basePath))
+
+    val partitionFilter1 = And(
+      EqualTo(attribute("dt"), literal("2021-03-01")),
+      EqualTo(attribute("hh"), literal("10"))
+    )
+    val partitionAndFilesAfterPrune = 
fileIndex.listFiles(Seq(partitionFilter1), Seq.empty)
+    assertEquals(1, partitionAndFilesAfterPrune.size)
+
+    val PartitionDirectory(partitionValues, filesAfterPrune) = 
partitionAndFilesAfterPrune(0)
+    // The partition prune will work for this case.
+    assertEquals(partitionValues.toSeq(Seq(StringType)).mkString(","), 
"2021-03-01,10")
+    assertEquals(getFileCountInPartitionPath("2021-03-01/10"), 
filesAfterPrune.size)
+
+    val readDF1 = spark.read.format("hudi").load(basePath)
+    assertEquals(10, readDF1.count())
+    assertEquals(5, readDF1.filter("dt = '2021-03-01' and hh = '10'").count())
+
+    // Test the case that partition column size not match the partition 
directory level and
+    // partition column size is > 1. We will not trait it as partitioned table 
when read.
+    val inputDF2 = (for (i <- 0 until 10) yield (i, s"a$i", 10 + i, 100 * i + 
10000,
+      s"2021/03/0${i % 2 + 1}", "10")).toDF("id", "name", "price", "version", 
"dt", "hh")
+    inputDF2.write.format("hudi")
+      .options(commonOpts)
+      .option(DataSourceWriteOptions.OPERATION_OPT_KEY, 
DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
+      .option(RECORDKEY_FIELD_OPT_KEY, "id")
+      .option(PRECOMBINE_FIELD_OPT_KEY, "version")
+      .option(PARTITIONPATH_FIELD_OPT_KEY, "dt,hh")
+      .option(KEYGENERATOR_CLASS_OPT_KEY, classOf[ComplexKeyGenerator].getName)
+      .option(DataSourceWriteOptions.URL_ENCODE_PARTITIONING_OPT_KEY, "false")
+      .mode(SaveMode.Overwrite)
+      .save(basePath)
+
+    fileIndex.refresh()
+    val partitionFilter2 = And(
+      EqualTo(attribute("dt"), literal("2021/03/01")),
+      EqualTo(attribute("hh"), literal("10"))
+    )
+    val partitionAndFilesAfterPrune2 = 
fileIndex.listFiles(Seq(partitionFilter2), Seq.empty)
+
+    assertEquals(1, partitionAndFilesAfterPrune2.size)
+    val PartitionDirectory(partitionValues2, filesAfterPrune2) = 
partitionAndFilesAfterPrune2(0)
+    // The partition prune would not work for this case, so the partition 
value it
+    // returns is a InternalRow.empty.
+    assertEquals(partitionValues2, InternalRow.empty)
+    // The returned file size should equal to the whole file size in all the 
partition paths.
+    assertEquals(getFileCountInPartitionPaths("2021/03/01/10", 
"2021/03/02/10"),
+      filesAfterPrune2.length)
+    val readDF2 = spark.read.format("hudi").load(basePath)
+
+    assertEquals(10, readDF2.count())
+    // There are 5 rows in the  dt = 2021/03/01 and hh = 10
+    assertEquals(5, readDF2.filter("dt = '2021/03/01' and hh ='10'").count())
+  }
+
+  private def attribute(partition: String): AttributeReference = {
+    AttributeReference(partition, StringType, true)()
+  }
+
+  private def literal(value: String): Literal = {
+    Literal.create(value)
+  }
+
+  private def getFileCountInPartitionPath(partitionPath: String): Int = {
+    metaClient.reloadActiveTimeline()
+    val activeInstants = 
metaClient.getActiveTimeline.getCommitsTimeline.filterCompletedInstants
+    val fileSystemView = new HoodieTableFileSystemView(metaClient, 
activeInstants)
+    
fileSystemView.getAllBaseFiles(partitionPath).iterator().asScala.toSeq.length
+  }
+
+  private def getFileCountInPartitionPaths(partitionPaths: String*): Int = {
+    partitionPaths.map(getFileCountInPartitionPath).sum
+  }
+}
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
index b671bc6..88ed65f 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
@@ -18,6 +18,10 @@
 package org.apache.hudi.functional
 
 import java.sql.{Date, Timestamp}
+
+import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
+
 import org.apache.hudi.common.config.HoodieMetadataConfig
 import org.apache.hudi.common.table.{HoodieTableMetaClient, 
TableSchemaResolver}
 import org.apache.hudi.common.table.timeline.HoodieInstant
@@ -38,7 +42,6 @@ import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
 import org.junit.jupiter.params.ParameterizedTest
 import org.junit.jupiter.params.provider.ValueSource
 
-import scala.collection.JavaConversions._
 
 /**
  * Basic tests on the spark datasource for COW table.
@@ -619,4 +622,51 @@ class TestCOWDataSource extends HoodieClientTestBase {
       .load(basePath + "/*")
     assertTrue(recordsReadDF.filter(col("_hoodie_partition_path") =!= 
lit("")).count() == 0)
   }
+
+  @ParameterizedTest
+  @ValueSource(booleans = Array(true, false))
+  def testQueryCOWWithBasePathAndFileIndex(partitionEncode: Boolean): Unit = {
+    val N = 20
+    // Test query with partition prune if URL_ENCODE_PARTITIONING_OPT_KEY has 
enable
+    val records1 = dataGen.generateInsertsContainsAllPartitions("000", N)
+    val inputDF1 = 
spark.read.json(spark.sparkContext.parallelize(recordsToStrings(records1), 2))
+    inputDF1.write.format("hudi")
+      .options(commonOpts)
+      .option(DataSourceWriteOptions.OPERATION_OPT_KEY, 
DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
+      .option(DataSourceWriteOptions.URL_ENCODE_PARTITIONING_OPT_KEY, 
partitionEncode)
+      .mode(SaveMode.Overwrite)
+      .save(basePath)
+    val commitInstantTime1 = HoodieDataSourceHelpers.latestCommit(fs, basePath)
+
+    val countIn20160315 = records1.asScala.count(record => 
record.getPartitionPath == "2016/03/15")
+    // query the partition by filter
+    val count1 = spark.read.format("hudi")
+      .load(basePath)
+      .filter("partition = '2016/03/15'")
+      .count()
+    assertEquals(countIn20160315, count1)
+
+    // query the partition by path
+    val partitionPath = if (partitionEncode) "2016%2F03%2F15" else "2016/03/15"
+    val count2 = spark.read.format("hudi")
+      .load(basePath + s"/$partitionPath")
+      .count()
+    assertEquals(countIn20160315, count2)
+
+    // Second write with Append mode
+    val records2 = dataGen.generateInsertsContainsAllPartitions("000", N + 1)
+    val inputDF2 = 
spark.read.json(spark.sparkContext.parallelize(recordsToStrings(records2), 2))
+    inputDF2.write.format("hudi")
+      .options(commonOpts)
+      .option(DataSourceWriteOptions.OPERATION_OPT_KEY, 
DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
+      .option(DataSourceWriteOptions.URL_ENCODE_PARTITIONING_OPT_KEY, 
partitionEncode)
+      .mode(SaveMode.Append)
+      .save(basePath)
+    // Incremental query without "*" in path
+    val hoodieIncViewDF1 = spark.read.format("org.apache.hudi")
+      .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, 
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
+      .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, 
commitInstantTime1)
+      .load(basePath)
+    assertEquals(N + 1, hoodieIncViewDF1.count())
+  }
 }
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestDataSourceForBootstrap.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestDataSourceForBootstrap.scala
index 2a6a0a7..0746f6d 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestDataSourceForBootstrap.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestDataSourceForBootstrap.scala
@@ -130,6 +130,10 @@ class TestDataSourceForBootstrap {
     hoodieROViewDF1 = spark.read.format("hudi").load(basePath + "/*")
     assertEquals(numRecords, hoodieROViewDF1.count())
     assertEquals(numRecordsUpdate, hoodieROViewDF1.filter(s"timestamp == 
$updateTimestamp").count())
+    // Read without *
+    val hoodieROViewDF1WithBasePath = spark.read.format("hudi").load(basePath)
+    assertEquals(numRecords, hoodieROViewDF1WithBasePath.count())
+    assertEquals(numRecordsUpdate, 
hoodieROViewDF1WithBasePath.filter(s"timestamp == $updateTimestamp").count())
 
     verifyIncrementalViewResult(commitInstantTime1, commitInstantTime2, 
isPartitioned = false, isHiveStylePartitioned = false)
   }
@@ -149,7 +153,8 @@ class TestDataSourceForBootstrap {
       .save(srcPath)
 
     // Perform bootstrap
-    val commitInstantTime1 = 
runMetadataBootstrapAndVerifyCommit(DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL)
+    val commitInstantTime1 = runMetadataBootstrapAndVerifyCommit(
+      DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL, Some("datestr"))
 
     // Read bootstrapped table and verify count
     val hoodieROViewDF1 = spark.read.format("hudi").load(basePath + "/*")
@@ -201,11 +206,15 @@ class TestDataSourceForBootstrap {
     })
 
     // Perform bootstrap
-    val commitInstantTime1 = 
runMetadataBootstrapAndVerifyCommit(DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL)
+    val commitInstantTime1 = runMetadataBootstrapAndVerifyCommit(
+      DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL, Some("datestr"))
 
     // Read bootstrapped table and verify count
     val hoodieROViewDF1 = spark.read.format("hudi").load(basePath + "/*")
     assertEquals(numRecords, hoodieROViewDF1.count())
+    // Read without *
+    val hoodieROViewWithBasePathDF1 = spark.read.format("hudi").load(basePath)
+    assertEquals(numRecords, hoodieROViewWithBasePathDF1.count())
 
     // Perform upsert based on the written bootstrap table
     val updateDf1 = hoodieROViewDF1.filter(col("_row_key") === 
verificationRowKey).withColumn(verificationCol, lit(updatedVerificationVal))
@@ -268,7 +277,8 @@ class TestDataSourceForBootstrap {
     })
 
     // Perform bootstrap
-    val commitInstantTime1 = 
runMetadataBootstrapAndVerifyCommit(DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL)
+    val commitInstantTime1 = runMetadataBootstrapAndVerifyCommit(
+      DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL, Some("datestr"))
 
     // Read bootstrapped table and verify count
     val hoodieROViewDF1 = spark.read.format("hudi")
@@ -304,6 +314,13 @@ class TestDataSourceForBootstrap {
                             .load(basePath + "/*")
     assertEquals(numRecords, hoodieROViewDF2.count())
     assertEquals(numRecordsUpdate, hoodieROViewDF2.filter(s"timestamp == 
$updateTimestamp").count())
+    // Test query without "*" for MOR READ_OPTIMIZED
+    val hoodieROViewDFWithBasePath = spark.read.format("hudi")
+      .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY,
+        DataSourceReadOptions.QUERY_TYPE_READ_OPTIMIZED_OPT_VAL)
+      .load(basePath)
+    assertEquals(numRecords, hoodieROViewDFWithBasePath.count())
+    assertEquals(numRecordsUpdate, 
hoodieROViewDFWithBasePath.filter(s"timestamp == $updateTimestamp").count())
   }
 
   @Test def testMetadataBootstrapMORPartitioned(): Unit = {
@@ -325,7 +342,8 @@ class TestDataSourceForBootstrap {
     })
 
     // Perform bootstrap
-    val commitInstantTime1 = 
runMetadataBootstrapAndVerifyCommit(DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL)
+    val commitInstantTime1 = runMetadataBootstrapAndVerifyCommit(
+      DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL, Some("datestr"))
 
     // Read bootstrapped table and verify count
     val hoodieROViewDF1 = spark.read.format("hudi")
@@ -333,6 +351,12 @@ class TestDataSourceForBootstrap {
                               
DataSourceReadOptions.QUERY_TYPE_READ_OPTIMIZED_OPT_VAL)
                             .load(basePath + "/*")
     assertEquals(numRecords, hoodieROViewDF1.count())
+    // Read bootstrapped table without "*"
+    val hoodieROViewDFWithBasePath = spark.read.format("hudi")
+      .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY,
+        DataSourceReadOptions.QUERY_TYPE_READ_OPTIMIZED_OPT_VAL)
+      .load(basePath)
+    assertEquals(numRecords, hoodieROViewDFWithBasePath.count())
 
     // Perform upsert based on the written bootstrap table
     val updateDf1 = hoodieROViewDF1.filter(col("_row_key") === 
verificationRowKey).withColumn(verificationCol, lit(updatedVerificationVal))
@@ -420,6 +444,9 @@ class TestDataSourceForBootstrap {
     val hoodieROViewDF1 = spark.read.format("hudi").load(basePath + "/*")
     assertEquals(numRecords, hoodieROViewDF1.count())
 
+    val hoodieROViewDFWithBasePath = spark.read.format("hudi").load(basePath)
+    assertEquals(numRecords, hoodieROViewDFWithBasePath.count())
+
     // Perform upsert
     val updateTimestamp = Instant.now.toEpochMilli
     val updateDF = TestBootstrap.generateTestRawTripDataset(updateTimestamp, 
0, numRecordsUpdate, partitionPaths.asJava,
@@ -445,13 +472,15 @@ class TestDataSourceForBootstrap {
     verifyIncrementalViewResult(commitInstantTime1, commitInstantTime2, 
isPartitioned = true, isHiveStylePartitioned = false)
   }
 
-  def runMetadataBootstrapAndVerifyCommit(tableType: String): String = {
+  def runMetadataBootstrapAndVerifyCommit(tableType: String,
+                                          partitionColumns: Option[String] = 
None): String = {
     val bootstrapDF = spark.emptyDataFrame
     bootstrapDF.write
       .format("hudi")
       .options(commonOpts)
       .option(DataSourceWriteOptions.OPERATION_OPT_KEY, 
DataSourceWriteOptions.BOOTSTRAP_OPERATION_OPT_VAL)
       .option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, tableType)
+      .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, 
partitionColumns.getOrElse(""))
       .option(HoodieBootstrapConfig.BOOTSTRAP_BASE_PATH_PROP, srcPath)
       .option(HoodieBootstrapConfig.BOOTSTRAP_KEYGEN_CLASS, 
classOf[SimpleKeyGenerator].getName)
       .mode(SaveMode.Overwrite)
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
index 92024a3..00c40ab 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
@@ -17,6 +17,7 @@
 
 package org.apache.hudi.functional
 
+import scala.collection.JavaConverters._
 import org.apache.hudi.DataSourceWriteOptions.{KEYGENERATOR_CLASS_OPT_KEY, 
PARTITIONPATH_FIELD_OPT_KEY, PAYLOAD_CLASS_OPT_KEY, PRECOMBINE_FIELD_OPT_KEY, 
RECORDKEY_FIELD_OPT_KEY}
 import org.apache.hudi.common.fs.FSUtils
 import org.apache.hudi.common.model.DefaultHoodieRecordPayload
@@ -31,6 +32,8 @@ import org.apache.spark.sql._
 import org.apache.spark.sql.functions._
 import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
 import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.ValueSource
 
 import scala.collection.JavaConversions._
 
@@ -562,4 +565,53 @@ class TestMORDataSource extends HoodieClientTestBase {
     df.show(1)
     df.select("_hoodie_commit_seqno", "fare.amount", "fare.currency", 
"tip_history").show(1)
   }
+
+  @ParameterizedTest
+  @ValueSource(booleans = Array(true, false))
+  def testQueryMORWithBasePathAndFileIndex(partitionEncode: Boolean): Unit = {
+    val N = 20
+    // Test query with partition prune if URL_ENCODE_PARTITIONING_OPT_KEY has 
enable
+    val records1 = dataGen.generateInsertsContainsAllPartitions("000", N)
+    val inputDF1 = 
spark.read.json(spark.sparkContext.parallelize(recordsToStrings(records1), 2))
+    inputDF1.write.format("hudi")
+      .options(commonOpts)
+      .option(DataSourceWriteOptions.OPERATION_OPT_KEY, 
DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
+      .option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, 
DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL)
+      .option(DataSourceWriteOptions.URL_ENCODE_PARTITIONING_OPT_KEY, 
partitionEncode)
+      .mode(SaveMode.Overwrite)
+      .save(basePath)
+    val commitInstantTime1 = HoodieDataSourceHelpers.latestCommit(fs, basePath)
+
+    val countIn20160315 = records1.asScala.count(record => 
record.getPartitionPath == "2016/03/15")
+    // query the partition by filter
+    val count1 = spark.read.format("hudi")
+      .load(basePath)
+      .filter("partition = '2016/03/15'")
+      .count()
+    assertEquals(countIn20160315, count1)
+
+    // query the partition by path
+    val partitionPath = if (partitionEncode) "2016%2F03%2F15" else "2016/03/15"
+    val count2 = spark.read.format("hudi")
+      .load(basePath + s"/$partitionPath")
+      .count()
+    assertEquals(countIn20160315, count2)
+
+    // Second write with Append mode
+    val records2 = dataGen.generateInsertsContainsAllPartitions("000", N + 1)
+    val inputDF2 = 
spark.read.json(spark.sparkContext.parallelize(recordsToStrings(records2), 2))
+    inputDF2.write.format("hudi")
+      .options(commonOpts)
+      .option(DataSourceWriteOptions.OPERATION_OPT_KEY, 
DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
+      .option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, 
DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL)
+      .option(DataSourceWriteOptions.URL_ENCODE_PARTITIONING_OPT_KEY, 
partitionEncode)
+      .mode(SaveMode.Append)
+      .save(basePath)
+    // Incremental query without "*" in path
+    val hoodieIncViewDF1 = spark.read.format("org.apache.hudi")
+      .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, 
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
+      .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, 
commitInstantTime1)
+      .load(basePath)
+    assertEquals(N + 1, hoodieIncViewDF1.count())
+  }
 }
diff --git 
a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/execution/datasources/Spark2ParsePartitionUtil.scala
 
b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/execution/datasources/Spark2ParsePartitionUtil.scala
new file mode 100644
index 0000000..5bf0284
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/execution/datasources/Spark2ParsePartitionUtil.scala
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+import java.util.TimeZone
+
+import org.apache.hadoop.fs.Path
+import 
org.apache.spark.sql.execution.datasources.PartitioningUtils.PartitionValues
+import org.apache.spark.sql.types.DataType
+
+class Spark2ParsePartitionUtil extends SparkParsePartitionUtil {
+  override def parsePartition(path: Path, typeInference: Boolean,
+                              basePaths: Set[Path],
+                              userSpecifiedDataTypes: Map[String, DataType],
+                              timeZone: TimeZone): Option[PartitionValues] = {
+    PartitioningUtils.parsePartition(path, typeInference,
+      basePaths, userSpecifiedDataTypes, timeZone)._1
+  }
+}
diff --git 
a/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/execution/datasources/Spark3ParsePartitionUtil.scala
 
b/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/execution/datasources/Spark3ParsePartitionUtil.scala
new file mode 100644
index 0000000..ea9cc78
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/execution/datasources/Spark3ParsePartitionUtil.scala
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+import java.util.TimeZone
+
+import org.apache.hadoop.fs.Path
+import org.apache.spark.sql.catalyst.util.{DateFormatter, TimestampFormatter}
+import 
org.apache.spark.sql.execution.datasources.PartitioningUtils.{PartitionValues, 
timestampPartitionPattern}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.DataType
+
+class Spark3ParsePartitionUtil(conf: SQLConf) extends SparkParsePartitionUtil {
+
+  override def parsePartition(path: Path, typeInference: Boolean,
+                              basePaths: Set[Path], userSpecifiedDataTypes: 
Map[String, DataType],
+                              timeZone: TimeZone): Option[PartitionValues] = {
+    val dateFormatter = DateFormatter(timeZone.toZoneId)
+    val timestampFormatter = TimestampFormatter(timestampPartitionPattern,
+      timeZone.toZoneId, isParsing = true)
+
+    PartitioningUtils.parsePartition(path, typeInference, basePaths, 
userSpecifiedDataTypes,
+      conf.validatePartitionColumns, timeZone.toZoneId, dateFormatter, 
timestampFormatter)._1
+  }
+}
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
index 336639c..01a374d 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
@@ -20,6 +20,7 @@ package org.apache.hudi.utilities.deltastreamer;
 
 import org.apache.hudi.DataSourceUtils;
 import org.apache.hudi.HoodieSparkUtils;
+import org.apache.hudi.HoodieWriterUtils;
 import org.apache.hudi.avro.HoodieAvroUtils;
 import org.apache.hudi.client.SparkRDDWriteClient;
 import org.apache.hudi.client.WriteStatus;
@@ -235,12 +236,15 @@ public class DeltaSync implements Serializable {
       }
     } else {
       this.commitTimelineOpt = Option.empty();
+      String partitionColumns = 
HoodieWriterUtils.getPartitionColumns(keyGenerator);
+
       HoodieTableMetaClient.withPropertyBuilder()
           .setTableType(cfg.tableType)
           .setTableName(cfg.targetTableName)
           .setArchiveLogFolder("archived")
           .setPayloadClassName(cfg.payloadClassName)
           .setBaseFileFormat(cfg.baseFileFormat)
+          .setPartitionColumns(partitionColumns)
           .initTable(new Configuration(jssc.hadoopConfiguration()),
             cfg.targetBasePath);
     }
@@ -326,12 +330,14 @@ public class DeltaSync implements Serializable {
         }
       }
     } else {
+      String partitionColumns = 
HoodieWriterUtils.getPartitionColumns(keyGenerator);
       HoodieTableMetaClient.withPropertyBuilder()
           .setTableType(cfg.tableType)
           .setTableName(cfg.targetTableName)
           .setArchiveLogFolder("archived")
           .setPayloadClassName(cfg.payloadClassName)
           .setBaseFileFormat(cfg.baseFileFormat)
+          .setPartitionColumns(partitionColumns)
           .initTable(new Configuration(jssc.hadoopConfiguration()), 
cfg.targetBasePath);
     }
 

Reply via email to