[GitHub] [hudi] xiarixiaoyao commented on a change in pull request #2761: [HUDI-1676] Support SQL with spark3
xiarixiaoyao commented on a change in pull request #2761: URL: https://github.com/apache/hudi/pull/2761#discussion_r615233137 ## File path: hudi-spark-datasource/hudi-spark3-extensions_2.12/src/main/scala/org/apache/spark/sql/catalyst/analysis/HudiOperationsCheck.scala ## @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.analysis + +import org.apache.hudi.execution.HudiSQLUtils +import org.apache.spark.sql.{AnalysisException, SparkSession} +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.expressions.{Expression, InSubquery, Literal, Not} +import org.apache.spark.sql.catalyst.merge.HudiMergeIntoTable +import org.apache.spark.sql.catalyst.plans.logical.{DeleteFromTable, LogicalPlan} +import org.apache.spark.sql.execution.command._ + +/** + * check rule which can intercept unSupportOperation for hudi + */ +class HudiOperationsCheck(spark: SparkSession) extends (LogicalPlan => Unit) { + val catalog = spark.sessionState.catalog + + override def apply(plan: LogicalPlan): Unit = { +plan foreach { + case AlterTableAddPartitionCommand(tableName, _, _) if (checkHudiTable(tableName)) => +throw new AnalysisException("AlterTableAddPartitionCommand are not currently supported in HUDI") + + case a: AlterTableDropPartitionCommand if (checkHudiTable(a.tableName)) => +throw new AnalysisException("AlterTableDropPartitionCommand are not currently supported in HUDI") + + case AlterTableChangeColumnCommand(tableName, _, _) if (checkHudiTable(tableName)) => +throw new AnalysisException("AlterTableChangeColumnCommand are not currently supported in HUDI") + + case AlterTableRenamePartitionCommand(tableName, _, _) if (checkHudiTable(tableName)) => +throw new AnalysisException("AlterTableRenamePartitionCommand are not currently supported in HUDI") + + case AlterTableRecoverPartitionsCommand(tableName, _) if (checkHudiTable(tableName)) => +throw new AnalysisException("AlterTableRecoverPartitionsCommand are not currently supported in HUDI") + + case AlterTableSetLocationCommand(tableName, _, _) if (checkHudiTable(tableName)) => +throw new AnalysisException("AlterTableSetLocationCommand are not currently supported in HUDI") + + case DeleteFromTable(target, Some(condition)) if (hasNullAwarePredicateWithNot(condition) && checkHudiTable(target)) => +throw new AnalysisException("Null-aware predicate sub-squeries a are not currently supported for DELETE") + + case HudiMergeIntoTable(target, _, _, _, noMatchedClauses, _) => +noMatchedClauses.map { m => + m.resolvedActions.foreach { action => +if (action.targetColNameParts.size > 1) { + throw new AnalysisException(s"cannot insert nested values: ${action.targetColNameParts.mkString("<") }") Review comment: create table nestedTable (keyid int,a struct) using hudi. now a is a nested filed. think that: "merge into nestedTable as t using sourceTable as s " + "on t.keyid = s.keyid when not matched then insert (t.keyid, **t.a.score**,) values (s.keyid, s.score)" we cannot insert t.keyid= s.keyid, t.a.socre = s.score to nestedTable directly; since we only set partial filed(**t.a.score**) for a nested filed **t.a**, the other partial filed(**t.a.number**) is unkown -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] xiarixiaoyao commented on a change in pull request #2761: [HUDI-1676] Support SQL with spark3
xiarixiaoyao commented on a change in pull request #2761: URL: https://github.com/apache/hudi/pull/2761#discussion_r615217323 ## File path: hudi-spark-datasource/hudi-spark3-extensions_2.12/src/test/scala/org/apache/spark/sql/hudi/TestDelete.scala ## @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hudi + +import org.apache.hadoop.fs.Path + +class TestDelete extends TestHoodieSqlBase { + + test("Test Delete Table") { +withTempDir { tmp => + val tablePath = new Path(tmp.toString, "deleteTable").toUri.toString + spark.sql("set spark.hoodie.shuffle.parallelism=4") + spark.sql( +s"""create table deleteTable ( + |keyid int, + |name string, + |price double, + |col1 long, + |p string, + |p1 string, + |p2 string) using hudi + |partitioned by (p,p1,p2) + |options('hoodie.datasource.write.table.type'='MERGE_ON_READ', + |'hoodie.datasource.write.precombine.field'='col1', Review comment: agree -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] xiarixiaoyao commented on a change in pull request #2761: [HUDI-1676] Support SQL with spark3
xiarixiaoyao commented on a change in pull request #2761: URL: https://github.com/apache/hudi/pull/2761#discussion_r615217279 ## File path: hudi-spark-datasource/hudi-spark3-extensions_2.12/src/main/scala/org/apache/spark/sql/hudi/execution/MergeIntoHudiTable.scala ## @@ -0,0 +1,332 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hudi.execution + +import org.apache.hudi.DataSourceWriteOptions.{DEFAULT_PAYLOAD_OPT_VAL, PAYLOAD_CLASS_OPT_KEY, RECORDKEY_FIELD_OPT_KEY} +import org.apache.hudi.common.model.{OverwriteNonDefaultsWithLatestAvroPayload, WriteOperationType} +import org.apache.hudi.execution.HudiSQLUtils +import org.apache.spark.internal.Logging +import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.encoders.{ExpressionEncoder, RowEncoder} +import org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate +import org.apache.spark.sql.catalyst.merge._ +import org.apache.spark.sql.catalyst.expressions.{And, Attribute, BasePredicate, Expression, Literal, PredicateHelper, UnsafeProjection} +import org.apache.spark.sql.functions.{col, lit, when} +import org.apache.spark.sql.catalyst.merge.{HudiMergeClause, HudiMergeInsertClause} +import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan, Project} +import org.apache.spark.sql.execution.command.RunnableCommand +import org.apache.spark.sql.internal.StaticSQLConf +import org.apache.spark.sql.types.{StringType, StructType} + +import scala.collection.mutable.ArrayBuffer + +/** + * merge command, execute merge operation for hudi + */ +case class MergeIntoHudiTable( +target: LogicalPlan, +source: LogicalPlan, +joinCondition: Expression, +matchedClauses: Seq[HudiMergeClause], +noMatchedClause: Seq[HudiMergeInsertClause], +finalSchema: StructType, +trimmedSchema: StructType) extends RunnableCommand with Logging with PredicateHelper { + + var tableMeta: Map[String, String] = null + + lazy val isRecordKeyJoin = { +val recordKeyFields = tableMeta.get(RECORDKEY_FIELD_OPT_KEY).get.split(",").map(_.trim).filter(!_.isEmpty) +val intersect = joinCondition.references.intersect(target.outputSet).toSeq +if (recordKeyFields.size == 1 && intersect.size ==1 && intersect(0).name.equalsIgnoreCase(recordKeyFields(0))) { + true +} else { + false +} + } + + override def run(sparkSession: SparkSession): Seq[Row] = { +tableMeta = HudiSQLUtils.getHoodiePropsFromRelation(target, sparkSession) +val enableHive = "hive" == sparkSession.sessionState.conf.getConf(StaticSQLConf.CATALOG_IMPLEMENTATION) Review comment: agree -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] xiarixiaoyao commented on a change in pull request #2761: [HUDI-1676] Support SQL with spark3
xiarixiaoyao commented on a change in pull request #2761: URL: https://github.com/apache/hudi/pull/2761#discussion_r615217239 ## File path: hudi-spark-datasource/hudi-spark3-extensions_2.12/src/main/scala/org/apache/spark/sql/hudi/execution/CreateHudiTableAsSelectCommand.scala ## @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hudi.execution + +import java.util.Properties + +import org.apache.hadoop.fs.{FileSystem, Path} +import org.apache.hudi.DataSourceWriteOptions +import org.apache.hudi.DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY +import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient} +import org.apache.hudi.config.HoodieWriteConfig.BULKINSERT_PARALLELISM +import org.apache.hudi.execution.HudiSQLUtils +import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType} +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.execution.command.RunnableCommand +import org.apache.spark.sql.internal.StaticSQLConf + +import scala.collection.JavaConverters._ + +/** + * Command for ctas + */ +case class CreateHudiTableAsSelectCommand( +table: CatalogTable, +mode: SaveMode, +query: LogicalPlan, +outputColumnNames: Seq[String]) extends RunnableCommand { + override def run(sparkSession: SparkSession): Seq[Row] = { +assert(table.tableType != CatalogTableType.VIEW) +assert(table.provider.isDefined) +val sessionState = sparkSession.sessionState +var fs: FileSystem = null +val conf = sessionState.newHadoopConf() + +val db = table.identifier.database.getOrElse(sessionState.catalog.getCurrentDatabase) +val tableIdentWithDB = table.identifier.copy(database = Some(db)) +val tableName = tableIdentWithDB.unquotedString +val enableHive = "hive" == sparkSession.sessionState.conf.getConf(StaticSQLConf.CATALOG_IMPLEMENTATION) +val path = HudiSQLUtils.getTablePath(sparkSession, table) + +if (sessionState.catalog.tableExists(tableIdentWithDB)) { + assert(mode != SaveMode.Overwrite, s"Expect the table $tableName has been dropped when the save mode is OverWrite") + if (mode == SaveMode.ErrorIfExists) { +throw new AnalysisException(s"Table $tableName already exists. You eed to drop it first.") + } + if (mode == SaveMode.Ignore) { +// scalastyle:off +return Seq.empty +// scalastyle:on + } + // append table + saveDataIntoHudiTable(sparkSession, table, path, table.storage.properties, enableHive) +} else { + val properties = table.storage.properties + assert(table.schema.isEmpty) + sparkSession.sessionState.catalog.validateTableLocation(table) + // create table + if (!enableHive) { +val newTable = if (!HudiSQLUtils.tableExists(path, conf)) { + table.copy(schema = query.schema) +} else { + table +} +CreateHudiTableCommand(newTable, true).run(sparkSession) + } + saveDataIntoHudiTable(sparkSession, table, path, properties, enableHive, "bulk_insert") +} + +// save necessary parameter in hoodie.properties +val newProperties = new Properties() +newProperties.putAll(table.storage.properties.asJava) +// add table partition +newProperties.put(PARTITIONPATH_FIELD_OPT_KEY, table.partitionColumnNames.mkString(",")) +val metaPath = new Path(path, HoodieTableMetaClient.METAFOLDER_NAME) +val propertyPath = new Path(metaPath, HoodieTableConfig.HOODIE_PROPERTIES_FILE) + CreateHudiTableCommand.saveNeededPropertiesIntoHoodie(propertyPath.getFileSystem(conf), propertyPath, newProperties) +Seq.empty[Row] + } + + private def saveDataIntoHudiTable( + session: SparkSession, + table: CatalogTable, + tablePath: String, + tableOptions: Map[String, String], + enableHive: Boolean, + operationType: String = "upsert"): Unit = { +val mode = if (operationType.equals("upsert")) { + "append" +} else { + "overwrite" +} +val newTableOptions = if (enableHive) { + // add hive sync properties + val
[GitHub] [hudi] xiarixiaoyao commented on a change in pull request #2761: [HUDI-1676] Support SQL with spark3
xiarixiaoyao commented on a change in pull request #2761: URL: https://github.com/apache/hudi/pull/2761#discussion_r615216595 ## File path: hudi-spark-datasource/hudi-spark3-extensions_2.12/src/main/scala/org/apache/hudi/execution/HudiSQLUtils.scala ## @@ -0,0 +1,553 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.execution + +import java.io.File +import java.util +import java.util.{Locale, Properties} + +import com.google.common.cache.{CacheBuilder, CacheLoader} +import org.apache.avro.generic.GenericRecord +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Path +import org.apache.hadoop.hive.conf.HiveConf +import org.apache.hudi.DataSourceReadOptions.{QUERY_TYPE_OPT_KEY, QUERY_TYPE_READ_OPTIMIZED_OPT_VAL, QUERY_TYPE_SNAPSHOT_OPT_VAL} +import org.apache.hudi.DataSourceWriteOptions._ +import org.apache.hudi.HoodieSparkSqlWriter.{TableInstantInfo, toProperties} +import org.apache.hudi.{AvroConversionUtils, DataSourceReadOptions, DataSourceUtils, DataSourceWriteOptions, DefaultSource, HoodieBootstrapPartition, HoodieBootstrapRelation, HoodieSparkSqlWriter, HoodieSparkUtils, HoodieWriterUtils, MergeOnReadSnapshotRelation} +import org.apache.hudi.common.model.{OverwriteNonDefaultsWithLatestAvroPayload, OverwriteWithLatestAvroPayload} +import org.apache.hudi.avro.HoodieAvroUtils +import org.apache.hudi.client.{HoodieWriteResult, SparkRDDWriteClient} +import org.apache.hudi.common.config.TypedProperties +import org.apache.hudi.common.model._ +import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient} +import org.apache.hudi.common.table.timeline.HoodieActiveTimeline +import org.apache.hudi.config.HoodieWriteConfig._ +import org.apache.hudi.exception.{HoodieException, HoodieIOException} +import org.apache.hudi.hive.{HiveSyncConfig, HiveSyncTool} +import org.apache.hudi.keygen.constant.KeyGeneratorOptions +import org.apache.hudi.payload.AWSDmsAvroPayload +import org.apache.hudi.spark3.internal.HoodieDataSourceInternalTable +import org.apache.log4j.LogManager +import org.apache.spark.api.java.JavaSparkContext +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession} +import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType, HiveTableRelation} +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.functions.{col, udf} +import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation} +import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation +import org.apache.spark.sql.sources.BaseRelation +import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.util.CaseInsensitiveStringMap + +import scala.collection.JavaConverters._ +import scala.collection.mutable.ListBuffer + + + +/** + * hudi IUD utils + */ +object HudiSQLUtils { + + val AWSDMSAVROPAYLOAD = classOf[AWSDmsAvroPayload].getName + val OVERWRITEWITHLATESTVROAYLOAD = classOf[OverwriteWithLatestAvroPayload].getName + val OVERWRITENONDEFAULTSWITHLATESTAVROPAYLOAD = classOf[OverwriteNonDefaultsWithLatestAvroPayload].getName + val MERGE_MARKER = "_hoodie_merge_marker" + + private val log = LogManager.getLogger(getClass) + + private val tableConfigCache = CacheBuilder +.newBuilder() +.maximumSize(1000) +.build(new CacheLoader[String, Properties] { + override def load(k: String): Properties = { +try { + HoodieTableMetaClient.builder().setConf(SparkSession.active.sparkContext.hadoopConfiguration) +.setBasePath(k).build().getTableConfig.getProperties +} catch { + // we catch expected error here + case e: HoodieIOException => +log.error(e.getMessage) +new Properties() + case t: Throwable => +throw t +} + } +}) + + def getPropertiesFromTableConfigCache(path: String): Properties = { +if (path.isEmpty) { + throw new HoodieIOException("unexpected empty hoodie table basePath") +} +tableConfigCache.get(path) + } + + private def
[GitHub] [hudi] xiarixiaoyao commented on a change in pull request #2761: [HUDI-1676] Support SQL with spark3
xiarixiaoyao commented on a change in pull request #2761: URL: https://github.com/apache/hudi/pull/2761#discussion_r615216569 ## File path: hudi-spark-datasource/hudi-spark3-extensions_2.12/src/main/scala/org/apache/hudi/execution/HudiSQLUtils.scala ## @@ -0,0 +1,553 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.execution + +import java.io.File +import java.util +import java.util.{Locale, Properties} + +import com.google.common.cache.{CacheBuilder, CacheLoader} +import org.apache.avro.generic.GenericRecord +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Path +import org.apache.hadoop.hive.conf.HiveConf +import org.apache.hudi.DataSourceReadOptions.{QUERY_TYPE_OPT_KEY, QUERY_TYPE_READ_OPTIMIZED_OPT_VAL, QUERY_TYPE_SNAPSHOT_OPT_VAL} +import org.apache.hudi.DataSourceWriteOptions._ +import org.apache.hudi.HoodieSparkSqlWriter.{TableInstantInfo, toProperties} +import org.apache.hudi.{AvroConversionUtils, DataSourceReadOptions, DataSourceUtils, DataSourceWriteOptions, DefaultSource, HoodieBootstrapPartition, HoodieBootstrapRelation, HoodieSparkSqlWriter, HoodieSparkUtils, HoodieWriterUtils, MergeOnReadSnapshotRelation} +import org.apache.hudi.common.model.{OverwriteNonDefaultsWithLatestAvroPayload, OverwriteWithLatestAvroPayload} +import org.apache.hudi.avro.HoodieAvroUtils +import org.apache.hudi.client.{HoodieWriteResult, SparkRDDWriteClient} +import org.apache.hudi.common.config.TypedProperties +import org.apache.hudi.common.model._ +import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient} +import org.apache.hudi.common.table.timeline.HoodieActiveTimeline +import org.apache.hudi.config.HoodieWriteConfig._ +import org.apache.hudi.exception.{HoodieException, HoodieIOException} +import org.apache.hudi.hive.{HiveSyncConfig, HiveSyncTool} +import org.apache.hudi.keygen.constant.KeyGeneratorOptions +import org.apache.hudi.payload.AWSDmsAvroPayload +import org.apache.hudi.spark3.internal.HoodieDataSourceInternalTable +import org.apache.log4j.LogManager +import org.apache.spark.api.java.JavaSparkContext +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession} +import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType, HiveTableRelation} +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.functions.{col, udf} +import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation} +import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation +import org.apache.spark.sql.sources.BaseRelation +import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.util.CaseInsensitiveStringMap + +import scala.collection.JavaConverters._ +import scala.collection.mutable.ListBuffer + + + +/** + * hudi IUD utils + */ +object HudiSQLUtils { + + val AWSDMSAVROPAYLOAD = classOf[AWSDmsAvroPayload].getName + val OVERWRITEWITHLATESTVROAYLOAD = classOf[OverwriteWithLatestAvroPayload].getName + val OVERWRITENONDEFAULTSWITHLATESTAVROPAYLOAD = classOf[OverwriteNonDefaultsWithLatestAvroPayload].getName + val MERGE_MARKER = "_hoodie_merge_marker" + + private val log = LogManager.getLogger(getClass) + + private val tableConfigCache = CacheBuilder +.newBuilder() +.maximumSize(1000) +.build(new CacheLoader[String, Properties] { + override def load(k: String): Properties = { +try { + HoodieTableMetaClient.builder().setConf(SparkSession.active.sparkContext.hadoopConfiguration) +.setBasePath(k).build().getTableConfig.getProperties +} catch { + // we catch expected error here + case e: HoodieIOException => +log.error(e.getMessage) +new Properties() + case t: Throwable => +throw t +} + } +}) + + def getPropertiesFromTableConfigCache(path: String): Properties = { +if (path.isEmpty) { + throw new HoodieIOException("unexpected empty hoodie table basePath") +} +tableConfigCache.get(path) + } + + private def
[GitHub] [hudi] xiarixiaoyao commented on a change in pull request #2761: [HUDI-1676] Support SQL with spark3
xiarixiaoyao commented on a change in pull request #2761: URL: https://github.com/apache/hudi/pull/2761#discussion_r615216468 ## File path: hudi-spark-datasource/hudi-spark3-extensions_2.12/src/main/scala/org/apache/hudi/execution/HudiSQLUtils.scala ## @@ -0,0 +1,553 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.execution + +import java.io.File +import java.util +import java.util.{Locale, Properties} + +import com.google.common.cache.{CacheBuilder, CacheLoader} +import org.apache.avro.generic.GenericRecord +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Path +import org.apache.hadoop.hive.conf.HiveConf +import org.apache.hudi.DataSourceReadOptions.{QUERY_TYPE_OPT_KEY, QUERY_TYPE_READ_OPTIMIZED_OPT_VAL, QUERY_TYPE_SNAPSHOT_OPT_VAL} +import org.apache.hudi.DataSourceWriteOptions._ +import org.apache.hudi.HoodieSparkSqlWriter.{TableInstantInfo, toProperties} +import org.apache.hudi.{AvroConversionUtils, DataSourceReadOptions, DataSourceUtils, DataSourceWriteOptions, DefaultSource, HoodieBootstrapPartition, HoodieBootstrapRelation, HoodieSparkSqlWriter, HoodieSparkUtils, HoodieWriterUtils, MergeOnReadSnapshotRelation} +import org.apache.hudi.common.model.{OverwriteNonDefaultsWithLatestAvroPayload, OverwriteWithLatestAvroPayload} +import org.apache.hudi.avro.HoodieAvroUtils +import org.apache.hudi.client.{HoodieWriteResult, SparkRDDWriteClient} +import org.apache.hudi.common.config.TypedProperties +import org.apache.hudi.common.model._ +import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient} +import org.apache.hudi.common.table.timeline.HoodieActiveTimeline +import org.apache.hudi.config.HoodieWriteConfig._ +import org.apache.hudi.exception.{HoodieException, HoodieIOException} +import org.apache.hudi.hive.{HiveSyncConfig, HiveSyncTool} +import org.apache.hudi.keygen.constant.KeyGeneratorOptions +import org.apache.hudi.payload.AWSDmsAvroPayload +import org.apache.hudi.spark3.internal.HoodieDataSourceInternalTable +import org.apache.log4j.LogManager +import org.apache.spark.api.java.JavaSparkContext +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession} +import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType, HiveTableRelation} +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.functions.{col, udf} +import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation} +import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation +import org.apache.spark.sql.sources.BaseRelation +import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.util.CaseInsensitiveStringMap + +import scala.collection.JavaConverters._ +import scala.collection.mutable.ListBuffer + + + +/** + * hudi IUD utils + */ +object HudiSQLUtils { + + val AWSDMSAVROPAYLOAD = classOf[AWSDmsAvroPayload].getName + val OVERWRITEWITHLATESTVROAYLOAD = classOf[OverwriteWithLatestAvroPayload].getName + val OVERWRITENONDEFAULTSWITHLATESTAVROPAYLOAD = classOf[OverwriteNonDefaultsWithLatestAvroPayload].getName + val MERGE_MARKER = "_hoodie_merge_marker" + + private val log = LogManager.getLogger(getClass) + + private val tableConfigCache = CacheBuilder +.newBuilder() +.maximumSize(1000) +.build(new CacheLoader[String, Properties] { + override def load(k: String): Properties = { +try { + HoodieTableMetaClient.builder().setConf(SparkSession.active.sparkContext.hadoopConfiguration) +.setBasePath(k).build().getTableConfig.getProperties +} catch { + // we catch expected error here + case e: HoodieIOException => +log.error(e.getMessage) +new Properties() + case t: Throwable => +throw t +} + } +}) + + def getPropertiesFromTableConfigCache(path: String): Properties = { +if (path.isEmpty) { + throw new HoodieIOException("unexpected empty hoodie table basePath") +} +tableConfigCache.get(path) + } + + private def
[GitHub] [hudi] xiarixiaoyao commented on a change in pull request #2761: [HUDI-1676] Support SQL with spark3
xiarixiaoyao commented on a change in pull request #2761: URL: https://github.com/apache/hudi/pull/2761#discussion_r615216402 ## File path: hudi-spark-datasource/hudi-spark3-extensions_2.12/src/main/scala/org/apache/hudi/execution/HudiSQLUtils.scala ## @@ -0,0 +1,553 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.execution + +import java.io.File +import java.util +import java.util.{Locale, Properties} + +import com.google.common.cache.{CacheBuilder, CacheLoader} +import org.apache.avro.generic.GenericRecord +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Path +import org.apache.hadoop.hive.conf.HiveConf +import org.apache.hudi.DataSourceReadOptions.{QUERY_TYPE_OPT_KEY, QUERY_TYPE_READ_OPTIMIZED_OPT_VAL, QUERY_TYPE_SNAPSHOT_OPT_VAL} +import org.apache.hudi.DataSourceWriteOptions._ +import org.apache.hudi.HoodieSparkSqlWriter.{TableInstantInfo, toProperties} +import org.apache.hudi.{AvroConversionUtils, DataSourceReadOptions, DataSourceUtils, DataSourceWriteOptions, DefaultSource, HoodieBootstrapPartition, HoodieBootstrapRelation, HoodieSparkSqlWriter, HoodieSparkUtils, HoodieWriterUtils, MergeOnReadSnapshotRelation} +import org.apache.hudi.common.model.{OverwriteNonDefaultsWithLatestAvroPayload, OverwriteWithLatestAvroPayload} +import org.apache.hudi.avro.HoodieAvroUtils +import org.apache.hudi.client.{HoodieWriteResult, SparkRDDWriteClient} +import org.apache.hudi.common.config.TypedProperties +import org.apache.hudi.common.model._ +import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient} +import org.apache.hudi.common.table.timeline.HoodieActiveTimeline +import org.apache.hudi.config.HoodieWriteConfig._ +import org.apache.hudi.exception.{HoodieException, HoodieIOException} +import org.apache.hudi.hive.{HiveSyncConfig, HiveSyncTool} +import org.apache.hudi.keygen.constant.KeyGeneratorOptions +import org.apache.hudi.payload.AWSDmsAvroPayload +import org.apache.hudi.spark3.internal.HoodieDataSourceInternalTable +import org.apache.log4j.LogManager +import org.apache.spark.api.java.JavaSparkContext +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession} +import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType, HiveTableRelation} +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.functions.{col, udf} +import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation} +import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation +import org.apache.spark.sql.sources.BaseRelation +import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.util.CaseInsensitiveStringMap + +import scala.collection.JavaConverters._ +import scala.collection.mutable.ListBuffer + + + +/** + * hudi IUD utils + */ +object HudiSQLUtils { + + val AWSDMSAVROPAYLOAD = classOf[AWSDmsAvroPayload].getName + val OVERWRITEWITHLATESTVROAYLOAD = classOf[OverwriteWithLatestAvroPayload].getName + val OVERWRITENONDEFAULTSWITHLATESTAVROPAYLOAD = classOf[OverwriteNonDefaultsWithLatestAvroPayload].getName + val MERGE_MARKER = "_hoodie_merge_marker" + + private val log = LogManager.getLogger(getClass) + + private val tableConfigCache = CacheBuilder +.newBuilder() +.maximumSize(1000) +.build(new CacheLoader[String, Properties] { + override def load(k: String): Properties = { +try { + HoodieTableMetaClient.builder().setConf(SparkSession.active.sparkContext.hadoopConfiguration) +.setBasePath(k).build().getTableConfig.getProperties +} catch { + // we catch expected error here + case e: HoodieIOException => +log.error(e.getMessage) +new Properties() + case t: Throwable => +throw t +} + } +}) + + def getPropertiesFromTableConfigCache(path: String): Properties = { +if (path.isEmpty) { + throw new HoodieIOException("unexpected empty hoodie table basePath") +} +tableConfigCache.get(path) + } + + private def
[GitHub] [hudi] xiarixiaoyao commented on a change in pull request #2761: [HUDI-1676] Support SQL with spark3
xiarixiaoyao commented on a change in pull request #2761: URL: https://github.com/apache/hudi/pull/2761#discussion_r615215652 ## File path: hudi-spark-datasource/hudi-spark3-extensions_2.12/src/main/scala/org/apache/hudi/execution/HudiSQLUtils.scala ## @@ -0,0 +1,553 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.execution + +import java.io.File +import java.util +import java.util.{Locale, Properties} + +import com.google.common.cache.{CacheBuilder, CacheLoader} +import org.apache.avro.generic.GenericRecord +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Path +import org.apache.hadoop.hive.conf.HiveConf +import org.apache.hudi.DataSourceReadOptions.{QUERY_TYPE_OPT_KEY, QUERY_TYPE_READ_OPTIMIZED_OPT_VAL, QUERY_TYPE_SNAPSHOT_OPT_VAL} +import org.apache.hudi.DataSourceWriteOptions._ +import org.apache.hudi.HoodieSparkSqlWriter.{TableInstantInfo, toProperties} +import org.apache.hudi.{AvroConversionUtils, DataSourceReadOptions, DataSourceUtils, DataSourceWriteOptions, DefaultSource, HoodieBootstrapPartition, HoodieBootstrapRelation, HoodieSparkSqlWriter, HoodieSparkUtils, HoodieWriterUtils, MergeOnReadSnapshotRelation} +import org.apache.hudi.common.model.{OverwriteNonDefaultsWithLatestAvroPayload, OverwriteWithLatestAvroPayload} +import org.apache.hudi.avro.HoodieAvroUtils +import org.apache.hudi.client.{HoodieWriteResult, SparkRDDWriteClient} +import org.apache.hudi.common.config.TypedProperties +import org.apache.hudi.common.model._ +import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient} +import org.apache.hudi.common.table.timeline.HoodieActiveTimeline +import org.apache.hudi.config.HoodieWriteConfig._ +import org.apache.hudi.exception.{HoodieException, HoodieIOException} +import org.apache.hudi.hive.{HiveSyncConfig, HiveSyncTool} +import org.apache.hudi.keygen.constant.KeyGeneratorOptions +import org.apache.hudi.payload.AWSDmsAvroPayload +import org.apache.hudi.spark3.internal.HoodieDataSourceInternalTable +import org.apache.log4j.LogManager +import org.apache.spark.api.java.JavaSparkContext +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession} +import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType, HiveTableRelation} +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.functions.{col, udf} +import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation} +import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation +import org.apache.spark.sql.sources.BaseRelation +import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.util.CaseInsensitiveStringMap + +import scala.collection.JavaConverters._ +import scala.collection.mutable.ListBuffer + + + +/** + * hudi IUD utils + */ +object HudiSQLUtils { + + val AWSDMSAVROPAYLOAD = classOf[AWSDmsAvroPayload].getName + val OVERWRITEWITHLATESTVROAYLOAD = classOf[OverwriteWithLatestAvroPayload].getName + val OVERWRITENONDEFAULTSWITHLATESTAVROPAYLOAD = classOf[OverwriteNonDefaultsWithLatestAvroPayload].getName + val MERGE_MARKER = "_hoodie_merge_marker" + + private val log = LogManager.getLogger(getClass) + + private val tableConfigCache = CacheBuilder +.newBuilder() +.maximumSize(1000) +.build(new CacheLoader[String, Properties] { + override def load(k: String): Properties = { +try { + HoodieTableMetaClient.builder().setConf(SparkSession.active.sparkContext.hadoopConfiguration) +.setBasePath(k).build().getTableConfig.getProperties +} catch { + // we catch expected error here + case e: HoodieIOException => +log.error(e.getMessage) +new Properties() + case t: Throwable => +throw t +} + } +}) + + def getPropertiesFromTableConfigCache(path: String): Properties = { +if (path.isEmpty) { + throw new HoodieIOException("unexpected empty hoodie table basePath") +} +tableConfigCache.get(path) + } + + private def
[GitHub] [hudi] xiarixiaoyao commented on a change in pull request #2761: [HUDI-1676] Support SQL with spark3
xiarixiaoyao commented on a change in pull request #2761: URL: https://github.com/apache/hudi/pull/2761#discussion_r615215418 ## File path: hudi-spark-datasource/hudi-spark3-extensions_2.12/src/main/scala/org/apache/hudi/execution/HudiSQLUtils.scala ## @@ -0,0 +1,553 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.execution + +import java.io.File +import java.util +import java.util.{Locale, Properties} + +import com.google.common.cache.{CacheBuilder, CacheLoader} +import org.apache.avro.generic.GenericRecord +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Path +import org.apache.hadoop.hive.conf.HiveConf +import org.apache.hudi.DataSourceReadOptions.{QUERY_TYPE_OPT_KEY, QUERY_TYPE_READ_OPTIMIZED_OPT_VAL, QUERY_TYPE_SNAPSHOT_OPT_VAL} +import org.apache.hudi.DataSourceWriteOptions._ +import org.apache.hudi.HoodieSparkSqlWriter.{TableInstantInfo, toProperties} +import org.apache.hudi.{AvroConversionUtils, DataSourceReadOptions, DataSourceUtils, DataSourceWriteOptions, DefaultSource, HoodieBootstrapPartition, HoodieBootstrapRelation, HoodieSparkSqlWriter, HoodieSparkUtils, HoodieWriterUtils, MergeOnReadSnapshotRelation} +import org.apache.hudi.common.model.{OverwriteNonDefaultsWithLatestAvroPayload, OverwriteWithLatestAvroPayload} +import org.apache.hudi.avro.HoodieAvroUtils +import org.apache.hudi.client.{HoodieWriteResult, SparkRDDWriteClient} +import org.apache.hudi.common.config.TypedProperties +import org.apache.hudi.common.model._ +import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient} +import org.apache.hudi.common.table.timeline.HoodieActiveTimeline +import org.apache.hudi.config.HoodieWriteConfig._ +import org.apache.hudi.exception.{HoodieException, HoodieIOException} +import org.apache.hudi.hive.{HiveSyncConfig, HiveSyncTool} +import org.apache.hudi.keygen.constant.KeyGeneratorOptions +import org.apache.hudi.payload.AWSDmsAvroPayload +import org.apache.hudi.spark3.internal.HoodieDataSourceInternalTable +import org.apache.log4j.LogManager +import org.apache.spark.api.java.JavaSparkContext +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession} +import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType, HiveTableRelation} +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.functions.{col, udf} +import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation} +import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation +import org.apache.spark.sql.sources.BaseRelation +import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.util.CaseInsensitiveStringMap + +import scala.collection.JavaConverters._ +import scala.collection.mutable.ListBuffer + + + +/** + * hudi IUD utils + */ +object HudiSQLUtils { + + val AWSDMSAVROPAYLOAD = classOf[AWSDmsAvroPayload].getName Review comment: AWSDMSAVROPAYLOAD use 'Op' filed to mark whether the current row is in the deleted state。 so i think if AWSDMSAVROPAYLOAD is specified, we should respect the logcial of it. In the merge statement if we meet AWSDMSAVROPAYLOAD , we will ignore all merge condition, just merge it to hudi table directly。 These are just my personal views,if not suitable, i will update my code -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] xiarixiaoyao commented on a change in pull request #2761: [HUDI-1676] Support SQL with spark3
xiarixiaoyao commented on a change in pull request #2761: URL: https://github.com/apache/hudi/pull/2761#discussion_r611126561 ## File path: hudi-spark-datasource/hudi-spark3-extensions_2.12/src/main/scala/io/hudi/sql/HudiSpark3SessionExtension.scala ## @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.hudi.sql + +import org.apache.spark.sql.SparkSessionExtensions +import org.apache.spark.sql.catalyst.analysis.{HudiAnalysis, HudiOperationsCheck, PostprocessHudiTable, ProcessHudiMerge} +import org.apache.spark.sql.catalyst.optimizer.{OptimizerconditionHudi, RewriteHudiUID} + +/** + * An extension for Spark SQL to activate Hudi SQL parser to support Hudi SQL grammer + * example to create a 'SparkSession' with the hudi SQL grammar + * {{{ + * import org.apache.spark.sql.SparkSession + * + * val spark = SparkSession + * .builder() + * .appName("...") + * .master("...") + * .config("spark.sql.extensions", "io.hudi.sql.HudiSpark3SessionExtension") + * }}} + * + */ +class HudiSpark3SessionExtension extends (SparkSessionExtensions => Unit) { Review comment: @garyli1019 thanks for you review, i will update -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] xiarixiaoyao commented on a change in pull request #2761: [HUDI-1676] Support SQL with spark3
xiarixiaoyao commented on a change in pull request #2761: URL: https://github.com/apache/hudi/pull/2761#discussion_r611020232 ## File path: hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieFileIndex.scala ## @@ -302,6 +302,10 @@ case class HoodieFileIndex( PartitionRowPath(partitionRow, partitionPath) } +if (partitionRowPaths.isEmpty) { + partitionRowPaths = Seq(PartitionRowPath(InternalRow.empty, "")).toBuffer +} + Review comment: @leesf thanks for you review. i will raise another pr to solve this problem。 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] xiarixiaoyao commented on a change in pull request #2761: [HUDI-1676] Support SQL with spark3
xiarixiaoyao commented on a change in pull request #2761: URL: https://github.com/apache/hudi/pull/2761#discussion_r606622137 ## File path: hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieFileIndex.scala ## @@ -302,6 +302,10 @@ case class HoodieFileIndex( PartitionRowPath(partitionRow, partitionPath) } +if (partitionRowPaths.isEmpty) { + partitionRowPaths = Seq(PartitionRowPath(InternalRow.empty, "")).toBuffer +} + Review comment: simple fixed the bug , introduced by HUDI-1591 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] xiarixiaoyao commented on a change in pull request #2761: [HUDI-1676] Support SQL with spark3
xiarixiaoyao commented on a change in pull request #2761: URL: https://github.com/apache/hudi/pull/2761#discussion_r606622137 ## File path: hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieFileIndex.scala ## @@ -302,6 +302,10 @@ case class HoodieFileIndex( PartitionRowPath(partitionRow, partitionPath) } +if (partitionRowPaths.isEmpty) { + partitionRowPaths = Seq(PartitionRowPath(InternalRow.empty, "")).toBuffer +} + Review comment: simple fixed the bug ,, introduce by HUDI-1591 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org