[GitHub] [hudi] xiarixiaoyao commented on a change in pull request #2761: [HUDI-1676] Support SQL with spark3

2021-04-17 Thread GitBox


xiarixiaoyao commented on a change in pull request #2761:
URL: https://github.com/apache/hudi/pull/2761#discussion_r615233137



##
File path: 
hudi-spark-datasource/hudi-spark3-extensions_2.12/src/main/scala/org/apache/spark/sql/catalyst/analysis/HudiOperationsCheck.scala
##
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.analysis
+
+import org.apache.hudi.execution.HudiSQLUtils
+import org.apache.spark.sql.{AnalysisException, SparkSession}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.expressions.{Expression, InSubquery, 
Literal, Not}
+import org.apache.spark.sql.catalyst.merge.HudiMergeIntoTable
+import org.apache.spark.sql.catalyst.plans.logical.{DeleteFromTable, 
LogicalPlan}
+import org.apache.spark.sql.execution.command._
+
+/**
+  * check rule which can intercept unSupportOperation for hudi
+  */
+class HudiOperationsCheck(spark: SparkSession) extends (LogicalPlan => Unit) {
+  val catalog = spark.sessionState.catalog
+
+  override def apply(plan: LogicalPlan): Unit = {
+plan foreach {
+  case AlterTableAddPartitionCommand(tableName, _, _) if 
(checkHudiTable(tableName)) =>
+throw new AnalysisException("AlterTableAddPartitionCommand are not 
currently supported in HUDI")
+
+  case a: AlterTableDropPartitionCommand if (checkHudiTable(a.tableName)) 
=>
+throw new AnalysisException("AlterTableDropPartitionCommand are not 
currently supported in HUDI")
+
+  case AlterTableChangeColumnCommand(tableName, _, _) if 
(checkHudiTable(tableName)) =>
+throw new AnalysisException("AlterTableChangeColumnCommand are not 
currently supported in HUDI")
+
+  case AlterTableRenamePartitionCommand(tableName, _, _) if 
(checkHudiTable(tableName)) =>
+throw new AnalysisException("AlterTableRenamePartitionCommand are not 
currently supported in HUDI")
+
+  case AlterTableRecoverPartitionsCommand(tableName, _) if 
(checkHudiTable(tableName)) =>
+throw new AnalysisException("AlterTableRecoverPartitionsCommand are 
not currently supported in HUDI")
+
+  case AlterTableSetLocationCommand(tableName, _, _) if 
(checkHudiTable(tableName)) =>
+throw new AnalysisException("AlterTableSetLocationCommand are not 
currently supported in HUDI")
+
+  case DeleteFromTable(target, Some(condition)) if 
(hasNullAwarePredicateWithNot(condition) && checkHudiTable(target)) =>
+throw new AnalysisException("Null-aware predicate sub-squeries a are 
not currently supported for DELETE")
+
+  case HudiMergeIntoTable(target, _, _, _, noMatchedClauses, _) =>
+noMatchedClauses.map { m =>
+  m.resolvedActions.foreach { action =>
+if (action.targetColNameParts.size > 1) {
+  throw new AnalysisException(s"cannot insert nested values: 
${action.targetColNameParts.mkString("<") }")

Review comment:
   
   create table nestedTable (keyid int,a struct) using 
hudi.   now a is  a nested filed.
   think that:
   "merge into nestedTable as t using sourceTable as s " +
 "on t.keyid = s.keyid when not matched then insert (t.keyid, 
**t.a.score**,) values (s.keyid, s.score)"
   
   we cannot insert  t.keyid= s.keyid,  t.a.socre = s.score  to nestedTable 
directly;  since  we only set partial filed(**t.a.score**) for a nested filed 
**t.a**, the other partial filed(**t.a.number**) is unkown
   
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] xiarixiaoyao commented on a change in pull request #2761: [HUDI-1676] Support SQL with spark3

2021-04-17 Thread GitBox


xiarixiaoyao commented on a change in pull request #2761:
URL: https://github.com/apache/hudi/pull/2761#discussion_r615217323



##
File path: 
hudi-spark-datasource/hudi-spark3-extensions_2.12/src/test/scala/org/apache/spark/sql/hudi/TestDelete.scala
##
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi
+
+import org.apache.hadoop.fs.Path
+
+class TestDelete extends TestHoodieSqlBase {
+
+  test("Test Delete Table") {
+withTempDir { tmp =>
+  val tablePath = new Path(tmp.toString, "deleteTable").toUri.toString
+  spark.sql("set spark.hoodie.shuffle.parallelism=4")
+  spark.sql(
+s"""create table deleteTable (
+   |keyid int,
+   |name string,
+   |price double,
+   |col1 long,
+   |p string,
+   |p1 string,
+   |p2 string) using hudi
+   |partitioned by (p,p1,p2)
+   |options('hoodie.datasource.write.table.type'='MERGE_ON_READ',
+   |'hoodie.datasource.write.precombine.field'='col1',

Review comment:
   agree




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] xiarixiaoyao commented on a change in pull request #2761: [HUDI-1676] Support SQL with spark3

2021-04-17 Thread GitBox


xiarixiaoyao commented on a change in pull request #2761:
URL: https://github.com/apache/hudi/pull/2761#discussion_r615217279



##
File path: 
hudi-spark-datasource/hudi-spark3-extensions_2.12/src/main/scala/org/apache/spark/sql/hudi/execution/MergeIntoHudiTable.scala
##
@@ -0,0 +1,332 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.execution
+
+import org.apache.hudi.DataSourceWriteOptions.{DEFAULT_PAYLOAD_OPT_VAL, 
PAYLOAD_CLASS_OPT_KEY, RECORDKEY_FIELD_OPT_KEY}
+import 
org.apache.hudi.common.model.{OverwriteNonDefaultsWithLatestAvroPayload, 
WriteOperationType}
+import org.apache.hudi.execution.HudiSQLUtils
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.encoders.{ExpressionEncoder, RowEncoder}
+import org.apache.spark.sql.catalyst.expressions.codegen.GeneratePredicate
+import org.apache.spark.sql.catalyst.merge._
+import org.apache.spark.sql.catalyst.expressions.{And, Attribute, 
BasePredicate, Expression, Literal, PredicateHelper, UnsafeProjection}
+import org.apache.spark.sql.functions.{col, lit, when}
+import org.apache.spark.sql.catalyst.merge.{HudiMergeClause, 
HudiMergeInsertClause}
+import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, 
LogicalPlan, Project}
+import org.apache.spark.sql.execution.command.RunnableCommand
+import org.apache.spark.sql.internal.StaticSQLConf
+import org.apache.spark.sql.types.{StringType, StructType}
+
+import scala.collection.mutable.ArrayBuffer
+
+/**
+  * merge command, execute merge operation for hudi
+  */
+case class MergeIntoHudiTable(
+target: LogicalPlan,
+source: LogicalPlan,
+joinCondition: Expression,
+matchedClauses: Seq[HudiMergeClause],
+noMatchedClause: Seq[HudiMergeInsertClause],
+finalSchema: StructType,
+trimmedSchema: StructType) extends RunnableCommand with Logging with 
PredicateHelper {
+
+  var tableMeta: Map[String, String] = null
+
+  lazy val isRecordKeyJoin = {
+val recordKeyFields = 
tableMeta.get(RECORDKEY_FIELD_OPT_KEY).get.split(",").map(_.trim).filter(!_.isEmpty)
+val intersect = joinCondition.references.intersect(target.outputSet).toSeq
+if (recordKeyFields.size == 1 && intersect.size ==1 && 
intersect(0).name.equalsIgnoreCase(recordKeyFields(0))) {
+  true
+} else {
+  false
+}
+  }
+
+  override def run(sparkSession: SparkSession): Seq[Row] = {
+tableMeta = HudiSQLUtils.getHoodiePropsFromRelation(target, sparkSession)
+val enableHive = "hive" == 
sparkSession.sessionState.conf.getConf(StaticSQLConf.CATALOG_IMPLEMENTATION)

Review comment:
   agree




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] xiarixiaoyao commented on a change in pull request #2761: [HUDI-1676] Support SQL with spark3

2021-04-17 Thread GitBox


xiarixiaoyao commented on a change in pull request #2761:
URL: https://github.com/apache/hudi/pull/2761#discussion_r615217239



##
File path: 
hudi-spark-datasource/hudi-spark3-extensions_2.12/src/main/scala/org/apache/spark/sql/hudi/execution/CreateHudiTableAsSelectCommand.scala
##
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.execution
+
+import java.util.Properties
+
+import org.apache.hadoop.fs.{FileSystem, Path}
+import org.apache.hudi.DataSourceWriteOptions
+import org.apache.hudi.DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY
+import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
+import org.apache.hudi.config.HoodieWriteConfig.BULKINSERT_PARALLELISM
+import org.apache.hudi.execution.HudiSQLUtils
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType}
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.command.RunnableCommand
+import org.apache.spark.sql.internal.StaticSQLConf
+
+import scala.collection.JavaConverters._
+
+/**
+ * Command for ctas
+ */
+case class CreateHudiTableAsSelectCommand(
+table: CatalogTable,
+mode: SaveMode,
+query: LogicalPlan,
+outputColumnNames: Seq[String]) extends RunnableCommand {
+  override def run(sparkSession: SparkSession): Seq[Row] = {
+assert(table.tableType != CatalogTableType.VIEW)
+assert(table.provider.isDefined)
+val sessionState = sparkSession.sessionState
+var fs: FileSystem = null
+val conf = sessionState.newHadoopConf()
+
+val db = 
table.identifier.database.getOrElse(sessionState.catalog.getCurrentDatabase)
+val tableIdentWithDB = table.identifier.copy(database = Some(db))
+val tableName = tableIdentWithDB.unquotedString
+val enableHive = "hive" == 
sparkSession.sessionState.conf.getConf(StaticSQLConf.CATALOG_IMPLEMENTATION)
+val path = HudiSQLUtils.getTablePath(sparkSession, table)
+
+if (sessionState.catalog.tableExists(tableIdentWithDB)) {
+  assert(mode != SaveMode.Overwrite, s"Expect the table $tableName has 
been dropped when the save mode is OverWrite")
+  if (mode == SaveMode.ErrorIfExists) {
+throw new AnalysisException(s"Table $tableName already exists. You eed 
to drop it first.")
+  }
+  if (mode == SaveMode.Ignore) {
+// scalastyle:off
+return Seq.empty
+// scalastyle:on
+  }
+  // append table
+  saveDataIntoHudiTable(sparkSession, table, path, 
table.storage.properties, enableHive)
+} else {
+  val properties = table.storage.properties
+  assert(table.schema.isEmpty)
+  sparkSession.sessionState.catalog.validateTableLocation(table)
+  // create table
+  if (!enableHive) {
+val newTable = if (!HudiSQLUtils.tableExists(path, conf)) {
+  table.copy(schema = query.schema)
+} else {
+  table
+}
+CreateHudiTableCommand(newTable, true).run(sparkSession)
+  }
+  saveDataIntoHudiTable(sparkSession, table, path, properties, enableHive, 
"bulk_insert")
+}
+
+// save necessary parameter in hoodie.properties
+val newProperties = new Properties()
+newProperties.putAll(table.storage.properties.asJava)
+// add table partition
+newProperties.put(PARTITIONPATH_FIELD_OPT_KEY, 
table.partitionColumnNames.mkString(","))
+val metaPath = new Path(path, HoodieTableMetaClient.METAFOLDER_NAME)
+val propertyPath = new Path(metaPath, 
HoodieTableConfig.HOODIE_PROPERTIES_FILE)
+
CreateHudiTableCommand.saveNeededPropertiesIntoHoodie(propertyPath.getFileSystem(conf),
 propertyPath, newProperties)
+Seq.empty[Row]
+  }
+
+  private def saveDataIntoHudiTable(
+  session: SparkSession,
+  table: CatalogTable,
+  tablePath: String,
+  tableOptions: Map[String, String],
+  enableHive: Boolean,
+  operationType: String = "upsert"): Unit = {
+val mode = if (operationType.equals("upsert")) {
+  "append"
+} else {
+  "overwrite"
+}
+val newTableOptions = if (enableHive) {
+  // add hive sync properties
+  val 

[GitHub] [hudi] xiarixiaoyao commented on a change in pull request #2761: [HUDI-1676] Support SQL with spark3

2021-04-17 Thread GitBox


xiarixiaoyao commented on a change in pull request #2761:
URL: https://github.com/apache/hudi/pull/2761#discussion_r615216595



##
File path: 
hudi-spark-datasource/hudi-spark3-extensions_2.12/src/main/scala/org/apache/hudi/execution/HudiSQLUtils.scala
##
@@ -0,0 +1,553 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.execution
+
+import java.io.File
+import java.util
+import java.util.{Locale, Properties}
+
+import com.google.common.cache.{CacheBuilder, CacheLoader}
+import org.apache.avro.generic.GenericRecord
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.hive.conf.HiveConf
+import org.apache.hudi.DataSourceReadOptions.{QUERY_TYPE_OPT_KEY, 
QUERY_TYPE_READ_OPTIMIZED_OPT_VAL, QUERY_TYPE_SNAPSHOT_OPT_VAL}
+import org.apache.hudi.DataSourceWriteOptions._
+import org.apache.hudi.HoodieSparkSqlWriter.{TableInstantInfo, toProperties}
+import org.apache.hudi.{AvroConversionUtils, DataSourceReadOptions, 
DataSourceUtils, DataSourceWriteOptions, DefaultSource, 
HoodieBootstrapPartition, HoodieBootstrapRelation, HoodieSparkSqlWriter, 
HoodieSparkUtils, HoodieWriterUtils, MergeOnReadSnapshotRelation}
+import 
org.apache.hudi.common.model.{OverwriteNonDefaultsWithLatestAvroPayload, 
OverwriteWithLatestAvroPayload}
+import org.apache.hudi.avro.HoodieAvroUtils
+import org.apache.hudi.client.{HoodieWriteResult, SparkRDDWriteClient}
+import org.apache.hudi.common.config.TypedProperties
+import org.apache.hudi.common.model._
+import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline
+import org.apache.hudi.config.HoodieWriteConfig._
+import org.apache.hudi.exception.{HoodieException, HoodieIOException}
+import org.apache.hudi.hive.{HiveSyncConfig, HiveSyncTool}
+import org.apache.hudi.keygen.constant.KeyGeneratorOptions
+import org.apache.hudi.payload.AWSDmsAvroPayload
+import org.apache.hudi.spark3.internal.HoodieDataSourceInternalTable
+import org.apache.log4j.LogManager
+import org.apache.spark.api.java.JavaSparkContext
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
+import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType, 
HiveTableRelation}
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.functions.{col, udf}
+import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, 
LogicalRelation}
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
+import org.apache.spark.sql.sources.BaseRelation
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ListBuffer
+
+
+
+/**
+  * hudi IUD utils
+  */
+object HudiSQLUtils {
+
+  val AWSDMSAVROPAYLOAD = classOf[AWSDmsAvroPayload].getName
+  val OVERWRITEWITHLATESTVROAYLOAD = 
classOf[OverwriteWithLatestAvroPayload].getName
+  val OVERWRITENONDEFAULTSWITHLATESTAVROPAYLOAD = 
classOf[OverwriteNonDefaultsWithLatestAvroPayload].getName
+  val MERGE_MARKER = "_hoodie_merge_marker"
+
+  private val log = LogManager.getLogger(getClass)
+
+  private val tableConfigCache = CacheBuilder
+.newBuilder()
+.maximumSize(1000)
+.build(new CacheLoader[String, Properties] {
+  override def load(k: String): Properties = {
+try {
+  
HoodieTableMetaClient.builder().setConf(SparkSession.active.sparkContext.hadoopConfiguration)
+.setBasePath(k).build().getTableConfig.getProperties
+} catch {
+  // we catch expected error here
+  case e: HoodieIOException =>
+log.error(e.getMessage)
+new Properties()
+  case t: Throwable =>
+throw t
+}
+  }
+})
+
+  def getPropertiesFromTableConfigCache(path: String): Properties = {
+if (path.isEmpty) {
+  throw new HoodieIOException("unexpected empty hoodie table basePath")
+}
+tableConfigCache.get(path)
+  }
+
+  private def 

[GitHub] [hudi] xiarixiaoyao commented on a change in pull request #2761: [HUDI-1676] Support SQL with spark3

2021-04-17 Thread GitBox


xiarixiaoyao commented on a change in pull request #2761:
URL: https://github.com/apache/hudi/pull/2761#discussion_r615216569



##
File path: 
hudi-spark-datasource/hudi-spark3-extensions_2.12/src/main/scala/org/apache/hudi/execution/HudiSQLUtils.scala
##
@@ -0,0 +1,553 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.execution
+
+import java.io.File
+import java.util
+import java.util.{Locale, Properties}
+
+import com.google.common.cache.{CacheBuilder, CacheLoader}
+import org.apache.avro.generic.GenericRecord
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.hive.conf.HiveConf
+import org.apache.hudi.DataSourceReadOptions.{QUERY_TYPE_OPT_KEY, 
QUERY_TYPE_READ_OPTIMIZED_OPT_VAL, QUERY_TYPE_SNAPSHOT_OPT_VAL}
+import org.apache.hudi.DataSourceWriteOptions._
+import org.apache.hudi.HoodieSparkSqlWriter.{TableInstantInfo, toProperties}
+import org.apache.hudi.{AvroConversionUtils, DataSourceReadOptions, 
DataSourceUtils, DataSourceWriteOptions, DefaultSource, 
HoodieBootstrapPartition, HoodieBootstrapRelation, HoodieSparkSqlWriter, 
HoodieSparkUtils, HoodieWriterUtils, MergeOnReadSnapshotRelation}
+import 
org.apache.hudi.common.model.{OverwriteNonDefaultsWithLatestAvroPayload, 
OverwriteWithLatestAvroPayload}
+import org.apache.hudi.avro.HoodieAvroUtils
+import org.apache.hudi.client.{HoodieWriteResult, SparkRDDWriteClient}
+import org.apache.hudi.common.config.TypedProperties
+import org.apache.hudi.common.model._
+import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline
+import org.apache.hudi.config.HoodieWriteConfig._
+import org.apache.hudi.exception.{HoodieException, HoodieIOException}
+import org.apache.hudi.hive.{HiveSyncConfig, HiveSyncTool}
+import org.apache.hudi.keygen.constant.KeyGeneratorOptions
+import org.apache.hudi.payload.AWSDmsAvroPayload
+import org.apache.hudi.spark3.internal.HoodieDataSourceInternalTable
+import org.apache.log4j.LogManager
+import org.apache.spark.api.java.JavaSparkContext
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
+import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType, 
HiveTableRelation}
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.functions.{col, udf}
+import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, 
LogicalRelation}
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
+import org.apache.spark.sql.sources.BaseRelation
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ListBuffer
+
+
+
+/**
+  * hudi IUD utils
+  */
+object HudiSQLUtils {
+
+  val AWSDMSAVROPAYLOAD = classOf[AWSDmsAvroPayload].getName
+  val OVERWRITEWITHLATESTVROAYLOAD = 
classOf[OverwriteWithLatestAvroPayload].getName
+  val OVERWRITENONDEFAULTSWITHLATESTAVROPAYLOAD = 
classOf[OverwriteNonDefaultsWithLatestAvroPayload].getName
+  val MERGE_MARKER = "_hoodie_merge_marker"
+
+  private val log = LogManager.getLogger(getClass)
+
+  private val tableConfigCache = CacheBuilder
+.newBuilder()
+.maximumSize(1000)
+.build(new CacheLoader[String, Properties] {
+  override def load(k: String): Properties = {
+try {
+  
HoodieTableMetaClient.builder().setConf(SparkSession.active.sparkContext.hadoopConfiguration)
+.setBasePath(k).build().getTableConfig.getProperties
+} catch {
+  // we catch expected error here
+  case e: HoodieIOException =>
+log.error(e.getMessage)
+new Properties()
+  case t: Throwable =>
+throw t
+}
+  }
+})
+
+  def getPropertiesFromTableConfigCache(path: String): Properties = {
+if (path.isEmpty) {
+  throw new HoodieIOException("unexpected empty hoodie table basePath")
+}
+tableConfigCache.get(path)
+  }
+
+  private def 

[GitHub] [hudi] xiarixiaoyao commented on a change in pull request #2761: [HUDI-1676] Support SQL with spark3

2021-04-17 Thread GitBox


xiarixiaoyao commented on a change in pull request #2761:
URL: https://github.com/apache/hudi/pull/2761#discussion_r615216468



##
File path: 
hudi-spark-datasource/hudi-spark3-extensions_2.12/src/main/scala/org/apache/hudi/execution/HudiSQLUtils.scala
##
@@ -0,0 +1,553 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.execution
+
+import java.io.File
+import java.util
+import java.util.{Locale, Properties}
+
+import com.google.common.cache.{CacheBuilder, CacheLoader}
+import org.apache.avro.generic.GenericRecord
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.hive.conf.HiveConf
+import org.apache.hudi.DataSourceReadOptions.{QUERY_TYPE_OPT_KEY, 
QUERY_TYPE_READ_OPTIMIZED_OPT_VAL, QUERY_TYPE_SNAPSHOT_OPT_VAL}
+import org.apache.hudi.DataSourceWriteOptions._
+import org.apache.hudi.HoodieSparkSqlWriter.{TableInstantInfo, toProperties}
+import org.apache.hudi.{AvroConversionUtils, DataSourceReadOptions, 
DataSourceUtils, DataSourceWriteOptions, DefaultSource, 
HoodieBootstrapPartition, HoodieBootstrapRelation, HoodieSparkSqlWriter, 
HoodieSparkUtils, HoodieWriterUtils, MergeOnReadSnapshotRelation}
+import 
org.apache.hudi.common.model.{OverwriteNonDefaultsWithLatestAvroPayload, 
OverwriteWithLatestAvroPayload}
+import org.apache.hudi.avro.HoodieAvroUtils
+import org.apache.hudi.client.{HoodieWriteResult, SparkRDDWriteClient}
+import org.apache.hudi.common.config.TypedProperties
+import org.apache.hudi.common.model._
+import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline
+import org.apache.hudi.config.HoodieWriteConfig._
+import org.apache.hudi.exception.{HoodieException, HoodieIOException}
+import org.apache.hudi.hive.{HiveSyncConfig, HiveSyncTool}
+import org.apache.hudi.keygen.constant.KeyGeneratorOptions
+import org.apache.hudi.payload.AWSDmsAvroPayload
+import org.apache.hudi.spark3.internal.HoodieDataSourceInternalTable
+import org.apache.log4j.LogManager
+import org.apache.spark.api.java.JavaSparkContext
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
+import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType, 
HiveTableRelation}
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.functions.{col, udf}
+import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, 
LogicalRelation}
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
+import org.apache.spark.sql.sources.BaseRelation
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ListBuffer
+
+
+
+/**
+  * hudi IUD utils
+  */
+object HudiSQLUtils {
+
+  val AWSDMSAVROPAYLOAD = classOf[AWSDmsAvroPayload].getName
+  val OVERWRITEWITHLATESTVROAYLOAD = 
classOf[OverwriteWithLatestAvroPayload].getName
+  val OVERWRITENONDEFAULTSWITHLATESTAVROPAYLOAD = 
classOf[OverwriteNonDefaultsWithLatestAvroPayload].getName
+  val MERGE_MARKER = "_hoodie_merge_marker"
+
+  private val log = LogManager.getLogger(getClass)
+
+  private val tableConfigCache = CacheBuilder
+.newBuilder()
+.maximumSize(1000)
+.build(new CacheLoader[String, Properties] {
+  override def load(k: String): Properties = {
+try {
+  
HoodieTableMetaClient.builder().setConf(SparkSession.active.sparkContext.hadoopConfiguration)
+.setBasePath(k).build().getTableConfig.getProperties
+} catch {
+  // we catch expected error here
+  case e: HoodieIOException =>
+log.error(e.getMessage)
+new Properties()
+  case t: Throwable =>
+throw t
+}
+  }
+})
+
+  def getPropertiesFromTableConfigCache(path: String): Properties = {
+if (path.isEmpty) {
+  throw new HoodieIOException("unexpected empty hoodie table basePath")
+}
+tableConfigCache.get(path)
+  }
+
+  private def 

[GitHub] [hudi] xiarixiaoyao commented on a change in pull request #2761: [HUDI-1676] Support SQL with spark3

2021-04-17 Thread GitBox


xiarixiaoyao commented on a change in pull request #2761:
URL: https://github.com/apache/hudi/pull/2761#discussion_r615216402



##
File path: 
hudi-spark-datasource/hudi-spark3-extensions_2.12/src/main/scala/org/apache/hudi/execution/HudiSQLUtils.scala
##
@@ -0,0 +1,553 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.execution
+
+import java.io.File
+import java.util
+import java.util.{Locale, Properties}
+
+import com.google.common.cache.{CacheBuilder, CacheLoader}
+import org.apache.avro.generic.GenericRecord
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.hive.conf.HiveConf
+import org.apache.hudi.DataSourceReadOptions.{QUERY_TYPE_OPT_KEY, 
QUERY_TYPE_READ_OPTIMIZED_OPT_VAL, QUERY_TYPE_SNAPSHOT_OPT_VAL}
+import org.apache.hudi.DataSourceWriteOptions._
+import org.apache.hudi.HoodieSparkSqlWriter.{TableInstantInfo, toProperties}
+import org.apache.hudi.{AvroConversionUtils, DataSourceReadOptions, 
DataSourceUtils, DataSourceWriteOptions, DefaultSource, 
HoodieBootstrapPartition, HoodieBootstrapRelation, HoodieSparkSqlWriter, 
HoodieSparkUtils, HoodieWriterUtils, MergeOnReadSnapshotRelation}
+import 
org.apache.hudi.common.model.{OverwriteNonDefaultsWithLatestAvroPayload, 
OverwriteWithLatestAvroPayload}
+import org.apache.hudi.avro.HoodieAvroUtils
+import org.apache.hudi.client.{HoodieWriteResult, SparkRDDWriteClient}
+import org.apache.hudi.common.config.TypedProperties
+import org.apache.hudi.common.model._
+import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline
+import org.apache.hudi.config.HoodieWriteConfig._
+import org.apache.hudi.exception.{HoodieException, HoodieIOException}
+import org.apache.hudi.hive.{HiveSyncConfig, HiveSyncTool}
+import org.apache.hudi.keygen.constant.KeyGeneratorOptions
+import org.apache.hudi.payload.AWSDmsAvroPayload
+import org.apache.hudi.spark3.internal.HoodieDataSourceInternalTable
+import org.apache.log4j.LogManager
+import org.apache.spark.api.java.JavaSparkContext
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
+import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType, 
HiveTableRelation}
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.functions.{col, udf}
+import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, 
LogicalRelation}
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
+import org.apache.spark.sql.sources.BaseRelation
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ListBuffer
+
+
+
+/**
+  * hudi IUD utils
+  */
+object HudiSQLUtils {
+
+  val AWSDMSAVROPAYLOAD = classOf[AWSDmsAvroPayload].getName
+  val OVERWRITEWITHLATESTVROAYLOAD = 
classOf[OverwriteWithLatestAvroPayload].getName
+  val OVERWRITENONDEFAULTSWITHLATESTAVROPAYLOAD = 
classOf[OverwriteNonDefaultsWithLatestAvroPayload].getName
+  val MERGE_MARKER = "_hoodie_merge_marker"
+
+  private val log = LogManager.getLogger(getClass)
+
+  private val tableConfigCache = CacheBuilder
+.newBuilder()
+.maximumSize(1000)
+.build(new CacheLoader[String, Properties] {
+  override def load(k: String): Properties = {
+try {
+  
HoodieTableMetaClient.builder().setConf(SparkSession.active.sparkContext.hadoopConfiguration)
+.setBasePath(k).build().getTableConfig.getProperties
+} catch {
+  // we catch expected error here
+  case e: HoodieIOException =>
+log.error(e.getMessage)
+new Properties()
+  case t: Throwable =>
+throw t
+}
+  }
+})
+
+  def getPropertiesFromTableConfigCache(path: String): Properties = {
+if (path.isEmpty) {
+  throw new HoodieIOException("unexpected empty hoodie table basePath")
+}
+tableConfigCache.get(path)
+  }
+
+  private def 

[GitHub] [hudi] xiarixiaoyao commented on a change in pull request #2761: [HUDI-1676] Support SQL with spark3

2021-04-17 Thread GitBox


xiarixiaoyao commented on a change in pull request #2761:
URL: https://github.com/apache/hudi/pull/2761#discussion_r615215652



##
File path: 
hudi-spark-datasource/hudi-spark3-extensions_2.12/src/main/scala/org/apache/hudi/execution/HudiSQLUtils.scala
##
@@ -0,0 +1,553 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.execution
+
+import java.io.File
+import java.util
+import java.util.{Locale, Properties}
+
+import com.google.common.cache.{CacheBuilder, CacheLoader}
+import org.apache.avro.generic.GenericRecord
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.hive.conf.HiveConf
+import org.apache.hudi.DataSourceReadOptions.{QUERY_TYPE_OPT_KEY, 
QUERY_TYPE_READ_OPTIMIZED_OPT_VAL, QUERY_TYPE_SNAPSHOT_OPT_VAL}
+import org.apache.hudi.DataSourceWriteOptions._
+import org.apache.hudi.HoodieSparkSqlWriter.{TableInstantInfo, toProperties}
+import org.apache.hudi.{AvroConversionUtils, DataSourceReadOptions, 
DataSourceUtils, DataSourceWriteOptions, DefaultSource, 
HoodieBootstrapPartition, HoodieBootstrapRelation, HoodieSparkSqlWriter, 
HoodieSparkUtils, HoodieWriterUtils, MergeOnReadSnapshotRelation}
+import 
org.apache.hudi.common.model.{OverwriteNonDefaultsWithLatestAvroPayload, 
OverwriteWithLatestAvroPayload}
+import org.apache.hudi.avro.HoodieAvroUtils
+import org.apache.hudi.client.{HoodieWriteResult, SparkRDDWriteClient}
+import org.apache.hudi.common.config.TypedProperties
+import org.apache.hudi.common.model._
+import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline
+import org.apache.hudi.config.HoodieWriteConfig._
+import org.apache.hudi.exception.{HoodieException, HoodieIOException}
+import org.apache.hudi.hive.{HiveSyncConfig, HiveSyncTool}
+import org.apache.hudi.keygen.constant.KeyGeneratorOptions
+import org.apache.hudi.payload.AWSDmsAvroPayload
+import org.apache.hudi.spark3.internal.HoodieDataSourceInternalTable
+import org.apache.log4j.LogManager
+import org.apache.spark.api.java.JavaSparkContext
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
+import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType, 
HiveTableRelation}
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.functions.{col, udf}
+import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, 
LogicalRelation}
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
+import org.apache.spark.sql.sources.BaseRelation
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ListBuffer
+
+
+
+/**
+  * hudi IUD utils
+  */
+object HudiSQLUtils {
+
+  val AWSDMSAVROPAYLOAD = classOf[AWSDmsAvroPayload].getName
+  val OVERWRITEWITHLATESTVROAYLOAD = 
classOf[OverwriteWithLatestAvroPayload].getName
+  val OVERWRITENONDEFAULTSWITHLATESTAVROPAYLOAD = 
classOf[OverwriteNonDefaultsWithLatestAvroPayload].getName
+  val MERGE_MARKER = "_hoodie_merge_marker"
+
+  private val log = LogManager.getLogger(getClass)
+
+  private val tableConfigCache = CacheBuilder
+.newBuilder()
+.maximumSize(1000)
+.build(new CacheLoader[String, Properties] {
+  override def load(k: String): Properties = {
+try {
+  
HoodieTableMetaClient.builder().setConf(SparkSession.active.sparkContext.hadoopConfiguration)
+.setBasePath(k).build().getTableConfig.getProperties
+} catch {
+  // we catch expected error here
+  case e: HoodieIOException =>
+log.error(e.getMessage)
+new Properties()
+  case t: Throwable =>
+throw t
+}
+  }
+})
+
+  def getPropertiesFromTableConfigCache(path: String): Properties = {
+if (path.isEmpty) {
+  throw new HoodieIOException("unexpected empty hoodie table basePath")
+}
+tableConfigCache.get(path)
+  }
+
+  private def 

[GitHub] [hudi] xiarixiaoyao commented on a change in pull request #2761: [HUDI-1676] Support SQL with spark3

2021-04-17 Thread GitBox


xiarixiaoyao commented on a change in pull request #2761:
URL: https://github.com/apache/hudi/pull/2761#discussion_r615215418



##
File path: 
hudi-spark-datasource/hudi-spark3-extensions_2.12/src/main/scala/org/apache/hudi/execution/HudiSQLUtils.scala
##
@@ -0,0 +1,553 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.execution
+
+import java.io.File
+import java.util
+import java.util.{Locale, Properties}
+
+import com.google.common.cache.{CacheBuilder, CacheLoader}
+import org.apache.avro.generic.GenericRecord
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.hive.conf.HiveConf
+import org.apache.hudi.DataSourceReadOptions.{QUERY_TYPE_OPT_KEY, 
QUERY_TYPE_READ_OPTIMIZED_OPT_VAL, QUERY_TYPE_SNAPSHOT_OPT_VAL}
+import org.apache.hudi.DataSourceWriteOptions._
+import org.apache.hudi.HoodieSparkSqlWriter.{TableInstantInfo, toProperties}
+import org.apache.hudi.{AvroConversionUtils, DataSourceReadOptions, 
DataSourceUtils, DataSourceWriteOptions, DefaultSource, 
HoodieBootstrapPartition, HoodieBootstrapRelation, HoodieSparkSqlWriter, 
HoodieSparkUtils, HoodieWriterUtils, MergeOnReadSnapshotRelation}
+import 
org.apache.hudi.common.model.{OverwriteNonDefaultsWithLatestAvroPayload, 
OverwriteWithLatestAvroPayload}
+import org.apache.hudi.avro.HoodieAvroUtils
+import org.apache.hudi.client.{HoodieWriteResult, SparkRDDWriteClient}
+import org.apache.hudi.common.config.TypedProperties
+import org.apache.hudi.common.model._
+import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline
+import org.apache.hudi.config.HoodieWriteConfig._
+import org.apache.hudi.exception.{HoodieException, HoodieIOException}
+import org.apache.hudi.hive.{HiveSyncConfig, HiveSyncTool}
+import org.apache.hudi.keygen.constant.KeyGeneratorOptions
+import org.apache.hudi.payload.AWSDmsAvroPayload
+import org.apache.hudi.spark3.internal.HoodieDataSourceInternalTable
+import org.apache.log4j.LogManager
+import org.apache.spark.api.java.JavaSparkContext
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
+import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType, 
HiveTableRelation}
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.functions.{col, udf}
+import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, 
LogicalRelation}
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
+import org.apache.spark.sql.sources.BaseRelation
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ListBuffer
+
+
+
+/**
+  * hudi IUD utils
+  */
+object HudiSQLUtils {
+
+  val AWSDMSAVROPAYLOAD = classOf[AWSDmsAvroPayload].getName

Review comment:
   AWSDMSAVROPAYLOAD use 'Op' filed to mark whether the current row is in 
the deleted state。 so i think if  AWSDMSAVROPAYLOAD is specified, we should 
respect the logcial of it.   In the merge statement if we meet 
AWSDMSAVROPAYLOAD , we will ignore all merge condition, just merge it to hudi 
table directly。   These are just my personal views,if not suitable, i will 
update my code




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] xiarixiaoyao commented on a change in pull request #2761: [HUDI-1676] Support SQL with spark3

2021-04-10 Thread GitBox


xiarixiaoyao commented on a change in pull request #2761:
URL: https://github.com/apache/hudi/pull/2761#discussion_r611126561



##
File path: 
hudi-spark-datasource/hudi-spark3-extensions_2.12/src/main/scala/io/hudi/sql/HudiSpark3SessionExtension.scala
##
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.hudi.sql
+
+import org.apache.spark.sql.SparkSessionExtensions
+import org.apache.spark.sql.catalyst.analysis.{HudiAnalysis, 
HudiOperationsCheck, PostprocessHudiTable, ProcessHudiMerge}
+import org.apache.spark.sql.catalyst.optimizer.{OptimizerconditionHudi, 
RewriteHudiUID}
+
+/**
+  * An extension for Spark SQL to activate Hudi SQL parser to support Hudi SQL 
grammer
+  * example to create a 'SparkSession' with the hudi SQL grammar
+  * {{{
+  *   import org.apache.spark.sql.SparkSession
+  *
+  *   val spark = SparkSession
+  *   .builder()
+  *   .appName("...")
+  *   .master("...")
+  *   .config("spark.sql.extensions", "io.hudi.sql.HudiSpark3SessionExtension")
+  * }}}
+  *
+  */
+class HudiSpark3SessionExtension extends (SparkSessionExtensions => Unit) {

Review comment:
   @garyli1019  thanks for you review, i will update 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] xiarixiaoyao commented on a change in pull request #2761: [HUDI-1676] Support SQL with spark3

2021-04-10 Thread GitBox


xiarixiaoyao commented on a change in pull request #2761:
URL: https://github.com/apache/hudi/pull/2761#discussion_r611020232



##
File path: 
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
##
@@ -302,6 +302,10 @@ case class HoodieFileIndex(
   PartitionRowPath(partitionRow, partitionPath)
 }
 
+if (partitionRowPaths.isEmpty) {
+  partitionRowPaths = Seq(PartitionRowPath(InternalRow.empty, "")).toBuffer
+}
+

Review comment:
   @leesf thanks for you review.  i will raise another pr to solve 
this problem。




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] xiarixiaoyao commented on a change in pull request #2761: [HUDI-1676] Support SQL with spark3

2021-04-02 Thread GitBox


xiarixiaoyao commented on a change in pull request #2761:
URL: https://github.com/apache/hudi/pull/2761#discussion_r606622137



##
File path: 
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
##
@@ -302,6 +302,10 @@ case class HoodieFileIndex(
   PartitionRowPath(partitionRow, partitionPath)
 }
 
+if (partitionRowPaths.isEmpty) {
+  partitionRowPaths = Seq(PartitionRowPath(InternalRow.empty, "")).toBuffer
+}
+

Review comment:
   simple fixed the bug , introduced by HUDI-1591




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] xiarixiaoyao commented on a change in pull request #2761: [HUDI-1676] Support SQL with spark3

2021-04-02 Thread GitBox


xiarixiaoyao commented on a change in pull request #2761:
URL: https://github.com/apache/hudi/pull/2761#discussion_r606622137



##
File path: 
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
##
@@ -302,6 +302,10 @@ case class HoodieFileIndex(
   PartitionRowPath(partitionRow, partitionPath)
 }
 
+if (partitionRowPaths.isEmpty) {
+  partitionRowPaths = Seq(PartitionRowPath(InternalRow.empty, "")).toBuffer
+}
+

Review comment:
   simple fixed the bug ,, introduce by HUDI-1591




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org