[GitHub] [hudi] leesf commented on a change in pull request #2761: [HUDI-1676] Support SQL with spark3

2021-04-16 Thread GitBox


leesf commented on a change in pull request #2761:
URL: https://github.com/apache/hudi/pull/2761#discussion_r614781917



##
File path: 
hudi-spark-datasource/hudi-spark3-extensions_2.12/src/test/scala/org/apache/spark/sql/hudi/TestDelete.scala
##
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi
+
+import org.apache.hadoop.fs.Path
+
+class TestDelete extends TestHoodieSqlBase {
+
+  test("Test Delete Table") {
+withTempDir { tmp =>
+  val tablePath = new Path(tmp.toString, "deleteTable").toUri.toString
+  spark.sql("set spark.hoodie.shuffle.parallelism=4")
+  spark.sql(
+s"""create table deleteTable (
+   |keyid int,
+   |name string,
+   |price double,
+   |col1 long,
+   |p string,
+   |p1 string,
+   |p2 string) using hudi
+   |partitioned by (p,p1,p2)
+   |options('hoodie.datasource.write.table.type'='MERGE_ON_READ',
+   |'hoodie.datasource.write.precombine.field'='col1',

Review comment:
   here in base sql implement, it called `versionColumn`, should align with 
this?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] leesf commented on a change in pull request #2761: [HUDI-1676] Support SQL with spark3

2021-04-16 Thread GitBox


leesf commented on a change in pull request #2761:
URL: https://github.com/apache/hudi/pull/2761#discussion_r614780712



##
File path: 
hudi-spark-datasource/hudi-spark3-extensions_2.12/src/main/scala/org/apache/spark/sql/hudi/execution/MergeIntoHudiTable.scala
##
@@ -0,0 +1,332 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.execution
+
+import org.apache.hudi.DataSourceWriteOptions.{DEFAULT_PAYLOAD_OPT_VAL, 
PAYLOAD_CLASS_OPT_KEY, RECORDKEY_FIELD_OPT_KEY}
+import 
org.apache.hudi.common.model.{OverwriteNonDefaultsWithLatestAvroPayload, 
WriteOperationType}
+import org.apache.hudi.execution.HudiSQLUtils
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.encoders.{ExpressionEncoder, RowEncoder}
+import org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate
+import org.apache.spark.sql.catalyst.merge._
+import org.apache.spark.sql.catalyst.expressions.{And, Attribute, 
BasePredicate, Expression, Literal, PredicateHelper, UnsafeProjection}
+import org.apache.spark.sql.functions.{col, lit, when}
+import org.apache.spark.sql.catalyst.merge.{HudiMergeClause, 
HudiMergeInsertClause}
+import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, 
LogicalPlan, Project}
+import org.apache.spark.sql.execution.command.RunnableCommand
+import org.apache.spark.sql.internal.StaticSQLConf
+import org.apache.spark.sql.types.{StringType, StructType}
+
+import scala.collection.mutable.ArrayBuffer
+
+/**
+  * merge command, execute merge operation for hudi
+  */
+case class MergeIntoHudiTable(
+target: LogicalPlan,
+source: LogicalPlan,
+joinCondition: Expression,
+matchedClauses: Seq[HudiMergeClause],
+noMatchedClause: Seq[HudiMergeInsertClause],
+finalSchema: StructType,
+trimmedSchema: StructType) extends RunnableCommand with Logging with 
PredicateHelper {
+
+  var tableMeta: Map[String, String] = null
+
+  lazy val isRecordKeyJoin = {
+val recordKeyFields = 
tableMeta.get(RECORDKEY_FIELD_OPT_KEY).get.split(",").map(_.trim).filter(!_.isEmpty)
+val intersect = joinCondition.references.intersect(target.outputSet).toSeq
+if (recordKeyFields.size == 1 && intersect.size ==1 && 
intersect(0).name.equalsIgnoreCase(recordKeyFields(0))) {
+  true
+} else {
+  false
+}
+  }
+
+  override def run(sparkSession: SparkSession): Seq[Row] = {
+tableMeta = HudiSQLUtils.getHoodiePropsFromRelation(target, sparkSession)
+val enableHive = "hive" == 
sparkSession.sessionState.conf.getConf(StaticSQLConf.CATALOG_IMPLEMENTATION)

Review comment:
   here please the logic put into a method called 
`hiveCatalogImplementaion`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] leesf commented on a change in pull request #2761: [HUDI-1676] Support SQL with spark3

2021-04-16 Thread GitBox


leesf commented on a change in pull request #2761:
URL: https://github.com/apache/hudi/pull/2761#discussion_r614780712



##
File path: 
hudi-spark-datasource/hudi-spark3-extensions_2.12/src/main/scala/org/apache/spark/sql/hudi/execution/MergeIntoHudiTable.scala
##
@@ -0,0 +1,332 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.execution
+
+import org.apache.hudi.DataSourceWriteOptions.{DEFAULT_PAYLOAD_OPT_VAL, 
PAYLOAD_CLASS_OPT_KEY, RECORDKEY_FIELD_OPT_KEY}
+import 
org.apache.hudi.common.model.{OverwriteNonDefaultsWithLatestAvroPayload, 
WriteOperationType}
+import org.apache.hudi.execution.HudiSQLUtils
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.encoders.{ExpressionEncoder, RowEncoder}
+import org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate
+import org.apache.spark.sql.catalyst.merge._
+import org.apache.spark.sql.catalyst.expressions.{And, Attribute, 
BasePredicate, Expression, Literal, PredicateHelper, UnsafeProjection}
+import org.apache.spark.sql.functions.{col, lit, when}
+import org.apache.spark.sql.catalyst.merge.{HudiMergeClause, 
HudiMergeInsertClause}
+import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, 
LogicalPlan, Project}
+import org.apache.spark.sql.execution.command.RunnableCommand
+import org.apache.spark.sql.internal.StaticSQLConf
+import org.apache.spark.sql.types.{StringType, StructType}
+
+import scala.collection.mutable.ArrayBuffer
+
+/**
+  * merge command, execute merge operation for hudi
+  */
+case class MergeIntoHudiTable(
+target: LogicalPlan,
+source: LogicalPlan,
+joinCondition: Expression,
+matchedClauses: Seq[HudiMergeClause],
+noMatchedClause: Seq[HudiMergeInsertClause],
+finalSchema: StructType,
+trimmedSchema: StructType) extends RunnableCommand with Logging with 
PredicateHelper {
+
+  var tableMeta: Map[String, String] = null
+
+  lazy val isRecordKeyJoin = {
+val recordKeyFields = 
tableMeta.get(RECORDKEY_FIELD_OPT_KEY).get.split(",").map(_.trim).filter(!_.isEmpty)
+val intersect = joinCondition.references.intersect(target.outputSet).toSeq
+if (recordKeyFields.size == 1 && intersect.size ==1 && 
intersect(0).name.equalsIgnoreCase(recordKeyFields(0))) {
+  true
+} else {
+  false
+}
+  }
+
+  override def run(sparkSession: SparkSession): Seq[Row] = {
+tableMeta = HudiSQLUtils.getHoodiePropsFromRelation(target, sparkSession)
+val enableHive = "hive" == 
sparkSession.sessionState.conf.getConf(StaticSQLConf.CATALOG_IMPLEMENTATION)

Review comment:
   here please the logic put into a method called 
`hiveCatalogImplementaion`to be reused




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] leesf commented on a change in pull request #2761: [HUDI-1676] Support SQL with spark3

2021-04-16 Thread GitBox


leesf commented on a change in pull request #2761:
URL: https://github.com/apache/hudi/pull/2761#discussion_r614779858



##
File path: 
hudi-spark-datasource/hudi-spark3-extensions_2.12/src/main/scala/org/apache/spark/sql/hudi/execution/CreateHudiTableCommand.scala
##
@@ -0,0 +1,326 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.execution
+
+import java.util.Properties
+
+import org.apache.hadoop.fs.{FSDataInputStream, FSDataOutputStream, 
FileSystem, Path}
+import org.apache.hudi.DataSourceWriteOptions
+import org.apache.hudi.DataSourceWriteOptions._
+import org.apache.hudi.common.model.{HoodieFileFormat, HoodieRecord}
+import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient, 
TableSchemaResolver}
+import org.apache.hudi.config.HoodieWriteConfig.KEYGENERATOR_CLASS_PROP
+import org.apache.hudi.execution.HudiSQLUtils
+import org.apache.hudi.hadoop.HoodieParquetInputFormat
+import org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat
+import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.avro.SchemaConverters
+import org.apache.spark.sql.catalyst.analysis.{NoSuchDatabaseException, 
TableAlreadyExistsException}
+import org.apache.spark.sql.{Row, SparkSession}
+import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, 
CatalogTable, CatalogTableType}
+import org.apache.spark.sql.execution.command.RunnableCommand
+import org.apache.spark.sql.hive.SparkfunctionWrapper
+import org.apache.spark.sql.internal.StaticSQLConf
+import org.apache.spark.sql.types.{StringType, StructField, StructType}
+
+import scala.collection.JavaConverters._
+
+/**
+ * Command for create hoodie table
+ */
+case class CreateHudiTableCommand(table: CatalogTable, ignoreIfExists: 
Boolean) extends RunnableCommand {
+
+  override def run(sparkSession: SparkSession): Seq[Row] = {
+assert(table.tableType != CatalogTableType.VIEW)
+assert(table.provider.isDefined)
+
+val tableName = table.identifier.unquotedString
+val sessionState = sparkSession.sessionState
+val tableIsExists = sessionState.catalog.tableExists(table.identifier)
+if (tableIsExists) {
+  if (ignoreIfExists) {
+// scalastyle:off
+return Seq.empty[Row]
+// scalastyle:on
+  } else {
+throw new IllegalArgumentException(s"Table 
${table.identifier.quotedString} already exists")
+  }
+}
+
+val enableHive = "hive" == 
sparkSession.sessionState.conf.getConf(StaticSQLConf.CATALOG_IMPLEMENTATION)
+val path = HudiSQLUtils.getTablePath(sparkSession, table)
+val conf = sparkSession.sessionState.newHadoopConf()
+val isTableExists = HudiSQLUtils.tableExists(path, conf)
+val (newSchema, tableOptions) = if (table.tableType == 
CatalogTableType.EXTERNAL && isTableExists) {
+  // if this is an external table & the table has already exists in the 
location,
+  // infer schema from the table meta
+  assert(table.schema.isEmpty, s"Should not specified table schema " +
+s"for an exists hoodie table: ${table.identifier.unquotedString}")
+  // get Schema from the external table
+  val metaClient = 
HoodieTableMetaClient.builder().setBasePath(path).setConf(conf).build()
+  val schemaResolver = new TableSchemaResolver(metaClient)
+  val avroSchema = schemaResolver.getTableAvroSchema(true)
+  val tableSchema = 
SchemaConverters.toSqlType(avroSchema).dataType.asInstanceOf[StructType]
+  (tableSchema, table.storage.properties ++ 
metaClient.getTableConfig.getProps.asScala)
+} else {
+  // Add the meta fields to the scheme if this is a managed table or an 
empty external table
+  val fullSchema: StructType = {
+val metaFields = HoodieRecord.HOODIE_META_COLUMNS.asScala
+val dataFields = table.schema.fields.filterNot(f => 
metaFields.contains(f.name))
+val fields = metaFields.map(StructField(_, StringType)) ++ dataFields
+StructType(fields)
+  }
+  (fullSchema, table.storage.properties)
+}
+
+// Append * to tablePath if create dataSourceV1 hudi table
+val newPath = if 

[GitHub] [hudi] leesf commented on a change in pull request #2761: [HUDI-1676] Support SQL with spark3

2021-04-16 Thread GitBox


leesf commented on a change in pull request #2761:
URL: https://github.com/apache/hudi/pull/2761#discussion_r614779619



##
File path: 
hudi-spark-datasource/hudi-spark3-extensions_2.12/src/main/scala/org/apache/spark/sql/hudi/execution/CreateHudiTableCommand.scala
##
@@ -0,0 +1,326 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.execution
+
+import java.util.Properties
+
+import org.apache.hadoop.fs.{FSDataInputStream, FSDataOutputStream, 
FileSystem, Path}
+import org.apache.hudi.DataSourceWriteOptions
+import org.apache.hudi.DataSourceWriteOptions._
+import org.apache.hudi.common.model.{HoodieFileFormat, HoodieRecord}
+import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient, 
TableSchemaResolver}
+import org.apache.hudi.config.HoodieWriteConfig.KEYGENERATOR_CLASS_PROP
+import org.apache.hudi.execution.HudiSQLUtils
+import org.apache.hudi.hadoop.HoodieParquetInputFormat
+import org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat
+import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.avro.SchemaConverters
+import org.apache.spark.sql.catalyst.analysis.{NoSuchDatabaseException, 
TableAlreadyExistsException}
+import org.apache.spark.sql.{Row, SparkSession}
+import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, 
CatalogTable, CatalogTableType}
+import org.apache.spark.sql.execution.command.RunnableCommand
+import org.apache.spark.sql.hive.SparkfunctionWrapper
+import org.apache.spark.sql.internal.StaticSQLConf
+import org.apache.spark.sql.types.{StringType, StructField, StructType}
+
+import scala.collection.JavaConverters._
+
+/**
+ * Command for create hoodie table
+ */
+case class CreateHudiTableCommand(table: CatalogTable, ignoreIfExists: 
Boolean) extends RunnableCommand {
+
+  override def run(sparkSession: SparkSession): Seq[Row] = {
+assert(table.tableType != CatalogTableType.VIEW)
+assert(table.provider.isDefined)
+
+val tableName = table.identifier.unquotedString
+val sessionState = sparkSession.sessionState
+val tableIsExists = sessionState.catalog.tableExists(table.identifier)
+if (tableIsExists) {
+  if (ignoreIfExists) {
+// scalastyle:off
+return Seq.empty[Row]
+// scalastyle:on
+  } else {
+throw new IllegalArgumentException(s"Table 
${table.identifier.quotedString} already exists")
+  }
+}
+
+val enableHive = "hive" == 
sparkSession.sessionState.conf.getConf(StaticSQLConf.CATALOG_IMPLEMENTATION)
+val path = HudiSQLUtils.getTablePath(sparkSession, table)
+val conf = sparkSession.sessionState.newHadoopConf()
+val isTableExists = HudiSQLUtils.tableExists(path, conf)
+val (newSchema, tableOptions) = if (table.tableType == 
CatalogTableType.EXTERNAL && isTableExists) {
+  // if this is an external table & the table has already exists in the 
location,
+  // infer schema from the table meta
+  assert(table.schema.isEmpty, s"Should not specified table schema " +
+s"for an exists hoodie table: ${table.identifier.unquotedString}")
+  // get Schema from the external table
+  val metaClient = 
HoodieTableMetaClient.builder().setBasePath(path).setConf(conf).build()
+  val schemaResolver = new TableSchemaResolver(metaClient)
+  val avroSchema = schemaResolver.getTableAvroSchema(true)
+  val tableSchema = 
SchemaConverters.toSqlType(avroSchema).dataType.asInstanceOf[StructType]
+  (tableSchema, table.storage.properties ++ 
metaClient.getTableConfig.getProps.asScala)
+} else {
+  // Add the meta fields to the scheme if this is a managed table or an 
empty external table
+  val fullSchema: StructType = {
+val metaFields = HoodieRecord.HOODIE_META_COLUMNS.asScala
+val dataFields = table.schema.fields.filterNot(f => 
metaFields.contains(f.name))
+val fields = metaFields.map(StructField(_, StringType)) ++ dataFields
+StructType(fields)
+  }
+  (fullSchema, table.storage.properties)
+}
+
+// Append * to tablePath if create dataSourceV1 hudi table
+val newPath = if 

[GitHub] [hudi] leesf commented on a change in pull request #2761: [HUDI-1676] Support SQL with spark3

2021-04-16 Thread GitBox


leesf commented on a change in pull request #2761:
URL: https://github.com/apache/hudi/pull/2761#discussion_r614779342



##
File path: 
hudi-spark-datasource/hudi-spark3-extensions_2.12/src/main/scala/org/apache/spark/sql/hudi/execution/CreateHudiTableAsSelectCommand.scala
##
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.execution
+
+import java.util.Properties
+
+import org.apache.hadoop.fs.{FileSystem, Path}
+import org.apache.hudi.DataSourceWriteOptions
+import org.apache.hudi.DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY
+import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
+import org.apache.hudi.config.HoodieWriteConfig.BULKINSERT_PARALLELISM
+import org.apache.hudi.execution.HudiSQLUtils
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType}
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.command.RunnableCommand
+import org.apache.spark.sql.internal.StaticSQLConf
+
+import scala.collection.JavaConverters._
+
+/**
+ * Command for ctas
+ */
+case class CreateHudiTableAsSelectCommand(
+table: CatalogTable,
+mode: SaveMode,
+query: LogicalPlan,
+outputColumnNames: Seq[String]) extends RunnableCommand {
+  override def run(sparkSession: SparkSession): Seq[Row] = {
+assert(table.tableType != CatalogTableType.VIEW)
+assert(table.provider.isDefined)
+val sessionState = sparkSession.sessionState
+var fs: FileSystem = null
+val conf = sessionState.newHadoopConf()
+
+val db = 
table.identifier.database.getOrElse(sessionState.catalog.getCurrentDatabase)
+val tableIdentWithDB = table.identifier.copy(database = Some(db))
+val tableName = tableIdentWithDB.unquotedString
+val enableHive = "hive" == 
sparkSession.sessionState.conf.getConf(StaticSQLConf.CATALOG_IMPLEMENTATION)
+val path = HudiSQLUtils.getTablePath(sparkSession, table)
+
+if (sessionState.catalog.tableExists(tableIdentWithDB)) {
+  assert(mode != SaveMode.Overwrite, s"Expect the table $tableName has 
been dropped when the save mode is OverWrite")
+  if (mode == SaveMode.ErrorIfExists) {
+throw new AnalysisException(s"Table $tableName already exists. You eed 
to drop it first.")
+  }
+  if (mode == SaveMode.Ignore) {
+// scalastyle:off
+return Seq.empty
+// scalastyle:on
+  }
+  // append table
+  saveDataIntoHudiTable(sparkSession, table, path, 
table.storage.properties, enableHive)
+} else {
+  val properties = table.storage.properties
+  assert(table.schema.isEmpty)
+  sparkSession.sessionState.catalog.validateTableLocation(table)
+  // create table
+  if (!enableHive) {
+val newTable = if (!HudiSQLUtils.tableExists(path, conf)) {
+  table.copy(schema = query.schema)
+} else {
+  table
+}
+CreateHudiTableCommand(newTable, true).run(sparkSession)
+  }
+  saveDataIntoHudiTable(sparkSession, table, path, properties, enableHive, 
"bulk_insert")
+}
+
+// save necessary parameter in hoodie.properties
+val newProperties = new Properties()
+newProperties.putAll(table.storage.properties.asJava)
+// add table partition
+newProperties.put(PARTITIONPATH_FIELD_OPT_KEY, 
table.partitionColumnNames.mkString(","))
+val metaPath = new Path(path, HoodieTableMetaClient.METAFOLDER_NAME)
+val propertyPath = new Path(metaPath, 
HoodieTableConfig.HOODIE_PROPERTIES_FILE)
+
CreateHudiTableCommand.saveNeededPropertiesIntoHoodie(propertyPath.getFileSystem(conf),
 propertyPath, newProperties)
+Seq.empty[Row]
+  }
+
+  private def saveDataIntoHudiTable(
+  session: SparkSession,
+  table: CatalogTable,
+  tablePath: String,
+  tableOptions: Map[String, String],
+  enableHive: Boolean,
+  operationType: String = "upsert"): Unit = {
+val mode = if (operationType.equals("upsert")) {
+  "append"
+} else {
+  "overwrite"
+}
+val newTableOptions = if (enableHive) {
+  // add hive sync properties
+  val extraOptions = 

[GitHub] [hudi] leesf commented on a change in pull request #2761: [HUDI-1676] Support SQL with spark3

2021-04-16 Thread GitBox


leesf commented on a change in pull request #2761:
URL: https://github.com/apache/hudi/pull/2761#discussion_r614776987



##
File path: 
hudi-spark-datasource/hudi-spark3-extensions_2.12/src/main/scala/org/apache/spark/sql/catalyst/analysis/HudiAnalysis.scala
##
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.analysis
+
+import org.apache.hudi.common.model.HoodieRecord
+import org.apache.hudi.execution.HudiSQLUtils
+import org.apache.hudi.execution.HudiSQLUtils.HoodieV1Relation
+import org.apache.hudi.spark3.internal.HoodieDataSourceInternalTable
+import org.apache.spark.sql.catalyst.catalog.{CatalogTable, HiveTableRelation}
+import org.apache.spark.sql.{AnalysisException, SparkSession}
+import org.apache.spark.sql.catalyst.expressions.{Alias, Literal}
+import org.apache.spark.sql.catalyst.merge.HudiMergeIntoUtils
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
+import org.apache.spark.sql.hudi.execution.InsertIntoHudiTable
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.{StructField, StructType}
+
+import scala.collection.mutable
+
+/**
+ * Analysis rules for hudi. deal with * in merge clause and insert into clause
+ */
+class HudiAnalysis(session: SparkSession, conf: SQLConf) extends 
Rule[LogicalPlan] {
+  override def apply(plan: LogicalPlan): LogicalPlan = 
plan.resolveOperatorsDown {
+case dsv2 @ DataSourceV2Relation(d: HoodieDataSourceInternalTable, _, _, 
_, options) =>
+  HoodieV1Relation.fromV2Relation(dsv2, d, options)
+
+// ResolveReferences rule will resolve * in merge, but the resolve result 
is not what we expected
+// so if source is not resolved, we add a special placeholder to prevert 
ResolveReferences rule to resolve * in mergeIntoTable
+case m @ MergeIntoTable(target, source, _, _, _) =>
+  if (source.resolved) {
+dealWithStarAction(m)
+  } else {
+placeHolderStarAction(m)
+  }
+// we should deal with Meta columns in hudi, it will be safe to deal 
insertIntoStatement here.
+case i @ InsertIntoStatement(table, _, query, overwrite, _)
+  if table.resolved && query.resolved && 
HudiSQLUtils.isHudiRelation(table) =>
+  table match {
+case relation: HiveTableRelation =>
+  val metadata = relation.tableMeta
+  preprocess(i, metadata.identifier.quotedString, 
metadata.partitionSchema, Some(metadata), query, overwrite)
+case dsv2 @ DataSourceV2Relation(d: HoodieDataSourceInternalTable, _, 
_, _, _) =>
+  preprocessV2(i, d, query)
+case l: LogicalRelation =>
+  val catalogTable = l.catalogTable
+  val tblName = 
catalogTable.map(_.identifier.quotedString).getOrElse("unknown")
+  preprocess(i, tblName, catalogTable.get.partitionSchema, 
catalogTable, query, overwrite)
+case _ => i
+  }
+  }
+
+  /**
+   * do align hoodie metacols, hoodie tablehave metaCols which are hidden by 
default, we try our best to fill those cols auto

Review comment:
   table has 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] leesf commented on a change in pull request #2761: [HUDI-1676] Support SQL with spark3

2021-04-16 Thread GitBox


leesf commented on a change in pull request #2761:
URL: https://github.com/apache/hudi/pull/2761#discussion_r614778297



##
File path: 
hudi-spark-datasource/hudi-spark3-extensions_2.12/src/main/scala/org/apache/spark/sql/hudi/execution/CreateHudiTableAsSelectCommand.scala
##
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.execution
+
+import java.util.Properties
+
+import org.apache.hadoop.fs.{FileSystem, Path}
+import org.apache.hudi.DataSourceWriteOptions
+import org.apache.hudi.DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY
+import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
+import org.apache.hudi.config.HoodieWriteConfig.BULKINSERT_PARALLELISM
+import org.apache.hudi.execution.HudiSQLUtils
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType}
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.command.RunnableCommand
+import org.apache.spark.sql.internal.StaticSQLConf
+
+import scala.collection.JavaConverters._
+
+/**
+ * Command for ctas
+ */
+case class CreateHudiTableAsSelectCommand(
+table: CatalogTable,
+mode: SaveMode,
+query: LogicalPlan,
+outputColumnNames: Seq[String]) extends RunnableCommand {
+  override def run(sparkSession: SparkSession): Seq[Row] = {
+assert(table.tableType != CatalogTableType.VIEW)
+assert(table.provider.isDefined)
+val sessionState = sparkSession.sessionState
+var fs: FileSystem = null
+val conf = sessionState.newHadoopConf()
+
+val db = 
table.identifier.database.getOrElse(sessionState.catalog.getCurrentDatabase)
+val tableIdentWithDB = table.identifier.copy(database = Some(db))
+val tableName = tableIdentWithDB.unquotedString
+val enableHive = "hive" == 
sparkSession.sessionState.conf.getConf(StaticSQLConf.CATALOG_IMPLEMENTATION)
+val path = HudiSQLUtils.getTablePath(sparkSession, table)
+
+if (sessionState.catalog.tableExists(tableIdentWithDB)) {
+  assert(mode != SaveMode.Overwrite, s"Expect the table $tableName has 
been dropped when the save mode is OverWrite")
+  if (mode == SaveMode.ErrorIfExists) {
+throw new AnalysisException(s"Table $tableName already exists. You eed 
to drop it first.")
+  }
+  if (mode == SaveMode.Ignore) {
+// scalastyle:off
+return Seq.empty
+// scalastyle:on
+  }
+  // append table
+  saveDataIntoHudiTable(sparkSession, table, path, 
table.storage.properties, enableHive)
+} else {
+  val properties = table.storage.properties
+  assert(table.schema.isEmpty)
+  sparkSession.sessionState.catalog.validateTableLocation(table)
+  // create table
+  if (!enableHive) {

Review comment:
   ditto




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] leesf commented on a change in pull request #2761: [HUDI-1676] Support SQL with spark3

2021-04-16 Thread GitBox


leesf commented on a change in pull request #2761:
URL: https://github.com/apache/hudi/pull/2761#discussion_r614777699



##
File path: 
hudi-spark-datasource/hudi-spark3-extensions_2.12/src/main/scala/org/apache/spark/sql/catalyst/analysis/HudiOperationsCheck.scala
##
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.analysis
+
+import org.apache.hudi.execution.HudiSQLUtils
+import org.apache.spark.sql.{AnalysisException, SparkSession}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.expressions.{Expression, InSubquery, 
Literal, Not}
+import org.apache.spark.sql.catalyst.merge.HudiMergeIntoTable
+import org.apache.spark.sql.catalyst.plans.logical.{DeleteFromTable, 
LogicalPlan}
+import org.apache.spark.sql.execution.command._
+
+/**
+  * check rule which can intercept unSupportOperation for hudi
+  */
+class HudiOperationsCheck(spark: SparkSession) extends (LogicalPlan => Unit) {
+  val catalog = spark.sessionState.catalog
+
+  override def apply(plan: LogicalPlan): Unit = {
+plan foreach {
+  case AlterTableAddPartitionCommand(tableName, _, _) if 
(checkHudiTable(tableName)) =>
+throw new AnalysisException("AlterTableAddPartitionCommand are not 
currently supported in HUDI")
+
+  case a: AlterTableDropPartitionCommand if (checkHudiTable(a.tableName)) 
=>
+throw new AnalysisException("AlterTableDropPartitionCommand are not 
currently supported in HUDI")
+
+  case AlterTableChangeColumnCommand(tableName, _, _) if 
(checkHudiTable(tableName)) =>
+throw new AnalysisException("AlterTableChangeColumnCommand are not 
currently supported in HUDI")
+
+  case AlterTableRenamePartitionCommand(tableName, _, _) if 
(checkHudiTable(tableName)) =>
+throw new AnalysisException("AlterTableRenamePartitionCommand are not 
currently supported in HUDI")
+
+  case AlterTableRecoverPartitionsCommand(tableName, _) if 
(checkHudiTable(tableName)) =>
+throw new AnalysisException("AlterTableRecoverPartitionsCommand are 
not currently supported in HUDI")
+
+  case AlterTableSetLocationCommand(tableName, _, _) if 
(checkHudiTable(tableName)) =>
+throw new AnalysisException("AlterTableSetLocationCommand are not 
currently supported in HUDI")
+
+  case DeleteFromTable(target, Some(condition)) if 
(hasNullAwarePredicateWithNot(condition) && checkHudiTable(target)) =>
+throw new AnalysisException("Null-aware predicate sub-squeries a are 
not currently supported for DELETE")
+
+  case HudiMergeIntoTable(target, _, _, _, noMatchedClauses, _) =>
+noMatchedClauses.map { m =>
+  m.resolvedActions.foreach { action =>
+if (action.targetColNameParts.size > 1) {
+  throw new AnalysisException(s"cannot insert nested values: 
${action.targetColNameParts.mkString("<") }")

Review comment:
   would you please describe why would not insert nested values?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] leesf commented on a change in pull request #2761: [HUDI-1676] Support SQL with spark3

2021-04-16 Thread GitBox


leesf commented on a change in pull request #2761:
URL: https://github.com/apache/hudi/pull/2761#discussion_r614772168



##
File path: 
hudi-spark-datasource/hudi-spark3-extensions_2.12/src/main/scala/org/apache/hudi/execution/HudiSQLUtils.scala
##
@@ -0,0 +1,553 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.execution
+
+import java.io.File
+import java.util
+import java.util.{Locale, Properties}
+
+import com.google.common.cache.{CacheBuilder, CacheLoader}
+import org.apache.avro.generic.GenericRecord
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.hive.conf.HiveConf
+import org.apache.hudi.DataSourceReadOptions.{QUERY_TYPE_OPT_KEY, 
QUERY_TYPE_READ_OPTIMIZED_OPT_VAL, QUERY_TYPE_SNAPSHOT_OPT_VAL}
+import org.apache.hudi.DataSourceWriteOptions._
+import org.apache.hudi.HoodieSparkSqlWriter.{TableInstantInfo, toProperties}
+import org.apache.hudi.{AvroConversionUtils, DataSourceReadOptions, 
DataSourceUtils, DataSourceWriteOptions, DefaultSource, 
HoodieBootstrapPartition, HoodieBootstrapRelation, HoodieSparkSqlWriter, 
HoodieSparkUtils, HoodieWriterUtils, MergeOnReadSnapshotRelation}
+import 
org.apache.hudi.common.model.{OverwriteNonDefaultsWithLatestAvroPayload, 
OverwriteWithLatestAvroPayload}
+import org.apache.hudi.avro.HoodieAvroUtils
+import org.apache.hudi.client.{HoodieWriteResult, SparkRDDWriteClient}
+import org.apache.hudi.common.config.TypedProperties
+import org.apache.hudi.common.model._
+import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline
+import org.apache.hudi.config.HoodieWriteConfig._
+import org.apache.hudi.exception.{HoodieException, HoodieIOException}
+import org.apache.hudi.hive.{HiveSyncConfig, HiveSyncTool}
+import org.apache.hudi.keygen.constant.KeyGeneratorOptions
+import org.apache.hudi.payload.AWSDmsAvroPayload
+import org.apache.hudi.spark3.internal.HoodieDataSourceInternalTable
+import org.apache.log4j.LogManager
+import org.apache.spark.api.java.JavaSparkContext
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
+import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType, 
HiveTableRelation}
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.functions.{col, udf}
+import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, 
LogicalRelation}
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
+import org.apache.spark.sql.sources.BaseRelation
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ListBuffer
+
+
+
+/**
+  * hudi IUD utils
+  */
+object HudiSQLUtils {
+
+  val AWSDMSAVROPAYLOAD = classOf[AWSDmsAvroPayload].getName
+  val OVERWRITEWITHLATESTVROAYLOAD = 
classOf[OverwriteWithLatestAvroPayload].getName
+  val OVERWRITENONDEFAULTSWITHLATESTAVROPAYLOAD = 
classOf[OverwriteNonDefaultsWithLatestAvroPayload].getName
+  val MERGE_MARKER = "_hoodie_merge_marker"
+
+  private val log = LogManager.getLogger(getClass)
+
+  private val tableConfigCache = CacheBuilder
+.newBuilder()
+.maximumSize(1000)
+.build(new CacheLoader[String, Properties] {
+  override def load(k: String): Properties = {
+try {
+  
HoodieTableMetaClient.builder().setConf(SparkSession.active.sparkContext.hadoopConfiguration)
+.setBasePath(k).build().getTableConfig.getProperties
+} catch {
+  // we catch expected error here
+  case e: HoodieIOException =>
+log.error(e.getMessage)
+new Properties()
+  case t: Throwable =>
+throw t
+}
+  }
+})
+
+  def getPropertiesFromTableConfigCache(path: String): Properties = {
+if (path.isEmpty) {
+  throw new HoodieIOException("unexpected empty hoodie table basePath")
+}
+tableConfigCache.get(path)
+  }
+
+  private def matchHoodieRelation(relation: 

[GitHub] [hudi] leesf commented on a change in pull request #2761: [HUDI-1676] Support SQL with spark3

2021-04-16 Thread GitBox


leesf commented on a change in pull request #2761:
URL: https://github.com/apache/hudi/pull/2761#discussion_r614776570



##
File path: 
hudi-spark-datasource/hudi-spark3-extensions_2.12/src/main/scala/org/apache/hudi/execution/HudiSQLUtils.scala
##
@@ -0,0 +1,553 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.execution
+
+import java.io.File
+import java.util
+import java.util.{Locale, Properties}
+
+import com.google.common.cache.{CacheBuilder, CacheLoader}
+import org.apache.avro.generic.GenericRecord
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.hive.conf.HiveConf
+import org.apache.hudi.DataSourceReadOptions.{QUERY_TYPE_OPT_KEY, 
QUERY_TYPE_READ_OPTIMIZED_OPT_VAL, QUERY_TYPE_SNAPSHOT_OPT_VAL}
+import org.apache.hudi.DataSourceWriteOptions._
+import org.apache.hudi.HoodieSparkSqlWriter.{TableInstantInfo, toProperties}
+import org.apache.hudi.{AvroConversionUtils, DataSourceReadOptions, 
DataSourceUtils, DataSourceWriteOptions, DefaultSource, 
HoodieBootstrapPartition, HoodieBootstrapRelation, HoodieSparkSqlWriter, 
HoodieSparkUtils, HoodieWriterUtils, MergeOnReadSnapshotRelation}
+import 
org.apache.hudi.common.model.{OverwriteNonDefaultsWithLatestAvroPayload, 
OverwriteWithLatestAvroPayload}
+import org.apache.hudi.avro.HoodieAvroUtils
+import org.apache.hudi.client.{HoodieWriteResult, SparkRDDWriteClient}
+import org.apache.hudi.common.config.TypedProperties
+import org.apache.hudi.common.model._
+import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline
+import org.apache.hudi.config.HoodieWriteConfig._
+import org.apache.hudi.exception.{HoodieException, HoodieIOException}
+import org.apache.hudi.hive.{HiveSyncConfig, HiveSyncTool}
+import org.apache.hudi.keygen.constant.KeyGeneratorOptions
+import org.apache.hudi.payload.AWSDmsAvroPayload
+import org.apache.hudi.spark3.internal.HoodieDataSourceInternalTable
+import org.apache.log4j.LogManager
+import org.apache.spark.api.java.JavaSparkContext
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
+import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType, 
HiveTableRelation}
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.functions.{col, udf}
+import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, 
LogicalRelation}
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
+import org.apache.spark.sql.sources.BaseRelation
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ListBuffer
+
+
+
+/**
+  * hudi IUD utils
+  */
+object HudiSQLUtils {
+
+  val AWSDMSAVROPAYLOAD = classOf[AWSDmsAvroPayload].getName
+  val OVERWRITEWITHLATESTVROAYLOAD = 
classOf[OverwriteWithLatestAvroPayload].getName
+  val OVERWRITENONDEFAULTSWITHLATESTAVROPAYLOAD = 
classOf[OverwriteNonDefaultsWithLatestAvroPayload].getName
+  val MERGE_MARKER = "_hoodie_merge_marker"
+
+  private val log = LogManager.getLogger(getClass)
+
+  private val tableConfigCache = CacheBuilder
+.newBuilder()
+.maximumSize(1000)
+.build(new CacheLoader[String, Properties] {
+  override def load(k: String): Properties = {
+try {
+  
HoodieTableMetaClient.builder().setConf(SparkSession.active.sparkContext.hadoopConfiguration)
+.setBasePath(k).build().getTableConfig.getProperties
+} catch {
+  // we catch expected error here
+  case e: HoodieIOException =>
+log.error(e.getMessage)
+new Properties()
+  case t: Throwable =>
+throw t
+}
+  }
+})
+
+  def getPropertiesFromTableConfigCache(path: String): Properties = {
+if (path.isEmpty) {
+  throw new HoodieIOException("unexpected empty hoodie table basePath")
+}
+tableConfigCache.get(path)
+  }
+
+  private def matchHoodieRelation(relation: 

[GitHub] [hudi] leesf commented on a change in pull request #2761: [HUDI-1676] Support SQL with spark3

2021-04-16 Thread GitBox


leesf commented on a change in pull request #2761:
URL: https://github.com/apache/hudi/pull/2761#discussion_r614773873



##
File path: 
hudi-spark-datasource/hudi-spark3-extensions_2.12/src/main/scala/org/apache/hudi/execution/HudiSQLUtils.scala
##
@@ -0,0 +1,553 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.execution
+
+import java.io.File
+import java.util
+import java.util.{Locale, Properties}
+
+import com.google.common.cache.{CacheBuilder, CacheLoader}
+import org.apache.avro.generic.GenericRecord
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.hive.conf.HiveConf
+import org.apache.hudi.DataSourceReadOptions.{QUERY_TYPE_OPT_KEY, 
QUERY_TYPE_READ_OPTIMIZED_OPT_VAL, QUERY_TYPE_SNAPSHOT_OPT_VAL}
+import org.apache.hudi.DataSourceWriteOptions._
+import org.apache.hudi.HoodieSparkSqlWriter.{TableInstantInfo, toProperties}
+import org.apache.hudi.{AvroConversionUtils, DataSourceReadOptions, 
DataSourceUtils, DataSourceWriteOptions, DefaultSource, 
HoodieBootstrapPartition, HoodieBootstrapRelation, HoodieSparkSqlWriter, 
HoodieSparkUtils, HoodieWriterUtils, MergeOnReadSnapshotRelation}
+import 
org.apache.hudi.common.model.{OverwriteNonDefaultsWithLatestAvroPayload, 
OverwriteWithLatestAvroPayload}
+import org.apache.hudi.avro.HoodieAvroUtils
+import org.apache.hudi.client.{HoodieWriteResult, SparkRDDWriteClient}
+import org.apache.hudi.common.config.TypedProperties
+import org.apache.hudi.common.model._
+import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline
+import org.apache.hudi.config.HoodieWriteConfig._
+import org.apache.hudi.exception.{HoodieException, HoodieIOException}
+import org.apache.hudi.hive.{HiveSyncConfig, HiveSyncTool}
+import org.apache.hudi.keygen.constant.KeyGeneratorOptions
+import org.apache.hudi.payload.AWSDmsAvroPayload
+import org.apache.hudi.spark3.internal.HoodieDataSourceInternalTable
+import org.apache.log4j.LogManager
+import org.apache.spark.api.java.JavaSparkContext
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
+import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType, 
HiveTableRelation}
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.functions.{col, udf}
+import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, 
LogicalRelation}
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
+import org.apache.spark.sql.sources.BaseRelation
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ListBuffer
+
+
+
+/**
+  * hudi IUD utils
+  */
+object HudiSQLUtils {
+
+  val AWSDMSAVROPAYLOAD = classOf[AWSDmsAvroPayload].getName
+  val OVERWRITEWITHLATESTVROAYLOAD = 
classOf[OverwriteWithLatestAvroPayload].getName
+  val OVERWRITENONDEFAULTSWITHLATESTAVROPAYLOAD = 
classOf[OverwriteNonDefaultsWithLatestAvroPayload].getName
+  val MERGE_MARKER = "_hoodie_merge_marker"
+
+  private val log = LogManager.getLogger(getClass)
+
+  private val tableConfigCache = CacheBuilder
+.newBuilder()
+.maximumSize(1000)
+.build(new CacheLoader[String, Properties] {
+  override def load(k: String): Properties = {
+try {
+  
HoodieTableMetaClient.builder().setConf(SparkSession.active.sparkContext.hadoopConfiguration)
+.setBasePath(k).build().getTableConfig.getProperties
+} catch {
+  // we catch expected error here
+  case e: HoodieIOException =>
+log.error(e.getMessage)
+new Properties()
+  case t: Throwable =>
+throw t
+}
+  }
+})
+
+  def getPropertiesFromTableConfigCache(path: String): Properties = {
+if (path.isEmpty) {
+  throw new HoodieIOException("unexpected empty hoodie table basePath")
+}
+tableConfigCache.get(path)
+  }
+
+  private def matchHoodieRelation(relation: 

[GitHub] [hudi] leesf commented on a change in pull request #2761: [HUDI-1676] Support SQL with spark3

2021-04-16 Thread GitBox


leesf commented on a change in pull request #2761:
URL: https://github.com/apache/hudi/pull/2761#discussion_r614772279



##
File path: 
hudi-spark-datasource/hudi-spark3-extensions_2.12/src/main/scala/org/apache/hudi/execution/HudiSQLUtils.scala
##
@@ -0,0 +1,553 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.execution
+
+import java.io.File
+import java.util
+import java.util.{Locale, Properties}
+
+import com.google.common.cache.{CacheBuilder, CacheLoader}
+import org.apache.avro.generic.GenericRecord
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.hive.conf.HiveConf
+import org.apache.hudi.DataSourceReadOptions.{QUERY_TYPE_OPT_KEY, 
QUERY_TYPE_READ_OPTIMIZED_OPT_VAL, QUERY_TYPE_SNAPSHOT_OPT_VAL}
+import org.apache.hudi.DataSourceWriteOptions._
+import org.apache.hudi.HoodieSparkSqlWriter.{TableInstantInfo, toProperties}
+import org.apache.hudi.{AvroConversionUtils, DataSourceReadOptions, 
DataSourceUtils, DataSourceWriteOptions, DefaultSource, 
HoodieBootstrapPartition, HoodieBootstrapRelation, HoodieSparkSqlWriter, 
HoodieSparkUtils, HoodieWriterUtils, MergeOnReadSnapshotRelation}
+import 
org.apache.hudi.common.model.{OverwriteNonDefaultsWithLatestAvroPayload, 
OverwriteWithLatestAvroPayload}
+import org.apache.hudi.avro.HoodieAvroUtils
+import org.apache.hudi.client.{HoodieWriteResult, SparkRDDWriteClient}
+import org.apache.hudi.common.config.TypedProperties
+import org.apache.hudi.common.model._
+import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline
+import org.apache.hudi.config.HoodieWriteConfig._
+import org.apache.hudi.exception.{HoodieException, HoodieIOException}
+import org.apache.hudi.hive.{HiveSyncConfig, HiveSyncTool}
+import org.apache.hudi.keygen.constant.KeyGeneratorOptions
+import org.apache.hudi.payload.AWSDmsAvroPayload
+import org.apache.hudi.spark3.internal.HoodieDataSourceInternalTable
+import org.apache.log4j.LogManager
+import org.apache.spark.api.java.JavaSparkContext
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
+import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType, 
HiveTableRelation}
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.functions.{col, udf}
+import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, 
LogicalRelation}
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
+import org.apache.spark.sql.sources.BaseRelation
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ListBuffer
+
+
+
+/**
+  * hudi IUD utils
+  */
+object HudiSQLUtils {
+
+  val AWSDMSAVROPAYLOAD = classOf[AWSDmsAvroPayload].getName
+  val OVERWRITEWITHLATESTVROAYLOAD = 
classOf[OverwriteWithLatestAvroPayload].getName
+  val OVERWRITENONDEFAULTSWITHLATESTAVROPAYLOAD = 
classOf[OverwriteNonDefaultsWithLatestAvroPayload].getName
+  val MERGE_MARKER = "_hoodie_merge_marker"
+
+  private val log = LogManager.getLogger(getClass)
+
+  private val tableConfigCache = CacheBuilder
+.newBuilder()
+.maximumSize(1000)
+.build(new CacheLoader[String, Properties] {
+  override def load(k: String): Properties = {
+try {
+  
HoodieTableMetaClient.builder().setConf(SparkSession.active.sparkContext.hadoopConfiguration)
+.setBasePath(k).build().getTableConfig.getProperties
+} catch {
+  // we catch expected error here
+  case e: HoodieIOException =>
+log.error(e.getMessage)
+new Properties()
+  case t: Throwable =>
+throw t
+}
+  }
+})
+
+  def getPropertiesFromTableConfigCache(path: String): Properties = {
+if (path.isEmpty) {
+  throw new HoodieIOException("unexpected empty hoodie table basePath")
+}
+tableConfigCache.get(path)
+  }
+
+  private def matchHoodieRelation(relation: 

[GitHub] [hudi] leesf commented on a change in pull request #2761: [HUDI-1676] Support SQL with spark3

2021-04-16 Thread GitBox


leesf commented on a change in pull request #2761:
URL: https://github.com/apache/hudi/pull/2761#discussion_r614771176



##
File path: 
hudi-spark-datasource/hudi-spark3-extensions_2.12/src/main/scala/org/apache/hudi/execution/HudiSQLUtils.scala
##
@@ -0,0 +1,553 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.execution
+
+import java.io.File
+import java.util
+import java.util.{Locale, Properties}
+
+import com.google.common.cache.{CacheBuilder, CacheLoader}
+import org.apache.avro.generic.GenericRecord
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.hive.conf.HiveConf
+import org.apache.hudi.DataSourceReadOptions.{QUERY_TYPE_OPT_KEY, 
QUERY_TYPE_READ_OPTIMIZED_OPT_VAL, QUERY_TYPE_SNAPSHOT_OPT_VAL}
+import org.apache.hudi.DataSourceWriteOptions._
+import org.apache.hudi.HoodieSparkSqlWriter.{TableInstantInfo, toProperties}
+import org.apache.hudi.{AvroConversionUtils, DataSourceReadOptions, 
DataSourceUtils, DataSourceWriteOptions, DefaultSource, 
HoodieBootstrapPartition, HoodieBootstrapRelation, HoodieSparkSqlWriter, 
HoodieSparkUtils, HoodieWriterUtils, MergeOnReadSnapshotRelation}
+import 
org.apache.hudi.common.model.{OverwriteNonDefaultsWithLatestAvroPayload, 
OverwriteWithLatestAvroPayload}
+import org.apache.hudi.avro.HoodieAvroUtils
+import org.apache.hudi.client.{HoodieWriteResult, SparkRDDWriteClient}
+import org.apache.hudi.common.config.TypedProperties
+import org.apache.hudi.common.model._
+import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline
+import org.apache.hudi.config.HoodieWriteConfig._
+import org.apache.hudi.exception.{HoodieException, HoodieIOException}
+import org.apache.hudi.hive.{HiveSyncConfig, HiveSyncTool}
+import org.apache.hudi.keygen.constant.KeyGeneratorOptions
+import org.apache.hudi.payload.AWSDmsAvroPayload
+import org.apache.hudi.spark3.internal.HoodieDataSourceInternalTable
+import org.apache.log4j.LogManager
+import org.apache.spark.api.java.JavaSparkContext
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
+import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType, 
HiveTableRelation}
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.functions.{col, udf}
+import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, 
LogicalRelation}
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
+import org.apache.spark.sql.sources.BaseRelation
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ListBuffer
+
+
+
+/**
+  * hudi IUD utils
+  */
+object HudiSQLUtils {
+
+  val AWSDMSAVROPAYLOAD = classOf[AWSDmsAvroPayload].getName
+  val OVERWRITEWITHLATESTVROAYLOAD = 
classOf[OverwriteWithLatestAvroPayload].getName
+  val OVERWRITENONDEFAULTSWITHLATESTAVROPAYLOAD = 
classOf[OverwriteNonDefaultsWithLatestAvroPayload].getName
+  val MERGE_MARKER = "_hoodie_merge_marker"
+
+  private val log = LogManager.getLogger(getClass)
+
+  private val tableConfigCache = CacheBuilder
+.newBuilder()
+.maximumSize(1000)
+.build(new CacheLoader[String, Properties] {
+  override def load(k: String): Properties = {
+try {
+  
HoodieTableMetaClient.builder().setConf(SparkSession.active.sparkContext.hadoopConfiguration)
+.setBasePath(k).build().getTableConfig.getProperties
+} catch {
+  // we catch expected error here
+  case e: HoodieIOException =>
+log.error(e.getMessage)
+new Properties()
+  case t: Throwable =>
+throw t
+}
+  }
+})
+
+  def getPropertiesFromTableConfigCache(path: String): Properties = {
+if (path.isEmpty) {
+  throw new HoodieIOException("unexpected empty hoodie table basePath")
+}
+tableConfigCache.get(path)
+  }
+
+  private def matchHoodieRelation(relation: 

[GitHub] [hudi] leesf commented on a change in pull request #2761: [HUDI-1676] Support SQL with spark3

2021-04-16 Thread GitBox


leesf commented on a change in pull request #2761:
URL: https://github.com/apache/hudi/pull/2761#discussion_r614774065



##
File path: 
hudi-spark-datasource/hudi-spark3-extensions_2.12/src/main/scala/org/apache/hudi/execution/HudiSQLUtils.scala
##
@@ -0,0 +1,553 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.execution
+
+import java.io.File
+import java.util
+import java.util.{Locale, Properties}
+
+import com.google.common.cache.{CacheBuilder, CacheLoader}
+import org.apache.avro.generic.GenericRecord
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.hive.conf.HiveConf
+import org.apache.hudi.DataSourceReadOptions.{QUERY_TYPE_OPT_KEY, 
QUERY_TYPE_READ_OPTIMIZED_OPT_VAL, QUERY_TYPE_SNAPSHOT_OPT_VAL}
+import org.apache.hudi.DataSourceWriteOptions._
+import org.apache.hudi.HoodieSparkSqlWriter.{TableInstantInfo, toProperties}
+import org.apache.hudi.{AvroConversionUtils, DataSourceReadOptions, 
DataSourceUtils, DataSourceWriteOptions, DefaultSource, 
HoodieBootstrapPartition, HoodieBootstrapRelation, HoodieSparkSqlWriter, 
HoodieSparkUtils, HoodieWriterUtils, MergeOnReadSnapshotRelation}
+import 
org.apache.hudi.common.model.{OverwriteNonDefaultsWithLatestAvroPayload, 
OverwriteWithLatestAvroPayload}
+import org.apache.hudi.avro.HoodieAvroUtils
+import org.apache.hudi.client.{HoodieWriteResult, SparkRDDWriteClient}
+import org.apache.hudi.common.config.TypedProperties
+import org.apache.hudi.common.model._
+import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline
+import org.apache.hudi.config.HoodieWriteConfig._
+import org.apache.hudi.exception.{HoodieException, HoodieIOException}
+import org.apache.hudi.hive.{HiveSyncConfig, HiveSyncTool}
+import org.apache.hudi.keygen.constant.KeyGeneratorOptions
+import org.apache.hudi.payload.AWSDmsAvroPayload
+import org.apache.hudi.spark3.internal.HoodieDataSourceInternalTable
+import org.apache.log4j.LogManager
+import org.apache.spark.api.java.JavaSparkContext
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
+import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType, 
HiveTableRelation}
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.functions.{col, udf}
+import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, 
LogicalRelation}
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
+import org.apache.spark.sql.sources.BaseRelation
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ListBuffer
+
+
+
+/**
+  * hudi IUD utils
+  */
+object HudiSQLUtils {
+
+  val AWSDMSAVROPAYLOAD = classOf[AWSDmsAvroPayload].getName
+  val OVERWRITEWITHLATESTVROAYLOAD = 
classOf[OverwriteWithLatestAvroPayload].getName
+  val OVERWRITENONDEFAULTSWITHLATESTAVROPAYLOAD = 
classOf[OverwriteNonDefaultsWithLatestAvroPayload].getName
+  val MERGE_MARKER = "_hoodie_merge_marker"
+
+  private val log = LogManager.getLogger(getClass)
+
+  private val tableConfigCache = CacheBuilder
+.newBuilder()
+.maximumSize(1000)
+.build(new CacheLoader[String, Properties] {
+  override def load(k: String): Properties = {
+try {
+  
HoodieTableMetaClient.builder().setConf(SparkSession.active.sparkContext.hadoopConfiguration)
+.setBasePath(k).build().getTableConfig.getProperties
+} catch {
+  // we catch expected error here
+  case e: HoodieIOException =>
+log.error(e.getMessage)
+new Properties()
+  case t: Throwable =>
+throw t
+}
+  }
+})
+
+  def getPropertiesFromTableConfigCache(path: String): Properties = {
+if (path.isEmpty) {
+  throw new HoodieIOException("unexpected empty hoodie table basePath")
+}
+tableConfigCache.get(path)
+  }
+
+  private def matchHoodieRelation(relation: 

[GitHub] [hudi] leesf commented on a change in pull request #2761: [HUDI-1676] Support SQL with spark3

2021-04-16 Thread GitBox


leesf commented on a change in pull request #2761:
URL: https://github.com/apache/hudi/pull/2761#discussion_r614768558



##
File path: 
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkUpsertCommitActionExecutor.java
##
@@ -44,6 +44,7 @@ public 
SparkUpsertCommitActionExecutor(HoodieSparkEngineContext context,
   @Override
   public HoodieWriteMetadata> execute() {
 return SparkWriteHelper.newInstance().write(instantTime, inputRecordsRDD, 
context, table,
-config.shouldCombineBeforeUpsert(), 
config.getUpsertShuffleParallelism(), this, true);
+config.shouldCombineBeforeUpsert(), 
config.getUpsertShuffleParallelism(),
+this, 
Boolean.parseBoolean(config.getProps().getProperty("hoodie.tagging.before.insert",
 "true")));

Review comment:
   would you please move `hoodie.tagging.before.insert` and default value 
to HoodieWriteConfig?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] leesf commented on a change in pull request #2761: [HUDI-1676] Support SQL with spark3

2021-04-16 Thread GitBox


leesf commented on a change in pull request #2761:
URL: https://github.com/apache/hudi/pull/2761#discussion_r614770493



##
File path: 
hudi-spark-datasource/hudi-spark3-extensions_2.12/src/main/scala/org/apache/hudi/execution/HudiSQLUtils.scala
##
@@ -0,0 +1,553 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.execution
+
+import java.io.File
+import java.util
+import java.util.{Locale, Properties}
+
+import com.google.common.cache.{CacheBuilder, CacheLoader}
+import org.apache.avro.generic.GenericRecord
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.hive.conf.HiveConf
+import org.apache.hudi.DataSourceReadOptions.{QUERY_TYPE_OPT_KEY, 
QUERY_TYPE_READ_OPTIMIZED_OPT_VAL, QUERY_TYPE_SNAPSHOT_OPT_VAL}
+import org.apache.hudi.DataSourceWriteOptions._
+import org.apache.hudi.HoodieSparkSqlWriter.{TableInstantInfo, toProperties}
+import org.apache.hudi.{AvroConversionUtils, DataSourceReadOptions, 
DataSourceUtils, DataSourceWriteOptions, DefaultSource, 
HoodieBootstrapPartition, HoodieBootstrapRelation, HoodieSparkSqlWriter, 
HoodieSparkUtils, HoodieWriterUtils, MergeOnReadSnapshotRelation}
+import 
org.apache.hudi.common.model.{OverwriteNonDefaultsWithLatestAvroPayload, 
OverwriteWithLatestAvroPayload}
+import org.apache.hudi.avro.HoodieAvroUtils
+import org.apache.hudi.client.{HoodieWriteResult, SparkRDDWriteClient}
+import org.apache.hudi.common.config.TypedProperties
+import org.apache.hudi.common.model._
+import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline
+import org.apache.hudi.config.HoodieWriteConfig._
+import org.apache.hudi.exception.{HoodieException, HoodieIOException}
+import org.apache.hudi.hive.{HiveSyncConfig, HiveSyncTool}
+import org.apache.hudi.keygen.constant.KeyGeneratorOptions
+import org.apache.hudi.payload.AWSDmsAvroPayload
+import org.apache.hudi.spark3.internal.HoodieDataSourceInternalTable
+import org.apache.log4j.LogManager
+import org.apache.spark.api.java.JavaSparkContext
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
+import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType, 
HiveTableRelation}
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.functions.{col, udf}
+import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, 
LogicalRelation}
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
+import org.apache.spark.sql.sources.BaseRelation
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ListBuffer
+
+
+
+/**
+  * hudi IUD utils
+  */
+object HudiSQLUtils {
+
+  val AWSDMSAVROPAYLOAD = classOf[AWSDmsAvroPayload].getName
+  val OVERWRITEWITHLATESTVROAYLOAD = 
classOf[OverwriteWithLatestAvroPayload].getName
+  val OVERWRITENONDEFAULTSWITHLATESTAVROPAYLOAD = 
classOf[OverwriteNonDefaultsWithLatestAvroPayload].getName
+  val MERGE_MARKER = "_hoodie_merge_marker"
+
+  private val log = LogManager.getLogger(getClass)
+
+  private val tableConfigCache = CacheBuilder
+.newBuilder()
+.maximumSize(1000)
+.build(new CacheLoader[String, Properties] {
+  override def load(k: String): Properties = {
+try {
+  
HoodieTableMetaClient.builder().setConf(SparkSession.active.sparkContext.hadoopConfiguration)
+.setBasePath(k).build().getTableConfig.getProperties
+} catch {
+  // we catch expected error here
+  case e: HoodieIOException =>
+log.error(e.getMessage)

Review comment:
   log.warn?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] leesf commented on a change in pull request #2761: [HUDI-1676] Support SQL with spark3

2021-04-16 Thread GitBox


leesf commented on a change in pull request #2761:
URL: https://github.com/apache/hudi/pull/2761#discussion_r614771646



##
File path: 
hudi-spark-datasource/hudi-spark3-extensions_2.12/src/main/scala/org/apache/hudi/execution/HudiSQLUtils.scala
##
@@ -0,0 +1,553 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.execution
+
+import java.io.File
+import java.util
+import java.util.{Locale, Properties}
+
+import com.google.common.cache.{CacheBuilder, CacheLoader}
+import org.apache.avro.generic.GenericRecord
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.hive.conf.HiveConf
+import org.apache.hudi.DataSourceReadOptions.{QUERY_TYPE_OPT_KEY, 
QUERY_TYPE_READ_OPTIMIZED_OPT_VAL, QUERY_TYPE_SNAPSHOT_OPT_VAL}
+import org.apache.hudi.DataSourceWriteOptions._
+import org.apache.hudi.HoodieSparkSqlWriter.{TableInstantInfo, toProperties}
+import org.apache.hudi.{AvroConversionUtils, DataSourceReadOptions, 
DataSourceUtils, DataSourceWriteOptions, DefaultSource, 
HoodieBootstrapPartition, HoodieBootstrapRelation, HoodieSparkSqlWriter, 
HoodieSparkUtils, HoodieWriterUtils, MergeOnReadSnapshotRelation}
+import 
org.apache.hudi.common.model.{OverwriteNonDefaultsWithLatestAvroPayload, 
OverwriteWithLatestAvroPayload}
+import org.apache.hudi.avro.HoodieAvroUtils
+import org.apache.hudi.client.{HoodieWriteResult, SparkRDDWriteClient}
+import org.apache.hudi.common.config.TypedProperties
+import org.apache.hudi.common.model._
+import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline
+import org.apache.hudi.config.HoodieWriteConfig._
+import org.apache.hudi.exception.{HoodieException, HoodieIOException}
+import org.apache.hudi.hive.{HiveSyncConfig, HiveSyncTool}
+import org.apache.hudi.keygen.constant.KeyGeneratorOptions
+import org.apache.hudi.payload.AWSDmsAvroPayload
+import org.apache.hudi.spark3.internal.HoodieDataSourceInternalTable
+import org.apache.log4j.LogManager
+import org.apache.spark.api.java.JavaSparkContext
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
+import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType, 
HiveTableRelation}
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.functions.{col, udf}
+import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, 
LogicalRelation}
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
+import org.apache.spark.sql.sources.BaseRelation
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ListBuffer
+
+
+
+/**
+  * hudi IUD utils
+  */
+object HudiSQLUtils {
+
+  val AWSDMSAVROPAYLOAD = classOf[AWSDmsAvroPayload].getName
+  val OVERWRITEWITHLATESTVROAYLOAD = 
classOf[OverwriteWithLatestAvroPayload].getName
+  val OVERWRITENONDEFAULTSWITHLATESTAVROPAYLOAD = 
classOf[OverwriteNonDefaultsWithLatestAvroPayload].getName
+  val MERGE_MARKER = "_hoodie_merge_marker"
+
+  private val log = LogManager.getLogger(getClass)
+
+  private val tableConfigCache = CacheBuilder
+.newBuilder()
+.maximumSize(1000)
+.build(new CacheLoader[String, Properties] {
+  override def load(k: String): Properties = {
+try {
+  
HoodieTableMetaClient.builder().setConf(SparkSession.active.sparkContext.hadoopConfiguration)
+.setBasePath(k).build().getTableConfig.getProperties
+} catch {
+  // we catch expected error here
+  case e: HoodieIOException =>
+log.error(e.getMessage)
+new Properties()
+  case t: Throwable =>
+throw t
+}
+  }
+})
+
+  def getPropertiesFromTableConfigCache(path: String): Properties = {
+if (path.isEmpty) {
+  throw new HoodieIOException("unexpected empty hoodie table basePath")
+}
+tableConfigCache.get(path)
+  }
+
+  private def matchHoodieRelation(relation: 

[GitHub] [hudi] leesf commented on a change in pull request #2761: [HUDI-1676] Support SQL with spark3

2021-04-16 Thread GitBox


leesf commented on a change in pull request #2761:
URL: https://github.com/apache/hudi/pull/2761#discussion_r614768558



##
File path: 
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkUpsertCommitActionExecutor.java
##
@@ -44,6 +44,7 @@ public 
SparkUpsertCommitActionExecutor(HoodieSparkEngineContext context,
   @Override
   public HoodieWriteMetadata> execute() {
 return SparkWriteHelper.newInstance().write(instantTime, inputRecordsRDD, 
context, table,
-config.shouldCombineBeforeUpsert(), 
config.getUpsertShuffleParallelism(), this, true);
+config.shouldCombineBeforeUpsert(), 
config.getUpsertShuffleParallelism(),
+this, 
Boolean.parseBoolean(config.getProps().getProperty("hoodie.tagging.before.insert",
 "true")));

Review comment:
   would you move `hoodie.tagging.before.insert` and default value to 
HoodieWriteConfig?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] leesf commented on a change in pull request #2761: [HUDI-1676] Support SQL with spark3

2021-04-16 Thread GitBox


leesf commented on a change in pull request #2761:
URL: https://github.com/apache/hudi/pull/2761#discussion_r614772540



##
File path: 
hudi-spark-datasource/hudi-spark3-extensions_2.12/src/main/scala/org/apache/hudi/execution/HudiSQLUtils.scala
##
@@ -0,0 +1,553 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.execution
+
+import java.io.File
+import java.util
+import java.util.{Locale, Properties}
+
+import com.google.common.cache.{CacheBuilder, CacheLoader}
+import org.apache.avro.generic.GenericRecord
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.hive.conf.HiveConf
+import org.apache.hudi.DataSourceReadOptions.{QUERY_TYPE_OPT_KEY, 
QUERY_TYPE_READ_OPTIMIZED_OPT_VAL, QUERY_TYPE_SNAPSHOT_OPT_VAL}
+import org.apache.hudi.DataSourceWriteOptions._
+import org.apache.hudi.HoodieSparkSqlWriter.{TableInstantInfo, toProperties}
+import org.apache.hudi.{AvroConversionUtils, DataSourceReadOptions, 
DataSourceUtils, DataSourceWriteOptions, DefaultSource, 
HoodieBootstrapPartition, HoodieBootstrapRelation, HoodieSparkSqlWriter, 
HoodieSparkUtils, HoodieWriterUtils, MergeOnReadSnapshotRelation}
+import 
org.apache.hudi.common.model.{OverwriteNonDefaultsWithLatestAvroPayload, 
OverwriteWithLatestAvroPayload}
+import org.apache.hudi.avro.HoodieAvroUtils
+import org.apache.hudi.client.{HoodieWriteResult, SparkRDDWriteClient}
+import org.apache.hudi.common.config.TypedProperties
+import org.apache.hudi.common.model._
+import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline
+import org.apache.hudi.config.HoodieWriteConfig._
+import org.apache.hudi.exception.{HoodieException, HoodieIOException}
+import org.apache.hudi.hive.{HiveSyncConfig, HiveSyncTool}
+import org.apache.hudi.keygen.constant.KeyGeneratorOptions
+import org.apache.hudi.payload.AWSDmsAvroPayload
+import org.apache.hudi.spark3.internal.HoodieDataSourceInternalTable
+import org.apache.log4j.LogManager
+import org.apache.spark.api.java.JavaSparkContext
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
+import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType, 
HiveTableRelation}
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.functions.{col, udf}
+import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, 
LogicalRelation}
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
+import org.apache.spark.sql.sources.BaseRelation
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ListBuffer
+
+
+
+/**
+  * hudi IUD utils
+  */
+object HudiSQLUtils {
+
+  val AWSDMSAVROPAYLOAD = classOf[AWSDmsAvroPayload].getName
+  val OVERWRITEWITHLATESTVROAYLOAD = 
classOf[OverwriteWithLatestAvroPayload].getName
+  val OVERWRITENONDEFAULTSWITHLATESTAVROPAYLOAD = 
classOf[OverwriteNonDefaultsWithLatestAvroPayload].getName
+  val MERGE_MARKER = "_hoodie_merge_marker"
+
+  private val log = LogManager.getLogger(getClass)
+
+  private val tableConfigCache = CacheBuilder
+.newBuilder()
+.maximumSize(1000)
+.build(new CacheLoader[String, Properties] {
+  override def load(k: String): Properties = {
+try {
+  
HoodieTableMetaClient.builder().setConf(SparkSession.active.sparkContext.hadoopConfiguration)
+.setBasePath(k).build().getTableConfig.getProperties
+} catch {
+  // we catch expected error here
+  case e: HoodieIOException =>
+log.error(e.getMessage)
+new Properties()
+  case t: Throwable =>
+throw t
+}
+  }
+})
+
+  def getPropertiesFromTableConfigCache(path: String): Properties = {
+if (path.isEmpty) {
+  throw new HoodieIOException("unexpected empty hoodie table basePath")
+}
+tableConfigCache.get(path)
+  }
+
+  private def matchHoodieRelation(relation: 

[GitHub] [hudi] leesf commented on a change in pull request #2761: [HUDI-1676] Support SQL with spark3

2021-04-16 Thread GitBox


leesf commented on a change in pull request #2761:
URL: https://github.com/apache/hudi/pull/2761#discussion_r614770205



##
File path: 
hudi-spark-datasource/hudi-spark3-extensions_2.12/src/main/scala/org/apache/hudi/execution/HudiSQLUtils.scala
##
@@ -0,0 +1,553 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.execution
+
+import java.io.File
+import java.util
+import java.util.{Locale, Properties}
+
+import com.google.common.cache.{CacheBuilder, CacheLoader}
+import org.apache.avro.generic.GenericRecord
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.hive.conf.HiveConf
+import org.apache.hudi.DataSourceReadOptions.{QUERY_TYPE_OPT_KEY, 
QUERY_TYPE_READ_OPTIMIZED_OPT_VAL, QUERY_TYPE_SNAPSHOT_OPT_VAL}
+import org.apache.hudi.DataSourceWriteOptions._
+import org.apache.hudi.HoodieSparkSqlWriter.{TableInstantInfo, toProperties}
+import org.apache.hudi.{AvroConversionUtils, DataSourceReadOptions, 
DataSourceUtils, DataSourceWriteOptions, DefaultSource, 
HoodieBootstrapPartition, HoodieBootstrapRelation, HoodieSparkSqlWriter, 
HoodieSparkUtils, HoodieWriterUtils, MergeOnReadSnapshotRelation}
+import 
org.apache.hudi.common.model.{OverwriteNonDefaultsWithLatestAvroPayload, 
OverwriteWithLatestAvroPayload}
+import org.apache.hudi.avro.HoodieAvroUtils
+import org.apache.hudi.client.{HoodieWriteResult, SparkRDDWriteClient}
+import org.apache.hudi.common.config.TypedProperties
+import org.apache.hudi.common.model._
+import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline
+import org.apache.hudi.config.HoodieWriteConfig._
+import org.apache.hudi.exception.{HoodieException, HoodieIOException}
+import org.apache.hudi.hive.{HiveSyncConfig, HiveSyncTool}
+import org.apache.hudi.keygen.constant.KeyGeneratorOptions
+import org.apache.hudi.payload.AWSDmsAvroPayload
+import org.apache.hudi.spark3.internal.HoodieDataSourceInternalTable
+import org.apache.log4j.LogManager
+import org.apache.spark.api.java.JavaSparkContext
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
+import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType, 
HiveTableRelation}
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.functions.{col, udf}
+import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, 
LogicalRelation}
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
+import org.apache.spark.sql.sources.BaseRelation
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ListBuffer
+
+
+
+/**
+  * hudi IUD utils
+  */
+object HudiSQLUtils {
+
+  val AWSDMSAVROPAYLOAD = classOf[AWSDmsAvroPayload].getName

Review comment:
   not quietly understand why specify AWSDMSAVROPAYLOAD here.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] leesf commented on a change in pull request #2761: [HUDI-1676] Support SQL with spark3

2021-04-16 Thread GitBox


leesf commented on a change in pull request #2761:
URL: https://github.com/apache/hudi/pull/2761#discussion_r614768775



##
File path: 
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/deltacommit/SparkUpsertDeltaCommitActionExecutor.java
##
@@ -44,6 +44,7 @@ public 
SparkUpsertDeltaCommitActionExecutor(HoodieSparkEngineContext context,
   @Override
   public HoodieWriteMetadata execute() {
 return SparkWriteHelper.newInstance().write(instantTime, inputRecordsRDD, 
context, table,
-config.shouldCombineBeforeUpsert(), 
config.getUpsertShuffleParallelism(),this, true);
+config.shouldCombineBeforeUpsert(), 
config.getUpsertShuffleParallelism(),
+this, 
Boolean.parseBoolean(config.getProps().getProperty("hoodie.tagging.before.insert",
 "true")));

Review comment:
   ditto




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] leesf commented on a change in pull request #2761: [HUDI-1676] Support SQL with spark3

2021-04-10 Thread GitBox


leesf commented on a change in pull request #2761:
URL: https://github.com/apache/hudi/pull/2761#discussion_r611015757



##
File path: 
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
##
@@ -302,6 +302,10 @@ case class HoodieFileIndex(
   PartitionRowPath(partitionRow, partitionPath)
 }
 
+if (partitionRowPaths.isEmpty) {
+  partitionRowPaths = Seq(PartitionRowPath(InternalRow.empty, "")).toBuffer
+}
+

Review comment:
   @xiarixiaoyao can we raise another PR to fix the bug?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org