[GitHub] [hudi] umehrot2 commented on a change in pull request #2651: [HUDI-1591] [RFC-26] Improve Hoodie Table Query Performance And Ease Of Use Fo…

2021-03-31 Thread GitBox


umehrot2 commented on a change in pull request #2651:
URL: https://github.com/apache/hudi/pull/2651#discussion_r605196012



##
File path: 
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
##
@@ -81,4 +82,53 @@ object HoodieWriterUtils {
 params.foreach(kv => props.setProperty(kv._1, kv._2))
 props
   }
+
+  /**
+   * Get the partition columns to stored to hoodie.properties.
+   * @param parameters
+   * @return
+   */
+  def getPartitionColumns(parameters: Map[String, String]): Option[String] = {
+val  keyGenClass = parameters.getOrElse(KEYGENERATOR_CLASS_OPT_KEY,
+  DEFAULT_KEYGENERATOR_CLASS_OPT_VAL)
+try {
+  val constructor = getClass.getClassLoader.loadClass(keyGenClass)
+.getConstructor(classOf[TypedProperties])
+  constructor.setAccessible(true)
+  val props = new TypedProperties()
+  props.putAll(parameters.asJava)
+  val keyGen = constructor.newInstance(props)

Review comment:
   Can't we pass the KeyGenerator already created in `HoodieSparkSqlWriter` 
and `DeltaSync` instead of recreating it again here ? Both these places already 
create `KeyGenerator` using reflection.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] umehrot2 commented on a change in pull request #2651: [HUDI-1591] [RFC-26] Improve Hoodie Table Query Performance And Ease Of Use Fo…

2021-03-30 Thread GitBox


umehrot2 commented on a change in pull request #2651:
URL: https://github.com/apache/hudi/pull/2651#discussion_r604479749



##
File path: 
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
##
@@ -81,4 +81,33 @@ object HoodieWriterUtils {
 params.foreach(kv => props.setProperty(kv._1, kv._2))
 props
   }
+
+  /**
+   * Get the partition columns to stored to hoodie.properties.
+   * Return the partitionColumns only if it is the key generator class is the 
build-ins.
+   * For other custom key generator class, we cannot know whether or not it 
has relation
+   * with the partition columns.
+   * @param parameters
+   * @return
+   */
+  def getPartitionColumns(parameters: Map[String, String]): Option[String] = {
+val  keyGenClass = parameters.getOrElse(KEYGENERATOR_CLASS_OPT_KEY,
+  DEFAULT_KEYGENERATOR_CLASS_OPT_VAL)
+val partitionColumns = parameters.get(PARTITIONPATH_FIELD_OPT_KEY)
+if (keyGenClass == classOf[SimpleKeyGenerator].getName ||
+keyGenClass == classOf[ComplexKeyGenerator].getName ||
+keyGenClass == classOf[TimestampBasedKeyGenerator].getName) {

Review comment:
   I don't think this condition covers all cases. `CustomKeyGenerator` and 
`CustomAvroKeyGenerator` also make use of the partition path field. Also 
tracking these individually is risky and in future developers will easily miss 
this. Here is my recommendation:
   - Pass the KeyGenerator created in `HoodieSparkSqlWriter` to this function
   - Check if `KeyGenerator` is an instance of `BaseKeyGenerator`
   - If yes, invoke the `getPartitionPathFields` on the key generator and 
return those as the partition columns instead of reading it from the 
`PARTITIONPATH_FIELD_OPT_KEY`.
   - This will cover cases where someone extends from `BaseKeyGenerator` and 
uses some other partition key.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] umehrot2 commented on a change in pull request #2651: [HUDI-1591] [RFC-26] Improve Hoodie Table Query Performance And Ease Of Use Fo…

2021-03-25 Thread GitBox


umehrot2 commented on a change in pull request #2651:
URL: https://github.com/apache/hudi/pull/2651#discussion_r601930831



##
File path: 
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
##
@@ -0,0 +1,317 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi
+
+import java.util.Properties
+
+import scala.collection.JavaConverters._
+
+import org.apache.hadoop.fs.{FileStatus, Path}
+import org.apache.hudi.client.common.HoodieSparkEngineContext
+import org.apache.hudi.common.config.{HoodieMetadataConfig, 
SerializableConfiguration}
+import org.apache.hudi.common.engine.HoodieLocalEngineContext
+import org.apache.hudi.common.fs.FSUtils
+import org.apache.hudi.common.model.HoodieBaseFile
+import org.apache.hudi.common.table.{HoodieTableMetaClient, 
TableSchemaResolver}
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView
+import org.apache.hudi.config.HoodieWriteConfig
+import org.apache.spark.api.java.JavaSparkContext
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.{InternalRow, expressions}
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.avro.SchemaConverters
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
BoundReference, Expression, InterpretedPredicate}
+import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils}
+import org.apache.spark.sql.execution.datasources.{FileIndex, 
PartitionDirectory, PartitionUtils}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.unsafe.types.UTF8String
+
+/**
+  * A File Index which support partition prune for hoodie snapshot and 
read-optimized
+  * query.
+  * Main steps to get the file list for query:
+  * 1、Load all files and partition values from the table path.
+  * 2、Do the partition prune by the partition filter condition.
+  *
+  * There are 3 cases for this:
+  * 1、If the partition columns size is equal to the actually partition path 
level, we
+  * read it as partitioned table.(e.g partition column is "dt", the partition 
path is "2021-03-10")
+  *
+  * 2、If the partition columns size is not equal to the partition path level, 
but the partition
+  * column size is "1" (e.g. partition column is "dt", but the partition path 
is "2021/03/10"
+  * who'es directory level is 3).We can still read it as a partitioned table. 
We will mapping the
+  * partition path (e.g. 2021/03/10) to the only partition column (e.g. "dt").
+  *
+  * 3、Else the the partition columns size is not equal to the partition 
directory level and the
+  * size is great than "1" (e.g. partition column is "dt,hh", the partition 
path is "2021/03/10/12")
+  * , we read it as a None Partitioned table because we cannot know how to 
mapping the partition
+  * path with the partition columns in this case.
+  */
+case class HoodieFileIndex(
+ spark: SparkSession,
+ basePath: String,
+ schemaSpec: Option[StructType],
+ options: Map[String, String])
+  extends FileIndex with Logging {
+
+  @transient private val hadoopConf = spark.sessionState.newHadoopConf()
+  private lazy val metaClient = HoodieTableMetaClient
+.builder().setConf(hadoopConf).setBasePath(basePath).build()
+
+  @transient private val queryPath = new Path(options.getOrElse("path", 
"'path' option required"))
+  /**
+* Get the schema of the table.
+*/
+  lazy val schema: StructType = schemaSpec.getOrElse({
+val schemaUtil = new TableSchemaResolver(metaClient)
+SchemaConverters.toSqlType(schemaUtil.getTableAvroSchema)
+  .dataType.asInstanceOf[StructType]
+  })
+
+  /**
+* Get the partition schema from the hoodie.properties.
+*/
+  private lazy val _partitionSchemaFromProperties: StructType = {
+val tableConfig = metaClient.getTableConfig
+val partitionColumns = tableConfig.getPartitionColumns
+val nameFieldMap = schema.fields.map(filed => filed.name -> filed).toMap
+
+if (partitionColumns.isPresent) {
+  val partitionFields = partitionColumns.get().map(column =>
+nameFieldMap.getOrElse(column, throw new 
IllegalArgumentException(s"Cannot find column " +
+

[GitHub] [hudi] umehrot2 commented on a change in pull request #2651: [HUDI-1591] [RFC-26] Improve Hoodie Table Query Performance And Ease Of Use Fo…

2021-03-25 Thread GitBox


umehrot2 commented on a change in pull request #2651:
URL: https://github.com/apache/hudi/pull/2651#discussion_r601930658



##
File path: 
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
##
@@ -0,0 +1,317 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi
+
+import java.util.Properties
+
+import scala.collection.JavaConverters._
+
+import org.apache.hadoop.fs.{FileStatus, Path}
+import org.apache.hudi.client.common.HoodieSparkEngineContext
+import org.apache.hudi.common.config.{HoodieMetadataConfig, 
SerializableConfiguration}
+import org.apache.hudi.common.engine.HoodieLocalEngineContext
+import org.apache.hudi.common.fs.FSUtils
+import org.apache.hudi.common.model.HoodieBaseFile
+import org.apache.hudi.common.table.{HoodieTableMetaClient, 
TableSchemaResolver}
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView
+import org.apache.hudi.config.HoodieWriteConfig
+import org.apache.spark.api.java.JavaSparkContext
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.{InternalRow, expressions}
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.avro.SchemaConverters
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
BoundReference, Expression, InterpretedPredicate}
+import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils}
+import org.apache.spark.sql.execution.datasources.{FileIndex, 
PartitionDirectory, PartitionUtils}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.unsafe.types.UTF8String
+
+/**
+  * A File Index which support partition prune for hoodie snapshot and 
read-optimized
+  * query.
+  * Main steps to get the file list for query:
+  * 1、Load all files and partition values from the table path.
+  * 2、Do the partition prune by the partition filter condition.
+  *
+  * There are 3 cases for this:
+  * 1、If the partition columns size is equal to the actually partition path 
level, we
+  * read it as partitioned table.(e.g partition column is "dt", the partition 
path is "2021-03-10")
+  *
+  * 2、If the partition columns size is not equal to the partition path level, 
but the partition
+  * column size is "1" (e.g. partition column is "dt", but the partition path 
is "2021/03/10"
+  * who'es directory level is 3).We can still read it as a partitioned table. 
We will mapping the
+  * partition path (e.g. 2021/03/10) to the only partition column (e.g. "dt").
+  *
+  * 3、Else the the partition columns size is not equal to the partition 
directory level and the
+  * size is great than "1" (e.g. partition column is "dt,hh", the partition 
path is "2021/03/10/12")
+  * , we read it as a None Partitioned table because we cannot know how to 
mapping the partition
+  * path with the partition columns in this case.
+  */
+case class HoodieFileIndex(
+ spark: SparkSession,
+ basePath: String,
+ schemaSpec: Option[StructType],
+ options: Map[String, String])
+  extends FileIndex with Logging {
+
+  @transient private val hadoopConf = spark.sessionState.newHadoopConf()
+  private lazy val metaClient = HoodieTableMetaClient
+.builder().setConf(hadoopConf).setBasePath(basePath).build()
+
+  @transient private val queryPath = new Path(options.getOrElse("path", 
"'path' option required"))
+  /**
+* Get the schema of the table.
+*/
+  lazy val schema: StructType = schemaSpec.getOrElse({
+val schemaUtil = new TableSchemaResolver(metaClient)
+SchemaConverters.toSqlType(schemaUtil.getTableAvroSchema)
+  .dataType.asInstanceOf[StructType]
+  })
+
+  /**
+* Get the partition schema from the hoodie.properties.
+*/
+  private lazy val _partitionSchemaFromProperties: StructType = {
+val tableConfig = metaClient.getTableConfig
+val partitionColumns = tableConfig.getPartitionColumns
+val nameFieldMap = schema.fields.map(filed => filed.name -> filed).toMap
+
+if (partitionColumns.isPresent) {
+  val partitionFields = partitionColumns.get().map(column =>
+nameFieldMap.getOrElse(column, throw new 
IllegalArgumentException(s"Cannot find column " +
+

[GitHub] [hudi] umehrot2 commented on a change in pull request #2651: [HUDI-1591] [RFC-26] Improve Hoodie Table Query Performance And Ease Of Use Fo…

2021-03-25 Thread GitBox


umehrot2 commented on a change in pull request #2651:
URL: https://github.com/apache/hudi/pull/2651#discussion_r601854740



##
File path: hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
##
@@ -276,6 +276,16 @@ public static void processFiles(FileSystem fs, String 
basePathStr, Functionhttp://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi
+
+import java.util.Properties
+
+import scala.collection.JavaConverters._
+import org.apache.hadoop.fs.{FileStatus, Path}
+import org.apache.hudi.client.common.HoodieSparkEngineContext
+import org.apache.hudi.common.config.{HoodieMetadataConfig, 
SerializableConfiguration}
+import org.apache.hudi.common.engine.HoodieLocalEngineContext
+import org.apache.hudi.common.fs.FSUtils
+import org.apache.hudi.common.model.HoodieBaseFile
+import org.apache.hudi.common.table.{HoodieTableMetaClient, 
TableSchemaResolver}
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView
+import org.apache.hudi.config.HoodieWriteConfig
+import org.apache.spark.api.java.JavaSparkContext
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.{InternalRow, expressions}
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.avro.SchemaConverters
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
BoundReference, Expression, InterpretedPredicate}
+import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils}
+import org.apache.spark.sql.execution.datasources.{FileIndex, FileStatusCache, 
NoopCache, PartitionDirectory, PartitionUtils}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.unsafe.types.UTF8String
+
+import scala.collection.mutable
+
+/**
+  * A File Index which support partition prune for hoodie snapshot and 
read-optimized
+  * query.
+  * Main steps to get the file list for query:
+  * 1、Load all files and partition values from the table path.
+  * 2、Do the partition prune by the partition filter condition.
+  *
+  * There are 3 cases for this:
+  * 1、If the partition columns size is equal to the actually partition path 
level, we
+  * read it as partitioned table.(e.g partition column is "dt", the partition 
path is "2021-03-10")
+  *
+  * 2、If the partition columns size is not equal to the partition path level, 
but the partition
+  * column size is "1" (e.g. partition column is "dt", but the partition path 
is "2021/03/10"
+  * who'es directory level is 3).We can still read it as a partitioned table. 
We will mapping the
+  * partition path (e.g. 2021/03/10) to the only partition column (e.g. "dt").
+  *
+  * 3、Else the the partition columns size is not equal to the partition 
directory level and the
+  * size is great than "1" (e.g. partition column is "dt,hh", the partition 
path is "2021/03/10/12")
+  * , we read it as a None Partitioned table because we cannot know how to 
mapping the partition
+  * path with the partition columns in this case.
+  */
+case class HoodieFileIndex(
+ spark: SparkSession,
+ metaClient: HoodieTableMetaClient,
+ schemaSpec: Option[StructType],
+ options: Map[String, String],
+ @transient fileStatusCache: FileStatusCache = NoopCache)
+  extends FileIndex with Logging {
+
+  private val basePath = metaClient.getBasePath
+
+  @transient private val queryPath = new Path(options.getOrElse("path", 
"'path' option required"))
+  /**
+* Get the schema of the table.
+*/
+  lazy val schema: StructType = schemaSpec.getOrElse({
+val schemaUtil = new TableSchemaResolver(metaClient)
+SchemaConverters.toSqlType(schemaUtil.getTableAvroSchema)
+  .dataType.asInstanceOf[StructType]
+  })
+
+  /**
+* Get the partition schema from the hoodie.properties.
+*/
+  private lazy val _partitionSchemaFromProperties: StructType = {
+val tableConfig = metaClient.getTableConfig
+val partitionColumns = tableConfig.getPartitionColumns
+val nameFieldMap = schema.fields.map(filed => filed.name -> filed).toMap
+
+if (partitionColumns.isPresent) {
+  val partitionFields = partitionColumns.get().map(column =>
+nameFieldMap.getOrElse(column, throw new 
IllegalArgumentException(s"Cannot find column: '" +
+  s"$column' in the schema[${schema.fields.mkString(",")}]")))
+  new StructType(partitionFields)
+} else { // If the partition columns have not stored in 
hoodie.properites(the table that was
+  // created earlier), we trait it as a none-partitioned table.
+  new StructType()
+}
+  }
+
+  @transient @volatile private var fileSystemView: HoodieTableFileSystem

[GitHub] [hudi] umehrot2 commented on a change in pull request #2651: [HUDI-1591] [RFC-26] Improve Hoodie Table Query Performance And Ease Of Use Fo…

2021-03-22 Thread GitBox


umehrot2 commented on a change in pull request #2651:
URL: https://github.com/apache/hudi/pull/2651#discussion_r599160889



##
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
##
@@ -179,6 +179,9 @@
   public static final String EXTERNAL_RECORD_AND_SCHEMA_TRANSFORMATION = 
AVRO_SCHEMA + ".externalTransformation";
   public static final String DEFAULT_EXTERNAL_RECORD_AND_SCHEMA_TRANSFORMATION 
= "false";
 
+  public static final String  MAX_LISTING_PARALLELISM = 
"hoodie.max.list.file.parallelism";
+  public static final Integer DEFAULT_MAX_LISTING_PARALLELISM = 200;

Review comment:
   - I think its fine to use the `DEFAULT_PARALLELISM` i.e `1500` as the 
default. It is what we use in `FileSystemBackedTableMetadata` as well.
   - We should add a method here to get this configuration just like all other 
configurations.

##
File path: 
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
##
@@ -0,0 +1,317 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi
+
+import java.util.Properties
+
+import scala.collection.JavaConverters._
+
+import org.apache.hadoop.fs.{FileStatus, Path}
+import org.apache.hudi.client.common.HoodieSparkEngineContext
+import org.apache.hudi.common.config.{HoodieMetadataConfig, 
SerializableConfiguration}
+import org.apache.hudi.common.engine.HoodieLocalEngineContext
+import org.apache.hudi.common.fs.FSUtils
+import org.apache.hudi.common.model.HoodieBaseFile
+import org.apache.hudi.common.table.{HoodieTableMetaClient, 
TableSchemaResolver}
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView
+import org.apache.hudi.config.HoodieWriteConfig
+import org.apache.spark.api.java.JavaSparkContext
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.{InternalRow, expressions}
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.avro.SchemaConverters
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
BoundReference, Expression, InterpretedPredicate}
+import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils}
+import org.apache.spark.sql.execution.datasources.{FileIndex, 
PartitionDirectory, PartitionUtils}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.unsafe.types.UTF8String
+
+/**
+  * A File Index which support partition prune for hoodie snapshot and 
read-optimized
+  * query.
+  * Main steps to get the file list for query:
+  * 1、Load all files and partition values from the table path.
+  * 2、Do the partition prune by the partition filter condition.
+  *
+  * There are 3 cases for this:
+  * 1、If the partition columns size is equal to the actually partition path 
level, we
+  * read it as partitioned table.(e.g partition column is "dt", the partition 
path is "2021-03-10")
+  *
+  * 2、If the partition columns size is not equal to the partition path level, 
but the partition
+  * column size is "1" (e.g. partition column is "dt", but the partition path 
is "2021/03/10"
+  * who'es directory level is 3).We can still read it as a partitioned table. 
We will mapping the
+  * partition path (e.g. 2021/03/10) to the only partition column (e.g. "dt").
+  *
+  * 3、Else the the partition columns size is not equal to the partition 
directory level and the
+  * size is great than "1" (e.g. partition column is "dt,hh", the partition 
path is "2021/03/10/12")
+  * , we read it as a None Partitioned table because we cannot know how to 
mapping the partition
+  * path with the partition columns in this case.
+  */
+case class HoodieFileIndex(
+ spark: SparkSession,
+ basePath: String,
+ schemaSpec: Option[StructType],
+ options: Map[String, String])
+  extends FileIndex with Logging {
+
+  @transient private val hadoopConf = spark.sessionState.newHadoopConf()
+  private lazy val metaClient = HoodieTableMetaClient
+.builder().setConf(hadoopConf).setBasePath(basePath).build()
+
+  @transient private val queryPath = new Path(options.getOrElse("path", 
"'path' option required"))
+  /**
+* Get the schema of the ta

[GitHub] [hudi] umehrot2 commented on a change in pull request #2651: [HUDI-1591] [RFC-26] Improve Hoodie Table Query Performance And Ease Of Use Fo…

2021-03-12 Thread GitBox


umehrot2 commented on a change in pull request #2651:
URL: https://github.com/apache/hudi/pull/2651#discussion_r593531717



##
File path: 
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala
##
@@ -79,39 +82,58 @@ class DefaultSource extends RelationProvider
 val allPaths = path.map(p => Seq(p)).getOrElse(Seq()) ++ readPaths
 
 val fs = FSUtils.getFs(allPaths.head, 
sqlContext.sparkContext.hadoopConfiguration)
-val globPaths = HoodieSparkUtils.checkAndGlobPathIfNecessary(allPaths, fs)
-
-val tablePath = DataSourceUtils.getTablePath(fs, globPaths.toArray)
+// Use the HoodieFileIndex only if the 'path' has specified with no "*" 
contains.
+// And READ_PATHS_OPT_KEY has not specified.
+// Or else we use the original way to read hoodie table.

Review comment:
   Its fine for now, but ideally we should support the glob paths with 
`HoodieFileIndex` to be able to partition prune and later be able to integrate 
it with metadata based listing and column range indexes. Can you open a JIRA 
for this ?

##
File path: 
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
##
@@ -0,0 +1,273 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi
+
+import java.util.Properties
+
+import scala.collection.JavaConverters._
+import org.apache.hadoop.fs.{FileStatus, Path}
+import org.apache.hudi.client.common.HoodieSparkEngineContext
+import org.apache.hudi.common.config.{HoodieMetadataConfig, 
SerializableConfiguration}
+import org.apache.hudi.common.fs.FSUtils
+import org.apache.hudi.common.model.HoodieBaseFile
+import org.apache.hudi.common.table.{HoodieTableMetaClient, 
TableSchemaResolver}
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView
+import org.apache.hudi.config.HoodieWriteConfig
+import org.apache.spark.api.java.JavaSparkContext
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.{InternalRow, expressions}
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.avro.SchemaConverters
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
BoundReference, Expression, InterpretedPredicate}
+import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils}
+import org.apache.spark.sql.execution.datasources.{FileIndex, 
PartitionDirectory, PartitionUtils}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.StructType
+
+/**
+  * A File Index which support partition prune for hoodie snapshot and 
read-optimized
+  * query.
+  * Main steps to get the file list for query:
+  * 1、Load all files and partition values from the table path.
+  * 2、Do the partition prune by the partition filter condition.
+  *
+  * Note:
+  * Only when the URL_ENCODE_PARTITIONING_OPT_KEY is enable, we can store the 
partition columns
+  * to the hoodie.properties in HoodieSqlWriter when write table. So that the 
query can benefit
+  * from the partition prune.
+  */

Review comment:
   This is too strong a constraint to have. Most customers would not be 
using `URL_ENCODE_PARTITIONING_OPT_KEY` and it does not make sense for 
partition pruning to not work for them and this feature would loose most of its 
value.
   
   I still don't quite understand why we can't support partition pruning by 
default. As far as storing the partition columns names in `hoodie.properties` 
is concerned, we can do that irrespective and does not have to be constrained 
by `URL_ENCODE_PARTITIONING_OPT_KEY`. As I understand, the problem occurs 
during the partition pruning where the values provided by spark in the 
predicates would not match the value passed by Hudi in cases such as where 
there is one partition column with value like `2020/10/10`. Is this 
understanding correct ?
   
   But just to support this above case, does not mean we do not do partition 
pruning for normal cases where the same will be represented  by 3 partition 
columns: `=2020`,`mm=10`,`dd=10`. Our goal should be to support partition 
pruning by default, and for certain scenarios like `2020/10/10` the need 
encoding, it is upto the customer to pass `URL_ENCODE_PARTI

[GitHub] [hudi] umehrot2 commented on a change in pull request #2651: [HUDI-1591] [RFC-26] Improve Hoodie Table Query Performance And Ease Of Use Fo…

2021-03-10 Thread GitBox


umehrot2 commented on a change in pull request #2651:
URL: https://github.com/apache/hudi/pull/2651#discussion_r592054327



##
File path: 
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
##
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi
+
+import scala.collection.JavaConverters._
+import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
+import org.apache.hudi.common.fs.FSUtils
+import org.apache.hudi.common.model.HoodieBaseFile
+import org.apache.hudi.common.table.{HoodieTableMetaClient, 
TableSchemaResolver}
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.{InternalRow, expressions}
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.avro.SchemaConverters
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
BoundReference, Expression, InterpretedPredicate}
+import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils}
+import org.apache.spark.sql.execution.datasources.{FileIndex, 
PartitionDirectory, PartitionUtils}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.StructType
+
+import scala.collection.mutable
+import scala.collection.mutable.ListBuffer
+
+/**
+  * A File Index which support partition prune for hoodie snapshot and 
read-optimized
+  * query.
+  * Main steps to get the file list for query:
+  * 1、Load all files and partition values from the table path.
+  * 2、Do the partition prune by the partition filter condition.
+  *
+  * Note:
+  * Only when the URL_ENCODE_PARTITIONING_OPT_KEY is enable, we can store the 
partition columns
+  * to the hoodie.properties in HoodieSqlWriter when write table. So that the 
query can benefit
+  * from the partition prune.
+  */
+case class HoodieFileIndex(
+ spark: SparkSession,
+ basePath: String,

Review comment:
   I understand your point and the same thing can be supported with the 
partition filters moving forward. We don't have the check for paths from 
multiple tables even right now, so let's not worry about validating that.
   
   I am not so worried about this particular use-case, but we either ways have 
to support multiple paths being passed to the file index to support glob paths 
right ? So I was thinking that if either ways we need to support multiple 
paths, then multiple paths via this property would automatically work. But I 
agree with you that supporting the extra paths via this property is not my top 
priority. So you focus on solving the other more important things first. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] umehrot2 commented on a change in pull request #2651: [HUDI-1591] [RFC-26] Improve Hoodie Table Query Performance And Ease Of Use Fo…

2021-03-10 Thread GitBox


umehrot2 commented on a change in pull request #2651:
URL: https://github.com/apache/hudi/pull/2651#discussion_r592005038



##
File path: 
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala
##
@@ -66,10 +69,15 @@ class DefaultSource extends RelationProvider
   override def createRelation(sqlContext: SQLContext,
   optParams: Map[String, String],
   schema: StructType): BaseRelation = {
+// Remove the "*" from the path in order to be compatible with the 
previous query path with "*"
+val path = removeStar(optParams.get("path"))

Review comment:
   I would ideally not like to go with this assumption. This would break 
and return incorrect results for Hudi customers using globbed paths till now. 
If possible, we should try to implement it in a way that supports both globbed 
paths and non-globbed paths.
   
   Spark's `InMemoryFileIndex` for example can handle both globbed and 
non-globbed paths, and if we are implementing our own FileIndex then we should 
see if we can handle that in our implementation too. I need to take a deeper 
look, but is it not possible to glob the paths and pass all the paths to the 
`HoodieFileIndex` and then list all of them like `InMemoryFileIndex` does. It 
accepts multiple `rootPathsSpecified`.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] umehrot2 commented on a change in pull request #2651: [HUDI-1591] [RFC-26] Improve Hoodie Table Query Performance And Ease Of Use Fo…

2021-03-10 Thread GitBox


umehrot2 commented on a change in pull request #2651:
URL: https://github.com/apache/hudi/pull/2651#discussion_r591998950



##
File path: 
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
##
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi
+
+import scala.collection.JavaConverters._
+import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
+import org.apache.hudi.common.fs.FSUtils
+import org.apache.hudi.common.model.HoodieBaseFile
+import org.apache.hudi.common.table.{HoodieTableMetaClient, 
TableSchemaResolver}
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.{InternalRow, expressions}
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.avro.SchemaConverters
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
BoundReference, Expression, InterpretedPredicate}
+import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils}
+import org.apache.spark.sql.execution.datasources.{FileIndex, 
PartitionDirectory, PartitionUtils}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.StructType
+
+import scala.collection.mutable
+import scala.collection.mutable.ListBuffer
+
+/**
+  * A File Index which support partition prune for hoodie snapshot and 
read-optimized
+  * query.
+  * Main steps to get the file list for query:
+  * 1、Load all files and partition values from the table path.
+  * 2、Do the partition prune by the partition filter condition.
+  *
+  * Note:
+  * Only when the URL_ENCODE_PARTITIONING_OPT_KEY is enable, we can store the 
partition columns
+  * to the hoodie.properties in HoodieSqlWriter when write table. So that the 
query can benefit
+  * from the partition prune.
+  */
+case class HoodieFileIndex(
+ spark: SparkSession,
+ basePath: String,
+ schemaSpec: Option[StructType],
+ options: Map[String, String])
+  extends FileIndex with Logging {
+
+  private val hadoopConf = spark.sessionState.newHadoopConf()
+  private val fs = new Path(basePath).getFileSystem(hadoopConf)
+  private lazy val metaClient = HoodieTableMetaClient
+.builder().setConf(hadoopConf).setBasePath(basePath).build()
+
+  private val queryPath = new Path(options.getOrElse("path", "'path' option 
required"))
+  /**
+* Get the schema of the table.
+*/
+  lazy val schema: StructType = schemaSpec.getOrElse({
+val schemaUtil = new TableSchemaResolver(metaClient)
+SchemaConverters.toSqlType(schemaUtil.getTableAvroSchema)
+  .dataType.asInstanceOf[StructType]
+  })
+
+  /**
+* Get the partition schema.
+*/
+  private lazy val _partitionSchema: StructType = {
+val tableConfig = metaClient.getTableConfig
+val partitionColumns = tableConfig.getPartitionColumns
+val nameFieldMap = schema.fields.map(filed => filed.name -> filed).toMap
+// If the URL_ENCODE_PARTITIONING_OPT_KEY has enable, the partition schema 
will stored in
+// hoodie.properties, So we can benefit from the partition prune.
+if (partitionColumns.isPresent) {
+  val partitionFields = partitionColumns.get().map(column =>
+nameFieldMap.getOrElse(column, throw new 
IllegalArgumentException(s"Cannot find column " +
+  s"$column in the schema[${schema.fields.mkString(",")}]")))
+  new StructType(partitionFields)
+} else { // If the URL_ENCODE_PARTITIONING_OPT_KEY is disable, we trait it 
as a
+  // none-partitioned table.
+  new StructType()
+}
+  }
+
+  private val timeZoneId = CaseInsensitiveMap(options)
+.get(DateTimeUtils.TIMEZONE_OPTION)
+.getOrElse(SQLConf.get.sessionLocalTimeZone)
+
+  @volatile private var fileSystemView: HoodieTableFileSystemView = _
+  @volatile private var cachedAllInputFiles: Array[HoodieBaseFile] = _
+  @volatile private var cachedFileSize: Long = 0L
+  @volatile private var cachedAllPartitionPaths: Seq[PartitionPath] = _
+
+  refresh()
+
+  override def rootPaths: Seq[Path] = queryPath :: Nil
+
+  override def listFiles(partitionFilters: Seq[Expression],
+ dataFilters: Seq[E