[GitHub] [hudi] umehrot2 commented on a change in pull request #2651: [HUDI-1591] [RFC-26] Improve Hoodie Table Query Performance And Ease Of Use Fo…
umehrot2 commented on a change in pull request #2651: URL: https://github.com/apache/hudi/pull/2651#discussion_r605196012 ## File path: hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala ## @@ -81,4 +82,53 @@ object HoodieWriterUtils { params.foreach(kv => props.setProperty(kv._1, kv._2)) props } + + /** + * Get the partition columns to stored to hoodie.properties. + * @param parameters + * @return + */ + def getPartitionColumns(parameters: Map[String, String]): Option[String] = { +val keyGenClass = parameters.getOrElse(KEYGENERATOR_CLASS_OPT_KEY, + DEFAULT_KEYGENERATOR_CLASS_OPT_VAL) +try { + val constructor = getClass.getClassLoader.loadClass(keyGenClass) +.getConstructor(classOf[TypedProperties]) + constructor.setAccessible(true) + val props = new TypedProperties() + props.putAll(parameters.asJava) + val keyGen = constructor.newInstance(props) Review comment: Can't we pass the KeyGenerator already created in `HoodieSparkSqlWriter` and `DeltaSync` instead of recreating it again here ? Both these places already create `KeyGenerator` using reflection. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] umehrot2 commented on a change in pull request #2651: [HUDI-1591] [RFC-26] Improve Hoodie Table Query Performance And Ease Of Use Fo…
umehrot2 commented on a change in pull request #2651: URL: https://github.com/apache/hudi/pull/2651#discussion_r604479749 ## File path: hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala ## @@ -81,4 +81,33 @@ object HoodieWriterUtils { params.foreach(kv => props.setProperty(kv._1, kv._2)) props } + + /** + * Get the partition columns to stored to hoodie.properties. + * Return the partitionColumns only if it is the key generator class is the build-ins. + * For other custom key generator class, we cannot know whether or not it has relation + * with the partition columns. + * @param parameters + * @return + */ + def getPartitionColumns(parameters: Map[String, String]): Option[String] = { +val keyGenClass = parameters.getOrElse(KEYGENERATOR_CLASS_OPT_KEY, + DEFAULT_KEYGENERATOR_CLASS_OPT_VAL) +val partitionColumns = parameters.get(PARTITIONPATH_FIELD_OPT_KEY) +if (keyGenClass == classOf[SimpleKeyGenerator].getName || +keyGenClass == classOf[ComplexKeyGenerator].getName || +keyGenClass == classOf[TimestampBasedKeyGenerator].getName) { Review comment: I don't think this condition covers all cases. `CustomKeyGenerator` and `CustomAvroKeyGenerator` also make use of the partition path field. Also tracking these individually is risky and in future developers will easily miss this. Here is my recommendation: - Pass the KeyGenerator created in `HoodieSparkSqlWriter` to this function - Check if `KeyGenerator` is an instance of `BaseKeyGenerator` - If yes, invoke the `getPartitionPathFields` on the key generator and return those as the partition columns instead of reading it from the `PARTITIONPATH_FIELD_OPT_KEY`. - This will cover cases where someone extends from `BaseKeyGenerator` and uses some other partition key. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] umehrot2 commented on a change in pull request #2651: [HUDI-1591] [RFC-26] Improve Hoodie Table Query Performance And Ease Of Use Fo…
umehrot2 commented on a change in pull request #2651: URL: https://github.com/apache/hudi/pull/2651#discussion_r601930831 ## File path: hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieFileIndex.scala ## @@ -0,0 +1,317 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi + +import java.util.Properties + +import scala.collection.JavaConverters._ + +import org.apache.hadoop.fs.{FileStatus, Path} +import org.apache.hudi.client.common.HoodieSparkEngineContext +import org.apache.hudi.common.config.{HoodieMetadataConfig, SerializableConfiguration} +import org.apache.hudi.common.engine.HoodieLocalEngineContext +import org.apache.hudi.common.fs.FSUtils +import org.apache.hudi.common.model.HoodieBaseFile +import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver} +import org.apache.hudi.common.table.view.HoodieTableFileSystemView +import org.apache.hudi.config.HoodieWriteConfig +import org.apache.spark.api.java.JavaSparkContext +import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.{InternalRow, expressions} +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.avro.SchemaConverters +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, BoundReference, Expression, InterpretedPredicate} +import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils} +import org.apache.spark.sql.execution.datasources.{FileIndex, PartitionDirectory, PartitionUtils} +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types.StructType +import org.apache.spark.unsafe.types.UTF8String + +/** + * A File Index which support partition prune for hoodie snapshot and read-optimized + * query. + * Main steps to get the file list for query: + * 1、Load all files and partition values from the table path. + * 2、Do the partition prune by the partition filter condition. + * + * There are 3 cases for this: + * 1、If the partition columns size is equal to the actually partition path level, we + * read it as partitioned table.(e.g partition column is "dt", the partition path is "2021-03-10") + * + * 2、If the partition columns size is not equal to the partition path level, but the partition + * column size is "1" (e.g. partition column is "dt", but the partition path is "2021/03/10" + * who'es directory level is 3).We can still read it as a partitioned table. We will mapping the + * partition path (e.g. 2021/03/10) to the only partition column (e.g. "dt"). + * + * 3、Else the the partition columns size is not equal to the partition directory level and the + * size is great than "1" (e.g. partition column is "dt,hh", the partition path is "2021/03/10/12") + * , we read it as a None Partitioned table because we cannot know how to mapping the partition + * path with the partition columns in this case. + */ +case class HoodieFileIndex( + spark: SparkSession, + basePath: String, + schemaSpec: Option[StructType], + options: Map[String, String]) + extends FileIndex with Logging { + + @transient private val hadoopConf = spark.sessionState.newHadoopConf() + private lazy val metaClient = HoodieTableMetaClient +.builder().setConf(hadoopConf).setBasePath(basePath).build() + + @transient private val queryPath = new Path(options.getOrElse("path", "'path' option required")) + /** +* Get the schema of the table. +*/ + lazy val schema: StructType = schemaSpec.getOrElse({ +val schemaUtil = new TableSchemaResolver(metaClient) +SchemaConverters.toSqlType(schemaUtil.getTableAvroSchema) + .dataType.asInstanceOf[StructType] + }) + + /** +* Get the partition schema from the hoodie.properties. +*/ + private lazy val _partitionSchemaFromProperties: StructType = { +val tableConfig = metaClient.getTableConfig +val partitionColumns = tableConfig.getPartitionColumns +val nameFieldMap = schema.fields.map(filed => filed.name -> filed).toMap + +if (partitionColumns.isPresent) { + val partitionFields = partitionColumns.get().map(column => +nameFieldMap.getOrElse(column, throw new IllegalArgumentException(s"Cannot find column " + +
[GitHub] [hudi] umehrot2 commented on a change in pull request #2651: [HUDI-1591] [RFC-26] Improve Hoodie Table Query Performance And Ease Of Use Fo…
umehrot2 commented on a change in pull request #2651: URL: https://github.com/apache/hudi/pull/2651#discussion_r601930658 ## File path: hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieFileIndex.scala ## @@ -0,0 +1,317 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi + +import java.util.Properties + +import scala.collection.JavaConverters._ + +import org.apache.hadoop.fs.{FileStatus, Path} +import org.apache.hudi.client.common.HoodieSparkEngineContext +import org.apache.hudi.common.config.{HoodieMetadataConfig, SerializableConfiguration} +import org.apache.hudi.common.engine.HoodieLocalEngineContext +import org.apache.hudi.common.fs.FSUtils +import org.apache.hudi.common.model.HoodieBaseFile +import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver} +import org.apache.hudi.common.table.view.HoodieTableFileSystemView +import org.apache.hudi.config.HoodieWriteConfig +import org.apache.spark.api.java.JavaSparkContext +import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.{InternalRow, expressions} +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.avro.SchemaConverters +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, BoundReference, Expression, InterpretedPredicate} +import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils} +import org.apache.spark.sql.execution.datasources.{FileIndex, PartitionDirectory, PartitionUtils} +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types.StructType +import org.apache.spark.unsafe.types.UTF8String + +/** + * A File Index which support partition prune for hoodie snapshot and read-optimized + * query. + * Main steps to get the file list for query: + * 1、Load all files and partition values from the table path. + * 2、Do the partition prune by the partition filter condition. + * + * There are 3 cases for this: + * 1、If the partition columns size is equal to the actually partition path level, we + * read it as partitioned table.(e.g partition column is "dt", the partition path is "2021-03-10") + * + * 2、If the partition columns size is not equal to the partition path level, but the partition + * column size is "1" (e.g. partition column is "dt", but the partition path is "2021/03/10" + * who'es directory level is 3).We can still read it as a partitioned table. We will mapping the + * partition path (e.g. 2021/03/10) to the only partition column (e.g. "dt"). + * + * 3、Else the the partition columns size is not equal to the partition directory level and the + * size is great than "1" (e.g. partition column is "dt,hh", the partition path is "2021/03/10/12") + * , we read it as a None Partitioned table because we cannot know how to mapping the partition + * path with the partition columns in this case. + */ +case class HoodieFileIndex( + spark: SparkSession, + basePath: String, + schemaSpec: Option[StructType], + options: Map[String, String]) + extends FileIndex with Logging { + + @transient private val hadoopConf = spark.sessionState.newHadoopConf() + private lazy val metaClient = HoodieTableMetaClient +.builder().setConf(hadoopConf).setBasePath(basePath).build() + + @transient private val queryPath = new Path(options.getOrElse("path", "'path' option required")) + /** +* Get the schema of the table. +*/ + lazy val schema: StructType = schemaSpec.getOrElse({ +val schemaUtil = new TableSchemaResolver(metaClient) +SchemaConverters.toSqlType(schemaUtil.getTableAvroSchema) + .dataType.asInstanceOf[StructType] + }) + + /** +* Get the partition schema from the hoodie.properties. +*/ + private lazy val _partitionSchemaFromProperties: StructType = { +val tableConfig = metaClient.getTableConfig +val partitionColumns = tableConfig.getPartitionColumns +val nameFieldMap = schema.fields.map(filed => filed.name -> filed).toMap + +if (partitionColumns.isPresent) { + val partitionFields = partitionColumns.get().map(column => +nameFieldMap.getOrElse(column, throw new IllegalArgumentException(s"Cannot find column " + +
[GitHub] [hudi] umehrot2 commented on a change in pull request #2651: [HUDI-1591] [RFC-26] Improve Hoodie Table Query Performance And Ease Of Use Fo…
umehrot2 commented on a change in pull request #2651: URL: https://github.com/apache/hudi/pull/2651#discussion_r601854740 ## File path: hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java ## @@ -276,6 +276,16 @@ public static void processFiles(FileSystem fs, String basePathStr, Functionhttp://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi + +import java.util.Properties + +import scala.collection.JavaConverters._ +import org.apache.hadoop.fs.{FileStatus, Path} +import org.apache.hudi.client.common.HoodieSparkEngineContext +import org.apache.hudi.common.config.{HoodieMetadataConfig, SerializableConfiguration} +import org.apache.hudi.common.engine.HoodieLocalEngineContext +import org.apache.hudi.common.fs.FSUtils +import org.apache.hudi.common.model.HoodieBaseFile +import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver} +import org.apache.hudi.common.table.view.HoodieTableFileSystemView +import org.apache.hudi.config.HoodieWriteConfig +import org.apache.spark.api.java.JavaSparkContext +import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.{InternalRow, expressions} +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.avro.SchemaConverters +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, BoundReference, Expression, InterpretedPredicate} +import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils} +import org.apache.spark.sql.execution.datasources.{FileIndex, FileStatusCache, NoopCache, PartitionDirectory, PartitionUtils} +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types.StructType +import org.apache.spark.unsafe.types.UTF8String + +import scala.collection.mutable + +/** + * A File Index which support partition prune for hoodie snapshot and read-optimized + * query. + * Main steps to get the file list for query: + * 1、Load all files and partition values from the table path. + * 2、Do the partition prune by the partition filter condition. + * + * There are 3 cases for this: + * 1、If the partition columns size is equal to the actually partition path level, we + * read it as partitioned table.(e.g partition column is "dt", the partition path is "2021-03-10") + * + * 2、If the partition columns size is not equal to the partition path level, but the partition + * column size is "1" (e.g. partition column is "dt", but the partition path is "2021/03/10" + * who'es directory level is 3).We can still read it as a partitioned table. We will mapping the + * partition path (e.g. 2021/03/10) to the only partition column (e.g. "dt"). + * + * 3、Else the the partition columns size is not equal to the partition directory level and the + * size is great than "1" (e.g. partition column is "dt,hh", the partition path is "2021/03/10/12") + * , we read it as a None Partitioned table because we cannot know how to mapping the partition + * path with the partition columns in this case. + */ +case class HoodieFileIndex( + spark: SparkSession, + metaClient: HoodieTableMetaClient, + schemaSpec: Option[StructType], + options: Map[String, String], + @transient fileStatusCache: FileStatusCache = NoopCache) + extends FileIndex with Logging { + + private val basePath = metaClient.getBasePath + + @transient private val queryPath = new Path(options.getOrElse("path", "'path' option required")) + /** +* Get the schema of the table. +*/ + lazy val schema: StructType = schemaSpec.getOrElse({ +val schemaUtil = new TableSchemaResolver(metaClient) +SchemaConverters.toSqlType(schemaUtil.getTableAvroSchema) + .dataType.asInstanceOf[StructType] + }) + + /** +* Get the partition schema from the hoodie.properties. +*/ + private lazy val _partitionSchemaFromProperties: StructType = { +val tableConfig = metaClient.getTableConfig +val partitionColumns = tableConfig.getPartitionColumns +val nameFieldMap = schema.fields.map(filed => filed.name -> filed).toMap + +if (partitionColumns.isPresent) { + val partitionFields = partitionColumns.get().map(column => +nameFieldMap.getOrElse(column, throw new IllegalArgumentException(s"Cannot find column: '" + + s"$column' in the schema[${schema.fields.mkString(",")}]"))) + new StructType(partitionFields) +} else { // If the partition columns have not stored in hoodie.properites(the table that was + // created earlier), we trait it as a none-partitioned table. + new StructType() +} + } + + @transient @volatile private var fileSystemView: HoodieTableFileSystem
[GitHub] [hudi] umehrot2 commented on a change in pull request #2651: [HUDI-1591] [RFC-26] Improve Hoodie Table Query Performance And Ease Of Use Fo…
umehrot2 commented on a change in pull request #2651: URL: https://github.com/apache/hudi/pull/2651#discussion_r599160889 ## File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java ## @@ -179,6 +179,9 @@ public static final String EXTERNAL_RECORD_AND_SCHEMA_TRANSFORMATION = AVRO_SCHEMA + ".externalTransformation"; public static final String DEFAULT_EXTERNAL_RECORD_AND_SCHEMA_TRANSFORMATION = "false"; + public static final String MAX_LISTING_PARALLELISM = "hoodie.max.list.file.parallelism"; + public static final Integer DEFAULT_MAX_LISTING_PARALLELISM = 200; Review comment: - I think its fine to use the `DEFAULT_PARALLELISM` i.e `1500` as the default. It is what we use in `FileSystemBackedTableMetadata` as well. - We should add a method here to get this configuration just like all other configurations. ## File path: hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieFileIndex.scala ## @@ -0,0 +1,317 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi + +import java.util.Properties + +import scala.collection.JavaConverters._ + +import org.apache.hadoop.fs.{FileStatus, Path} +import org.apache.hudi.client.common.HoodieSparkEngineContext +import org.apache.hudi.common.config.{HoodieMetadataConfig, SerializableConfiguration} +import org.apache.hudi.common.engine.HoodieLocalEngineContext +import org.apache.hudi.common.fs.FSUtils +import org.apache.hudi.common.model.HoodieBaseFile +import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver} +import org.apache.hudi.common.table.view.HoodieTableFileSystemView +import org.apache.hudi.config.HoodieWriteConfig +import org.apache.spark.api.java.JavaSparkContext +import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.{InternalRow, expressions} +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.avro.SchemaConverters +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, BoundReference, Expression, InterpretedPredicate} +import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils} +import org.apache.spark.sql.execution.datasources.{FileIndex, PartitionDirectory, PartitionUtils} +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types.StructType +import org.apache.spark.unsafe.types.UTF8String + +/** + * A File Index which support partition prune for hoodie snapshot and read-optimized + * query. + * Main steps to get the file list for query: + * 1、Load all files and partition values from the table path. + * 2、Do the partition prune by the partition filter condition. + * + * There are 3 cases for this: + * 1、If the partition columns size is equal to the actually partition path level, we + * read it as partitioned table.(e.g partition column is "dt", the partition path is "2021-03-10") + * + * 2、If the partition columns size is not equal to the partition path level, but the partition + * column size is "1" (e.g. partition column is "dt", but the partition path is "2021/03/10" + * who'es directory level is 3).We can still read it as a partitioned table. We will mapping the + * partition path (e.g. 2021/03/10) to the only partition column (e.g. "dt"). + * + * 3、Else the the partition columns size is not equal to the partition directory level and the + * size is great than "1" (e.g. partition column is "dt,hh", the partition path is "2021/03/10/12") + * , we read it as a None Partitioned table because we cannot know how to mapping the partition + * path with the partition columns in this case. + */ +case class HoodieFileIndex( + spark: SparkSession, + basePath: String, + schemaSpec: Option[StructType], + options: Map[String, String]) + extends FileIndex with Logging { + + @transient private val hadoopConf = spark.sessionState.newHadoopConf() + private lazy val metaClient = HoodieTableMetaClient +.builder().setConf(hadoopConf).setBasePath(basePath).build() + + @transient private val queryPath = new Path(options.getOrElse("path", "'path' option required")) + /** +* Get the schema of the ta
[GitHub] [hudi] umehrot2 commented on a change in pull request #2651: [HUDI-1591] [RFC-26] Improve Hoodie Table Query Performance And Ease Of Use Fo…
umehrot2 commented on a change in pull request #2651: URL: https://github.com/apache/hudi/pull/2651#discussion_r593531717 ## File path: hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala ## @@ -79,39 +82,58 @@ class DefaultSource extends RelationProvider val allPaths = path.map(p => Seq(p)).getOrElse(Seq()) ++ readPaths val fs = FSUtils.getFs(allPaths.head, sqlContext.sparkContext.hadoopConfiguration) -val globPaths = HoodieSparkUtils.checkAndGlobPathIfNecessary(allPaths, fs) - -val tablePath = DataSourceUtils.getTablePath(fs, globPaths.toArray) +// Use the HoodieFileIndex only if the 'path' has specified with no "*" contains. +// And READ_PATHS_OPT_KEY has not specified. +// Or else we use the original way to read hoodie table. Review comment: Its fine for now, but ideally we should support the glob paths with `HoodieFileIndex` to be able to partition prune and later be able to integrate it with metadata based listing and column range indexes. Can you open a JIRA for this ? ## File path: hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieFileIndex.scala ## @@ -0,0 +1,273 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi + +import java.util.Properties + +import scala.collection.JavaConverters._ +import org.apache.hadoop.fs.{FileStatus, Path} +import org.apache.hudi.client.common.HoodieSparkEngineContext +import org.apache.hudi.common.config.{HoodieMetadataConfig, SerializableConfiguration} +import org.apache.hudi.common.fs.FSUtils +import org.apache.hudi.common.model.HoodieBaseFile +import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver} +import org.apache.hudi.common.table.view.HoodieTableFileSystemView +import org.apache.hudi.config.HoodieWriteConfig +import org.apache.spark.api.java.JavaSparkContext +import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.{InternalRow, expressions} +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.avro.SchemaConverters +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, BoundReference, Expression, InterpretedPredicate} +import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils} +import org.apache.spark.sql.execution.datasources.{FileIndex, PartitionDirectory, PartitionUtils} +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types.StructType + +/** + * A File Index which support partition prune for hoodie snapshot and read-optimized + * query. + * Main steps to get the file list for query: + * 1、Load all files and partition values from the table path. + * 2、Do the partition prune by the partition filter condition. + * + * Note: + * Only when the URL_ENCODE_PARTITIONING_OPT_KEY is enable, we can store the partition columns + * to the hoodie.properties in HoodieSqlWriter when write table. So that the query can benefit + * from the partition prune. + */ Review comment: This is too strong a constraint to have. Most customers would not be using `URL_ENCODE_PARTITIONING_OPT_KEY` and it does not make sense for partition pruning to not work for them and this feature would loose most of its value. I still don't quite understand why we can't support partition pruning by default. As far as storing the partition columns names in `hoodie.properties` is concerned, we can do that irrespective and does not have to be constrained by `URL_ENCODE_PARTITIONING_OPT_KEY`. As I understand, the problem occurs during the partition pruning where the values provided by spark in the predicates would not match the value passed by Hudi in cases such as where there is one partition column with value like `2020/10/10`. Is this understanding correct ? But just to support this above case, does not mean we do not do partition pruning for normal cases where the same will be represented by 3 partition columns: `=2020`,`mm=10`,`dd=10`. Our goal should be to support partition pruning by default, and for certain scenarios like `2020/10/10` the need encoding, it is upto the customer to pass `URL_ENCODE_PARTI
[GitHub] [hudi] umehrot2 commented on a change in pull request #2651: [HUDI-1591] [RFC-26] Improve Hoodie Table Query Performance And Ease Of Use Fo…
umehrot2 commented on a change in pull request #2651: URL: https://github.com/apache/hudi/pull/2651#discussion_r592054327 ## File path: hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieFileIndex.scala ## @@ -0,0 +1,279 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi + +import scala.collection.JavaConverters._ +import org.apache.hadoop.fs.{FileStatus, FileSystem, Path} +import org.apache.hudi.common.fs.FSUtils +import org.apache.hudi.common.model.HoodieBaseFile +import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver} +import org.apache.hudi.common.table.view.HoodieTableFileSystemView +import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.{InternalRow, expressions} +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.avro.SchemaConverters +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, BoundReference, Expression, InterpretedPredicate} +import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils} +import org.apache.spark.sql.execution.datasources.{FileIndex, PartitionDirectory, PartitionUtils} +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types.StructType + +import scala.collection.mutable +import scala.collection.mutable.ListBuffer + +/** + * A File Index which support partition prune for hoodie snapshot and read-optimized + * query. + * Main steps to get the file list for query: + * 1、Load all files and partition values from the table path. + * 2、Do the partition prune by the partition filter condition. + * + * Note: + * Only when the URL_ENCODE_PARTITIONING_OPT_KEY is enable, we can store the partition columns + * to the hoodie.properties in HoodieSqlWriter when write table. So that the query can benefit + * from the partition prune. + */ +case class HoodieFileIndex( + spark: SparkSession, + basePath: String, Review comment: I understand your point and the same thing can be supported with the partition filters moving forward. We don't have the check for paths from multiple tables even right now, so let's not worry about validating that. I am not so worried about this particular use-case, but we either ways have to support multiple paths being passed to the file index to support glob paths right ? So I was thinking that if either ways we need to support multiple paths, then multiple paths via this property would automatically work. But I agree with you that supporting the extra paths via this property is not my top priority. So you focus on solving the other more important things first. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] umehrot2 commented on a change in pull request #2651: [HUDI-1591] [RFC-26] Improve Hoodie Table Query Performance And Ease Of Use Fo…
umehrot2 commented on a change in pull request #2651: URL: https://github.com/apache/hudi/pull/2651#discussion_r592005038 ## File path: hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala ## @@ -66,10 +69,15 @@ class DefaultSource extends RelationProvider override def createRelation(sqlContext: SQLContext, optParams: Map[String, String], schema: StructType): BaseRelation = { +// Remove the "*" from the path in order to be compatible with the previous query path with "*" +val path = removeStar(optParams.get("path")) Review comment: I would ideally not like to go with this assumption. This would break and return incorrect results for Hudi customers using globbed paths till now. If possible, we should try to implement it in a way that supports both globbed paths and non-globbed paths. Spark's `InMemoryFileIndex` for example can handle both globbed and non-globbed paths, and if we are implementing our own FileIndex then we should see if we can handle that in our implementation too. I need to take a deeper look, but is it not possible to glob the paths and pass all the paths to the `HoodieFileIndex` and then list all of them like `InMemoryFileIndex` does. It accepts multiple `rootPathsSpecified`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] umehrot2 commented on a change in pull request #2651: [HUDI-1591] [RFC-26] Improve Hoodie Table Query Performance And Ease Of Use Fo…
umehrot2 commented on a change in pull request #2651: URL: https://github.com/apache/hudi/pull/2651#discussion_r591998950 ## File path: hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieFileIndex.scala ## @@ -0,0 +1,279 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi + +import scala.collection.JavaConverters._ +import org.apache.hadoop.fs.{FileStatus, FileSystem, Path} +import org.apache.hudi.common.fs.FSUtils +import org.apache.hudi.common.model.HoodieBaseFile +import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver} +import org.apache.hudi.common.table.view.HoodieTableFileSystemView +import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.{InternalRow, expressions} +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.avro.SchemaConverters +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, BoundReference, Expression, InterpretedPredicate} +import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils} +import org.apache.spark.sql.execution.datasources.{FileIndex, PartitionDirectory, PartitionUtils} +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types.StructType + +import scala.collection.mutable +import scala.collection.mutable.ListBuffer + +/** + * A File Index which support partition prune for hoodie snapshot and read-optimized + * query. + * Main steps to get the file list for query: + * 1、Load all files and partition values from the table path. + * 2、Do the partition prune by the partition filter condition. + * + * Note: + * Only when the URL_ENCODE_PARTITIONING_OPT_KEY is enable, we can store the partition columns + * to the hoodie.properties in HoodieSqlWriter when write table. So that the query can benefit + * from the partition prune. + */ +case class HoodieFileIndex( + spark: SparkSession, + basePath: String, + schemaSpec: Option[StructType], + options: Map[String, String]) + extends FileIndex with Logging { + + private val hadoopConf = spark.sessionState.newHadoopConf() + private val fs = new Path(basePath).getFileSystem(hadoopConf) + private lazy val metaClient = HoodieTableMetaClient +.builder().setConf(hadoopConf).setBasePath(basePath).build() + + private val queryPath = new Path(options.getOrElse("path", "'path' option required")) + /** +* Get the schema of the table. +*/ + lazy val schema: StructType = schemaSpec.getOrElse({ +val schemaUtil = new TableSchemaResolver(metaClient) +SchemaConverters.toSqlType(schemaUtil.getTableAvroSchema) + .dataType.asInstanceOf[StructType] + }) + + /** +* Get the partition schema. +*/ + private lazy val _partitionSchema: StructType = { +val tableConfig = metaClient.getTableConfig +val partitionColumns = tableConfig.getPartitionColumns +val nameFieldMap = schema.fields.map(filed => filed.name -> filed).toMap +// If the URL_ENCODE_PARTITIONING_OPT_KEY has enable, the partition schema will stored in +// hoodie.properties, So we can benefit from the partition prune. +if (partitionColumns.isPresent) { + val partitionFields = partitionColumns.get().map(column => +nameFieldMap.getOrElse(column, throw new IllegalArgumentException(s"Cannot find column " + + s"$column in the schema[${schema.fields.mkString(",")}]"))) + new StructType(partitionFields) +} else { // If the URL_ENCODE_PARTITIONING_OPT_KEY is disable, we trait it as a + // none-partitioned table. + new StructType() +} + } + + private val timeZoneId = CaseInsensitiveMap(options) +.get(DateTimeUtils.TIMEZONE_OPTION) +.getOrElse(SQLConf.get.sessionLocalTimeZone) + + @volatile private var fileSystemView: HoodieTableFileSystemView = _ + @volatile private var cachedAllInputFiles: Array[HoodieBaseFile] = _ + @volatile private var cachedFileSize: Long = 0L + @volatile private var cachedAllPartitionPaths: Seq[PartitionPath] = _ + + refresh() + + override def rootPaths: Seq[Path] = queryPath :: Nil + + override def listFiles(partitionFilters: Seq[Expression], + dataFilters: Seq[E