[GitHub] [hudi] pengzhiwei2018 commented on a change in pull request #2651: [HUDI-1591] [RFC-26] Improve Hoodie Table Query Performance And Ease Of Use Fo…

2021-03-31 Thread GitBox


pengzhiwei2018 commented on a change in pull request #2651:
URL: https://github.com/apache/hudi/pull/2651#discussion_r605337987



##
File path: 
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
##
@@ -81,4 +82,53 @@ object HoodieWriterUtils {
 params.foreach(kv => props.setProperty(kv._1, kv._2))
 props
   }
+
+  /**
+   * Get the partition columns to stored to hoodie.properties.
+   * @param parameters
+   * @return
+   */
+  def getPartitionColumns(parameters: Map[String, String]): Option[String] = {
+val  keyGenClass = parameters.getOrElse(KEYGENERATOR_CLASS_OPT_KEY,
+  DEFAULT_KEYGENERATOR_CLASS_OPT_VAL)
+try {
+  val constructor = getClass.getClassLoader.loadClass(keyGenClass)
+.getConstructor(classOf[TypedProperties])
+  constructor.setAccessible(true)
+  val props = new TypedProperties()
+  props.putAll(parameters.asJava)
+  val keyGen = constructor.newInstance(props)

Review comment:
   Reuse the KeyGenerator except the bootstrap method.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] pengzhiwei2018 commented on a change in pull request #2651: [HUDI-1591] [RFC-26] Improve Hoodie Table Query Performance And Ease Of Use Fo…

2021-03-31 Thread GitBox


pengzhiwei2018 commented on a change in pull request #2651:
URL: https://github.com/apache/hudi/pull/2651#discussion_r605330589



##
File path: 
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
##
@@ -81,4 +82,53 @@ object HoodieWriterUtils {
 params.foreach(kv => props.setProperty(kv._1, kv._2))
 props
   }
+
+  /**
+   * Get the partition columns to stored to hoodie.properties.
+   * @param parameters
+   * @return
+   */
+  def getPartitionColumns(parameters: Map[String, String]): Option[String] = {
+val  keyGenClass = parameters.getOrElse(KEYGENERATOR_CLASS_OPT_KEY,
+  DEFAULT_KEYGENERATOR_CLASS_OPT_VAL)
+try {
+  val constructor = getClass.getClassLoader.loadClass(keyGenClass)
+.getConstructor(classOf[TypedProperties])
+  constructor.setAccessible(true)
+  val props = new TypedProperties()
+  props.putAll(parameters.asJava)
+  val keyGen = constructor.newInstance(props)

Review comment:
   Hi @umehrot2 , for bootstrap in `HoodieSparkSqlWriter`, there is no 
KeyGenerator created, so we need to recreating it here.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] pengzhiwei2018 commented on a change in pull request #2651: [HUDI-1591] [RFC-26] Improve Hoodie Table Query Performance And Ease Of Use Fo…

2021-03-30 Thread GitBox


pengzhiwei2018 commented on a change in pull request #2651:
URL: https://github.com/apache/hudi/pull/2651#discussion_r604593860



##
File path: 
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
##
@@ -81,4 +81,33 @@ object HoodieWriterUtils {
 params.foreach(kv => props.setProperty(kv._1, kv._2))
 props
   }
+
+  /**
+   * Get the partition columns to stored to hoodie.properties.
+   * Return the partitionColumns only if it is the key generator class is the 
build-ins.
+   * For other custom key generator class, we cannot know whether or not it 
has relation
+   * with the partition columns.
+   * @param parameters
+   * @return
+   */
+  def getPartitionColumns(parameters: Map[String, String]): Option[String] = {
+val  keyGenClass = parameters.getOrElse(KEYGENERATOR_CLASS_OPT_KEY,
+  DEFAULT_KEYGENERATOR_CLASS_OPT_VAL)
+val partitionColumns = parameters.get(PARTITIONPATH_FIELD_OPT_KEY)
+if (keyGenClass == classOf[SimpleKeyGenerator].getName ||
+keyGenClass == classOf[ComplexKeyGenerator].getName ||
+keyGenClass == classOf[TimestampBasedKeyGenerator].getName) {

Review comment:
   Good suggestion!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] pengzhiwei2018 commented on a change in pull request #2651: [HUDI-1591] [RFC-26] Improve Hoodie Table Query Performance And Ease Of Use Fo…

2021-03-30 Thread GitBox


pengzhiwei2018 commented on a change in pull request #2651:
URL: https://github.com/apache/hudi/pull/2651#discussion_r603959642



##
File path: 
hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
##
@@ -57,6 +58,7 @@
   public static final String HOODIE_TABLE_TYPE_PROP_NAME = "hoodie.table.type";
   public static final String HOODIE_TABLE_VERSION_PROP_NAME = 
"hoodie.table.version";
   public static final String HOODIE_TABLE_PRECOMBINE_FIELD = 
"hoodie.table.precombine.field";
+  public static final String HOODIE_TABLE_PARTITION_COLUMNS = 
"hoodie.table.partition.columns";

Review comment:
   For existing tables without store the partition columns, we query it as 
a non-partitioned table.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] pengzhiwei2018 commented on a change in pull request #2651: [HUDI-1591] [RFC-26] Improve Hoodie Table Query Performance And Ease Of Use Fo…

2021-03-30 Thread GitBox


pengzhiwei2018 commented on a change in pull request #2651:
URL: https://github.com/apache/hudi/pull/2651#discussion_r603865166



##
File path: 
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala
##
@@ -133,25 +132,47 @@ class MergeOnReadSnapshotRelation(val sqlContext: 
SQLContext,
   }
 
   def buildFileIndex(): List[HoodieMergeOnReadFileSplit] = {
-val inMemoryFileIndex = 
HoodieSparkUtils.createInMemoryFileIndex(sqlContext.sparkSession, globPaths)
-val fileStatuses = inMemoryFileIndex.allFiles()
-if (fileStatuses.isEmpty) {
-  throw new HoodieException("No files found for reading in user provided 
path.")
+val fileStatuses = if (globPaths.isDefined) {
+  // Load files from the global paths if it has defined to be compatible 
with the original mode
+  val inMemoryFileIndex = 
HoodieSparkUtils.createInMemoryFileIndex(sqlContext.sparkSession, globPaths.get)
+  inMemoryFileIndex.allFiles()
+} else { // Load files by the HoodieFileIndex.
+  val hoodieFileIndex = HoodieFileIndex(sqlContext.sparkSession, 
metaClient,
+Some(tableStructSchema), optParams, 
FileStatusCache.getOrCreate(sqlContext.sparkSession))
+  hoodieFileIndex.allFiles
 }
 
-val fsView = new HoodieTableFileSystemView(metaClient,
-  metaClient.getActiveTimeline.getCommitsTimeline
-.filterCompletedInstants, fileStatuses.toArray)
-val latestFiles: List[HoodieBaseFile] = 
fsView.getLatestBaseFiles.iterator().asScala.toList
-val latestCommit = fsView.getLastInstant.get().getTimestamp
-val fileGroup = HoodieRealtimeInputFormatUtils.groupLogsByBaseFile(conf, 
latestFiles.asJava).asScala
-val fileSplits = fileGroup.map(kv => {
-  val baseFile = kv._1
-  val logPaths = if (kv._2.isEmpty) Option.empty else 
Option(kv._2.asScala.toList)
-  val partitionedFile = PartitionedFile(InternalRow.empty, 
baseFile.getPath, 0, baseFile.getFileLen)
-  HoodieMergeOnReadFileSplit(Option(partitionedFile), logPaths, 
latestCommit,
-metaClient.getBasePath, maxCompactionMemoryInBytes, mergeType)
-}).toList
-fileSplits
+if (fileStatuses.isEmpty) { // If this an empty table, return an empty 
split list.
+  List.empty[HoodieMergeOnReadFileSplit]
+} else {
+  val fsView = new HoodieTableFileSystemView(metaClient,
+metaClient.getActiveTimeline.getCommitsTimeline
+  .filterCompletedInstants, fileStatuses.toArray)
+  val latestFiles: List[HoodieBaseFile] = 
fsView.getLatestBaseFiles.iterator().asScala.toList
+  val latestCommit = fsView.getLastInstant.get().getTimestamp
+  val fileGroup = HoodieRealtimeInputFormatUtils.groupLogsByBaseFile(conf, 
latestFiles.asJava).asScala
+  val fileSplits = fileGroup.map(kv => {
+val baseFile = kv._1
+val logPaths = if (kv._2.isEmpty) Option.empty else 
Option(kv._2.asScala.toList)
+
+// Here we use the Path#toUri to encode the path string, as there is a 
decode in

Review comment:
   Yes, I will do refactor this code.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] pengzhiwei2018 commented on a change in pull request #2651: [HUDI-1591] [RFC-26] Improve Hoodie Table Query Performance And Ease Of Use Fo…

2021-03-26 Thread GitBox


pengzhiwei2018 commented on a change in pull request #2651:
URL: https://github.com/apache/hudi/pull/2651#discussion_r602102751



##
File path: 
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
##
@@ -0,0 +1,349 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi
+
+import java.util.Properties
+
+import scala.collection.JavaConverters._
+import org.apache.hadoop.fs.{FileStatus, Path}
+import org.apache.hudi.client.common.HoodieSparkEngineContext
+import org.apache.hudi.common.config.{HoodieMetadataConfig, 
SerializableConfiguration}
+import org.apache.hudi.common.engine.HoodieLocalEngineContext
+import org.apache.hudi.common.fs.FSUtils
+import org.apache.hudi.common.model.HoodieBaseFile
+import org.apache.hudi.common.table.{HoodieTableMetaClient, 
TableSchemaResolver}
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView
+import org.apache.hudi.config.HoodieWriteConfig
+import org.apache.spark.api.java.JavaSparkContext
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.{InternalRow, expressions}
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.avro.SchemaConverters
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
BoundReference, Expression, InterpretedPredicate}
+import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils}
+import org.apache.spark.sql.execution.datasources.{FileIndex, FileStatusCache, 
NoopCache, PartitionDirectory, PartitionUtils}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.unsafe.types.UTF8String
+
+import scala.collection.mutable
+
+/**
+  * A File Index which support partition prune for hoodie snapshot and 
read-optimized
+  * query.
+  * Main steps to get the file list for query:
+  * 1、Load all files and partition values from the table path.
+  * 2、Do the partition prune by the partition filter condition.
+  *
+  * There are 3 cases for this:
+  * 1、If the partition columns size is equal to the actually partition path 
level, we
+  * read it as partitioned table.(e.g partition column is "dt", the partition 
path is "2021-03-10")
+  *
+  * 2、If the partition columns size is not equal to the partition path level, 
but the partition
+  * column size is "1" (e.g. partition column is "dt", but the partition path 
is "2021/03/10"
+  * who'es directory level is 3).We can still read it as a partitioned table. 
We will mapping the
+  * partition path (e.g. 2021/03/10) to the only partition column (e.g. "dt").
+  *
+  * 3、Else the the partition columns size is not equal to the partition 
directory level and the
+  * size is great than "1" (e.g. partition column is "dt,hh", the partition 
path is "2021/03/10/12")
+  * , we read it as a None Partitioned table because we cannot know how to 
mapping the partition
+  * path with the partition columns in this case.

Review comment:
   I have add this to the comment of the `prunePartition` method.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] pengzhiwei2018 commented on a change in pull request #2651: [HUDI-1591] [RFC-26] Improve Hoodie Table Query Performance And Ease Of Use Fo…

2021-03-26 Thread GitBox


pengzhiwei2018 commented on a change in pull request #2651:
URL: https://github.com/apache/hudi/pull/2651#discussion_r602095082



##
File path: 
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
##
@@ -0,0 +1,349 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi
+
+import java.util.Properties
+
+import scala.collection.JavaConverters._
+import org.apache.hadoop.fs.{FileStatus, Path}
+import org.apache.hudi.client.common.HoodieSparkEngineContext
+import org.apache.hudi.common.config.{HoodieMetadataConfig, 
SerializableConfiguration}
+import org.apache.hudi.common.engine.HoodieLocalEngineContext
+import org.apache.hudi.common.fs.FSUtils
+import org.apache.hudi.common.model.HoodieBaseFile
+import org.apache.hudi.common.table.{HoodieTableMetaClient, 
TableSchemaResolver}
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView
+import org.apache.hudi.config.HoodieWriteConfig
+import org.apache.spark.api.java.JavaSparkContext
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.{InternalRow, expressions}
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.avro.SchemaConverters
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
BoundReference, Expression, InterpretedPredicate}
+import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils}
+import org.apache.spark.sql.execution.datasources.{FileIndex, FileStatusCache, 
NoopCache, PartitionDirectory, PartitionUtils}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.unsafe.types.UTF8String
+
+import scala.collection.mutable
+
+/**
+  * A File Index which support partition prune for hoodie snapshot and 
read-optimized
+  * query.
+  * Main steps to get the file list for query:
+  * 1、Load all files and partition values from the table path.
+  * 2、Do the partition prune by the partition filter condition.
+  *
+  * There are 3 cases for this:
+  * 1、If the partition columns size is equal to the actually partition path 
level, we
+  * read it as partitioned table.(e.g partition column is "dt", the partition 
path is "2021-03-10")
+  *
+  * 2、If the partition columns size is not equal to the partition path level, 
but the partition
+  * column size is "1" (e.g. partition column is "dt", but the partition path 
is "2021/03/10"
+  * who'es directory level is 3).We can still read it as a partitioned table. 
We will mapping the
+  * partition path (e.g. 2021/03/10) to the only partition column (e.g. "dt").
+  *
+  * 3、Else the the partition columns size is not equal to the partition 
directory level and the
+  * size is great than "1" (e.g. partition column is "dt,hh", the partition 
path is "2021/03/10/12")
+  * , we read it as a None Partitioned table because we cannot know how to 
mapping the partition
+  * path with the partition columns in this case.
+  */
+case class HoodieFileIndex(
+ spark: SparkSession,
+ metaClient: HoodieTableMetaClient,
+ schemaSpec: Option[StructType],
+ options: Map[String, String],
+ @transient fileStatusCache: FileStatusCache = NoopCache)
+  extends FileIndex with Logging {
+
+  private val basePath = metaClient.getBasePath
+
+  @transient private val queryPath = new Path(options.getOrElse("path", 
"'path' option required"))
+  /**
+* Get the schema of the table.
+*/
+  lazy val schema: StructType = schemaSpec.getOrElse({
+val schemaUtil = new TableSchemaResolver(metaClient)
+SchemaConverters.toSqlType(schemaUtil.getTableAvroSchema)
+  .dataType.asInstanceOf[StructType]
+  })
+
+  /**
+* Get the partition schema from the hoodie.properties.
+*/
+  private lazy val _partitionSchemaFromProperties: StructType = {
+val tableConfig = metaClient.getTableConfig
+val partitionColumns = tableConfig.getPartitionColumns
+val nameFieldMap = schema.fields.map(filed => filed.name -> filed).toMap
+
+if (partitionColumns.isPresent) {
+  val partitionFields = partitionColumns.get().map(column =>
+nameFieldMap.getOrElse(column, throw new 
IllegalArgumentException(s"Cannot find column: '" +
+ 

[GitHub] [hudi] pengzhiwei2018 commented on a change in pull request #2651: [HUDI-1591] [RFC-26] Improve Hoodie Table Query Performance And Ease Of Use Fo…

2021-03-26 Thread GitBox


pengzhiwei2018 commented on a change in pull request #2651:
URL: https://github.com/apache/hudi/pull/2651#discussion_r602094839



##
File path: 
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala
##
@@ -79,39 +81,52 @@ class DefaultSource extends RelationProvider
 val allPaths = path.map(p => Seq(p)).getOrElse(Seq()) ++ readPaths
 
 val fs = FSUtils.getFs(allPaths.head, 
sqlContext.sparkContext.hadoopConfiguration)
-val globPaths = HoodieSparkUtils.checkAndGlobPathIfNecessary(allPaths, fs)
-
-val tablePath = DataSourceUtils.getTablePath(fs, globPaths.toArray)
+// Use the HoodieFileIndex only if the 'path' has specified with no "*" 
contains.

Review comment:
   done!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] pengzhiwei2018 commented on a change in pull request #2651: [HUDI-1591] [RFC-26] Improve Hoodie Table Query Performance And Ease Of Use Fo…

2021-03-26 Thread GitBox


pengzhiwei2018 commented on a change in pull request #2651:
URL: https://github.com/apache/hudi/pull/2651#discussion_r602094723



##
File path: hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
##
@@ -276,6 +276,16 @@ public static void processFiles(FileSystem fs, String 
basePathStr, Function

[GitHub] [hudi] pengzhiwei2018 commented on a change in pull request #2651: [HUDI-1591] [RFC-26] Improve Hoodie Table Query Performance And Ease Of Use Fo…

2021-03-25 Thread GitBox


pengzhiwei2018 commented on a change in pull request #2651:
URL: https://github.com/apache/hudi/pull/2651#discussion_r601290387



##
File path: 
hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
##
@@ -193,6 +195,14 @@ public String getPreCombineField() {
 return props.getProperty(HOODIE_TABLE_PRECOMBINE_FIELD);
   }
 
+  public Option getPartitionColumns() {

Review comment:
   For the previous version of hudi, There are no partition column stored 
in the hoodie.properties. So It return an `Option#empty` for this case. We can 
distinguish the case of empty partitions and not store partition columns. I saw 
other property like `getBootstrapBasePath` also use the Option.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] pengzhiwei2018 commented on a change in pull request #2651: [HUDI-1591] [RFC-26] Improve Hoodie Table Query Performance And Ease Of Use Fo…

2021-03-25 Thread GitBox


pengzhiwei2018 commented on a change in pull request #2651:
URL: https://github.com/apache/hudi/pull/2651#discussion_r601225876



##
File path: 
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala
##
@@ -79,39 +81,52 @@ class DefaultSource extends RelationProvider
 val allPaths = path.map(p => Seq(p)).getOrElse(Seq()) ++ readPaths
 
 val fs = FSUtils.getFs(allPaths.head, 
sqlContext.sparkContext.hadoopConfiguration)
-val globPaths = HoodieSparkUtils.checkAndGlobPathIfNecessary(allPaths, fs)
-
-val tablePath = DataSourceUtils.getTablePath(fs, globPaths.toArray)
+// Use the HoodieFileIndex only if the 'path' has specified with no "*" 
contains.
+// And READ_PATHS_OPT_KEY has not specified.
+// Or else we use the original way to read hoodie table.
+val useHoodieFileIndex = path.isDefined && !path.get.contains("*") &&

Review comment:
   Hi @vinothchandar , Currently user query the hoodie table must specify 
some stars at the path.  If the path has not specified the stars, the old way 
of query hoodie table may not work(exception or get no data). So we must use 
the  `HoodieFileIndex` for this case by default.
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] pengzhiwei2018 commented on a change in pull request #2651: [HUDI-1591] [RFC-26] Improve Hoodie Table Query Performance And Ease Of Use Fo…

2021-03-25 Thread GitBox


pengzhiwei2018 commented on a change in pull request #2651:
URL: https://github.com/apache/hudi/pull/2651#discussion_r601225876



##
File path: 
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala
##
@@ -79,39 +81,52 @@ class DefaultSource extends RelationProvider
 val allPaths = path.map(p => Seq(p)).getOrElse(Seq()) ++ readPaths
 
 val fs = FSUtils.getFs(allPaths.head, 
sqlContext.sparkContext.hadoopConfiguration)
-val globPaths = HoodieSparkUtils.checkAndGlobPathIfNecessary(allPaths, fs)
-
-val tablePath = DataSourceUtils.getTablePath(fs, globPaths.toArray)
+// Use the HoodieFileIndex only if the 'path' has specified with no "*" 
contains.
+// And READ_PATHS_OPT_KEY has not specified.
+// Or else we use the original way to read hoodie table.
+val useHoodieFileIndex = path.isDefined && !path.get.contains("*") &&

Review comment:
   Hi @vinothchandar , Currently user query the hoodie table must specify 
some "*" at the path.  If the path has not specified the "*", the old way of 
query hoodie table may not work(exception or get no data). So we must use the  
`HoodieFileIndex` for this case by default.
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] pengzhiwei2018 commented on a change in pull request #2651: [HUDI-1591] [RFC-26] Improve Hoodie Table Query Performance And Ease Of Use Fo…

2021-03-25 Thread GitBox


pengzhiwei2018 commented on a change in pull request #2651:
URL: https://github.com/apache/hudi/pull/2651#discussion_r601220471



##
File path: 
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
##
@@ -112,12 +112,15 @@ private[hudi] object HoodieSparkSqlWriter {
 val archiveLogFolder = parameters.getOrElse(
   HoodieTableConfig.HOODIE_ARCHIVELOG_FOLDER_PROP_NAME, "archived")
 
+val partitionColumns = 
parameters.getOrElse(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, null)

Review comment:
   Hi @vinothchandar , we need get the partition schema and pass it to the 
`HadoopFsRelation`,just like this:
   
   HadoopFsRelation(
   fileIndex,
   fileIndex.partitionSchema,
   fileIndex.dataSchema,
   bucketSpec = None,
   fileFormat = new ParquetFileFormat,
   optParams)(sqlContext.sparkSession)
   
   By the generator, it is hard to construct the partition schema. So we need 
the partition columns.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] pengzhiwei2018 commented on a change in pull request #2651: [HUDI-1591] [RFC-26] Improve Hoodie Table Query Performance And Ease Of Use Fo…

2021-03-25 Thread GitBox


pengzhiwei2018 commented on a change in pull request #2651:
URL: https://github.com/apache/hudi/pull/2651#discussion_r601220471



##
File path: 
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
##
@@ -112,12 +112,15 @@ private[hudi] object HoodieSparkSqlWriter {
 val archiveLogFolder = parameters.getOrElse(
   HoodieTableConfig.HOODIE_ARCHIVELOG_FOLDER_PROP_NAME, "archived")
 
+val partitionColumns = 
parameters.getOrElse(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, null)

Review comment:
   Hi @vinothchandar , we need get the partition schema and pass it to the 
`HadoopFsRelation`,like this:
   
   HadoopFsRelation(
   fileIndex,
   fileIndex.partitionSchema,
   fileIndex.dataSchema,
   bucketSpec = None,
   fileFormat = new ParquetFileFormat,
   optParams)(sqlContext.sparkSession)
   
   By the generator, it is hard to construct the partition schema. So we need 
the partition columns.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] pengzhiwei2018 commented on a change in pull request #2651: [HUDI-1591] [RFC-26] Improve Hoodie Table Query Performance And Ease Of Use Fo…

2021-03-25 Thread GitBox


pengzhiwei2018 commented on a change in pull request #2651:
URL: https://github.com/apache/hudi/pull/2651#discussion_r601220471



##
File path: 
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
##
@@ -112,12 +112,15 @@ private[hudi] object HoodieSparkSqlWriter {
 val archiveLogFolder = parameters.getOrElse(
   HoodieTableConfig.HOODIE_ARCHIVELOG_FOLDER_PROP_NAME, "archived")
 
+val partitionColumns = 
parameters.getOrElse(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, null)

Review comment:
   Hi @vinothchandar , we need get the partition schema and pass it to the 
`HadoopFsRelation`,like this:
   
   HadoopFsRelation(
   fileIndex,
   fileIndex.partitionSchema,
   fileIndex.dataSchema,
   bucketSpec = None,
   fileFormat = new ParquetFileFormat,
   optParams)(sqlContext.sparkSession)
   By the generator, it is hard to construct the partition schema. So we need 
the partition columns.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] pengzhiwei2018 commented on a change in pull request #2651: [HUDI-1591] [RFC-26] Improve Hoodie Table Query Performance And Ease Of Use Fo…

2021-03-25 Thread GitBox


pengzhiwei2018 commented on a change in pull request #2651:
URL: https://github.com/apache/hudi/pull/2651#discussion_r601214645



##
File path: 
hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
##
@@ -57,6 +58,7 @@
   public static final String HOODIE_TABLE_TYPE_PROP_NAME = "hoodie.table.type";
   public static final String HOODIE_TABLE_VERSION_PROP_NAME = 
"hoodie.table.version";
   public static final String HOODIE_TABLE_PRECOMBINE_FIELD = 
"hoodie.table.precombine.field";
+  public static final String HOODIE_TABLE_PARTITION_COLUMNS = 
"hoodie.table.partition.columns";

Review comment:
   Hi @vinothchandar , we need the partition schema for partition prune for 
spark sql. So the partition columns is need for that. Or we cannot get the 
partition schema.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] pengzhiwei2018 commented on a change in pull request #2651: [HUDI-1591] [RFC-26] Improve Hoodie Table Query Performance And Ease Of Use Fo…

2021-03-25 Thread GitBox


pengzhiwei2018 commented on a change in pull request #2651:
URL: https://github.com/apache/hudi/pull/2651#discussion_r601213171



##
File path: hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
##
@@ -276,6 +276,13 @@ public static void processFiles(FileSystem fs, String 
basePathStr, Function

[GitHub] [hudi] pengzhiwei2018 commented on a change in pull request #2651: [HUDI-1591] [RFC-26] Improve Hoodie Table Query Performance And Ease Of Use Fo…

2021-03-25 Thread GitBox


pengzhiwei2018 commented on a change in pull request #2651:
URL: https://github.com/apache/hudi/pull/2651#discussion_r601212827



##
File path: 
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
##
@@ -0,0 +1,349 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi
+
+import java.util.Properties
+
+import scala.collection.JavaConverters._
+import org.apache.hadoop.fs.{FileStatus, Path}
+import org.apache.hudi.client.common.HoodieSparkEngineContext
+import org.apache.hudi.common.config.{HoodieMetadataConfig, 
SerializableConfiguration}
+import org.apache.hudi.common.engine.HoodieLocalEngineContext
+import org.apache.hudi.common.fs.FSUtils
+import org.apache.hudi.common.model.HoodieBaseFile
+import org.apache.hudi.common.table.{HoodieTableMetaClient, 
TableSchemaResolver}
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView
+import org.apache.hudi.config.HoodieWriteConfig
+import org.apache.spark.api.java.JavaSparkContext
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.{InternalRow, expressions}
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.avro.SchemaConverters
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
BoundReference, Expression, InterpretedPredicate}
+import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils}
+import org.apache.spark.sql.execution.datasources.{FileIndex, FileStatusCache, 
NoopCache, PartitionDirectory, PartitionUtils}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.unsafe.types.UTF8String
+
+import scala.collection.mutable
+
+/**
+  * A File Index which support partition prune for hoodie snapshot and 
read-optimized
+  * query.
+  * Main steps to get the file list for query:
+  * 1、Load all files and partition values from the table path.
+  * 2、Do the partition prune by the partition filter condition.
+  *
+  * There are 3 cases for this:
+  * 1、If the partition columns size is equal to the actually partition path 
level, we
+  * read it as partitioned table.(e.g partition column is "dt", the partition 
path is "2021-03-10")
+  *
+  * 2、If the partition columns size is not equal to the partition path level, 
but the partition
+  * column size is "1" (e.g. partition column is "dt", but the partition path 
is "2021/03/10"
+  * who'es directory level is 3).We can still read it as a partitioned table. 
We will mapping the
+  * partition path (e.g. 2021/03/10) to the only partition column (e.g. "dt").
+  *
+  * 3、Else the the partition columns size is not equal to the partition 
directory level and the
+  * size is great than "1" (e.g. partition column is "dt,hh", the partition 
path is "2021/03/10/12")
+  * , we read it as a None Partitioned table because we cannot know how to 
mapping the partition
+  * path with the partition columns in this case.
+  */
+case class HoodieFileIndex(
+ spark: SparkSession,
+ metaClient: HoodieTableMetaClient,
+ schemaSpec: Option[StructType],
+ options: Map[String, String],
+ @transient fileStatusCache: FileStatusCache = NoopCache)
+  extends FileIndex with Logging {
+
+  private val basePath = metaClient.getBasePath
+
+  @transient private val queryPath = new Path(options.getOrElse("path", 
"'path' option required"))
+  /**
+* Get the schema of the table.
+*/
+  lazy val schema: StructType = schemaSpec.getOrElse({
+val schemaUtil = new TableSchemaResolver(metaClient)
+SchemaConverters.toSqlType(schemaUtil.getTableAvroSchema)
+  .dataType.asInstanceOf[StructType]
+  })
+
+  /**
+* Get the partition schema from the hoodie.properties.
+*/
+  private lazy val _partitionSchemaFromProperties: StructType = {
+val tableConfig = metaClient.getTableConfig
+val partitionColumns = tableConfig.getPartitionColumns
+val nameFieldMap = schema.fields.map(filed => filed.name -> filed).toMap
+
+if (partitionColumns.isPresent) {
+  val partitionFields = partitionColumns.get().map(column =>
+nameFieldMap.getOrElse(column, throw new 
IllegalArgumentException(s"Cannot find column: '" +
+ 

[GitHub] [hudi] pengzhiwei2018 commented on a change in pull request #2651: [HUDI-1591] [RFC-26] Improve Hoodie Table Query Performance And Ease Of Use Fo…

2021-03-23 Thread GitBox


pengzhiwei2018 commented on a change in pull request #2651:
URL: https://github.com/apache/hudi/pull/2651#discussion_r599474180



##
File path: 
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
##
@@ -0,0 +1,317 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi
+
+import java.util.Properties
+
+import scala.collection.JavaConverters._
+
+import org.apache.hadoop.fs.{FileStatus, Path}
+import org.apache.hudi.client.common.HoodieSparkEngineContext
+import org.apache.hudi.common.config.{HoodieMetadataConfig, 
SerializableConfiguration}
+import org.apache.hudi.common.engine.HoodieLocalEngineContext
+import org.apache.hudi.common.fs.FSUtils
+import org.apache.hudi.common.model.HoodieBaseFile
+import org.apache.hudi.common.table.{HoodieTableMetaClient, 
TableSchemaResolver}
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView
+import org.apache.hudi.config.HoodieWriteConfig
+import org.apache.spark.api.java.JavaSparkContext
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.{InternalRow, expressions}
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.avro.SchemaConverters
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
BoundReference, Expression, InterpretedPredicate}
+import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils}
+import org.apache.spark.sql.execution.datasources.{FileIndex, 
PartitionDirectory, PartitionUtils}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.unsafe.types.UTF8String
+
+/**
+  * A File Index which support partition prune for hoodie snapshot and 
read-optimized
+  * query.
+  * Main steps to get the file list for query:
+  * 1、Load all files and partition values from the table path.
+  * 2、Do the partition prune by the partition filter condition.
+  *
+  * There are 3 cases for this:
+  * 1、If the partition columns size is equal to the actually partition path 
level, we
+  * read it as partitioned table.(e.g partition column is "dt", the partition 
path is "2021-03-10")
+  *
+  * 2、If the partition columns size is not equal to the partition path level, 
but the partition
+  * column size is "1" (e.g. partition column is "dt", but the partition path 
is "2021/03/10"
+  * who'es directory level is 3).We can still read it as a partitioned table. 
We will mapping the
+  * partition path (e.g. 2021/03/10) to the only partition column (e.g. "dt").
+  *
+  * 3、Else the the partition columns size is not equal to the partition 
directory level and the
+  * size is great than "1" (e.g. partition column is "dt,hh", the partition 
path is "2021/03/10/12")
+  * , we read it as a None Partitioned table because we cannot know how to 
mapping the partition
+  * path with the partition columns in this case.
+  */
+case class HoodieFileIndex(
+ spark: SparkSession,
+ basePath: String,
+ schemaSpec: Option[StructType],
+ options: Map[String, String])
+  extends FileIndex with Logging {
+
+  @transient private val hadoopConf = spark.sessionState.newHadoopConf()
+  private lazy val metaClient = HoodieTableMetaClient
+.builder().setConf(hadoopConf).setBasePath(basePath).build()
+
+  @transient private val queryPath = new Path(options.getOrElse("path", 
"'path' option required"))
+  /**
+* Get the schema of the table.
+*/
+  lazy val schema: StructType = schemaSpec.getOrElse({
+val schemaUtil = new TableSchemaResolver(metaClient)
+SchemaConverters.toSqlType(schemaUtil.getTableAvroSchema)
+  .dataType.asInstanceOf[StructType]
+  })
+
+  /**
+* Get the partition schema from the hoodie.properties.
+*/
+  private lazy val _partitionSchemaFromProperties: StructType = {
+val tableConfig = metaClient.getTableConfig
+val partitionColumns = tableConfig.getPartitionColumns
+val nameFieldMap = schema.fields.map(filed => filed.name -> filed).toMap
+
+if (partitionColumns.isPresent) {
+  val partitionFields = partitionColumns.get().map(column =>
+nameFieldMap.getOrElse(column, throw new 
IllegalArgumentException(s"Cannot find column " 

[GitHub] [hudi] pengzhiwei2018 commented on a change in pull request #2651: [HUDI-1591] [RFC-26] Improve Hoodie Table Query Performance And Ease Of Use Fo…

2021-03-23 Thread GitBox


pengzhiwei2018 commented on a change in pull request #2651:
URL: https://github.com/apache/hudi/pull/2651#discussion_r599296787



##
File path: 
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
##
@@ -0,0 +1,317 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi
+
+import java.util.Properties
+
+import scala.collection.JavaConverters._
+
+import org.apache.hadoop.fs.{FileStatus, Path}
+import org.apache.hudi.client.common.HoodieSparkEngineContext
+import org.apache.hudi.common.config.{HoodieMetadataConfig, 
SerializableConfiguration}
+import org.apache.hudi.common.engine.HoodieLocalEngineContext
+import org.apache.hudi.common.fs.FSUtils
+import org.apache.hudi.common.model.HoodieBaseFile
+import org.apache.hudi.common.table.{HoodieTableMetaClient, 
TableSchemaResolver}
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView
+import org.apache.hudi.config.HoodieWriteConfig
+import org.apache.spark.api.java.JavaSparkContext
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.{InternalRow, expressions}
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.avro.SchemaConverters
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
BoundReference, Expression, InterpretedPredicate}
+import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils}
+import org.apache.spark.sql.execution.datasources.{FileIndex, 
PartitionDirectory, PartitionUtils}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.unsafe.types.UTF8String
+
+/**
+  * A File Index which support partition prune for hoodie snapshot and 
read-optimized
+  * query.
+  * Main steps to get the file list for query:
+  * 1、Load all files and partition values from the table path.
+  * 2、Do the partition prune by the partition filter condition.
+  *
+  * There are 3 cases for this:
+  * 1、If the partition columns size is equal to the actually partition path 
level, we
+  * read it as partitioned table.(e.g partition column is "dt", the partition 
path is "2021-03-10")
+  *
+  * 2、If the partition columns size is not equal to the partition path level, 
but the partition
+  * column size is "1" (e.g. partition column is "dt", but the partition path 
is "2021/03/10"
+  * who'es directory level is 3).We can still read it as a partitioned table. 
We will mapping the
+  * partition path (e.g. 2021/03/10) to the only partition column (e.g. "dt").
+  *
+  * 3、Else the the partition columns size is not equal to the partition 
directory level and the
+  * size is great than "1" (e.g. partition column is "dt,hh", the partition 
path is "2021/03/10/12")
+  * , we read it as a None Partitioned table because we cannot know how to 
mapping the partition
+  * path with the partition columns in this case.
+  */
+case class HoodieFileIndex(
+ spark: SparkSession,
+ basePath: String,
+ schemaSpec: Option[StructType],
+ options: Map[String, String])
+  extends FileIndex with Logging {
+
+  @transient private val hadoopConf = spark.sessionState.newHadoopConf()
+  private lazy val metaClient = HoodieTableMetaClient
+.builder().setConf(hadoopConf).setBasePath(basePath).build()
+
+  @transient private val queryPath = new Path(options.getOrElse("path", 
"'path' option required"))
+  /**
+* Get the schema of the table.
+*/
+  lazy val schema: StructType = schemaSpec.getOrElse({
+val schemaUtil = new TableSchemaResolver(metaClient)
+SchemaConverters.toSqlType(schemaUtil.getTableAvroSchema)
+  .dataType.asInstanceOf[StructType]
+  })
+
+  /**
+* Get the partition schema from the hoodie.properties.
+*/
+  private lazy val _partitionSchemaFromProperties: StructType = {
+val tableConfig = metaClient.getTableConfig
+val partitionColumns = tableConfig.getPartitionColumns
+val nameFieldMap = schema.fields.map(filed => filed.name -> filed).toMap
+
+if (partitionColumns.isPresent) {
+  val partitionFields = partitionColumns.get().map(column =>
+nameFieldMap.getOrElse(column, throw new 
IllegalArgumentException(s"Cannot find column " 

[GitHub] [hudi] pengzhiwei2018 commented on a change in pull request #2651: [HUDI-1591] [RFC-26] Improve Hoodie Table Query Performance And Ease Of Use Fo…

2021-03-23 Thread GitBox


pengzhiwei2018 commented on a change in pull request #2651:
URL: https://github.com/apache/hudi/pull/2651#discussion_r599319824



##
File path: 
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
##
@@ -0,0 +1,317 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi
+
+import java.util.Properties
+
+import scala.collection.JavaConverters._
+
+import org.apache.hadoop.fs.{FileStatus, Path}
+import org.apache.hudi.client.common.HoodieSparkEngineContext
+import org.apache.hudi.common.config.{HoodieMetadataConfig, 
SerializableConfiguration}
+import org.apache.hudi.common.engine.HoodieLocalEngineContext
+import org.apache.hudi.common.fs.FSUtils
+import org.apache.hudi.common.model.HoodieBaseFile
+import org.apache.hudi.common.table.{HoodieTableMetaClient, 
TableSchemaResolver}
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView
+import org.apache.hudi.config.HoodieWriteConfig
+import org.apache.spark.api.java.JavaSparkContext
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.{InternalRow, expressions}
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.avro.SchemaConverters
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
BoundReference, Expression, InterpretedPredicate}
+import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils}
+import org.apache.spark.sql.execution.datasources.{FileIndex, 
PartitionDirectory, PartitionUtils}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.unsafe.types.UTF8String
+
+/**
+  * A File Index which support partition prune for hoodie snapshot and 
read-optimized
+  * query.
+  * Main steps to get the file list for query:
+  * 1、Load all files and partition values from the table path.
+  * 2、Do the partition prune by the partition filter condition.
+  *
+  * There are 3 cases for this:
+  * 1、If the partition columns size is equal to the actually partition path 
level, we
+  * read it as partitioned table.(e.g partition column is "dt", the partition 
path is "2021-03-10")
+  *
+  * 2、If the partition columns size is not equal to the partition path level, 
but the partition
+  * column size is "1" (e.g. partition column is "dt", but the partition path 
is "2021/03/10"
+  * who'es directory level is 3).We can still read it as a partitioned table. 
We will mapping the
+  * partition path (e.g. 2021/03/10) to the only partition column (e.g. "dt").
+  *
+  * 3、Else the the partition columns size is not equal to the partition 
directory level and the
+  * size is great than "1" (e.g. partition column is "dt,hh", the partition 
path is "2021/03/10/12")
+  * , we read it as a None Partitioned table because we cannot know how to 
mapping the partition
+  * path with the partition columns in this case.
+  */
+case class HoodieFileIndex(

Review comment:
   That is a good idea. It will improve the query performance for the 
across queries.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] pengzhiwei2018 commented on a change in pull request #2651: [HUDI-1591] [RFC-26] Improve Hoodie Table Query Performance And Ease Of Use Fo…

2021-03-23 Thread GitBox


pengzhiwei2018 commented on a change in pull request #2651:
URL: https://github.com/apache/hudi/pull/2651#discussion_r599318844



##
File path: 
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala
##
@@ -79,39 +82,58 @@ class DefaultSource extends RelationProvider
 val allPaths = path.map(p => Seq(p)).getOrElse(Seq()) ++ readPaths
 
 val fs = FSUtils.getFs(allPaths.head, 
sqlContext.sparkContext.hadoopConfiguration)
-val globPaths = HoodieSparkUtils.checkAndGlobPathIfNecessary(allPaths, fs)
-
-val tablePath = DataSourceUtils.getTablePath(fs, globPaths.toArray)
+// Use the HoodieFileIndex only if the 'path' has specified with no "*" 
contains.
+// And READ_PATHS_OPT_KEY has not specified.
+// Or else we use the original way to read hoodie table.
+val useHoodieFileIndex = path.isDefined && !path.get.contains("*") &&
+  !parameters.contains(DataSourceReadOptions.READ_PATHS_OPT_KEY)
+val globPaths = if (useHoodieFileIndex) {
+  None
+} else {
+  Some(HoodieSparkUtils.checkAndGlobPathIfNecessary(allPaths, fs))
+}
+// Get the table base path
+val tablePath = if (globPaths.isDefined) {
+  DataSourceUtils.getTablePath(fs, globPaths.get.toArray)
+} else {
+  val tablePathOpt = TablePathUtils.getTablePath(fs, new Path(path.get))
+  if (tablePathOpt.isPresent) {
+tablePathOpt.get().toString
+  } else {
+throw new HoodieException(s"Failed to get the table path, " +
+  s"Maybe ${allPaths.head} is not a hudi table path")
+  }

Review comment:
   Yes, I will refactor this code.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] pengzhiwei2018 commented on a change in pull request #2651: [HUDI-1591] [RFC-26] Improve Hoodie Table Query Performance And Ease Of Use Fo…

2021-03-23 Thread GitBox


pengzhiwei2018 commented on a change in pull request #2651:
URL: https://github.com/apache/hudi/pull/2651#discussion_r599303993



##
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
##
@@ -179,6 +179,9 @@
   public static final String EXTERNAL_RECORD_AND_SCHEMA_TRANSFORMATION = 
AVRO_SCHEMA + ".externalTransformation";
   public static final String DEFAULT_EXTERNAL_RECORD_AND_SCHEMA_TRANSFORMATION 
= "false";
 
+  public static final String  MAX_LISTING_PARALLELISM = 
"hoodie.max.list.file.parallelism";
+  public static final Integer DEFAULT_MAX_LISTING_PARALLELISM = 200;

Review comment:
   There is a `getFileListingParallelism` already exists in the 
`HoodieWriteConfig`, I can use that.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] pengzhiwei2018 commented on a change in pull request #2651: [HUDI-1591] [RFC-26] Improve Hoodie Table Query Performance And Ease Of Use Fo…

2021-03-23 Thread GitBox


pengzhiwei2018 commented on a change in pull request #2651:
URL: https://github.com/apache/hudi/pull/2651#discussion_r599297507



##
File path: 
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala
##
@@ -162,18 +184,26 @@ class DefaultSource extends RelationProvider
 
   override def shortName(): String = "hudi"
 
-  private def getBaseFileOnlyView(sqlContext: SQLContext,
+  private def getBaseFileOnlyView(useHoodieFileIndex: Boolean,
+  sqlContext: SQLContext,
   optParams: Map[String, String],
   schema: StructType,
+  tablePath: String,
   extraReadPaths: Seq[String],
-  isBootstrappedTable: Boolean,
-  globPaths: Seq[Path],
   metaClient: HoodieTableMetaClient): 
BaseRelation = {
 log.warn("Loading Base File Only View.")
+if (useHoodieFileIndex) {
+  val fileIndex = HoodieFileIndex(sqlContext.sparkSession, tablePath,
+if (schema == null) Option.empty[StructType] else Some(schema), 
optParams)
 
-if (isBootstrappedTable) {
-  // For bootstrapped tables, use our custom Spark relation for querying
-  new HoodieBootstrapRelation(sqlContext, schema, globPaths, metaClient, 
optParams)
+  log.info("Constructing hoodie (as parquet) data source with options :" + 
optParams)

Review comment:
   I am ok about this!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] pengzhiwei2018 commented on a change in pull request #2651: [HUDI-1591] [RFC-26] Improve Hoodie Table Query Performance And Ease Of Use Fo…

2021-03-23 Thread GitBox


pengzhiwei2018 commented on a change in pull request #2651:
URL: https://github.com/apache/hudi/pull/2651#discussion_r599297333



##
File path: 
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
##
@@ -0,0 +1,317 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi
+
+import java.util.Properties
+
+import scala.collection.JavaConverters._
+
+import org.apache.hadoop.fs.{FileStatus, Path}
+import org.apache.hudi.client.common.HoodieSparkEngineContext
+import org.apache.hudi.common.config.{HoodieMetadataConfig, 
SerializableConfiguration}
+import org.apache.hudi.common.engine.HoodieLocalEngineContext
+import org.apache.hudi.common.fs.FSUtils
+import org.apache.hudi.common.model.HoodieBaseFile
+import org.apache.hudi.common.table.{HoodieTableMetaClient, 
TableSchemaResolver}
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView
+import org.apache.hudi.config.HoodieWriteConfig
+import org.apache.spark.api.java.JavaSparkContext
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.{InternalRow, expressions}
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.avro.SchemaConverters
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
BoundReference, Expression, InterpretedPredicate}
+import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils}
+import org.apache.spark.sql.execution.datasources.{FileIndex, 
PartitionDirectory, PartitionUtils}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.unsafe.types.UTF8String
+
+/**
+  * A File Index which support partition prune for hoodie snapshot and 
read-optimized
+  * query.
+  * Main steps to get the file list for query:
+  * 1、Load all files and partition values from the table path.
+  * 2、Do the partition prune by the partition filter condition.
+  *
+  * There are 3 cases for this:
+  * 1、If the partition columns size is equal to the actually partition path 
level, we
+  * read it as partitioned table.(e.g partition column is "dt", the partition 
path is "2021-03-10")
+  *
+  * 2、If the partition columns size is not equal to the partition path level, 
but the partition
+  * column size is "1" (e.g. partition column is "dt", but the partition path 
is "2021/03/10"
+  * who'es directory level is 3).We can still read it as a partitioned table. 
We will mapping the
+  * partition path (e.g. 2021/03/10) to the only partition column (e.g. "dt").
+  *
+  * 3、Else the the partition columns size is not equal to the partition 
directory level and the
+  * size is great than "1" (e.g. partition column is "dt,hh", the partition 
path is "2021/03/10/12")
+  * , we read it as a None Partitioned table because we cannot know how to 
mapping the partition
+  * path with the partition columns in this case.
+  */
+case class HoodieFileIndex(
+ spark: SparkSession,
+ basePath: String,
+ schemaSpec: Option[StructType],
+ options: Map[String, String])
+  extends FileIndex with Logging {
+
+  @transient private val hadoopConf = spark.sessionState.newHadoopConf()
+  private lazy val metaClient = HoodieTableMetaClient
+.builder().setConf(hadoopConf).setBasePath(basePath).build()

Review comment:
   Good suggestion!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] pengzhiwei2018 commented on a change in pull request #2651: [HUDI-1591] [RFC-26] Improve Hoodie Table Query Performance And Ease Of Use Fo…

2021-03-23 Thread GitBox


pengzhiwei2018 commented on a change in pull request #2651:
URL: https://github.com/apache/hudi/pull/2651#discussion_r599296787



##
File path: 
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
##
@@ -0,0 +1,317 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi
+
+import java.util.Properties
+
+import scala.collection.JavaConverters._
+
+import org.apache.hadoop.fs.{FileStatus, Path}
+import org.apache.hudi.client.common.HoodieSparkEngineContext
+import org.apache.hudi.common.config.{HoodieMetadataConfig, 
SerializableConfiguration}
+import org.apache.hudi.common.engine.HoodieLocalEngineContext
+import org.apache.hudi.common.fs.FSUtils
+import org.apache.hudi.common.model.HoodieBaseFile
+import org.apache.hudi.common.table.{HoodieTableMetaClient, 
TableSchemaResolver}
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView
+import org.apache.hudi.config.HoodieWriteConfig
+import org.apache.spark.api.java.JavaSparkContext
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.{InternalRow, expressions}
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.avro.SchemaConverters
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
BoundReference, Expression, InterpretedPredicate}
+import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils}
+import org.apache.spark.sql.execution.datasources.{FileIndex, 
PartitionDirectory, PartitionUtils}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.unsafe.types.UTF8String
+
+/**
+  * A File Index which support partition prune for hoodie snapshot and 
read-optimized
+  * query.
+  * Main steps to get the file list for query:
+  * 1、Load all files and partition values from the table path.
+  * 2、Do the partition prune by the partition filter condition.
+  *
+  * There are 3 cases for this:
+  * 1、If the partition columns size is equal to the actually partition path 
level, we
+  * read it as partitioned table.(e.g partition column is "dt", the partition 
path is "2021-03-10")
+  *
+  * 2、If the partition columns size is not equal to the partition path level, 
but the partition
+  * column size is "1" (e.g. partition column is "dt", but the partition path 
is "2021/03/10"
+  * who'es directory level is 3).We can still read it as a partitioned table. 
We will mapping the
+  * partition path (e.g. 2021/03/10) to the only partition column (e.g. "dt").
+  *
+  * 3、Else the the partition columns size is not equal to the partition 
directory level and the
+  * size is great than "1" (e.g. partition column is "dt,hh", the partition 
path is "2021/03/10/12")
+  * , we read it as a None Partitioned table because we cannot know how to 
mapping the partition
+  * path with the partition columns in this case.
+  */
+case class HoodieFileIndex(
+ spark: SparkSession,
+ basePath: String,
+ schemaSpec: Option[StructType],
+ options: Map[String, String])
+  extends FileIndex with Logging {
+
+  @transient private val hadoopConf = spark.sessionState.newHadoopConf()
+  private lazy val metaClient = HoodieTableMetaClient
+.builder().setConf(hadoopConf).setBasePath(basePath).build()
+
+  @transient private val queryPath = new Path(options.getOrElse("path", 
"'path' option required"))
+  /**
+* Get the schema of the table.
+*/
+  lazy val schema: StructType = schemaSpec.getOrElse({
+val schemaUtil = new TableSchemaResolver(metaClient)
+SchemaConverters.toSqlType(schemaUtil.getTableAvroSchema)
+  .dataType.asInstanceOf[StructType]
+  })
+
+  /**
+* Get the partition schema from the hoodie.properties.
+*/
+  private lazy val _partitionSchemaFromProperties: StructType = {
+val tableConfig = metaClient.getTableConfig
+val partitionColumns = tableConfig.getPartitionColumns
+val nameFieldMap = schema.fields.map(filed => filed.name -> filed).toMap
+
+if (partitionColumns.isPresent) {
+  val partitionFields = partitionColumns.get().map(column =>
+nameFieldMap.getOrElse(column, throw new 
IllegalArgumentException(s"Cannot find column " 

[GitHub] [hudi] pengzhiwei2018 commented on a change in pull request #2651: [HUDI-1591] [RFC-26] Improve Hoodie Table Query Performance And Ease Of Use Fo…

2021-03-23 Thread GitBox


pengzhiwei2018 commented on a change in pull request #2651:
URL: https://github.com/apache/hudi/pull/2651#discussion_r599292364



##
File path: 
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
##
@@ -0,0 +1,317 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi
+
+import java.util.Properties
+
+import scala.collection.JavaConverters._
+
+import org.apache.hadoop.fs.{FileStatus, Path}
+import org.apache.hudi.client.common.HoodieSparkEngineContext
+import org.apache.hudi.common.config.{HoodieMetadataConfig, 
SerializableConfiguration}
+import org.apache.hudi.common.engine.HoodieLocalEngineContext
+import org.apache.hudi.common.fs.FSUtils
+import org.apache.hudi.common.model.HoodieBaseFile
+import org.apache.hudi.common.table.{HoodieTableMetaClient, 
TableSchemaResolver}
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView
+import org.apache.hudi.config.HoodieWriteConfig
+import org.apache.spark.api.java.JavaSparkContext
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.{InternalRow, expressions}
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.avro.SchemaConverters
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
BoundReference, Expression, InterpretedPredicate}
+import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils}
+import org.apache.spark.sql.execution.datasources.{FileIndex, 
PartitionDirectory, PartitionUtils}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.unsafe.types.UTF8String
+
+/**
+  * A File Index which support partition prune for hoodie snapshot and 
read-optimized
+  * query.
+  * Main steps to get the file list for query:
+  * 1、Load all files and partition values from the table path.
+  * 2、Do the partition prune by the partition filter condition.
+  *
+  * There are 3 cases for this:
+  * 1、If the partition columns size is equal to the actually partition path 
level, we
+  * read it as partitioned table.(e.g partition column is "dt", the partition 
path is "2021-03-10")
+  *
+  * 2、If the partition columns size is not equal to the partition path level, 
but the partition
+  * column size is "1" (e.g. partition column is "dt", but the partition path 
is "2021/03/10"
+  * who'es directory level is 3).We can still read it as a partitioned table. 
We will mapping the
+  * partition path (e.g. 2021/03/10) to the only partition column (e.g. "dt").
+  *
+  * 3、Else the the partition columns size is not equal to the partition 
directory level and the
+  * size is great than "1" (e.g. partition column is "dt,hh", the partition 
path is "2021/03/10/12")
+  * , we read it as a None Partitioned table because we cannot know how to 
mapping the partition
+  * path with the partition columns in this case.
+  */
+case class HoodieFileIndex(
+ spark: SparkSession,
+ basePath: String,
+ schemaSpec: Option[StructType],
+ options: Map[String, String])
+  extends FileIndex with Logging {
+
+  @transient private val hadoopConf = spark.sessionState.newHadoopConf()
+  private lazy val metaClient = HoodieTableMetaClient
+.builder().setConf(hadoopConf).setBasePath(basePath).build()
+
+  @transient private val queryPath = new Path(options.getOrElse("path", 
"'path' option required"))
+  /**
+* Get the schema of the table.
+*/
+  lazy val schema: StructType = schemaSpec.getOrElse({
+val schemaUtil = new TableSchemaResolver(metaClient)
+SchemaConverters.toSqlType(schemaUtil.getTableAvroSchema)
+  .dataType.asInstanceOf[StructType]
+  })
+
+  /**
+* Get the partition schema from the hoodie.properties.
+*/
+  private lazy val _partitionSchemaFromProperties: StructType = {
+val tableConfig = metaClient.getTableConfig
+val partitionColumns = tableConfig.getPartitionColumns
+val nameFieldMap = schema.fields.map(filed => filed.name -> filed).toMap
+
+if (partitionColumns.isPresent) {
+  val partitionFields = partitionColumns.get().map(column =>
+nameFieldMap.getOrElse(column, throw new 
IllegalArgumentException(s"Cannot find column " 

[GitHub] [hudi] pengzhiwei2018 commented on a change in pull request #2651: [HUDI-1591] [RFC-26] Improve Hoodie Table Query Performance And Ease Of Use Fo…

2021-03-22 Thread GitBox


pengzhiwei2018 commented on a change in pull request #2651:
URL: https://github.com/apache/hudi/pull/2651#discussion_r599288753



##
File path: 
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
##
@@ -112,12 +112,15 @@ private[hudi] object HoodieSparkSqlWriter {
 val archiveLogFolder = parameters.getOrElse(
   HoodieTableConfig.HOODIE_ARCHIVELOG_FOLDER_PROP_NAME, "archived")
 
+val partitionColumns = 
parameters.getOrElse(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, null)
+
 val tableMetaClient = HoodieTableMetaClient.withPropertyBuilder()
   .setTableType(tableType)
   .setTableName(tblName)
   .setArchiveLogFolder(archiveLogFolder)
   .setPayloadClassName(parameters(PAYLOAD_CLASS_OPT_KEY))
   
.setPreCombineField(parameters.getOrDefault(PRECOMBINE_FIELD_OPT_KEY, null))
+  .setPartitionColumns(partitionColumns)

Review comment:
   Thank you for reminding me about this.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] pengzhiwei2018 commented on a change in pull request #2651: [HUDI-1591] [RFC-26] Improve Hoodie Table Query Performance And Ease Of Use Fo…

2021-03-22 Thread GitBox


pengzhiwei2018 commented on a change in pull request #2651:
URL: https://github.com/apache/hudi/pull/2651#discussion_r599287887



##
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
##
@@ -179,6 +179,9 @@
   public static final String EXTERNAL_RECORD_AND_SCHEMA_TRANSFORMATION = 
AVRO_SCHEMA + ".externalTransformation";
   public static final String DEFAULT_EXTERNAL_RECORD_AND_SCHEMA_TRANSFORMATION 
= "false";
 
+  public static final String  MAX_LISTING_PARALLELISM = 
"hoodie.max.list.file.parallelism";
+  public static final Integer DEFAULT_MAX_LISTING_PARALLELISM = 200;

Review comment:
   Good suggestions!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] pengzhiwei2018 commented on a change in pull request #2651: [HUDI-1591] [RFC-26] Improve Hoodie Table Query Performance And Ease Of Use Fo…

2021-03-14 Thread GitBox


pengzhiwei2018 commented on a change in pull request #2651:
URL: https://github.com/apache/hudi/pull/2651#discussion_r594023224



##
File path: 
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala
##
@@ -79,39 +82,58 @@ class DefaultSource extends RelationProvider
 val allPaths = path.map(p => Seq(p)).getOrElse(Seq()) ++ readPaths
 
 val fs = FSUtils.getFs(allPaths.head, 
sqlContext.sparkContext.hadoopConfiguration)
-val globPaths = HoodieSparkUtils.checkAndGlobPathIfNecessary(allPaths, fs)
-
-val tablePath = DataSourceUtils.getTablePath(fs, globPaths.toArray)
+// Use the HoodieFileIndex only if the 'path' has specified with no "*" 
contains.
+// And READ_PATHS_OPT_KEY has not specified.
+// Or else we use the original way to read hoodie table.

Review comment:
   A Jira has opened at  
[HUDI-1689](https://issues.apache.org/jira/browse/HUDI-1689) 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] pengzhiwei2018 commented on a change in pull request #2651: [HUDI-1591] [RFC-26] Improve Hoodie Table Query Performance And Ease Of Use Fo…

2021-03-12 Thread GitBox


pengzhiwei2018 commented on a change in pull request #2651:
URL: https://github.com/apache/hudi/pull/2651#discussion_r593682460



##
File path: 
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
##
@@ -0,0 +1,273 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi
+
+import java.util.Properties
+
+import scala.collection.JavaConverters._
+import org.apache.hadoop.fs.{FileStatus, Path}
+import org.apache.hudi.client.common.HoodieSparkEngineContext
+import org.apache.hudi.common.config.{HoodieMetadataConfig, 
SerializableConfiguration}
+import org.apache.hudi.common.fs.FSUtils
+import org.apache.hudi.common.model.HoodieBaseFile
+import org.apache.hudi.common.table.{HoodieTableMetaClient, 
TableSchemaResolver}
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView
+import org.apache.hudi.config.HoodieWriteConfig
+import org.apache.spark.api.java.JavaSparkContext
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.{InternalRow, expressions}
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.avro.SchemaConverters
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
BoundReference, Expression, InterpretedPredicate}
+import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils}
+import org.apache.spark.sql.execution.datasources.{FileIndex, 
PartitionDirectory, PartitionUtils}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.StructType
+
+/**
+  * A File Index which support partition prune for hoodie snapshot and 
read-optimized
+  * query.
+  * Main steps to get the file list for query:
+  * 1、Load all files and partition values from the table path.
+  * 2、Do the partition prune by the partition filter condition.
+  *
+  * Note:
+  * Only when the URL_ENCODE_PARTITIONING_OPT_KEY is enable, we can store the 
partition columns
+  * to the hoodie.properties in HoodieSqlWriter when write table. So that the 
query can benefit
+  * from the partition prune.
+  */
+case class HoodieFileIndex(
+ spark: SparkSession,
+ basePath: String,
+ schemaSpec: Option[StructType],
+ options: Map[String, String])
+  extends FileIndex with Logging {
+
+  @transient private val hadoopConf = spark.sessionState.newHadoopConf()
+  private lazy val metaClient = HoodieTableMetaClient
+.builder().setConf(hadoopConf).setBasePath(basePath).build()
+
+  @transient private val queryPath = new Path(options.getOrElse("path", 
"'path' option required"))
+  /**
+* Get the schema of the table.
+*/
+  lazy val schema: StructType = schemaSpec.getOrElse({
+val schemaUtil = new TableSchemaResolver(metaClient)
+SchemaConverters.toSqlType(schemaUtil.getTableAvroSchema)
+  .dataType.asInstanceOf[StructType]
+  })
+
+  /**
+* Get the partition schema.
+*/
+  private lazy val _partitionSchema: StructType = {
+val tableConfig = metaClient.getTableConfig
+val partitionColumns = tableConfig.getPartitionColumns
+val nameFieldMap = schema.fields.map(filed => filed.name -> filed).toMap
+// If the URL_ENCODE_PARTITIONING_OPT_KEY has enable, the partition schema 
will stored in
+// hoodie.properties, So we can benefit from the partition prune.
+if (partitionColumns.isPresent) {
+  val partitionFields = partitionColumns.get().map(column =>
+nameFieldMap.getOrElse(column, throw new 
IllegalArgumentException(s"Cannot find column " +
+  s"$column in the schema[${schema.fields.mkString(",")}]")))
+  new StructType(partitionFields)
+} else { // If the URL_ENCODE_PARTITIONING_OPT_KEY is disable, we trait it 
as a
+  // none-partitioned table.
+  new StructType()
+}
+  }
+
+  @transient @volatile private var fileSystemView: HoodieTableFileSystemView = 
_
+  @transient @volatile private var cachedAllInputFiles: Array[HoodieBaseFile] 
= _
+  @transient @volatile private var cachedFileSize: Long = 0L
+  @transient @volatile private var cachedAllPartitionPaths: Seq[PartitionPath] 
= _
+
+  refresh()
+
+  override def rootPaths: Seq[Path] = queryPath :: Nil
+
+  override def listFiles(partitionFilters: 

[GitHub] [hudi] pengzhiwei2018 commented on a change in pull request #2651: [HUDI-1591] [RFC-26] Improve Hoodie Table Query Performance And Ease Of Use Fo…

2021-03-12 Thread GitBox


pengzhiwei2018 commented on a change in pull request #2651:
URL: https://github.com/apache/hudi/pull/2651#discussion_r593551029



##
File path: 
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
##
@@ -0,0 +1,273 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi
+
+import java.util.Properties
+
+import scala.collection.JavaConverters._
+import org.apache.hadoop.fs.{FileStatus, Path}
+import org.apache.hudi.client.common.HoodieSparkEngineContext
+import org.apache.hudi.common.config.{HoodieMetadataConfig, 
SerializableConfiguration}
+import org.apache.hudi.common.fs.FSUtils
+import org.apache.hudi.common.model.HoodieBaseFile
+import org.apache.hudi.common.table.{HoodieTableMetaClient, 
TableSchemaResolver}
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView
+import org.apache.hudi.config.HoodieWriteConfig
+import org.apache.spark.api.java.JavaSparkContext
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.{InternalRow, expressions}
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.avro.SchemaConverters
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
BoundReference, Expression, InterpretedPredicate}
+import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils}
+import org.apache.spark.sql.execution.datasources.{FileIndex, 
PartitionDirectory, PartitionUtils}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.StructType
+
+/**
+  * A File Index which support partition prune for hoodie snapshot and 
read-optimized
+  * query.
+  * Main steps to get the file list for query:
+  * 1、Load all files and partition values from the table path.
+  * 2、Do the partition prune by the partition filter condition.
+  *
+  * Note:
+  * Only when the URL_ENCODE_PARTITIONING_OPT_KEY is enable, we can store the 
partition columns
+  * to the hoodie.properties in HoodieSqlWriter when write table. So that the 
query can benefit
+  * from the partition prune.
+  */
+case class HoodieFileIndex(
+ spark: SparkSession,
+ basePath: String,
+ schemaSpec: Option[StructType],
+ options: Map[String, String])
+  extends FileIndex with Logging {
+
+  @transient private val hadoopConf = spark.sessionState.newHadoopConf()
+  private lazy val metaClient = HoodieTableMetaClient
+.builder().setConf(hadoopConf).setBasePath(basePath).build()
+
+  @transient private val queryPath = new Path(options.getOrElse("path", 
"'path' option required"))
+  /**
+* Get the schema of the table.
+*/
+  lazy val schema: StructType = schemaSpec.getOrElse({
+val schemaUtil = new TableSchemaResolver(metaClient)
+SchemaConverters.toSqlType(schemaUtil.getTableAvroSchema)
+  .dataType.asInstanceOf[StructType]
+  })
+
+  /**
+* Get the partition schema.
+*/
+  private lazy val _partitionSchema: StructType = {
+val tableConfig = metaClient.getTableConfig
+val partitionColumns = tableConfig.getPartitionColumns
+val nameFieldMap = schema.fields.map(filed => filed.name -> filed).toMap
+// If the URL_ENCODE_PARTITIONING_OPT_KEY has enable, the partition schema 
will stored in
+// hoodie.properties, So we can benefit from the partition prune.
+if (partitionColumns.isPresent) {
+  val partitionFields = partitionColumns.get().map(column =>
+nameFieldMap.getOrElse(column, throw new 
IllegalArgumentException(s"Cannot find column " +
+  s"$column in the schema[${schema.fields.mkString(",")}]")))
+  new StructType(partitionFields)
+} else { // If the URL_ENCODE_PARTITIONING_OPT_KEY is disable, we trait it 
as a
+  // none-partitioned table.
+  new StructType()
+}
+  }
+
+  @transient @volatile private var fileSystemView: HoodieTableFileSystemView = 
_
+  @transient @volatile private var cachedAllInputFiles: Array[HoodieBaseFile] 
= _
+  @transient @volatile private var cachedFileSize: Long = 0L
+  @transient @volatile private var cachedAllPartitionPaths: Seq[PartitionPath] 
= _
+
+  refresh()
+
+  override def rootPaths: Seq[Path] = queryPath :: Nil
+
+  override def listFiles(partitionFilters: 

[GitHub] [hudi] pengzhiwei2018 commented on a change in pull request #2651: [HUDI-1591] [RFC-26] Improve Hoodie Table Query Performance And Ease Of Use Fo…

2021-03-12 Thread GitBox


pengzhiwei2018 commented on a change in pull request #2651:
URL: https://github.com/apache/hudi/pull/2651#discussion_r593551029



##
File path: 
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
##
@@ -0,0 +1,273 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi
+
+import java.util.Properties
+
+import scala.collection.JavaConverters._
+import org.apache.hadoop.fs.{FileStatus, Path}
+import org.apache.hudi.client.common.HoodieSparkEngineContext
+import org.apache.hudi.common.config.{HoodieMetadataConfig, 
SerializableConfiguration}
+import org.apache.hudi.common.fs.FSUtils
+import org.apache.hudi.common.model.HoodieBaseFile
+import org.apache.hudi.common.table.{HoodieTableMetaClient, 
TableSchemaResolver}
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView
+import org.apache.hudi.config.HoodieWriteConfig
+import org.apache.spark.api.java.JavaSparkContext
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.{InternalRow, expressions}
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.avro.SchemaConverters
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
BoundReference, Expression, InterpretedPredicate}
+import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils}
+import org.apache.spark.sql.execution.datasources.{FileIndex, 
PartitionDirectory, PartitionUtils}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.StructType
+
+/**
+  * A File Index which support partition prune for hoodie snapshot and 
read-optimized
+  * query.
+  * Main steps to get the file list for query:
+  * 1、Load all files and partition values from the table path.
+  * 2、Do the partition prune by the partition filter condition.
+  *
+  * Note:
+  * Only when the URL_ENCODE_PARTITIONING_OPT_KEY is enable, we can store the 
partition columns
+  * to the hoodie.properties in HoodieSqlWriter when write table. So that the 
query can benefit
+  * from the partition prune.
+  */
+case class HoodieFileIndex(
+ spark: SparkSession,
+ basePath: String,
+ schemaSpec: Option[StructType],
+ options: Map[String, String])
+  extends FileIndex with Logging {
+
+  @transient private val hadoopConf = spark.sessionState.newHadoopConf()
+  private lazy val metaClient = HoodieTableMetaClient
+.builder().setConf(hadoopConf).setBasePath(basePath).build()
+
+  @transient private val queryPath = new Path(options.getOrElse("path", 
"'path' option required"))
+  /**
+* Get the schema of the table.
+*/
+  lazy val schema: StructType = schemaSpec.getOrElse({
+val schemaUtil = new TableSchemaResolver(metaClient)
+SchemaConverters.toSqlType(schemaUtil.getTableAvroSchema)
+  .dataType.asInstanceOf[StructType]
+  })
+
+  /**
+* Get the partition schema.
+*/
+  private lazy val _partitionSchema: StructType = {
+val tableConfig = metaClient.getTableConfig
+val partitionColumns = tableConfig.getPartitionColumns
+val nameFieldMap = schema.fields.map(filed => filed.name -> filed).toMap
+// If the URL_ENCODE_PARTITIONING_OPT_KEY has enable, the partition schema 
will stored in
+// hoodie.properties, So we can benefit from the partition prune.
+if (partitionColumns.isPresent) {
+  val partitionFields = partitionColumns.get().map(column =>
+nameFieldMap.getOrElse(column, throw new 
IllegalArgumentException(s"Cannot find column " +
+  s"$column in the schema[${schema.fields.mkString(",")}]")))
+  new StructType(partitionFields)
+} else { // If the URL_ENCODE_PARTITIONING_OPT_KEY is disable, we trait it 
as a
+  // none-partitioned table.
+  new StructType()
+}
+  }
+
+  @transient @volatile private var fileSystemView: HoodieTableFileSystemView = 
_
+  @transient @volatile private var cachedAllInputFiles: Array[HoodieBaseFile] 
= _
+  @transient @volatile private var cachedFileSize: Long = 0L
+  @transient @volatile private var cachedAllPartitionPaths: Seq[PartitionPath] 
= _
+
+  refresh()
+
+  override def rootPaths: Seq[Path] = queryPath :: Nil
+
+  override def listFiles(partitionFilters: 

[GitHub] [hudi] pengzhiwei2018 commented on a change in pull request #2651: [HUDI-1591] [RFC-26] Improve Hoodie Table Query Performance And Ease Of Use Fo…

2021-03-12 Thread GitBox


pengzhiwei2018 commented on a change in pull request #2651:
URL: https://github.com/apache/hudi/pull/2651#discussion_r593551029



##
File path: 
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
##
@@ -0,0 +1,273 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi
+
+import java.util.Properties
+
+import scala.collection.JavaConverters._
+import org.apache.hadoop.fs.{FileStatus, Path}
+import org.apache.hudi.client.common.HoodieSparkEngineContext
+import org.apache.hudi.common.config.{HoodieMetadataConfig, 
SerializableConfiguration}
+import org.apache.hudi.common.fs.FSUtils
+import org.apache.hudi.common.model.HoodieBaseFile
+import org.apache.hudi.common.table.{HoodieTableMetaClient, 
TableSchemaResolver}
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView
+import org.apache.hudi.config.HoodieWriteConfig
+import org.apache.spark.api.java.JavaSparkContext
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.{InternalRow, expressions}
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.avro.SchemaConverters
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
BoundReference, Expression, InterpretedPredicate}
+import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils}
+import org.apache.spark.sql.execution.datasources.{FileIndex, 
PartitionDirectory, PartitionUtils}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.StructType
+
+/**
+  * A File Index which support partition prune for hoodie snapshot and 
read-optimized
+  * query.
+  * Main steps to get the file list for query:
+  * 1、Load all files and partition values from the table path.
+  * 2、Do the partition prune by the partition filter condition.
+  *
+  * Note:
+  * Only when the URL_ENCODE_PARTITIONING_OPT_KEY is enable, we can store the 
partition columns
+  * to the hoodie.properties in HoodieSqlWriter when write table. So that the 
query can benefit
+  * from the partition prune.
+  */
+case class HoodieFileIndex(
+ spark: SparkSession,
+ basePath: String,
+ schemaSpec: Option[StructType],
+ options: Map[String, String])
+  extends FileIndex with Logging {
+
+  @transient private val hadoopConf = spark.sessionState.newHadoopConf()
+  private lazy val metaClient = HoodieTableMetaClient
+.builder().setConf(hadoopConf).setBasePath(basePath).build()
+
+  @transient private val queryPath = new Path(options.getOrElse("path", 
"'path' option required"))
+  /**
+* Get the schema of the table.
+*/
+  lazy val schema: StructType = schemaSpec.getOrElse({
+val schemaUtil = new TableSchemaResolver(metaClient)
+SchemaConverters.toSqlType(schemaUtil.getTableAvroSchema)
+  .dataType.asInstanceOf[StructType]
+  })
+
+  /**
+* Get the partition schema.
+*/
+  private lazy val _partitionSchema: StructType = {
+val tableConfig = metaClient.getTableConfig
+val partitionColumns = tableConfig.getPartitionColumns
+val nameFieldMap = schema.fields.map(filed => filed.name -> filed).toMap
+// If the URL_ENCODE_PARTITIONING_OPT_KEY has enable, the partition schema 
will stored in
+// hoodie.properties, So we can benefit from the partition prune.
+if (partitionColumns.isPresent) {
+  val partitionFields = partitionColumns.get().map(column =>
+nameFieldMap.getOrElse(column, throw new 
IllegalArgumentException(s"Cannot find column " +
+  s"$column in the schema[${schema.fields.mkString(",")}]")))
+  new StructType(partitionFields)
+} else { // If the URL_ENCODE_PARTITIONING_OPT_KEY is disable, we trait it 
as a
+  // none-partitioned table.
+  new StructType()
+}
+  }
+
+  @transient @volatile private var fileSystemView: HoodieTableFileSystemView = 
_
+  @transient @volatile private var cachedAllInputFiles: Array[HoodieBaseFile] 
= _
+  @transient @volatile private var cachedFileSize: Long = 0L
+  @transient @volatile private var cachedAllPartitionPaths: Seq[PartitionPath] 
= _
+
+  refresh()
+
+  override def rootPaths: Seq[Path] = queryPath :: Nil
+
+  override def listFiles(partitionFilters: 

[GitHub] [hudi] pengzhiwei2018 commented on a change in pull request #2651: [HUDI-1591] [RFC-26] Improve Hoodie Table Query Performance And Ease Of Use Fo…

2021-03-12 Thread GitBox


pengzhiwei2018 commented on a change in pull request #2651:
URL: https://github.com/apache/hudi/pull/2651#discussion_r593550642



##
File path: 
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
##
@@ -0,0 +1,273 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi
+
+import java.util.Properties
+
+import scala.collection.JavaConverters._
+import org.apache.hadoop.fs.{FileStatus, Path}
+import org.apache.hudi.client.common.HoodieSparkEngineContext
+import org.apache.hudi.common.config.{HoodieMetadataConfig, 
SerializableConfiguration}
+import org.apache.hudi.common.fs.FSUtils
+import org.apache.hudi.common.model.HoodieBaseFile
+import org.apache.hudi.common.table.{HoodieTableMetaClient, 
TableSchemaResolver}
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView
+import org.apache.hudi.config.HoodieWriteConfig
+import org.apache.spark.api.java.JavaSparkContext
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.{InternalRow, expressions}
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.avro.SchemaConverters
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
BoundReference, Expression, InterpretedPredicate}
+import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils}
+import org.apache.spark.sql.execution.datasources.{FileIndex, 
PartitionDirectory, PartitionUtils}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.StructType
+
+/**
+  * A File Index which support partition prune for hoodie snapshot and 
read-optimized
+  * query.
+  * Main steps to get the file list for query:
+  * 1、Load all files and partition values from the table path.
+  * 2、Do the partition prune by the partition filter condition.
+  *
+  * Note:
+  * Only when the URL_ENCODE_PARTITIONING_OPT_KEY is enable, we can store the 
partition columns
+  * to the hoodie.properties in HoodieSqlWriter when write table. So that the 
query can benefit
+  * from the partition prune.
+  */
+case class HoodieFileIndex(
+ spark: SparkSession,
+ basePath: String,
+ schemaSpec: Option[StructType],
+ options: Map[String, String])
+  extends FileIndex with Logging {
+
+  @transient private val hadoopConf = spark.sessionState.newHadoopConf()
+  private lazy val metaClient = HoodieTableMetaClient
+.builder().setConf(hadoopConf).setBasePath(basePath).build()
+
+  @transient private val queryPath = new Path(options.getOrElse("path", 
"'path' option required"))
+  /**
+* Get the schema of the table.
+*/
+  lazy val schema: StructType = schemaSpec.getOrElse({
+val schemaUtil = new TableSchemaResolver(metaClient)
+SchemaConverters.toSqlType(schemaUtil.getTableAvroSchema)
+  .dataType.asInstanceOf[StructType]
+  })
+
+  /**
+* Get the partition schema.
+*/
+  private lazy val _partitionSchema: StructType = {
+val tableConfig = metaClient.getTableConfig
+val partitionColumns = tableConfig.getPartitionColumns
+val nameFieldMap = schema.fields.map(filed => filed.name -> filed).toMap
+// If the URL_ENCODE_PARTITIONING_OPT_KEY has enable, the partition schema 
will stored in
+// hoodie.properties, So we can benefit from the partition prune.
+if (partitionColumns.isPresent) {
+  val partitionFields = partitionColumns.get().map(column =>
+nameFieldMap.getOrElse(column, throw new 
IllegalArgumentException(s"Cannot find column " +
+  s"$column in the schema[${schema.fields.mkString(",")}]")))
+  new StructType(partitionFields)
+} else { // If the URL_ENCODE_PARTITIONING_OPT_KEY is disable, we trait it 
as a
+  // none-partitioned table.
+  new StructType()
+}
+  }
+
+  @transient @volatile private var fileSystemView: HoodieTableFileSystemView = 
_
+  @transient @volatile private var cachedAllInputFiles: Array[HoodieBaseFile] 
= _
+  @transient @volatile private var cachedFileSize: Long = 0L
+  @transient @volatile private var cachedAllPartitionPaths: Seq[PartitionPath] 
= _
+
+  refresh()
+
+  override def rootPaths: Seq[Path] = queryPath :: Nil
+
+  override def listFiles(partitionFilters: 

[GitHub] [hudi] pengzhiwei2018 commented on a change in pull request #2651: [HUDI-1591] [RFC-26] Improve Hoodie Table Query Performance And Ease Of Use Fo…

2021-03-12 Thread GitBox


pengzhiwei2018 commented on a change in pull request #2651:
URL: https://github.com/apache/hudi/pull/2651#discussion_r593550517



##
File path: 
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
##
@@ -0,0 +1,273 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi
+
+import java.util.Properties
+
+import scala.collection.JavaConverters._
+import org.apache.hadoop.fs.{FileStatus, Path}
+import org.apache.hudi.client.common.HoodieSparkEngineContext
+import org.apache.hudi.common.config.{HoodieMetadataConfig, 
SerializableConfiguration}
+import org.apache.hudi.common.fs.FSUtils
+import org.apache.hudi.common.model.HoodieBaseFile
+import org.apache.hudi.common.table.{HoodieTableMetaClient, 
TableSchemaResolver}
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView
+import org.apache.hudi.config.HoodieWriteConfig
+import org.apache.spark.api.java.JavaSparkContext
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.{InternalRow, expressions}
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.avro.SchemaConverters
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
BoundReference, Expression, InterpretedPredicate}
+import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils}
+import org.apache.spark.sql.execution.datasources.{FileIndex, 
PartitionDirectory, PartitionUtils}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.StructType
+
+/**
+  * A File Index which support partition prune for hoodie snapshot and 
read-optimized
+  * query.
+  * Main steps to get the file list for query:
+  * 1、Load all files and partition values from the table path.
+  * 2、Do the partition prune by the partition filter condition.
+  *
+  * Note:
+  * Only when the URL_ENCODE_PARTITIONING_OPT_KEY is enable, we can store the 
partition columns
+  * to the hoodie.properties in HoodieSqlWriter when write table. So that the 
query can benefit
+  * from the partition prune.
+  */
+case class HoodieFileIndex(
+ spark: SparkSession,
+ basePath: String,
+ schemaSpec: Option[StructType],
+ options: Map[String, String])
+  extends FileIndex with Logging {
+
+  @transient private val hadoopConf = spark.sessionState.newHadoopConf()
+  private lazy val metaClient = HoodieTableMetaClient
+.builder().setConf(hadoopConf).setBasePath(basePath).build()
+
+  @transient private val queryPath = new Path(options.getOrElse("path", 
"'path' option required"))
+  /**
+* Get the schema of the table.
+*/
+  lazy val schema: StructType = schemaSpec.getOrElse({
+val schemaUtil = new TableSchemaResolver(metaClient)
+SchemaConverters.toSqlType(schemaUtil.getTableAvroSchema)
+  .dataType.asInstanceOf[StructType]
+  })
+
+  /**
+* Get the partition schema.
+*/
+  private lazy val _partitionSchema: StructType = {
+val tableConfig = metaClient.getTableConfig
+val partitionColumns = tableConfig.getPartitionColumns
+val nameFieldMap = schema.fields.map(filed => filed.name -> filed).toMap
+// If the URL_ENCODE_PARTITIONING_OPT_KEY has enable, the partition schema 
will stored in
+// hoodie.properties, So we can benefit from the partition prune.
+if (partitionColumns.isPresent) {
+  val partitionFields = partitionColumns.get().map(column =>
+nameFieldMap.getOrElse(column, throw new 
IllegalArgumentException(s"Cannot find column " +
+  s"$column in the schema[${schema.fields.mkString(",")}]")))
+  new StructType(partitionFields)
+} else { // If the URL_ENCODE_PARTITIONING_OPT_KEY is disable, we trait it 
as a
+  // none-partitioned table.
+  new StructType()
+}
+  }
+
+  @transient @volatile private var fileSystemView: HoodieTableFileSystemView = 
_
+  @transient @volatile private var cachedAllInputFiles: Array[HoodieBaseFile] 
= _
+  @transient @volatile private var cachedFileSize: Long = 0L
+  @transient @volatile private var cachedAllPartitionPaths: Seq[PartitionPath] 
= _
+
+  refresh()
+
+  override def rootPaths: Seq[Path] = queryPath :: Nil
+
+  override def listFiles(partitionFilters: 

[GitHub] [hudi] pengzhiwei2018 commented on a change in pull request #2651: [HUDI-1591] [RFC-26] Improve Hoodie Table Query Performance And Ease Of Use Fo…

2021-03-12 Thread GitBox


pengzhiwei2018 commented on a change in pull request #2651:
URL: https://github.com/apache/hudi/pull/2651#discussion_r593550517



##
File path: 
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
##
@@ -0,0 +1,273 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi
+
+import java.util.Properties
+
+import scala.collection.JavaConverters._
+import org.apache.hadoop.fs.{FileStatus, Path}
+import org.apache.hudi.client.common.HoodieSparkEngineContext
+import org.apache.hudi.common.config.{HoodieMetadataConfig, 
SerializableConfiguration}
+import org.apache.hudi.common.fs.FSUtils
+import org.apache.hudi.common.model.HoodieBaseFile
+import org.apache.hudi.common.table.{HoodieTableMetaClient, 
TableSchemaResolver}
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView
+import org.apache.hudi.config.HoodieWriteConfig
+import org.apache.spark.api.java.JavaSparkContext
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.{InternalRow, expressions}
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.avro.SchemaConverters
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
BoundReference, Expression, InterpretedPredicate}
+import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils}
+import org.apache.spark.sql.execution.datasources.{FileIndex, 
PartitionDirectory, PartitionUtils}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.StructType
+
+/**
+  * A File Index which support partition prune for hoodie snapshot and 
read-optimized
+  * query.
+  * Main steps to get the file list for query:
+  * 1、Load all files and partition values from the table path.
+  * 2、Do the partition prune by the partition filter condition.
+  *
+  * Note:
+  * Only when the URL_ENCODE_PARTITIONING_OPT_KEY is enable, we can store the 
partition columns
+  * to the hoodie.properties in HoodieSqlWriter when write table. So that the 
query can benefit
+  * from the partition prune.
+  */
+case class HoodieFileIndex(
+ spark: SparkSession,
+ basePath: String,
+ schemaSpec: Option[StructType],
+ options: Map[String, String])
+  extends FileIndex with Logging {
+
+  @transient private val hadoopConf = spark.sessionState.newHadoopConf()
+  private lazy val metaClient = HoodieTableMetaClient
+.builder().setConf(hadoopConf).setBasePath(basePath).build()
+
+  @transient private val queryPath = new Path(options.getOrElse("path", 
"'path' option required"))
+  /**
+* Get the schema of the table.
+*/
+  lazy val schema: StructType = schemaSpec.getOrElse({
+val schemaUtil = new TableSchemaResolver(metaClient)
+SchemaConverters.toSqlType(schemaUtil.getTableAvroSchema)
+  .dataType.asInstanceOf[StructType]
+  })
+
+  /**
+* Get the partition schema.
+*/
+  private lazy val _partitionSchema: StructType = {
+val tableConfig = metaClient.getTableConfig
+val partitionColumns = tableConfig.getPartitionColumns
+val nameFieldMap = schema.fields.map(filed => filed.name -> filed).toMap
+// If the URL_ENCODE_PARTITIONING_OPT_KEY has enable, the partition schema 
will stored in
+// hoodie.properties, So we can benefit from the partition prune.
+if (partitionColumns.isPresent) {
+  val partitionFields = partitionColumns.get().map(column =>
+nameFieldMap.getOrElse(column, throw new 
IllegalArgumentException(s"Cannot find column " +
+  s"$column in the schema[${schema.fields.mkString(",")}]")))
+  new StructType(partitionFields)
+} else { // If the URL_ENCODE_PARTITIONING_OPT_KEY is disable, we trait it 
as a
+  // none-partitioned table.
+  new StructType()
+}
+  }
+
+  @transient @volatile private var fileSystemView: HoodieTableFileSystemView = 
_
+  @transient @volatile private var cachedAllInputFiles: Array[HoodieBaseFile] 
= _
+  @transient @volatile private var cachedFileSize: Long = 0L
+  @transient @volatile private var cachedAllPartitionPaths: Seq[PartitionPath] 
= _
+
+  refresh()
+
+  override def rootPaths: Seq[Path] = queryPath :: Nil
+
+  override def listFiles(partitionFilters: 

[GitHub] [hudi] pengzhiwei2018 commented on a change in pull request #2651: [HUDI-1591] [RFC-26] Improve Hoodie Table Query Performance And Ease Of Use Fo…

2021-03-12 Thread GitBox


pengzhiwei2018 commented on a change in pull request #2651:
URL: https://github.com/apache/hudi/pull/2651#discussion_r593550517



##
File path: 
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
##
@@ -0,0 +1,273 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi
+
+import java.util.Properties
+
+import scala.collection.JavaConverters._
+import org.apache.hadoop.fs.{FileStatus, Path}
+import org.apache.hudi.client.common.HoodieSparkEngineContext
+import org.apache.hudi.common.config.{HoodieMetadataConfig, 
SerializableConfiguration}
+import org.apache.hudi.common.fs.FSUtils
+import org.apache.hudi.common.model.HoodieBaseFile
+import org.apache.hudi.common.table.{HoodieTableMetaClient, 
TableSchemaResolver}
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView
+import org.apache.hudi.config.HoodieWriteConfig
+import org.apache.spark.api.java.JavaSparkContext
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.{InternalRow, expressions}
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.avro.SchemaConverters
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
BoundReference, Expression, InterpretedPredicate}
+import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils}
+import org.apache.spark.sql.execution.datasources.{FileIndex, 
PartitionDirectory, PartitionUtils}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.StructType
+
+/**
+  * A File Index which support partition prune for hoodie snapshot and 
read-optimized
+  * query.
+  * Main steps to get the file list for query:
+  * 1、Load all files and partition values from the table path.
+  * 2、Do the partition prune by the partition filter condition.
+  *
+  * Note:
+  * Only when the URL_ENCODE_PARTITIONING_OPT_KEY is enable, we can store the 
partition columns
+  * to the hoodie.properties in HoodieSqlWriter when write table. So that the 
query can benefit
+  * from the partition prune.
+  */
+case class HoodieFileIndex(
+ spark: SparkSession,
+ basePath: String,
+ schemaSpec: Option[StructType],
+ options: Map[String, String])
+  extends FileIndex with Logging {
+
+  @transient private val hadoopConf = spark.sessionState.newHadoopConf()
+  private lazy val metaClient = HoodieTableMetaClient
+.builder().setConf(hadoopConf).setBasePath(basePath).build()
+
+  @transient private val queryPath = new Path(options.getOrElse("path", 
"'path' option required"))
+  /**
+* Get the schema of the table.
+*/
+  lazy val schema: StructType = schemaSpec.getOrElse({
+val schemaUtil = new TableSchemaResolver(metaClient)
+SchemaConverters.toSqlType(schemaUtil.getTableAvroSchema)
+  .dataType.asInstanceOf[StructType]
+  })
+
+  /**
+* Get the partition schema.
+*/
+  private lazy val _partitionSchema: StructType = {
+val tableConfig = metaClient.getTableConfig
+val partitionColumns = tableConfig.getPartitionColumns
+val nameFieldMap = schema.fields.map(filed => filed.name -> filed).toMap
+// If the URL_ENCODE_PARTITIONING_OPT_KEY has enable, the partition schema 
will stored in
+// hoodie.properties, So we can benefit from the partition prune.
+if (partitionColumns.isPresent) {
+  val partitionFields = partitionColumns.get().map(column =>
+nameFieldMap.getOrElse(column, throw new 
IllegalArgumentException(s"Cannot find column " +
+  s"$column in the schema[${schema.fields.mkString(",")}]")))
+  new StructType(partitionFields)
+} else { // If the URL_ENCODE_PARTITIONING_OPT_KEY is disable, we trait it 
as a
+  // none-partitioned table.
+  new StructType()
+}
+  }
+
+  @transient @volatile private var fileSystemView: HoodieTableFileSystemView = 
_
+  @transient @volatile private var cachedAllInputFiles: Array[HoodieBaseFile] 
= _
+  @transient @volatile private var cachedFileSize: Long = 0L
+  @transient @volatile private var cachedAllPartitionPaths: Seq[PartitionPath] 
= _
+
+  refresh()
+
+  override def rootPaths: Seq[Path] = queryPath :: Nil
+
+  override def listFiles(partitionFilters: 

[GitHub] [hudi] pengzhiwei2018 commented on a change in pull request #2651: [HUDI-1591] [RFC-26] Improve Hoodie Table Query Performance And Ease Of Use Fo…

2021-03-12 Thread GitBox


pengzhiwei2018 commented on a change in pull request #2651:
URL: https://github.com/apache/hudi/pull/2651#discussion_r593549485



##
File path: 
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
##
@@ -0,0 +1,273 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi
+
+import java.util.Properties
+
+import scala.collection.JavaConverters._
+import org.apache.hadoop.fs.{FileStatus, Path}
+import org.apache.hudi.client.common.HoodieSparkEngineContext
+import org.apache.hudi.common.config.{HoodieMetadataConfig, 
SerializableConfiguration}
+import org.apache.hudi.common.fs.FSUtils
+import org.apache.hudi.common.model.HoodieBaseFile
+import org.apache.hudi.common.table.{HoodieTableMetaClient, 
TableSchemaResolver}
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView
+import org.apache.hudi.config.HoodieWriteConfig
+import org.apache.spark.api.java.JavaSparkContext
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.{InternalRow, expressions}
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.avro.SchemaConverters
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
BoundReference, Expression, InterpretedPredicate}
+import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils}
+import org.apache.spark.sql.execution.datasources.{FileIndex, 
PartitionDirectory, PartitionUtils}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.StructType
+
+/**
+  * A File Index which support partition prune for hoodie snapshot and 
read-optimized
+  * query.
+  * Main steps to get the file list for query:
+  * 1、Load all files and partition values from the table path.
+  * 2、Do the partition prune by the partition filter condition.
+  *
+  * Note:
+  * Only when the URL_ENCODE_PARTITIONING_OPT_KEY is enable, we can store the 
partition columns
+  * to the hoodie.properties in HoodieSqlWriter when write table. So that the 
query can benefit
+  * from the partition prune.
+  */
+case class HoodieFileIndex(
+ spark: SparkSession,
+ basePath: String,
+ schemaSpec: Option[StructType],
+ options: Map[String, String])
+  extends FileIndex with Logging {
+
+  @transient private val hadoopConf = spark.sessionState.newHadoopConf()
+  private lazy val metaClient = HoodieTableMetaClient
+.builder().setConf(hadoopConf).setBasePath(basePath).build()
+
+  @transient private val queryPath = new Path(options.getOrElse("path", 
"'path' option required"))
+  /**
+* Get the schema of the table.
+*/
+  lazy val schema: StructType = schemaSpec.getOrElse({
+val schemaUtil = new TableSchemaResolver(metaClient)
+SchemaConverters.toSqlType(schemaUtil.getTableAvroSchema)
+  .dataType.asInstanceOf[StructType]
+  })
+
+  /**
+* Get the partition schema.
+*/
+  private lazy val _partitionSchema: StructType = {
+val tableConfig = metaClient.getTableConfig
+val partitionColumns = tableConfig.getPartitionColumns
+val nameFieldMap = schema.fields.map(filed => filed.name -> filed).toMap
+// If the URL_ENCODE_PARTITIONING_OPT_KEY has enable, the partition schema 
will stored in
+// hoodie.properties, So we can benefit from the partition prune.
+if (partitionColumns.isPresent) {
+  val partitionFields = partitionColumns.get().map(column =>
+nameFieldMap.getOrElse(column, throw new 
IllegalArgumentException(s"Cannot find column " +
+  s"$column in the schema[${schema.fields.mkString(",")}]")))
+  new StructType(partitionFields)
+} else { // If the URL_ENCODE_PARTITIONING_OPT_KEY is disable, we trait it 
as a
+  // none-partitioned table.
+  new StructType()
+}
+  }
+
+  @transient @volatile private var fileSystemView: HoodieTableFileSystemView = 
_
+  @transient @volatile private var cachedAllInputFiles: Array[HoodieBaseFile] 
= _
+  @transient @volatile private var cachedFileSize: Long = 0L
+  @transient @volatile private var cachedAllPartitionPaths: Seq[PartitionPath] 
= _
+
+  refresh()
+
+  override def rootPaths: Seq[Path] = queryPath :: Nil
+
+  override def listFiles(partitionFilters: 

[GitHub] [hudi] pengzhiwei2018 commented on a change in pull request #2651: [HUDI-1591] [RFC-26] Improve Hoodie Table Query Performance And Ease Of Use Fo…

2021-03-12 Thread GitBox


pengzhiwei2018 commented on a change in pull request #2651:
URL: https://github.com/apache/hudi/pull/2651#discussion_r593548510



##
File path: 
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
##
@@ -0,0 +1,273 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi
+
+import java.util.Properties
+
+import scala.collection.JavaConverters._
+import org.apache.hadoop.fs.{FileStatus, Path}
+import org.apache.hudi.client.common.HoodieSparkEngineContext
+import org.apache.hudi.common.config.{HoodieMetadataConfig, 
SerializableConfiguration}
+import org.apache.hudi.common.fs.FSUtils
+import org.apache.hudi.common.model.HoodieBaseFile
+import org.apache.hudi.common.table.{HoodieTableMetaClient, 
TableSchemaResolver}
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView
+import org.apache.hudi.config.HoodieWriteConfig
+import org.apache.spark.api.java.JavaSparkContext
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.{InternalRow, expressions}
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.avro.SchemaConverters
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
BoundReference, Expression, InterpretedPredicate}
+import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils}
+import org.apache.spark.sql.execution.datasources.{FileIndex, 
PartitionDirectory, PartitionUtils}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.StructType
+
+/**
+  * A File Index which support partition prune for hoodie snapshot and 
read-optimized
+  * query.
+  * Main steps to get the file list for query:
+  * 1、Load all files and partition values from the table path.
+  * 2、Do the partition prune by the partition filter condition.
+  *
+  * Note:
+  * Only when the URL_ENCODE_PARTITIONING_OPT_KEY is enable, we can store the 
partition columns
+  * to the hoodie.properties in HoodieSqlWriter when write table. So that the 
query can benefit
+  * from the partition prune.
+  */

Review comment:
   > I still don't quite understand why we can't support partition pruning 
by default. As far as storing the partition columns names in hoodie.properties 
is concerned, we can do that irrespective and does not have to be constrained 
by URL_ENCODE_PARTITIONING_OPT_KEY. As I understand, the problem occurs during 
the partition pruning where the values provided by spark in the predicates 
would not match the value passed by Hudi in cases such as where there is one 
partition column with value like 2020/10/10. Is this understanding correct ?
   
   Yes, it is my main concern for this. But you have noticed me that we can 
support partition prune by default and just skip the prune for the case that 
the partition column is not match with the partition value like `2020/10/10`.
   I wil have try for this. Thanks for your remind.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] pengzhiwei2018 commented on a change in pull request #2651: [HUDI-1591] [RFC-26] Improve Hoodie Table Query Performance And Ease Of Use Fo…

2021-03-12 Thread GitBox


pengzhiwei2018 commented on a change in pull request #2651:
URL: https://github.com/apache/hudi/pull/2651#discussion_r593548510



##
File path: 
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
##
@@ -0,0 +1,273 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi
+
+import java.util.Properties
+
+import scala.collection.JavaConverters._
+import org.apache.hadoop.fs.{FileStatus, Path}
+import org.apache.hudi.client.common.HoodieSparkEngineContext
+import org.apache.hudi.common.config.{HoodieMetadataConfig, 
SerializableConfiguration}
+import org.apache.hudi.common.fs.FSUtils
+import org.apache.hudi.common.model.HoodieBaseFile
+import org.apache.hudi.common.table.{HoodieTableMetaClient, 
TableSchemaResolver}
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView
+import org.apache.hudi.config.HoodieWriteConfig
+import org.apache.spark.api.java.JavaSparkContext
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.{InternalRow, expressions}
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.avro.SchemaConverters
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
BoundReference, Expression, InterpretedPredicate}
+import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils}
+import org.apache.spark.sql.execution.datasources.{FileIndex, 
PartitionDirectory, PartitionUtils}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.StructType
+
+/**
+  * A File Index which support partition prune for hoodie snapshot and 
read-optimized
+  * query.
+  * Main steps to get the file list for query:
+  * 1、Load all files and partition values from the table path.
+  * 2、Do the partition prune by the partition filter condition.
+  *
+  * Note:
+  * Only when the URL_ENCODE_PARTITIONING_OPT_KEY is enable, we can store the 
partition columns
+  * to the hoodie.properties in HoodieSqlWriter when write table. So that the 
query can benefit
+  * from the partition prune.
+  */

Review comment:
   > I still don't quite understand why we can't support partition pruning 
by default. As far as storing the partition columns names in hoodie.properties 
is concerned, we can do that irrespective and does not have to be constrained 
by URL_ENCODE_PARTITIONING_OPT_KEY. As I understand, the problem occurs during 
the partition pruning where the values provided by spark in the predicates 
would not match the value passed by Hudi in cases such as where there is one 
partition column with value like 2020/10/10. Is this understanding correct ?
   Yes, it is my main concern for this. But you have noticed me that we can 
support partition prune by default and just skip the prune for the case that 
the partition column is not match with the partition value like `2020/10/10`.
   I wil have try for this. Thanks for your remind.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] pengzhiwei2018 commented on a change in pull request #2651: [HUDI-1591] [RFC-26] Improve Hoodie Table Query Performance And Ease Of Use Fo…

2021-03-12 Thread GitBox


pengzhiwei2018 commented on a change in pull request #2651:
URL: https://github.com/apache/hudi/pull/2651#discussion_r593547532



##
File path: 
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala
##
@@ -79,39 +82,58 @@ class DefaultSource extends RelationProvider
 val allPaths = path.map(p => Seq(p)).getOrElse(Seq()) ++ readPaths
 
 val fs = FSUtils.getFs(allPaths.head, 
sqlContext.sparkContext.hadoopConfiguration)
-val globPaths = HoodieSparkUtils.checkAndGlobPathIfNecessary(allPaths, fs)
-
-val tablePath = DataSourceUtils.getTablePath(fs, globPaths.toArray)
+// Use the HoodieFileIndex only if the 'path' has specified with no "*" 
contains.
+// And READ_PATHS_OPT_KEY has not specified.
+// Or else we use the original way to read hoodie table.

Review comment:
   Hi @umehrot2 Thanks very much for your review!
   Yes, I will open a JIRA for this to support multi-paths in `HoodieFileIndex` 
later.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] pengzhiwei2018 commented on a change in pull request #2651: [HUDI-1591] [RFC-26] Improve Hoodie Table Query Performance And Ease Of Use Fo…

2021-03-11 Thread GitBox


pengzhiwei2018 commented on a change in pull request #2651:
URL: https://github.com/apache/hudi/pull/2651#discussion_r592039402



##
File path: 
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
##
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi
+
+import scala.collection.JavaConverters._
+import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
+import org.apache.hudi.common.fs.FSUtils
+import org.apache.hudi.common.model.HoodieBaseFile
+import org.apache.hudi.common.table.{HoodieTableMetaClient, 
TableSchemaResolver}
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.{InternalRow, expressions}
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.avro.SchemaConverters
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
BoundReference, Expression, InterpretedPredicate}
+import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils}
+import org.apache.spark.sql.execution.datasources.{FileIndex, 
PartitionDirectory, PartitionUtils}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.StructType
+
+import scala.collection.mutable
+import scala.collection.mutable.ListBuffer
+
+/**
+  * A File Index which support partition prune for hoodie snapshot and 
read-optimized
+  * query.
+  * Main steps to get the file list for query:
+  * 1、Load all files and partition values from the table path.
+  * 2、Do the partition prune by the partition filter condition.
+  *
+  * Note:
+  * Only when the URL_ENCODE_PARTITIONING_OPT_KEY is enable, we can store the 
partition columns
+  * to the hoodie.properties in HoodieSqlWriter when write table. So that the 
query can benefit
+  * from the partition prune.
+  */
+case class HoodieFileIndex(
+ spark: SparkSession,
+ basePath: String,

Review comment:
   Hi @umehrot2 , IMO support reading data only from the `basePath` can 
simply the reading logical. It is clear that one table using one path.
   If we want to support multi paths, we must validate if all the paths come 
from the same table. It may make the things complex for the `HoodieFileIndex` 
mode. 
   And the need for a multi path query can be achieved through the partition 
prune in `HoodieFileIndex`
   e.g.
   > For ex: Customer may not pass the actual path, but in 
hoodie.datasource.read.paths pass several partition paths: 
s3://basepath/partition1, s3://basepath/partition2
   
   We can query the table with the partition filter condition to reach the some 
goal, just like this:
   `spark.read.format("hudi").filter("pt = partition1 or pt = partition2")`
   
   This is my option, WDYT?
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] pengzhiwei2018 commented on a change in pull request #2651: [HUDI-1591] [RFC-26] Improve Hoodie Table Query Performance And Ease Of Use Fo…

2021-03-11 Thread GitBox


pengzhiwei2018 commented on a change in pull request #2651:
URL: https://github.com/apache/hudi/pull/2651#discussion_r592343397



##
File path: 
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
##
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi
+
+import scala.collection.JavaConverters._
+import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
+import org.apache.hudi.common.fs.FSUtils
+import org.apache.hudi.common.model.HoodieBaseFile
+import org.apache.hudi.common.table.{HoodieTableMetaClient, 
TableSchemaResolver}
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.{InternalRow, expressions}
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.avro.SchemaConverters
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
BoundReference, Expression, InterpretedPredicate}
+import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils}
+import org.apache.spark.sql.execution.datasources.{FileIndex, 
PartitionDirectory, PartitionUtils}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.StructType
+
+import scala.collection.mutable
+import scala.collection.mutable.ListBuffer
+
+/**
+  * A File Index which support partition prune for hoodie snapshot and 
read-optimized
+  * query.
+  * Main steps to get the file list for query:
+  * 1、Load all files and partition values from the table path.
+  * 2、Do the partition prune by the partition filter condition.
+  *
+  * Note:
+  * Only when the URL_ENCODE_PARTITIONING_OPT_KEY is enable, we can store the 
partition columns
+  * to the hoodie.properties in HoodieSqlWriter when write table. So that the 
query can benefit
+  * from the partition prune.
+  */
+case class HoodieFileIndex(
+ spark: SparkSession,
+ basePath: String,

Review comment:
   Yes, we can support the extra paths in the future and focus on the 
import things currently.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] pengzhiwei2018 commented on a change in pull request #2651: [HUDI-1591] [RFC-26] Improve Hoodie Table Query Performance And Ease Of Use Fo…

2021-03-10 Thread GitBox


pengzhiwei2018 commented on a change in pull request #2651:
URL: https://github.com/apache/hudi/pull/2651#discussion_r592040151



##
File path: 
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala
##
@@ -66,10 +69,15 @@ class DefaultSource extends RelationProvider
   override def createRelation(sqlContext: SQLContext,
   optParams: Map[String, String],
   schema: StructType): BaseRelation = {
+// Remove the "*" from the path in order to be compatible with the 
previous query path with "*"
+val path = removeStar(optParams.get("path"))

Review comment:
   Agree on this. I will support both the existing behavior using global 
path and the `HoodieFileIndex`.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] pengzhiwei2018 commented on a change in pull request #2651: [HUDI-1591] [RFC-26] Improve Hoodie Table Query Performance And Ease Of Use Fo…

2021-03-10 Thread GitBox


pengzhiwei2018 commented on a change in pull request #2651:
URL: https://github.com/apache/hudi/pull/2651#discussion_r592039402



##
File path: 
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
##
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi
+
+import scala.collection.JavaConverters._
+import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
+import org.apache.hudi.common.fs.FSUtils
+import org.apache.hudi.common.model.HoodieBaseFile
+import org.apache.hudi.common.table.{HoodieTableMetaClient, 
TableSchemaResolver}
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.{InternalRow, expressions}
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.avro.SchemaConverters
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
BoundReference, Expression, InterpretedPredicate}
+import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils}
+import org.apache.spark.sql.execution.datasources.{FileIndex, 
PartitionDirectory, PartitionUtils}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.StructType
+
+import scala.collection.mutable
+import scala.collection.mutable.ListBuffer
+
+/**
+  * A File Index which support partition prune for hoodie snapshot and 
read-optimized
+  * query.
+  * Main steps to get the file list for query:
+  * 1、Load all files and partition values from the table path.
+  * 2、Do the partition prune by the partition filter condition.
+  *
+  * Note:
+  * Only when the URL_ENCODE_PARTITIONING_OPT_KEY is enable, we can store the 
partition columns
+  * to the hoodie.properties in HoodieSqlWriter when write table. So that the 
query can benefit
+  * from the partition prune.
+  */
+case class HoodieFileIndex(
+ spark: SparkSession,
+ basePath: String,

Review comment:
   Hi @umehrot2 , IMO support reading data only from the `basePath` can 
simply the reading logical. It is clear that one table using one path.
   If we want to support multi paths, we must validate if all the paths come 
from the same table. It may make the things complex for the `HoodieFileIndex` 
mode. 
   And the need for a multi path query can be achieved through the partition 
prune in `HoodieFileIndex`
   e.g.
   > For ex: Customer may not pass the actual path, but in 
hoodie.datasource.read.paths pass several partition paths: 
s3://basepath/partition1, s3://basepath/partition2
   
   We can query the table with the partition filter condition to reach the some 
goal, just like this:
   `spark.read.format("hudi").filter("pt = partition1 and pt = partition2")`
   
   This is my option, WDYT?
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] pengzhiwei2018 commented on a change in pull request #2651: [HUDI-1591] [RFC-26] Improve Hoodie Table Query Performance And Ease Of Use Fo…

2021-03-10 Thread GitBox


pengzhiwei2018 commented on a change in pull request #2651:
URL: https://github.com/apache/hudi/pull/2651#discussion_r592028934



##
File path: 
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
##
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi
+
+import scala.collection.JavaConverters._
+import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
+import org.apache.hudi.common.fs.FSUtils
+import org.apache.hudi.common.model.HoodieBaseFile
+import org.apache.hudi.common.table.{HoodieTableMetaClient, 
TableSchemaResolver}
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.{InternalRow, expressions}
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.avro.SchemaConverters
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
BoundReference, Expression, InterpretedPredicate}
+import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils}
+import org.apache.spark.sql.execution.datasources.{FileIndex, 
PartitionDirectory, PartitionUtils}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.StructType
+
+import scala.collection.mutable
+import scala.collection.mutable.ListBuffer
+
+/**
+  * A File Index which support partition prune for hoodie snapshot and 
read-optimized
+  * query.
+  * Main steps to get the file list for query:
+  * 1、Load all files and partition values from the table path.
+  * 2、Do the partition prune by the partition filter condition.
+  *
+  * Note:
+  * Only when the URL_ENCODE_PARTITIONING_OPT_KEY is enable, we can store the 
partition columns
+  * to the hoodie.properties in HoodieSqlWriter when write table. So that the 
query can benefit
+  * from the partition prune.
+  */
+case class HoodieFileIndex(
+ spark: SparkSession,
+ basePath: String,
+ schemaSpec: Option[StructType],
+ options: Map[String, String])
+  extends FileIndex with Logging {
+
+  private val hadoopConf = spark.sessionState.newHadoopConf()
+  private val fs = new Path(basePath).getFileSystem(hadoopConf)
+  private lazy val metaClient = HoodieTableMetaClient
+.builder().setConf(hadoopConf).setBasePath(basePath).build()
+
+  private val queryPath = new Path(options.getOrElse("path", "'path' option 
required"))
+  /**
+* Get the schema of the table.
+*/
+  lazy val schema: StructType = schemaSpec.getOrElse({
+val schemaUtil = new TableSchemaResolver(metaClient)
+SchemaConverters.toSqlType(schemaUtil.getTableAvroSchema)
+  .dataType.asInstanceOf[StructType]
+  })
+
+  /**
+* Get the partition schema.
+*/
+  private lazy val _partitionSchema: StructType = {
+val tableConfig = metaClient.getTableConfig
+val partitionColumns = tableConfig.getPartitionColumns
+val nameFieldMap = schema.fields.map(filed => filed.name -> filed).toMap
+// If the URL_ENCODE_PARTITIONING_OPT_KEY has enable, the partition schema 
will stored in
+// hoodie.properties, So we can benefit from the partition prune.
+if (partitionColumns.isPresent) {
+  val partitionFields = partitionColumns.get().map(column =>
+nameFieldMap.getOrElse(column, throw new 
IllegalArgumentException(s"Cannot find column " +
+  s"$column in the schema[${schema.fields.mkString(",")}]")))
+  new StructType(partitionFields)
+} else { // If the URL_ENCODE_PARTITIONING_OPT_KEY is disable, we trait it 
as a
+  // none-partitioned table.
+  new StructType()
+}
+  }
+
+  private val timeZoneId = CaseInsensitiveMap(options)
+.get(DateTimeUtils.TIMEZONE_OPTION)
+.getOrElse(SQLConf.get.sessionLocalTimeZone)
+
+  @volatile private var fileSystemView: HoodieTableFileSystemView = _
+  @volatile private var cachedAllInputFiles: Array[HoodieBaseFile] = _
+  @volatile private var cachedFileSize: Long = 0L
+  @volatile private var cachedAllPartitionPaths: Seq[PartitionPath] = _
+
+  refresh()
+
+  override def rootPaths: Seq[Path] = queryPath :: Nil
+
+  override def listFiles(partitionFilters: Seq[Expression],
+