[GitHub] [hudi] pengzhiwei2018 commented on a change in pull request #2645: [HUDI-1659] Basic Implementation Of Spark Sql Support

2021-06-07 Thread GitBox


pengzhiwei2018 commented on a change in pull request #2645:
URL: https://github.com/apache/hudi/pull/2645#discussion_r647096744



##
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
##
@@ -189,8 +189,12 @@ protected boolean isUpdateRecord(HoodieRecord 
hoodieRecord) {
   private Option getIndexedRecord(HoodieRecord hoodieRecord) 
{
 Option> recordMetadata = 
hoodieRecord.getData().getMetadata();
 try {
-  Option avroRecord = 
hoodieRecord.getData().getInsertValue(writerSchema);
+  Option avroRecord = 
hoodieRecord.getData().getInsertValue(inputSchema,
+  config.getProps());
   if (avroRecord.isPresent()) {
+if (avroRecord.get().equals(IGNORE_RECORD)) {

Review comment:
   Yes, I will add some comment here.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] pengzhiwei2018 commented on a change in pull request #2645: [HUDI-1659] Basic Implementation Of Spark Sql Support

2021-06-07 Thread GitBox


pengzhiwei2018 commented on a change in pull request #2645:
URL: https://github.com/apache/hudi/pull/2645#discussion_r647059021



##
File path: pom.xml
##
@@ -533,6 +535,12 @@
 ${spark.version}
 provided
   
+  

Review comment:
   `hudi-spark` bundle  need the `spark-hive` dependency to create table 
for hoodie.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] pengzhiwei2018 commented on a change in pull request #2645: [HUDI-1659] Basic Implementation Of Spark Sql Support

2021-06-07 Thread GitBox


pengzhiwei2018 commented on a change in pull request #2645:
URL: https://github.com/apache/hudi/pull/2645#discussion_r646435588



##
File path: 
hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
##
@@ -60,6 +61,8 @@
   public static final String HOODIE_TABLE_VERSION_PROP_NAME = 
"hoodie.table.version";
   public static final String HOODIE_TABLE_PRECOMBINE_FIELD = 
"hoodie.table.precombine.field";
   public static final String HOODIE_TABLE_PARTITION_COLUMNS = 
"hoodie.table.partition.columns";
+  public static final String HOODIE_TABLE_RECORDKEY_FIELDS = 
"hoodie.table.recordkey.fields";
+  public static final String HOODIE_TABLE_CREATE_SCHEMA = 
"hoodie.table.create.schema";

Review comment:
   Yes, I have check the `.hoodie.properties `file, the `Properties.load 
`method will do the encode for schema string.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] pengzhiwei2018 commented on a change in pull request #2645: [HUDI-1659] Basic Implementation Of Spark Sql Support

2021-06-06 Thread GitBox


pengzhiwei2018 commented on a change in pull request #2645:
URL: https://github.com/apache/hudi/pull/2645#discussion_r646241526



##
File path: 
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/SerDeUtils.scala
##
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi
+
+import java.io.ByteArrayOutputStream
+
+import com.esotericsoftware.kryo.Kryo
+import com.esotericsoftware.kryo.io.{Input, Output}
+import org.apache.spark.SparkConf
+import org.apache.spark.serializer.KryoSerializer
+
+
+object SerDeUtils {

Review comment:
   Yes, all the sql test case for merge-into will cover this class.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] pengzhiwei2018 commented on a change in pull request #2645: [HUDI-1659] Basic Implementation Of Spark Sql Support

2021-06-06 Thread GitBox


pengzhiwei2018 commented on a change in pull request #2645:
URL: https://github.com/apache/hudi/pull/2645#discussion_r646241416



##
File path: 
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlUtils.scala
##
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi
+
+import scala.collection.JavaConverters._
+import java.net.URI
+import java.util.Locale
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.hudi.SparkAdapterSupport
+import org.apache.hudi.common.model.HoodieRecord
+import org.apache.spark.SPARK_VERSION
+import org.apache.spark.sql.{Column, DataFrame, SparkSession}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
+import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType}
+import org.apache.spark.sql.catalyst.expressions.{And, Cast, Expression, 
Literal}
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.{DataType, NullType, StringType, 
StructField, StructType}
+
+import scala.collection.immutable.Map
+
+object HoodieSqlUtils extends SparkAdapterSupport {

Review comment:
   All the sql test case can cover this class.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] pengzhiwei2018 commented on a change in pull request #2645: [HUDI-1659] Basic Implementation Of Spark Sql Support

2021-06-06 Thread GitBox


pengzhiwei2018 commented on a change in pull request #2645:
URL: https://github.com/apache/hudi/pull/2645#discussion_r646241242



##
File path: hudi-spark-datasource/hudi-spark2/pom.xml
##
@@ -29,6 +29,7 @@
 
   
 ${project.parent.parent.basedir}
+${scala11.version}

Review comment:
   Yes, I will remove this.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] pengzhiwei2018 commented on a change in pull request #2645: [HUDI-1659] Basic Implementation Of Spark Sql Support

2021-06-06 Thread GitBox


pengzhiwei2018 commented on a change in pull request #2645:
URL: https://github.com/apache/hudi/pull/2645#discussion_r646235274



##
File path: packaging/hudi-spark-bundle/pom.xml
##
@@ -66,10 +66,9 @@
   org.apache.hudi:hudi-common
   org.apache.hudi:hudi-client-common
   org.apache.hudi:hudi-spark-client
-  org.apache.hudi:hudi-spark-common
+  
org.apache.hudi:hudi-spark-common_${scala.binary.version}
   
org.apache.hudi:hudi-spark_${scala.binary.version}
-  
org.apache.hudi:hudi-spark2_${scala.binary.version}
-  org.apache.hudi:hudi-spark3_2.12
+  
org.apache.hudi:${hudi.spark.module}_${scala.binary.version}

Review comment:
   It works well for spark2 & spark3 currently when we only compile the 
right spark module  to the bundle jar. But if we compile both the spark2 and 
spark3 to the bundle jar as we do before, it will lead to some class conflict.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] pengzhiwei2018 commented on a change in pull request #2645: [HUDI-1659] Basic Implementation Of Spark Sql Support

2021-06-06 Thread GitBox


pengzhiwei2018 commented on a change in pull request #2645:
URL: https://github.com/apache/hudi/pull/2645#discussion_r646233700



##
File path: 
hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
##
@@ -60,6 +61,8 @@
   public static final String HOODIE_TABLE_VERSION_PROP_NAME = 
"hoodie.table.version";
   public static final String HOODIE_TABLE_PRECOMBINE_FIELD = 
"hoodie.table.precombine.field";
   public static final String HOODIE_TABLE_PARTITION_COLUMNS = 
"hoodie.table.partition.columns";
+  public static final String HOODIE_TABLE_RECORDKEY_FIELDS = 
"hoodie.table.recordkey.fields";

Review comment:
   Yes, make sense to me. I will solve this in another PR.

##
File path: 
hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
##
@@ -592,6 +592,8 @@ public static PropertyBuilder withPropertyBuilder() {
 
 private HoodieTableType tableType;
 private String tableName;
+private String tableSchema;

Review comment:
   Make sense.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] pengzhiwei2018 commented on a change in pull request #2645: [HUDI-1659] Basic Implementation Of Spark Sql Support

2021-06-06 Thread GitBox


pengzhiwei2018 commented on a change in pull request #2645:
URL: https://github.com/apache/hudi/pull/2645#discussion_r646232723



##
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
##
@@ -189,8 +189,12 @@ protected boolean isUpdateRecord(HoodieRecord 
hoodieRecord) {
   private Option getIndexedRecord(HoodieRecord hoodieRecord) 
{
 Option> recordMetadata = 
hoodieRecord.getData().getMetadata();
 try {
-  Option avroRecord = 
hoodieRecord.getData().getInsertValue(writerSchema);
+  Option avroRecord = 
hoodieRecord.getData().getInsertValue(inputSchema,
+  config.getProps());
   if (avroRecord.isPresent()) {
+if (avroRecord.get().equals(IGNORE_RECORD)) {

Review comment:
   IMO,  the `IGNORE_RECORD` is always used in the single JVM currently, It 
used in the `HoodieXXHandler`, so we can not consider the shuffle case.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] pengzhiwei2018 commented on a change in pull request #2645: [HUDI-1659] Basic Implementation Of Spark Sql Support

2021-05-05 Thread GitBox


pengzhiwei2018 commented on a change in pull request #2645:
URL: https://github.com/apache/hudi/pull/2645#discussion_r627059147



##
File path: 
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieOptionConfig.scala
##
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi
+
+import org.apache.hudi.DataSourceWriteOptions
+import org.apache.hudi.common.model.DefaultHoodieRecordPayload
+import org.apache.hudi.common.table.HoodieTableConfig
+
+
+/**
+ * The HoodieOptionConfig defines some short name for the hoodie
+ * option key and value.
+ */
+object HoodieOptionConfig {
+
+  /**
+   * The short name for the value of COW_TABLE_TYPE_OPT_VAL.
+   */
+  val SQL_VALUE_TABLE_TYPE_COW = "cow"
+
+  /**
+   * The short name for the value of MOR_TABLE_TYPE_OPT_VAL.
+   */
+  val SQL_VALUE_TABLE_TYPE_MOR = "mor"
+
+
+  val SQL_KEY_TABLE_PRIMARY_KEY: HoodieOption[String] = buildConf()
+.withSqlKey("primaryKey")
+.withHoodieKey(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY)
+.withTableConfigKey(HoodieTableConfig.HOODIE_TABLE_RECORDKEY_FIELDS)
+.build()
+
+  val SQL_KEY_TABLE_TYPE: HoodieOption[String] = buildConf()
+.withSqlKey("type")
+.withHoodieKey(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY)
+.withTableConfigKey(HoodieTableConfig.HOODIE_TABLE_TYPE_PROP_NAME)
+.defaultValue(SQL_VALUE_TABLE_TYPE_COW)
+.build()
+
+  val SQL_KEY_VERSION_COLUMN: HoodieOption[String] = buildConf()
+.withSqlKey("preCombineField")
+.withHoodieKey(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY)
+.withTableConfigKey(HoodieTableConfig.HOODIE_TABLE_PRECOMBINE_FIELD)
+.build()
+
+  val SQL_PAYLOAD_CLASS: HoodieOption[String] = buildConf()
+.withSqlKey("payloadClass")
+.withHoodieKey(DataSourceWriteOptions.PAYLOAD_CLASS_OPT_KEY)
+.withTableConfigKey(HoodieTableConfig.HOODIE_PAYLOAD_CLASS_PROP_NAME)
+.defaultValue(classOf[DefaultHoodieRecordPayload].getName)
+.build()
+
+  /**
+   * The mapping of the sql short name key to the hoodie's config key.
+   */
+  private lazy val keyMapping: Map[String, String] = {
+HoodieOptionConfig.getClass.getDeclaredFields
+.filter(f => f.getType == classOf[HoodieOption[_]])
+.map(f => {f.setAccessible(true); 
f.get(HoodieOptionConfig).asInstanceOf[HoodieOption[_]]})

Review comment:
   Yeah, I agree that avoid using reflection as much as possible.  But here 
reflection can make adding an option config very simple and natural, just add 
the config definition without other register code. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] pengzhiwei2018 commented on a change in pull request #2645: [HUDI-1659] Basic Implementation Of Spark Sql Support

2021-05-05 Thread GitBox


pengzhiwei2018 commented on a change in pull request #2645:
URL: https://github.com/apache/hudi/pull/2645#discussion_r627057178



##
File path: 
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/DeleteHoodieTableCommand.scala
##
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.command
+
+import org.apache.hudi.{DataSourceWriteOptions, SparkSqlAdapterSupport}
+import org.apache.hudi.DataSourceWriteOptions.{KEYGENERATOR_CLASS_OPT_KEY, 
OPERATION_OPT_KEY, PARTITIONPATH_FIELD_OPT_KEY, RECORDKEY_FIELD_OPT_KEY}
+import org.apache.hudi.config.HoodieWriteConfig.TABLE_NAME
+import org.apache.hudi.keygen.ComplexKeyGenerator
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.plans.logical.{DeleteFromTable, 
SubqueryAlias}
+import org.apache.spark.sql.execution.command.RunnableCommand
+import org.apache.spark.sql.hudi.HoodieOptionConfig
+import org.apache.spark.sql.hudi.HoodieSqlUtils._
+
+case class DeleteHoodieTableCommand(deleteTable: DeleteFromTable) extends 
RunnableCommand
+  with SparkSqlAdapterSupport {
+
+  private val table = deleteTable.table
+
+  private val tableId = table match {
+case SubqueryAlias(name, _) => sparkSqlAdapter.toTableIdentify(name)
+case _ => throw new IllegalArgumentException(s"Illegal table: $table")
+  }
+
+  override def run(sparkSession: SparkSession): Seq[Row] = {
+logInfo(s"start execute delete command for $tableId")
+
+// Remove meta fields from the data frame
+var df = removeMetaFields(Dataset.ofRows(sparkSession, table))
+if (deleteTable.condition.isDefined) {
+  df = df.filter(Column(deleteTable.condition.get))
+}

Review comment:
   Yes, the scan happend via `Hudi datasource` because the table is created 
as a spark data source table.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] pengzhiwei2018 commented on a change in pull request #2645: [HUDI-1659] Basic Implementation Of Spark Sql Support

2021-05-05 Thread GitBox


pengzhiwei2018 commented on a change in pull request #2645:
URL: https://github.com/apache/hudi/pull/2645#discussion_r627056678



##
File path: 
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieOptionConfig.scala
##
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi
+
+import org.apache.hudi.DataSourceWriteOptions
+import org.apache.hudi.common.model.DefaultHoodieRecordPayload
+import org.apache.hudi.common.table.HoodieTableConfig
+
+
+/**
+ * The HoodieOptionConfig defines some short name for the hoodie
+ * option key and value.
+ */
+object HoodieOptionConfig {
+
+  /**
+   * The short name for the value of COW_TABLE_TYPE_OPT_VAL.
+   */
+  val SQL_VALUE_TABLE_TYPE_COW = "cow"
+
+  /**
+   * The short name for the value of MOR_TABLE_TYPE_OPT_VAL.
+   */
+  val SQL_VALUE_TABLE_TYPE_MOR = "mor"
+
+
+  val SQL_KEY_TABLE_PRIMARY_KEY: HoodieOption[String] = buildConf()
+.withSqlKey("primaryKey")

Review comment:
   This is a short name of `RECORDKEY_FIELD_OPT_KEY`, do you mean append a 
`hudi` prefix to it ? I think the `using hudi` statement has already indicates 
that this is a hudi table option. we do not need append a prefix anymore. This 
a little bit cleaner to write, I think.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] pengzhiwei2018 commented on a change in pull request #2645: [HUDI-1659] Basic Implementation Of Spark Sql Support

2021-05-05 Thread GitBox


pengzhiwei2018 commented on a change in pull request #2645:
URL: https://github.com/apache/hudi/pull/2645#discussion_r627055477



##
File path: 
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/DeleteHoodieTableCommand.scala
##
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.command
+
+import org.apache.hudi.{DataSourceWriteOptions, SparkSqlAdapterSupport}
+import org.apache.hudi.DataSourceWriteOptions.{KEYGENERATOR_CLASS_OPT_KEY, 
OPERATION_OPT_KEY, PARTITIONPATH_FIELD_OPT_KEY, RECORDKEY_FIELD_OPT_KEY}
+import org.apache.hudi.config.HoodieWriteConfig.TABLE_NAME
+import org.apache.hudi.keygen.ComplexKeyGenerator
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.plans.logical.{DeleteFromTable, 
SubqueryAlias}
+import org.apache.spark.sql.execution.command.RunnableCommand
+import org.apache.spark.sql.hudi.HoodieOptionConfig
+import org.apache.spark.sql.hudi.HoodieSqlUtils._
+
+case class DeleteHoodieTableCommand(deleteTable: DeleteFromTable) extends 
RunnableCommand
+  with SparkSqlAdapterSupport {
+
+  private val table = deleteTable.table
+
+  private val tableId = table match {
+case SubqueryAlias(name, _) => sparkSqlAdapter.toTableIdentify(name)
+case _ => throw new IllegalArgumentException(s"Illegal table: $table")
+  }
+
+  override def run(sparkSession: SparkSession): Seq[Row] = {
+logInfo(s"start execute delete command for $tableId")
+
+// Remove meta fields from the data frame
+var df = removeMetaFields(Dataset.ofRows(sparkSession, table))
+if (deleteTable.condition.isDefined) {
+  df = df.filter(Column(deleteTable.condition.get))
+}
+val config = buildHoodieConfig(sparkSession)
+df.write
+  .format("hudi")
+  .mode(SaveMode.Append)
+  .options(config)
+  .save()
+sparkSession.catalog.refreshTable(tableId.unquotedString)
+logInfo(s"Finish execute delete command for $tableId")
+Seq.empty[Row]
+  }
+
+  private def buildHoodieConfig(sparkSession: SparkSession): Map[String, 
String] = {
+val targetTable = sparkSession.sessionState.catalog
+  .getTableMetadata(tableId)
+val path = getTableLocation(targetTable, sparkSession)
+  .getOrElse(s"missing location for $tableId")
+
+val primaryColumns = 
HoodieOptionConfig.getPrimaryColumns(targetTable.storage.properties)
+
+assert(primaryColumns.nonEmpty,
+  s"There are no primary key in table $tableId, cannot execute delete 
operator")
+
+withSparkConf(sparkSession, targetTable.storage.properties) {
+  Map(
+"path" -> path.toString,
+RECORDKEY_FIELD_OPT_KEY -> primaryColumns.mkString(","),
+KEYGENERATOR_CLASS_OPT_KEY -> 
classOf[ComplexKeyGenerator].getCanonicalName,

Review comment:
   1、Yes, your are right. we can remove the duplicate 
`RECORDKEY_FIELD_OPT_KEY` here.Because the primaryColumns is alway come from 
the catalog.
   2、I think the `ComplexKeyGenerator`  can cover the case of 
`SimpleKeyGenerator`.  `simple` is just a subset of `complex`. 

##
File path: 
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/DeleteHoodieTableCommand.scala
##
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.command
+
+import org.apache.hudi.{DataSourceWriteOptions, SparkSqlAdapterSupport}
+import org.apache.hud

[GitHub] [hudi] pengzhiwei2018 commented on a change in pull request #2645: [HUDI-1659] Basic Implementation Of Spark Sql Support

2021-05-05 Thread GitBox


pengzhiwei2018 commented on a change in pull request #2645:
URL: https://github.com/apache/hudi/pull/2645#discussion_r627051926



##
File path: 
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/SparkSqlAdapter.scala
##
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi
+
+import org.apache.hudi.client.utils.SparkRowSerDe
+import org.apache.spark.sql.{Row, SparkSession}
+import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
+import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
+import org.apache.spark.sql.catalyst.parser.ParserInterface
+import org.apache.spark.sql.catalyst.plans.JoinType
+import org.apache.spark.sql.catalyst.{AliasIdentifier, TableIdentifier}
+import org.apache.spark.sql.catalyst.plans.logical.{Join, LogicalPlan}
+import org.apache.spark.sql.execution.datasources.SparkParsePartitionUtil
+import org.apache.spark.sql.internal.SQLConf
+
+/**
+  * An interface to adapter the difference between spark2 and spark3
+  * in some spark sql related class.
+  */
+trait SparkSqlAdapter extends Serializable {

Review comment:
   Make sense to me.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] pengzhiwei2018 commented on a change in pull request #2645: [HUDI-1659] Basic Implementation Of Spark Sql Support

2021-05-05 Thread GitBox


pengzhiwei2018 commented on a change in pull request #2645:
URL: https://github.com/apache/hudi/pull/2645#discussion_r627051773



##
File path: 
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/SparkSqlAdapter.scala
##
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi
+
+import org.apache.hudi.client.utils.SparkRowSerDe
+import org.apache.spark.sql.{Row, SparkSession}
+import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
+import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
+import org.apache.spark.sql.catalyst.parser.ParserInterface
+import org.apache.spark.sql.catalyst.plans.JoinType
+import org.apache.spark.sql.catalyst.{AliasIdentifier, TableIdentifier}
+import org.apache.spark.sql.catalyst.plans.logical.{Join, LogicalPlan}
+import org.apache.spark.sql.execution.datasources.SparkParsePartitionUtil
+import org.apache.spark.sql.internal.SQLConf
+
+/**
+  * An interface to adapter the difference between spark2 and spark3
+  * in some spark sql related class.
+  */
+trait SparkSqlAdapter extends Serializable {
+
+  def createSparkRowSerDe(encoder: ExpressionEncoder[Row]): SparkRowSerDe

Review comment:
   ok, I will do it later.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] pengzhiwei2018 commented on a change in pull request #2645: [HUDI-1659] Basic Implementation Of Spark Sql Support

2021-04-28 Thread GitBox


pengzhiwei2018 commented on a change in pull request #2645:
URL: https://github.com/apache/hudi/pull/2645#discussion_r622719034



##
File path: 
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/UuidKeyGenerator.java
##
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.keygen;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.UUID;
+import java.util.stream.Collectors;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
+
+/**
+ * A KeyGenerator which use the uuid as the record key.
+ */
+public class UuidKeyGenerator extends BuiltinKeyGenerator {

Review comment:
   That's greate, I will try it in the 1840.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] pengzhiwei2018 commented on a change in pull request #2645: [HUDI-1659] Basic Implementation Of Spark Sql Support

2021-04-28 Thread GitBox


pengzhiwei2018 commented on a change in pull request #2645:
URL: https://github.com/apache/hudi/pull/2645#discussion_r622716410



##
File path: pom.xml
##
@@ -112,6 +112,7 @@
 3.0.0
 
 3
+hudi-spark2

Review comment:
   ok

##
File path: 
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCreateTable.scala
##
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi
+
+import scala.collection.JavaConverters._
+import org.apache.hudi.common.model.HoodieRecord
+import org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.catalog.CatalogTableType
+import org.apache.spark.sql.types.{DoubleType, IntegerType, LongType, 
StringType, StructField}
+
+class TestCreateTable extends TestHoodieSqlBase {
+
+  test("Test Create Managed Hoodie Table") {
+val tableName = generateTableName
+// Create a managed table
+spark.sql(
+  s"""
+ | create table $tableName (

Review comment:
   +1 for this.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] pengzhiwei2018 commented on a change in pull request #2645: [HUDI-1659] Basic Implementation Of Spark Sql Support

2021-04-26 Thread GitBox


pengzhiwei2018 commented on a change in pull request #2645:
URL: https://github.com/apache/hudi/pull/2645#discussion_r619739911



##
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/execution/HoodieLazyInsertIterable.java
##
@@ -42,6 +43,8 @@
 public abstract class HoodieLazyInsertIterable
 extends LazyIterableIterator, List> {
 
+  private static final Properties EMPTY_PROPERTIES = new Properties();

Review comment:
   The HoodieRecordPayload#getInsertValue required a `Properties` 
parameter, so here we must pass a properties object. Move the EMPTY_PROPERTIES 
to `CollectionUtils` make sense to me !

##
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/execution/HoodieLazyInsertIterable.java
##
@@ -42,6 +43,8 @@
 public abstract class HoodieLazyInsertIterable
 extends LazyIterableIterator, List> {
 
+  private static final Properties EMPTY_PROPERTIES = new Properties();

Review comment:
   The HoodieRecordPayload#getInsertValue required a `Properties` 
parameter, so here we must pass a properties object. Moving the 
EMPTY_PROPERTIES to `CollectionUtils` make sense to me !

##
File path: 
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/UuidKeyGenerator.java
##
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.keygen;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.UUID;
+import java.util.stream.Collectors;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
+
+/**
+ * A KeyGenerator which use the uuid as the record key.
+ */
+public class UuidKeyGenerator extends BuiltinKeyGenerator {

Review comment:
   Yes, it make sense to me! I prefer to make this in a separate PR because 
it need some work to pass the commit time. I have file a JIRA at 
[1840](https://issues.apache.org/jira/browse/HUDI-1840)

##
File path: 
hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
##
@@ -59,6 +60,8 @@
   public static final String HOODIE_TABLE_VERSION_PROP_NAME = 
"hoodie.table.version";
   public static final String HOODIE_TABLE_PRECOMBINE_FIELD = 
"hoodie.table.precombine.field";
   public static final String HOODIE_TABLE_PARTITION_COLUMNS = 
"hoodie.table.partition.columns";
+  public static final String HOODIE_TABLE_ROWKEY_FIELDS = 
"hoodie.table.rowkey.fields";
+  public static final String HOODIE_TABLE_SCHEMA = "hoodie.table.schema";

Review comment:
   make sense to me!

##
File path: 
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCreateTable.scala
##
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi
+
+import scala.collection.JavaConverters._
+import org.apache.hudi.common.model.HoodieRecord
+import org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.catalog.CatalogTableType
+import org.apache.spark.sql.types.{DoubleType, IntegerType, LongType, 
StringType, StructField}
+
+class TestCreateTable extends TestHoodieSqlBase {
+
+  test("Test Create Managed Hoodie Table") {
+val tableName = generateTableName
+// Create a managed table
+

[GitHub] [hudi] pengzhiwei2018 commented on a change in pull request #2645: [HUDI-1659] Basic Implementation Of Spark Sql Support

2021-04-25 Thread GitBox


pengzhiwei2018 commented on a change in pull request #2645:
URL: https://github.com/apache/hudi/pull/2645#discussion_r619787637



##
File path: 
hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
##
@@ -591,6 +591,8 @@ public static PropertyBuilder withPropertyBuilder() {
 
 private HoodieTableType tableType;
 private String tableName;
+private String tableSchema;

Review comment:
   A JIRA has created for this at 
[1842](https://issues.apache.org/jira/browse/HUDI-1842)

##
File path: 
hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/MultiPartKeysValueExtractor.java
##
@@ -31,6 +32,11 @@
 
   @Override
   public List extractPartitionValuesInPath(String partitionPath) {
+// If the partitionPath is empty string( which means none-partition 
table), the partition values

Review comment:
   Hi @vinothchandar , I have submit a separate PR for this at 
[2876](https://github.com/apache/hudi/pull/2876), After the 2876 has merged, we 
can move this code here.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] pengzhiwei2018 commented on a change in pull request #2645: [HUDI-1659] Basic Implementation Of Spark Sql Support

2021-04-24 Thread GitBox


pengzhiwei2018 commented on a change in pull request #2645:
URL: https://github.com/apache/hudi/pull/2645#discussion_r619757590



##
File path: pom.xml
##
@@ -112,6 +112,7 @@
 3.0.0
 
 3
+hudi-spark2

Review comment:
   As I have describe above, this change should be include in this PR.

##
File path: 
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCreateTable.scala
##
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi
+
+import scala.collection.JavaConverters._
+import org.apache.hudi.common.model.HoodieRecord
+import org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.catalog.CatalogTableType
+import org.apache.spark.sql.types.{DoubleType, IntegerType, LongType, 
StringType, StructField}
+
+class TestCreateTable extends TestHoodieSqlBase {
+
+  test("Test Create Managed Hoodie Table") {
+val tableName = generateTableName
+// Create a managed table
+spark.sql(
+  s"""
+ | create table $tableName (
+ |  id int,
+ |  name string,
+ |  price double,
+ |  ts long
+ | ) using hudi
+ | options (
+ |   primaryKey = 'id',
+ |   versionColumn = 'ts'

Review comment:
   > can we stick close to the parameter names used in the datasource 
write? We need not use the entire property name, but can we use same terms like 
precombineColumn=ts instead?
   
   Agree with this now.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] pengzhiwei2018 commented on a change in pull request #2645: [HUDI-1659] Basic Implementation Of Spark Sql Support

2021-04-24 Thread GitBox


pengzhiwei2018 commented on a change in pull request #2645:
URL: https://github.com/apache/hudi/pull/2645#discussion_r619743008



##
File path: 
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/UuidKeyGenerator.java
##
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.keygen;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.UUID;
+import java.util.stream.Collectors;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
+
+/**
+ * A KeyGenerator which use the uuid as the record key.
+ */
+public class UuidKeyGenerator extends BuiltinKeyGenerator {

Review comment:
   Yes, it make sense to me! I prefer to make this in a separate PR because 
it need some work to pass the commit time. I have file a JIRA at 
[1840](https://issues.apache.org/jira/browse/HUDI-1840)

##
File path: 
hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
##
@@ -59,6 +60,8 @@
   public static final String HOODIE_TABLE_VERSION_PROP_NAME = 
"hoodie.table.version";
   public static final String HOODIE_TABLE_PRECOMBINE_FIELD = 
"hoodie.table.precombine.field";
   public static final String HOODIE_TABLE_PARTITION_COLUMNS = 
"hoodie.table.partition.columns";
+  public static final String HOODIE_TABLE_ROWKEY_FIELDS = 
"hoodie.table.rowkey.fields";
+  public static final String HOODIE_TABLE_SCHEMA = "hoodie.table.schema";

Review comment:
   make sense to me!

##
File path: 
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCreateTable.scala
##
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi
+
+import scala.collection.JavaConverters._
+import org.apache.hudi.common.model.HoodieRecord
+import org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.catalog.CatalogTableType
+import org.apache.spark.sql.types.{DoubleType, IntegerType, LongType, 
StringType, StructField}
+
+class TestCreateTable extends TestHoodieSqlBase {
+
+  test("Test Create Managed Hoodie Table") {
+val tableName = generateTableName
+// Create a managed table
+spark.sql(
+  s"""
+ | create table $tableName (
+ |  id int,
+ |  name string,
+ |  price double,
+ |  ts long
+ | ) using hudi
+ | options (
+ |   primaryKey = 'id',
+ |   versionColumn = 'ts'

Review comment:
   yes

##
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
##
@@ -192,7 +193,7 @@ protected void initializeIncomingRecordsMap() {
   long memoryForMerge = 
IOUtils.getMaxMemoryPerPartitionMerge(taskContextSupplier, config.getProps());
   LOG.info("MaxMemoryPerPartitionMerge => " + memoryForMerge);
   this.keyToNewRecords = new ExternalSpillableMap<>(memoryForMerge, 
config.getSpillableMapBasePath(),
-  new DefaultSizeEstimator(), new 
HoodieRecordSizeEstimator(writerSchema));
+  new DefaultSizeEstimator(), new 
HoodieRecordSizeEstimator(inputSchema));

Review comment:
   > On the inputSchema, is this always equal to the table's schema?
   
   The `write

[GitHub] [hudi] pengzhiwei2018 commented on a change in pull request #2645: [HUDI-1659] Basic Implementation Of Spark Sql Support

2021-04-24 Thread GitBox


pengzhiwei2018 commented on a change in pull request #2645:
URL: https://github.com/apache/hudi/pull/2645#discussion_r619739911



##
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/execution/HoodieLazyInsertIterable.java
##
@@ -42,6 +43,8 @@
 public abstract class HoodieLazyInsertIterable
 extends LazyIterableIterator, List> {
 
+  private static final Properties EMPTY_PROPERTIES = new Properties();

Review comment:
   The HoodieRecordPayload#getInsertValue required a `Properties` 
parameter, so here we must pass a properties object. Move the EMPTY_PROPERTIES 
to `CollectionUtils` make sense to me !

##
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/execution/HoodieLazyInsertIterable.java
##
@@ -42,6 +43,8 @@
 public abstract class HoodieLazyInsertIterable
 extends LazyIterableIterator, List> {
 
+  private static final Properties EMPTY_PROPERTIES = new Properties();

Review comment:
   The HoodieRecordPayload#getInsertValue required a `Properties` 
parameter, so here we must pass a properties object. Moving the 
EMPTY_PROPERTIES to `CollectionUtils` make sense to me !




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] pengzhiwei2018 commented on a change in pull request #2645: [HUDI-1659] Basic Implementation Of Spark Sql Support

2021-04-23 Thread GitBox


pengzhiwei2018 commented on a change in pull request #2645:
URL: https://github.com/apache/hudi/pull/2645#discussion_r601152600



##
File path: 
hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
##
@@ -617,6 +619,16 @@ public PropertyBuilder setTableName(String tableName) {
   return this;
 }
 
+public PropertyBuilder setTableSchema(String tableSchema) {

Review comment:
   The `tableSchema` saved to the `hoodie.properites` is the first schema 
when we create table with the DDL. After that, the schema can also change after 
commit.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] pengzhiwei2018 commented on a change in pull request #2645: [HUDI-1659] Basic Implementation Of Spark Sql Support

2021-04-22 Thread GitBox


pengzhiwei2018 commented on a change in pull request #2645:
URL: https://github.com/apache/hudi/pull/2645#discussion_r618354781



##
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
##
@@ -174,7 +174,8 @@ public boolean commitStats(String instantTime, 
List stats, Opti
  String commitActionType, Map> partitionToReplaceFileIds) {
 // Create a Hoodie table which encapsulated the commits and files visible
 HoodieTable table = createTable(config, hadoopConf);
-HoodieCommitMetadata metadata = CommitUtils.buildMetadata(stats, 
partitionToReplaceFileIds, extraMetadata, operationType, config.getSchema(), 
commitActionType);
+HoodieCommitMetadata metadata = CommitUtils.buildMetadata(stats, 
partitionToReplaceFileIds,
+extraMetadata, operationType, config.getWriteSchema(), 
commitActionType);

Review comment:
   Hi @vinothchandar ,as I described below, The `writeSchema` is the same 
to the table schema. So there is no negative effects to the hive sync. I have 
run the case in our production environment and it works well.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] pengzhiwei2018 commented on a change in pull request #2645: [HUDI-1659] Basic Implementation Of Spark Sql Support

2021-04-22 Thread GitBox


pengzhiwei2018 commented on a change in pull request #2645:
URL: https://github.com/apache/hudi/pull/2645#discussion_r618342707



##
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
##
@@ -192,7 +193,7 @@ protected void initializeIncomingRecordsMap() {
   long memoryForMerge = 
IOUtils.getMaxMemoryPerPartitionMerge(taskContextSupplier, config.getProps());
   LOG.info("MaxMemoryPerPartitionMerge => " + memoryForMerge);
   this.keyToNewRecords = new ExternalSpillableMap<>(memoryForMerge, 
config.getSpillableMapBasePath(),
-  new DefaultSizeEstimator(), new 
HoodieRecordSizeEstimator(writerSchema));
+  new DefaultSizeEstimator(), new 
HoodieRecordSizeEstimator(inputSchema));

Review comment:
   Hi @vinothchandar , The `inputSchema` is schema of the `input DataFrame` 
while the `writeSchema` is the schema write to the table. They are always the 
same except the case for `MergeInto`.
   
   For `MergeInto` the `inputSchema` may be different from the `writeSchema`. 
e.g.
   
   > create table h0(id int, name string) using hudi;
   > merge into h0 using (select 1 as id, 'a1' as name, 1 as flag ) s0
   > on h0.id = s0.id
   > when matched and flag = 1 then update set id = s0.id, name = s0.name
   
   In this case, The `inputSchema` is` id: int, name:string, flag:int` which is 
the schema of `s0`. But the writeSchema is
   `id:int, name:string`, the field `flag` is only used for condition test. So 
they are different.
   
   In order to solve this problem, we introduce `InputSchema` to distinguish 
`writeSchema`.  We use the `hoodie.write.schema` to specified the `writeSchema` 
if we want to distinguish them. If the `hoodie.write.schema` are not set, the 
two schema are the same.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] pengzhiwei2018 commented on a change in pull request #2645: [HUDI-1659] Basic Implementation Of Spark Sql Support

2021-04-22 Thread GitBox


pengzhiwei2018 commented on a change in pull request #2645:
URL: https://github.com/apache/hudi/pull/2645#discussion_r618357837



##
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java
##
@@ -154,9 +157,9 @@ public void write() {
 final String key = keyIterator.next();
 HoodieRecord record = recordMap.get(key);
 if (useWriterSchema) {
-  write(record, 
record.getData().getInsertValue(writerSchemaWithMetafields));
+  write(record, 
record.getData().getInsertValue(inputSchemaWithMetaFields, config.getProps()));

Review comment:
   Please take a look at my following explanation, hope I can make that 
clear.Thanks




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] pengzhiwei2018 commented on a change in pull request #2645: [HUDI-1659] Basic Implementation Of Spark Sql Support

2021-04-22 Thread GitBox


pengzhiwei2018 commented on a change in pull request #2645:
URL: https://github.com/apache/hudi/pull/2645#discussion_r618359769



##
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java
##
@@ -54,9 +53,18 @@
 public abstract class HoodieWriteHandle extends HoodieIOHandle {
 
   private static final Logger LOG = 
LogManager.getLogger(HoodieWriteHandle.class);
+  /**
+   * The input schema of the incoming dataframe.
+   */
+  protected final Schema inputSchema;

Review comment:
   yeah, I have describe this above, hope I can make it clear.Thanks~

##
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
##
@@ -162,6 +162,8 @@
   "hoodie.write.meta.key.prefixes";
   public static final String DEFAULT_WRITE_META_KEY_PREFIXES = "";
 
+  public static final String WRITE_SCHEMA = "hoodie.write.schema";

Review comment:
   ok




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] pengzhiwei2018 commented on a change in pull request #2645: [HUDI-1659] Basic Implementation Of Spark Sql Support

2021-04-22 Thread GitBox


pengzhiwei2018 commented on a change in pull request #2645:
URL: https://github.com/apache/hudi/pull/2645#discussion_r618346466



##
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java
##
@@ -54,9 +53,30 @@
 public abstract class HoodieWriteHandle extends HoodieIOHandle {
 
   private static final Logger LOG = 
LogManager.getLogger(HoodieWriteHandle.class);
+  /**
+   * A special record returned by HoodieRecordPayload which will should be 
skip by the write

Review comment:
   ok, I will add more doc here.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] pengzhiwei2018 commented on a change in pull request #2645: [HUDI-1659] Basic Implementation Of Spark Sql Support

2021-04-22 Thread GitBox


pengzhiwei2018 commented on a change in pull request #2645:
URL: https://github.com/apache/hudi/pull/2645#discussion_r618354781



##
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
##
@@ -174,7 +174,8 @@ public boolean commitStats(String instantTime, 
List stats, Opti
  String commitActionType, Map> partitionToReplaceFileIds) {
 // Create a Hoodie table which encapsulated the commits and files visible
 HoodieTable table = createTable(config, hadoopConf);
-HoodieCommitMetadata metadata = CommitUtils.buildMetadata(stats, 
partitionToReplaceFileIds, extraMetadata, operationType, config.getSchema(), 
commitActionType);
+HoodieCommitMetadata metadata = CommitUtils.buildMetadata(stats, 
partitionToReplaceFileIds,
+extraMetadata, operationType, config.getWriteSchema(), 
commitActionType);

Review comment:
   Hi @vinothchandar ,as I described above, The `writeSchema` is the same 
to the table schema. So there is no negative effects to the hive sync. I have 
run the case in our production environment and it works well.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] pengzhiwei2018 commented on a change in pull request #2645: [HUDI-1659] Basic Implementation Of Spark Sql Support

2021-04-22 Thread GitBox


pengzhiwei2018 commented on a change in pull request #2645:
URL: https://github.com/apache/hudi/pull/2645#discussion_r618345744



##
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
##
@@ -278,11 +279,17 @@ public void write(GenericRecord oldRecord) {
   HoodieRecord hoodieRecord = new 
HoodieRecord<>(keyToNewRecords.get(key));
   try {
 Option combinedAvroRecord =
-hoodieRecord.getData().combineAndGetUpdateValue(oldRecord, 
useWriterSchema ? writerSchemaWithMetafields : writerSchema,
+hoodieRecord.getData().combineAndGetUpdateValue(oldRecord,
+  useWriterSchema ? inputSchemaWithMetaFields : inputSchema,

Review comment:
   yeah, the `HoodiePayload` use the `inputSchema` to parse the incoming 
record and compute the result record. The result record will have the full 
table schema.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] pengzhiwei2018 commented on a change in pull request #2645: [HUDI-1659] Basic Implementation Of Spark Sql Support

2021-04-22 Thread GitBox


pengzhiwei2018 commented on a change in pull request #2645:
URL: https://github.com/apache/hudi/pull/2645#discussion_r618328717



##
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java
##
@@ -63,14 +62,14 @@
 
   public HoodieCreateHandle(HoodieWriteConfig config, String instantTime, 
HoodieTable hoodieTable,
 String partitionPath, String fileId, 
TaskContextSupplier taskContextSupplier) {
-this(config, instantTime, hoodieTable, partitionPath, fileId, 
getWriterSchemaIncludingAndExcludingMetadataPair(config),
+this(config, instantTime, hoodieTable, partitionPath, fileId, 
Option.empty(),
 taskContextSupplier);
   }
 
   public HoodieCreateHandle(HoodieWriteConfig config, String instantTime, 
HoodieTable hoodieTable,
-String partitionPath, String fileId, Pair writerSchemaIncludingAndExcludingMetadataPair,
+String partitionPath, String fileId, 
Option specifySchema,

Review comment:
   Yes, thanks for correct me the spell issue. The `specifiedSchema` is 
used for `HoodieBootstrapHandle` to pass it is `writeSchema`, For other 
Handler, it is alway be `None`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] pengzhiwei2018 commented on a change in pull request #2645: [HUDI-1659] Basic Implementation Of Spark Sql Support

2021-04-22 Thread GitBox


pengzhiwei2018 commented on a change in pull request #2645:
URL: https://github.com/apache/hudi/pull/2645#discussion_r618342707



##
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
##
@@ -192,7 +193,7 @@ protected void initializeIncomingRecordsMap() {
   long memoryForMerge = 
IOUtils.getMaxMemoryPerPartitionMerge(taskContextSupplier, config.getProps());
   LOG.info("MaxMemoryPerPartitionMerge => " + memoryForMerge);
   this.keyToNewRecords = new ExternalSpillableMap<>(memoryForMerge, 
config.getSpillableMapBasePath(),
-  new DefaultSizeEstimator(), new 
HoodieRecordSizeEstimator(writerSchema));
+  new DefaultSizeEstimator(), new 
HoodieRecordSizeEstimator(inputSchema));

Review comment:
   Hi @vinothchandar , The `inputSchema` is schema of the `input DataFrame` 
while the `writeSchema` is the schema write to the table. They are always the 
same except the case for `MergeInto`.
   
   For `MergeInto` the `inputSchema` may be different from the `writeSchema`. 
e.g.
   
   > create table h0(id int, name string) using hudi;
   > merge into h0 using (select 1 as id, 'a1' as name, 1 as flag ) s0
   > on h0.id = s0.id
   > when matched and flag = 1 then update set id = s0.id, name = s0.name
   
   In the case, The `inputSchema` is` id: int, name:string, flag:int` which is 
the schema of `s0`. But the writeSchema is
   `id:int, name:string`, the field `flag` is only used for condition test. So 
they are different.
   
   In order to solve this problem, we introduce `InputSchema` to distinguish 
`writeSchema`.  We use the `hoodie.write.schema` to specified the `writeSchema` 
if we want to distinguish them. If the `hoodie.write.schema` are not set, the 
two schema are the same.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] pengzhiwei2018 commented on a change in pull request #2645: [HUDI-1659] Basic Implementation Of Spark Sql Support

2021-04-22 Thread GitBox


pengzhiwei2018 commented on a change in pull request #2645:
URL: https://github.com/apache/hudi/pull/2645#discussion_r618328717



##
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java
##
@@ -63,14 +62,14 @@
 
   public HoodieCreateHandle(HoodieWriteConfig config, String instantTime, 
HoodieTable hoodieTable,
 String partitionPath, String fileId, 
TaskContextSupplier taskContextSupplier) {
-this(config, instantTime, hoodieTable, partitionPath, fileId, 
getWriterSchemaIncludingAndExcludingMetadataPair(config),
+this(config, instantTime, hoodieTable, partitionPath, fileId, 
Option.empty(),
 taskContextSupplier);
   }
 
   public HoodieCreateHandle(HoodieWriteConfig config, String instantTime, 
HoodieTable hoodieTable,
-String partitionPath, String fileId, Pair writerSchemaIncludingAndExcludingMetadataPair,
+String partitionPath, String fileId, 
Option specifySchema,

Review comment:
   Yes, thanks for correct me the spell issue. The `specifiedSchema` is 
used for `HoodieBootstrapHandle` to pass it is `writeSchema` in the follow code:
   
   >  public HoodieBootstrapHandle(HoodieWriteConfig config, String commitTime, 
HoodieTable hoodieTable,
 String partitionPath, String fileId, TaskContextSupplier 
taskContextSupplier) {
   super(config, commitTime, hoodieTable, partitionPath, fileId,
   Option.of(HoodieAvroUtils.RECORD_KEY_SCHEMA), taskContextSupplier);
 }
   
   For other Handler, it is alway be `None`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] pengzhiwei2018 commented on a change in pull request #2645: [HUDI-1659] Basic Implementation Of Spark Sql Support

2021-04-22 Thread GitBox


pengzhiwei2018 commented on a change in pull request #2645:
URL: https://github.com/apache/hudi/pull/2645#discussion_r618324402



##
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
##
@@ -189,8 +189,12 @@ protected boolean isUpdateRecord(HoodieRecord 
hoodieRecord) {
   private Option getIndexedRecord(HoodieRecord hoodieRecord) 
{
 Option> recordMetadata = 
hoodieRecord.getData().getMetadata();
 try {
-  Option avroRecord = 
hoodieRecord.getData().getInsertValue(writerSchema);
+  Option avroRecord = 
hoodieRecord.getData().getInsertValue(inputSchema,
+  config.getProps());
   if (avroRecord.isPresent()) {
+if (avroRecord.get() == HoodieMergeHandle.IGNORE_RECORD) {

Review comment:
   1、Currently the `IGNORE_RECOR` is already located at the 
`HoodieWriteHandle`, I will remove the useless `HoodieMergeHandle` class name.
   
   2、Yes, It is more safe to use the `.equals()` method.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] pengzhiwei2018 commented on a change in pull request #2645: [HUDI-1659] Basic Implementation Of Spark Sql Support

2021-04-22 Thread GitBox


pengzhiwei2018 commented on a change in pull request #2645:
URL: https://github.com/apache/hudi/pull/2645#discussion_r618322543



##
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/execution/HoodieLazyInsertIterable.java
##
@@ -96,9 +97,14 @@ public HoodieInsertValueGenResult(T record, Schema schema) {
* Transformer function to help transform a HoodieRecord. This transformer 
is used by BufferedIterator to offload some
* expensive operations of transformation to the reader thread.
*/
+  static  Function, 
HoodieInsertValueGenResult> getTransformFunction(
+  Schema schema, HoodieWriteConfig config) {
+return hoodieRecord -> new HoodieInsertValueGenResult(hoodieRecord, 
schema, config.getProps());

Review comment:
   Yeah, `config.getProps()` just return the reference of `props` without 
new allocations.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] pengzhiwei2018 commented on a change in pull request #2645: [HUDI-1659] Basic Implementation Of Spark Sql Support

2021-04-22 Thread GitBox


pengzhiwei2018 commented on a change in pull request #2645:
URL: https://github.com/apache/hudi/pull/2645#discussion_r618317906



##
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/execution/HoodieLazyInsertIterable.java
##
@@ -96,9 +97,14 @@ public HoodieInsertValueGenResult(T record, Schema schema) {
* Transformer function to help transform a HoodieRecord. This transformer 
is used by BufferedIterator to offload some
* expensive operations of transformation to the reader thread.
*/
+  static  Function, 
HoodieInsertValueGenResult> getTransformFunction(
+  Schema schema, HoodieWriteConfig config) {
+return hoodieRecord -> new HoodieInsertValueGenResult(hoodieRecord, 
schema, config.getProps());
+  }
+
   static  Function, 
HoodieInsertValueGenResult> getTransformFunction(
   Schema schema) {
-return hoodieRecord -> new HoodieInsertValueGenResult(hoodieRecord, 
schema);
+return hoodieRecord -> new HoodieInsertValueGenResult(hoodieRecord, 
schema, new Properties());

Review comment:
   Good suggestion!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] pengzhiwei2018 commented on a change in pull request #2645: [HUDI-1659] Basic Implementation Of Spark Sql Support

2021-04-18 Thread GitBox


pengzhiwei2018 commented on a change in pull request #2645:
URL: https://github.com/apache/hudi/pull/2645#discussion_r615508923



##
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
##
@@ -444,7 +448,9 @@ private void writeToBuffer(HoodieRecord record) {
 }
 Option indexedRecord = getIndexedRecord(record);
 if (indexedRecord.isPresent()) {
-  recordList.add(indexedRecord.get());
+  if (indexedRecord.get() != IGNORE_RECORD) { // Skip the Ignore Record.

Review comment:
   Fixed~




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] pengzhiwei2018 commented on a change in pull request #2645: [HUDI-1659] Basic Implementation Of Spark Sql Support

2021-04-18 Thread GitBox


pengzhiwei2018 commented on a change in pull request #2645:
URL: https://github.com/apache/hudi/pull/2645#discussion_r615508220



##
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
##
@@ -444,7 +448,9 @@ private void writeToBuffer(HoodieRecord record) {
 }
 Option indexedRecord = getIndexedRecord(record);
 if (indexedRecord.isPresent()) {
-  recordList.add(indexedRecord.get());
+  if (indexedRecord.get() != IGNORE_RECORD) { // Skip the Ignore Record.

Review comment:
   > @pengzhiwei2018 one more question, will we introduce Catalog to manage 
table operations in further?
   
   Yes, I agree with introduce Catalog to manage table operations for spark3 in 
the further.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] pengzhiwei2018 commented on a change in pull request #2645: [HUDI-1659] Basic Implementation Of Spark Sql Support

2021-04-18 Thread GitBox


pengzhiwei2018 commented on a change in pull request #2645:
URL: https://github.com/apache/hudi/pull/2645#discussion_r615507564



##
File path: 
hudi-common/src/main/java/org/apache/hudi/common/model/DefaultHoodieRecordPayload.java
##
@@ -97,4 +86,22 @@ public DefaultHoodieRecordPayload(Option 
record) {
 }
 return metadata.isEmpty() ? Option.empty() : Option.of(metadata);
   }
+
+  protected boolean needUpdatePersistedRecord(IndexedRecord currentValue,
+  IndexedRecord incomingRecord, 
Properties properties) {
+/*
+ * Combining strategy here returns currentValue on disk if incoming record 
is older.
+ * The incoming record can be either a delete (sent as an upsert with 
_hoodie_is_deleted set to true)
+ * or an insert/update record. In any case, if it is older than the record 
in disk, the currentValue
+ * in disk is returned (to be rewritten with new commit time).
+ *
+ * NOTE: Deletes sent via EmptyHoodieRecordPayload and/or Delete operation 
type do not hit this code path

Review comment:
   Yes, It is used by the HoodieMergeHandle. Here I just put the original 
code into the `needUpdatePersistedRecord`, which can used by the sub-class of 
`DefaultHoodieRecordPayload`.e .g. `ExpressionPayload`.  It is just a code 
refactor here.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] pengzhiwei2018 commented on a change in pull request #2645: [HUDI-1659] Basic Implementation Of Spark Sql Support

2021-04-16 Thread GitBox


pengzhiwei2018 commented on a change in pull request #2645:
URL: https://github.com/apache/hudi/pull/2645#discussion_r614929242



##
File path: 
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestDelete.scala
##
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi
+
+class TestDelete extends TestHoodieSqlBase {
+
+  test("Test Delete Table") {
+withTempDir { tmp =>
+  Seq("cow", "mor").foreach {tableType =>
+val tableName = generateTableName
+// create table
+spark.sql(
+  s"""
+ |create table $tableName (
+ |  id int,
+ |  name string,
+ |  price double,
+ |  ts long
+ |) using hudi
+ | location '${tmp.getCanonicalPath}/$tableName'
+ | options (
+ |  type = '$tableType',
+ |  primaryKey = 'id',
+ |  versionColumn = 'ts'
+ | )
+   """.stripMargin)
+// insert data to table
+spark.sql(s"insert into $tableName select 1, 'a1', 10, 1000")
+checkAnswer(s"select id, name, price, ts from $tableName")(
+  Seq(1, "a1", 10.0, 1000)
+)
+
+// delete table

Review comment:
   Fixed




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] pengzhiwei2018 commented on a change in pull request #2645: [HUDI-1659] Basic Implementation Of Spark Sql Support

2021-04-16 Thread GitBox


pengzhiwei2018 commented on a change in pull request #2645:
URL: https://github.com/apache/hudi/pull/2645#discussion_r614680980



##
File path: 
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCreateTable.scala
##
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi
+
+import scala.collection.JavaConverters._
+import org.apache.hudi.common.model.HoodieRecord
+import org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.catalog.CatalogTableType
+import org.apache.spark.sql.types.{DoubleType, IntegerType, LongType, 
StringType, StructField}
+
+class TestCreateTable extends TestHoodieSqlBase {
+
+  test("Test Create Managed Hoodie Table") {
+val tableName = generateTableName
+// Create a managed table
+spark.sql(
+  s"""
+ | create table $tableName (
+ |  id int,
+ |  name string,
+ |  price double,
+ |  ts long
+ | ) using hudi
+ | options (
+ |   primaryKey = 'id',
+ |   versionColumn = 'ts'

Review comment:
   There is a check for the versionColumn when create table in 
`CreateHoodieTableCommand#validateTable`. So the `versionColumn` must be a 
field defined in the table columns.  I will also add test for other column name 
as the `versionColumn`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] pengzhiwei2018 commented on a change in pull request #2645: [HUDI-1659] Basic Implementation Of Spark Sql Support

2021-04-16 Thread GitBox


pengzhiwei2018 commented on a change in pull request #2645:
URL: https://github.com/apache/hudi/pull/2645#discussion_r614680980



##
File path: 
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCreateTable.scala
##
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi
+
+import scala.collection.JavaConverters._
+import org.apache.hudi.common.model.HoodieRecord
+import org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.catalog.CatalogTableType
+import org.apache.spark.sql.types.{DoubleType, IntegerType, LongType, 
StringType, StructField}
+
+class TestCreateTable extends TestHoodieSqlBase {
+
+  test("Test Create Managed Hoodie Table") {
+val tableName = generateTableName
+// Create a managed table
+spark.sql(
+  s"""
+ | create table $tableName (
+ |  id int,
+ |  name string,
+ |  price double,
+ |  ts long
+ | ) using hudi
+ | options (
+ |   primaryKey = 'id',
+ |   versionColumn = 'ts'

Review comment:
   There is a check for the versionColumn when create table in 
`CreateHoodieTableCommand#validateTable`. So the `versionColumn` must be a 
field defined in the table columns.  I think we should not define a default 
value for the versionColumn.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] pengzhiwei2018 commented on a change in pull request #2645: [HUDI-1659] Basic Implementation Of Spark Sql Support

2021-04-16 Thread GitBox


pengzhiwei2018 commented on a change in pull request #2645:
URL: https://github.com/apache/hudi/pull/2645#discussion_r614669121



##
File path: 
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableAsSelectCommand.scala
##
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.command
+
+import org.apache.spark.sql.{Row, SaveMode, SparkSession}
+import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType}
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.execution.command.DataWritingCommand
+
+/**
+  * Command for create table as query statement.
+  */
+case class CreateHoodieTableAsSelectCommand(
+   table: CatalogTable,
+   mode: SaveMode,
+   query: LogicalPlan) extends DataWritingCommand {
+
+  override def run(sparkSession: SparkSession, child: SparkPlan): Seq[Row] = {
+assert(table.tableType != CatalogTableType.VIEW)
+assert(table.provider.isDefined)
+
+val sessionState = sparkSession.sessionState
+val db = 
table.identifier.database.getOrElse(sessionState.catalog.getCurrentDatabase)
+val tableIdentWithDB = table.identifier.copy(database = Some(db))
+val tableName = tableIdentWithDB.unquotedString
+
+if (sessionState.catalog.tableExists(tableIdentWithDB)) {
+  assert(mode != SaveMode.Overwrite,
+s"Expect the table $tableName has been dropped when the save mode is 
Overwrite")
+
+  if (mode == SaveMode.ErrorIfExists) {
+throw new RuntimeException(s"Table $tableName already exists. You need 
to drop it first.")
+  }
+  if (mode == SaveMode.Ignore) {
+// Since the table already exists and the save mode is Ignore, we will 
just return.
+// scalastyle:off
+return Seq.empty
+// scalastyle:on

Review comment:
   The scala style check will fail for the `return` statement, So we should 
turn it off it here.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] pengzhiwei2018 commented on a change in pull request #2645: [HUDI-1659] Basic Implementation Of Spark Sql Support

2021-04-16 Thread GitBox


pengzhiwei2018 commented on a change in pull request #2645:
URL: https://github.com/apache/hudi/pull/2645#discussion_r614665177



##
File path: 
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala
##
@@ -0,0 +1,318 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.analysis
+
+import org.apache.hudi.SparkSqlAdapterSupport
+
+import scala.collection.JavaConverters._
+import org.apache.hudi.common.model.HoodieRecord
+import org.apache.spark.SPARK_VERSION
+import org.apache.spark.sql.{AnalysisException, SparkSession}
+import org.apache.spark.sql.catalyst.analysis.UnresolvedStar
+import org.apache.spark.sql.catalyst.expressions.AttributeReference
+import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
+import org.apache.spark.sql.catalyst.expressions.{Alias, Expression, Literal, 
NamedExpression}
+import org.apache.spark.sql.catalyst.plans.Inner
+import org.apache.spark.sql.catalyst.plans.logical.{Assignment, DeleteAction, 
DeleteFromTable, InsertAction, LogicalPlan, MergeIntoTable, Project, 
UpdateAction, UpdateTable}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.command.CreateDataSourceTableCommand
+import org.apache.spark.sql.execution.datasources.{CreateTable, 
LogicalRelation}
+import org.apache.spark.sql.hudi.HoodieSqlUtils._
+import org.apache.spark.sql.hudi.command.{CreateHoodieTableAsSelectCommand, 
CreateHoodieTableCommand, DeleteHoodieTableCommand, 
InsertIntoHoodieTableCommand, MergeIntoHoodieTableCommand, 
UpdateHoodieTableCommand}
+import org.apache.spark.sql.types.StringType
+
+object HoodieAnalysis {
+  def customResolutionRules(): Seq[SparkSession => Rule[LogicalPlan]] =
+Seq(
+  session => HoodieResolveReferences(session),
+  session => HoodieAnalysis(session)
+)
+
+  def customPostHocResolutionRules(): Seq[SparkSession => Rule[LogicalPlan]] =
+Seq(
+  session => HoodiePostAnalysisRule(session)
+)
+}
+
+/**
+  * Rule for convert the logical plan to command.
+  * @param sparkSession
+  */
+case class HoodieAnalysis(sparkSession: SparkSession) extends Rule[LogicalPlan]
+  with SparkSqlAdapterSupport {
+
+  override def apply(plan: LogicalPlan): LogicalPlan = {
+plan match {
+  // Convert to MergeIntoHoodieTableCommand
+  case m @ MergeIntoTable(target, _, _, _, _)
+if m.resolved && isHoodieTable(target, sparkSession) =>
+  MergeIntoHoodieTableCommand(m)
+
+  // Convert to UpdateHoodieTableCommand
+  case u @ UpdateTable(table, _, _)
+if u.resolved && isHoodieTable(table, sparkSession) =>
+  UpdateHoodieTableCommand(u)
+
+  // Convert to DeleteHoodieTableCommand
+  case d @ DeleteFromTable(table, _)
+if d.resolved && isHoodieTable(table, sparkSession) =>
+  DeleteHoodieTableCommand(d)
+
+  // Convert to InsertIntoHoodieTableCommand
+  case l if sparkSqlAdapter.isInsertInto(l) =>
+val (table, partition, query, overwrite, _) = 
sparkSqlAdapter.getInsertIntoChildren(l).get
+table match {
+  case relation: LogicalRelation if isHoodieTable(relation, 
sparkSession) =>
+new InsertIntoHoodieTableCommand(relation, query, partition, 
overwrite)
+  case _ =>
+l
+}
+  // Convert to CreateHoodieTableAsSelectCommand
+  case CreateTable(table, mode, Some(query))
+if query.resolved && isHoodieTable(table) =>
+  CreateHoodieTableAsSelectCommand(table, mode, query)
+  case _=> plan
+}
+  }
+}
+
+/**
+  * Rule for resolve hoodie's extended syntax or rewrite some logical plan.
+  * @param sparkSession
+  */
+case class HoodieResolveReferences(sparkSession: SparkSession) extends 
Rule[LogicalPlan]
+  with SparkSqlAdapterSupport {
+  private lazy val analyzer = sparkSession.sessionState.analyzer
+
+  def apply(plan: LogicalPlan): LogicalPlan = {
+plan match {
+  // Resolve merge into
+  case MergeIntoTable(target, source, mergeCondition, matchedActions, 
notMatchedActions)
+if isHoodieTable(target, sparkSession) && target.resolved && 
source.resolved =>
+
+def isEmptyAssign

[GitHub] [hudi] pengzhiwei2018 commented on a change in pull request #2645: [HUDI-1659] Basic Implementation Of Spark Sql Support

2021-04-16 Thread GitBox


pengzhiwei2018 commented on a change in pull request #2645:
URL: https://github.com/apache/hudi/pull/2645#discussion_r614664550



##
File path: 
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/SparkSqlAdapterSupport.scala
##
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.spark.SPARK_VERSION
+import org.apache.spark.sql.hudi.SparkSqlAdapter
+
+trait SparkSqlAdapterSupport {
+
+  lazy val sparkSqlAdapter: SparkSqlAdapter = {
+val adapterClass = if (SPARK_VERSION.startsWith("2.")) {
+  "org.apache.spark.sql.adapter.Spark2SqlAdapter"

Review comment:
   Yes, because the `hudi-spark` project only include the `hudi-spark2` or 
`hudi-spark3`, we cannot include the spark2 dependency and spark3 dependency in 
the project at the same time.
   So we cannot get the `Spark2SqlAdapter` and `Spark3SqlAdapter` at the same 
time when compile. Here we use the string name instead of the class to avoid 
compile error.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] pengzhiwei2018 commented on a change in pull request #2645: [HUDI-1659] Basic Implementation Of Spark Sql Support

2021-04-16 Thread GitBox


pengzhiwei2018 commented on a change in pull request #2645:
URL: https://github.com/apache/hudi/pull/2645#discussion_r614664550



##
File path: 
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/SparkSqlAdapterSupport.scala
##
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.spark.SPARK_VERSION
+import org.apache.spark.sql.hudi.SparkSqlAdapter
+
+trait SparkSqlAdapterSupport {
+
+  lazy val sparkSqlAdapter: SparkSqlAdapter = {
+val adapterClass = if (SPARK_VERSION.startsWith("2.")) {
+  "org.apache.spark.sql.adapter.Spark2SqlAdapter"

Review comment:
   Yes, because the `hudi-spark` project only include the `hudi-spark2` or 
`hudi-spark3`, we cannot include the spark2 dependency and spark3 dependency in 
the project at the same time.
   So we cannot get the `Spark2SqlAdapter` and `Spark3SqlAdapter` at the same 
time in compile time. Here we use the string name instead of the class to avoid 
compile error.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] pengzhiwei2018 commented on a change in pull request #2645: [HUDI-1659] Basic Implementation Of Spark Sql Support

2021-04-16 Thread GitBox


pengzhiwei2018 commented on a change in pull request #2645:
URL: https://github.com/apache/hudi/pull/2645#discussion_r614658039



##
File path: 
hudi-common/src/main/java/org/apache/hudi/common/model/HoodiePayloadProps.java
##
@@ -40,4 +40,12 @@
*/
   public static final String PAYLOAD_EVENT_TIME_FIELD_PROP = 
"hoodie.payload.event.time.field";
   public static String DEFAULT_PAYLOAD_EVENT_TIME_FIELD_VAL = "ts";
+
+  public static final String PAYLOAD_DELETE_CONDITION = 
"hoodie.payload.delete.condition";

Review comment:
   ok




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] pengzhiwei2018 commented on a change in pull request #2645: [HUDI-1659] Basic Implementation Of Spark Sql Support

2021-04-16 Thread GitBox


pengzhiwei2018 commented on a change in pull request #2645:
URL: https://github.com/apache/hudi/pull/2645#discussion_r614649396



##
File path: 
hudi-common/src/main/java/org/apache/hudi/common/model/DefaultHoodieRecordPayload.java
##
@@ -97,4 +86,20 @@ public DefaultHoodieRecordPayload(Option 
record) {
 }
 return metadata.isEmpty() ? Option.empty() : Option.of(metadata);
   }
+
+  protected boolean noNeedUpdatePersistedRecord(IndexedRecord currentValue,

Review comment:
   Yeah, needUpdatePersistedRecord looks more read able




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] pengzhiwei2018 commented on a change in pull request #2645: [HUDI-1659] Basic Implementation Of Spark Sql Support

2021-04-06 Thread GitBox


pengzhiwei2018 commented on a change in pull request #2645:
URL: https://github.com/apache/hudi/pull/2645#discussion_r607469389



##
File path: 
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/MergeIntoTest.scala
##
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi
+
+import org.apache.spark.sql.Row
+
+class MergeIntoTest extends HoodieBaseSqlTest {
+
+  test("Test MergeInto") {
+withTempDir { tmp =>
+  val tableName = generateTableName
+  // Create table
+  spark.sql(
+s"""
+   |create table $tableName (
+   |  id int,
+   |  name string,
+   |  price double,
+   |  ts long
+   |) using hudi
+   | location '${tmp.getCanonicalPath}'
+   | options (
+   |  primaryKey ='id',
+   |  versionColumn = 'ts'
+   | )
+   """.stripMargin)
+
+  // First merge (insert a new record)
+  spark.sql(
+s"""
+   | merge into $tableName
+   | using (
+   |  select 1 as id, 'a1' as name, 10 as price, 1000 as ts
+   | ) s0
+   | on s0.id = $tableName.id
+   | when matched then update set
+   | id = s0.id, name = s0.name, price = s0.price, ts = s0.ts
+   | when not matched then insert *
+   """.stripMargin)
+  val queryResult1 = spark.sql(s"select id, name, price, ts from 
$tableName").collect()
+  assertResult(Array(Row(1, "a1", 10.0, 1000)))(queryResult1)
+
+  // second merge (update the record)
+  spark.sql(
+s"""
+   | merge into $tableName
+   | using (
+   |  select 1 as id, 'a1' as name, 10 as price, 1001 as ts
+   | ) s0
+   | on s0.id = $tableName.id
+   | when matched then update set
+   | id = s0.id, name = s0.name, price = s0.price + $tableName.price, 
ts = s0.ts
+   | when not matched then insert *
+   """.stripMargin)
+  val queryResult2 = spark.sql(s"select id, name, price, ts from 
$tableName").collect()
+  assertResult(Array(Row(1, "a1", 20.0, 1001)))(queryResult2)
+
+  // the third time merge (update & insert the record)
+  spark.sql(
+s"""
+   | merge into $tableName
+   | using (
+   |  select * from (
+   |  select 1 as id, 'a1' as name, 10 as price, 1002 as ts
+   |  union all
+   |  select 2 as id, 'a2' as name, 12 as price, 1001 as ts
+   |  )
+   | ) s0
+   | on s0.id = $tableName.id
+   | when matched then update set
+   | id = s0.id, name = s0.name, price = s0.price + $tableName.price, 
ts = s0.ts
+   | when not matched and id % 2 = 0 then insert *
+   """.stripMargin)
+  val queryResult3 = spark.sql(s"select id, name, price, ts from 
$tableName").collect()
+  assertResult(Array(Row(1, "a1", 30.0, 1002), Row(2, "a2", 12.0, 
1001)))(queryResult3)
+
+  // the fourth merge (delete the record)
+  spark.sql(
+s"""
+   | merge into $tableName
+   | using (
+   |  select 1 as id, 'a1' as name, 12 as price, 1003 as ts
+   | ) s0
+   | on s0.id = $tableName.id
+   | when matched and id != 1 then update set
+   |id = s0.id, name = s0.name, price = s0.price, ts = s0.ts
+   | when matched and id = 1 then delete

Review comment:
   The match-delete condition can support other columns too. The condition 
`when matched and ts = 1002` can work well. I will add a test case for this.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] pengzhiwei2018 commented on a change in pull request #2645: [HUDI-1659] Basic Implementation Of Spark Sql Support

2021-04-06 Thread GitBox


pengzhiwei2018 commented on a change in pull request #2645:
URL: https://github.com/apache/hudi/pull/2645#discussion_r607943535



##
File path: 
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/MergeIntoTest.scala
##
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi
+
+import org.apache.spark.sql.Row
+
+class MergeIntoTest extends HoodieBaseSqlTest {
+
+  test("Test MergeInto") {
+withTempDir { tmp =>
+  val tableName = generateTableName
+  // Create table
+  spark.sql(
+s"""
+   |create table $tableName (
+   |  id int,
+   |  name string,
+   |  price double,
+   |  ts long
+   |) using hudi
+   | location '${tmp.getCanonicalPath}'
+   | options (
+   |  primaryKey ='id',
+   |  versionColumn = 'ts'
+   | )
+   """.stripMargin)
+
+  // First merge (insert a new record)
+  spark.sql(
+s"""
+   | merge into $tableName
+   | using (
+   |  select 1 as id, 'a1' as name, 10 as price, 1000 as ts
+   | ) s0
+   | on s0.id = $tableName.id

Review comment:
   Yeah, I have test such case, currently it cannot support such case that 
2 tables have different row key names. I will fix this later. Thanks for remind 
me about this @kwondw .




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] pengzhiwei2018 commented on a change in pull request #2645: [HUDI-1659] Basic Implementation Of Spark Sql Support

2021-04-06 Thread GitBox


pengzhiwei2018 commented on a change in pull request #2645:
URL: https://github.com/apache/hudi/pull/2645#discussion_r607463859



##
File path: 
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/MergeIntoTest.scala
##
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi
+
+import org.apache.spark.sql.Row
+
+class MergeIntoTest extends HoodieBaseSqlTest {
+
+  test("Test MergeInto") {
+withTempDir { tmp =>
+  val tableName = generateTableName
+  // Create table
+  spark.sql(
+s"""
+   |create table $tableName (
+   |  id int,
+   |  name string,
+   |  price double,
+   |  ts long
+   |) using hudi
+   | location '${tmp.getCanonicalPath}'
+   | options (
+   |  primaryKey ='id',
+   |  versionColumn = 'ts'
+   | )
+   """.stripMargin)
+
+  // First merge (insert a new record)
+  spark.sql(
+s"""
+   | merge into $tableName
+   | using (
+   |  select 1 as id, 'a1' as name, 10 as price, 1000 as ts
+   | ) s0
+   | on s0.id = $tableName.id

Review comment:
   No, we don't have such limitation to the table column name. I will add a 
test case for it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] pengzhiwei2018 commented on a change in pull request #2645: [HUDI-1659] Basic Implementation Of Spark Sql Support

2021-04-05 Thread GitBox


pengzhiwei2018 commented on a change in pull request #2645:
URL: https://github.com/apache/hudi/pull/2645#discussion_r607469389



##
File path: 
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/MergeIntoTest.scala
##
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi
+
+import org.apache.spark.sql.Row
+
+class MergeIntoTest extends HoodieBaseSqlTest {
+
+  test("Test MergeInto") {
+withTempDir { tmp =>
+  val tableName = generateTableName
+  // Create table
+  spark.sql(
+s"""
+   |create table $tableName (
+   |  id int,
+   |  name string,
+   |  price double,
+   |  ts long
+   |) using hudi
+   | location '${tmp.getCanonicalPath}'
+   | options (
+   |  primaryKey ='id',
+   |  versionColumn = 'ts'
+   | )
+   """.stripMargin)
+
+  // First merge (insert a new record)
+  spark.sql(
+s"""
+   | merge into $tableName
+   | using (
+   |  select 1 as id, 'a1' as name, 10 as price, 1000 as ts
+   | ) s0
+   | on s0.id = $tableName.id
+   | when matched then update set
+   | id = s0.id, name = s0.name, price = s0.price, ts = s0.ts
+   | when not matched then insert *
+   """.stripMargin)
+  val queryResult1 = spark.sql(s"select id, name, price, ts from 
$tableName").collect()
+  assertResult(Array(Row(1, "a1", 10.0, 1000)))(queryResult1)
+
+  // second merge (update the record)
+  spark.sql(
+s"""
+   | merge into $tableName
+   | using (
+   |  select 1 as id, 'a1' as name, 10 as price, 1001 as ts
+   | ) s0
+   | on s0.id = $tableName.id
+   | when matched then update set
+   | id = s0.id, name = s0.name, price = s0.price + $tableName.price, 
ts = s0.ts
+   | when not matched then insert *
+   """.stripMargin)
+  val queryResult2 = spark.sql(s"select id, name, price, ts from 
$tableName").collect()
+  assertResult(Array(Row(1, "a1", 20.0, 1001)))(queryResult2)
+
+  // the third time merge (update & insert the record)
+  spark.sql(
+s"""
+   | merge into $tableName
+   | using (
+   |  select * from (
+   |  select 1 as id, 'a1' as name, 10 as price, 1002 as ts
+   |  union all
+   |  select 2 as id, 'a2' as name, 12 as price, 1001 as ts
+   |  )
+   | ) s0
+   | on s0.id = $tableName.id
+   | when matched then update set
+   | id = s0.id, name = s0.name, price = s0.price + $tableName.price, 
ts = s0.ts
+   | when not matched and id % 2 = 0 then insert *
+   """.stripMargin)
+  val queryResult3 = spark.sql(s"select id, name, price, ts from 
$tableName").collect()
+  assertResult(Array(Row(1, "a1", 30.0, 1002), Row(2, "a2", 12.0, 
1001)))(queryResult3)
+
+  // the fourth merge (delete the record)
+  spark.sql(
+s"""
+   | merge into $tableName
+   | using (
+   |  select 1 as id, 'a1' as name, 12 as price, 1003 as ts
+   | ) s0
+   | on s0.id = $tableName.id
+   | when matched and id != 1 then update set
+   |id = s0.id, name = s0.name, price = s0.price, ts = s0.ts
+   | when matched and id = 1 then delete

Review comment:
   No, the match-delete condition can support other columns too. The 
condition `when matched and ts = 1002` can work well. I will add a test case 
for this.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] pengzhiwei2018 commented on a change in pull request #2645: [HUDI-1659] Basic Implementation Of Spark Sql Support

2021-04-05 Thread GitBox


pengzhiwei2018 commented on a change in pull request #2645:
URL: https://github.com/apache/hudi/pull/2645#discussion_r607468023



##
File path: 
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/MergeIntoTest.scala
##
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi
+
+import org.apache.spark.sql.Row
+
+class MergeIntoTest extends HoodieBaseSqlTest {
+
+  test("Test MergeInto") {
+withTempDir { tmp =>
+  val tableName = generateTableName
+  // Create table
+  spark.sql(
+s"""
+   |create table $tableName (
+   |  id int,
+   |  name string,
+   |  price double,
+   |  ts long
+   |) using hudi
+   | location '${tmp.getCanonicalPath}'
+   | options (
+   |  primaryKey ='id',
+   |  versionColumn = 'ts'
+   | )
+   """.stripMargin)
+
+  // First merge (insert a new record)
+  spark.sql(
+s"""
+   | merge into $tableName
+   | using (
+   |  select 1 as id, 'a1' as name, 10 as price, 1000 as ts
+   | ) s0
+   | on s0.id = $tableName.id
+   | when matched then update set
+   | id = s0.id, name = s0.name, price = s0.price, ts = s0.ts
+   | when not matched then insert *
+   """.stripMargin)
+  val queryResult1 = spark.sql(s"select id, name, price, ts from 
$tableName").collect()
+  assertResult(Array(Row(1, "a1", 10.0, 1000)))(queryResult1)
+
+  // second merge (update the record)
+  spark.sql(
+s"""
+   | merge into $tableName

Review comment:
   No, we have supported the alias in the PR. You can see the last test 
case in the `TestMergeInto`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] pengzhiwei2018 commented on a change in pull request #2645: [HUDI-1659] Basic Implementation Of Spark Sql Support

2021-04-05 Thread GitBox


pengzhiwei2018 commented on a change in pull request #2645:
URL: https://github.com/apache/hudi/pull/2645#discussion_r607467341



##
File path: 
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/CreateTableTest.scala
##
@@ -19,17 +19,44 @@ package org.apache.spark.sql.hudi
 
 import scala.collection.JavaConverters._
 import org.apache.hudi.common.model.HoodieRecord
-import org.apache.hudi.exception.HoodieDuplicateKeyException
-import org.apache.hudi.hadoop.HoodieParquetInputFormat
 import org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat
-import org.apache.spark.sql.Row
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.catalog.CatalogTableType
 import org.apache.spark.sql.types.{DoubleType, IntegerType, LongType, 
StringType, StructField}
 
 class CreateTableTest extends HoodieBaseSqlTest {
 
-  test("Test Create Hoodie Table") {
+  test("Test Create Managed Hoodie Table") {
+val tableName = generateTableName
+// Create a managed table
+spark.sql(
+  s"""
+ | create table $tableName (
+ |  id int,
+ |  name string,
+ |  price double,
+ |  ts long
+ | ) using hudi
+ | options (
+ |   primaryKey = 'id',

Review comment:
   Thanks for reminding me about this, @kwondw , I will add validation for 
the primaryKey. 
   For the  choose of `OPTIONS` or  `TBLPROPERTIES`, I think `OPTIONS` is used 
more widely for the DataSource Table.
   And it also more easy to write.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] pengzhiwei2018 commented on a change in pull request #2645: [HUDI-1659] Basic Implementation Of Spark Sql Support

2021-04-05 Thread GitBox


pengzhiwei2018 commented on a change in pull request #2645:
URL: https://github.com/apache/hudi/pull/2645#discussion_r607463859



##
File path: 
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/MergeIntoTest.scala
##
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi
+
+import org.apache.spark.sql.Row
+
+class MergeIntoTest extends HoodieBaseSqlTest {
+
+  test("Test MergeInto") {
+withTempDir { tmp =>
+  val tableName = generateTableName
+  // Create table
+  spark.sql(
+s"""
+   |create table $tableName (
+   |  id int,
+   |  name string,
+   |  price double,
+   |  ts long
+   |) using hudi
+   | location '${tmp.getCanonicalPath}'
+   | options (
+   |  primaryKey ='id',
+   |  versionColumn = 'ts'
+   | )
+   """.stripMargin)
+
+  // First merge (insert a new record)
+  spark.sql(
+s"""
+   | merge into $tableName
+   | using (
+   |  select 1 as id, 'a1' as name, 10 as price, 1000 as ts
+   | ) s0
+   | on s0.id = $tableName.id

Review comment:
   No, we don't have such limitation to the table column name. I will add a 
test case for it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] pengzhiwei2018 commented on a change in pull request #2645: [HUDI-1659] Basic Implementation Of Spark Sql Support

2021-04-05 Thread GitBox


pengzhiwei2018 commented on a change in pull request #2645:
URL: https://github.com/apache/hudi/pull/2645#discussion_r607463859



##
File path: 
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/MergeIntoTest.scala
##
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi
+
+import org.apache.spark.sql.Row
+
+class MergeIntoTest extends HoodieBaseSqlTest {
+
+  test("Test MergeInto") {
+withTempDir { tmp =>
+  val tableName = generateTableName
+  // Create table
+  spark.sql(
+s"""
+   |create table $tableName (
+   |  id int,
+   |  name string,
+   |  price double,
+   |  ts long
+   |) using hudi
+   | location '${tmp.getCanonicalPath}'
+   | options (
+   |  primaryKey ='id',
+   |  versionColumn = 'ts'
+   | )
+   """.stripMargin)
+
+  // First merge (insert a new record)
+  spark.sql(
+s"""
+   | merge into $tableName
+   | using (
+   |  select 1 as id, 'a1' as name, 10 as price, 1000 as ts
+   | ) s0
+   | on s0.id = $tableName.id

Review comment:
   No, we don't have such limit to the table column name. I will add a test 
case for it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] pengzhiwei2018 commented on a change in pull request #2645: [HUDI-1659] Basic Implementation Of Spark Sql Support

2021-04-05 Thread GitBox


pengzhiwei2018 commented on a change in pull request #2645:
URL: https://github.com/apache/hudi/pull/2645#discussion_r607462898



##
File path: 
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala
##
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.command
+
+import java.util.Properties
+
+import org.apache.avro.Schema
+import org.apache.avro.generic.{GenericRecord, IndexedRecord}
+import org.apache.hudi.common.model.{DefaultHoodieRecordPayload, HoodieRecord}
+import org.apache.hudi.common.util.{Option => HOption}
+import org.apache.hudi.exception.HoodieDuplicateKeyException
+import org.apache.hudi.DataSourceWriteOptions._
+import org.apache.hudi.config.HoodieWriteConfig.TABLE_NAME
+import org.apache.hudi.hive.MultiPartKeysValueExtractor
+import org.apache.hudi.keygen.{ComplexKeyGenerator, UuidKeyGenerator}
+import org.apache.hudi.{HoodieSparkSqlWriter, HoodieWriterUtils}
+import org.apache.spark.sql.catalyst.catalog.CatalogTable
+import org.apache.spark.sql.catalyst.expressions.{Alias, Literal}
+import org.apache.spark.sql.{Column, DataFrame, Dataset, Row, SaveMode, 
SparkSession}
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import 
org.apache.spark.sql.execution.datasources.{InsertIntoDataSourceCommand, 
LogicalRelation}
+import org.apache.spark.sql.hudi.HoodieOptionConfig
+import org.apache.spark.sql.hudi.HoodieSqlUtils._
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.{StringType, StructType}
+
+/**
+  * Command for insert into hoodie table.
+  */
+class InsertIntoHoodieTableCommand(
+logicalRelation: LogicalRelation,
+query: LogicalPlan,
+partition: Map[String, Option[String]],
+overwrite: Boolean)
+  extends InsertIntoDataSourceCommand(logicalRelation, query, overwrite) {
+
+  override def run(sparkSession: SparkSession): Seq[Row] = {
+assert(logicalRelation.catalogTable.isDefined, "Missing catalog table")
+
+val table = logicalRelation.catalogTable.get
+InsertIntoHoodieTableCommand.run(sparkSession, table, query, partition, 
overwrite)
+Seq.empty[Row]
+  }
+}
+
+object InsertIntoHoodieTableCommand {
+  /**
+* Run the insert query. We support both dynamic partition insert and 
static partition insert.
+* @param sparkSession The spark session.
+* @param table The insert table.
+* @param query The insert query.
+* @param insertPartitions The specified insert partition map.
+* e.g. "insert into h(dt = '2021') select id, name 
from src"
+* "dt" is the key in the map and "2021" is the 
partition value. If the
+* partition value has not specified(in the case of 
dynamic partition)
+* , it is None in the map.
+* @param overwrite Whether to overwrite the table.
+*/
+  def run(sparkSession: SparkSession, table: CatalogTable, query: LogicalPlan,
+  insertPartitions: Map[String, Option[String]],
+  overwrite: Boolean): Unit = {
+
+val config = if (table.schema.fields.nonEmpty) { // for insert into
+  buildHoodieInsertConfig(table, sparkSession, insertPartitions)
+} else { // It is CTAS if the table schema is empty, we use the schema 
from the query.
+  buildHoodieInsertConfig(table, sparkSession, insertPartitions, 
Some(query.schema))
+}
+
+val mode = if (overwrite) SaveMode.Overwrite else SaveMode.Append
+val parameters = HoodieWriterUtils.parametersWithWriteDefaults(config)
+val queryData = Dataset.ofRows(sparkSession, query)
+val conf = sparkSession.sessionState.conf
+val alignedQuery = alignOutputFields(queryData, table, insertPartitions, 
conf)
+HoodieSparkSqlWriter.write(sparkSession.sqlContext, mode, parameters, 
alignedQuery)
+sparkSession.catalog.refreshTable(table.identifier.unquotedString)
+  }
+
+  /**
+* Aligned the type and name of query's output fields with the result 
table's fields.
+* @param query The insert query which to aligned.
+* @param table The result table.
+* @param insertPartitions The insert pa

[GitHub] [hudi] pengzhiwei2018 commented on a change in pull request #2645: [HUDI-1659] Basic Implementation Of Spark Sql Support

2021-04-05 Thread GitBox


pengzhiwei2018 commented on a change in pull request #2645:
URL: https://github.com/apache/hudi/pull/2645#discussion_r607443714



##
File path: 
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/UpdateHoodieTableCommand.scala
##
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.command
+
+import org.apache.hudi.DataSourceWriteOptions
+import org.apache.hudi.DataSourceWriteOptions._
+import org.apache.hudi.common.model.HoodieRecord
+import org.apache.hudi.config.HoodieWriteConfig.TABLE_NAME
+import org.apache.hudi.hive.MultiPartKeysValueExtractor
+import org.apache.hudi.keygen.ComplexKeyGenerator
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, 
Expression}
+import org.apache.spark.sql.catalyst.plans.logical.{Assignment, SubqueryAlias, 
UpdateTable}
+import org.apache.spark.sql.execution.command.RunnableCommand
+import org.apache.spark.sql.hudi.HoodieOptionConfig
+import org.apache.spark.sql.hudi.HoodieSqlUtils._
+import org.apache.spark.sql.types.StructField
+
+import scala.collection.JavaConverters._
+
+case class UpdateHoodieTableCommand(updateTable: UpdateTable) extends 
RunnableCommand {
+
+  private val table = updateTable.table
+  private val tableAlias = table match {
+case SubqueryAlias(name, _) => name
+case _ => throw new IllegalArgumentException(s"Illegal table: $table")
+  }
+
+  override def run(sparkSession: SparkSession): Seq[Row] = {
+logInfo(s"start execute update command for $tableAlias")
+def cast(exp:Expression, field: StructField): Expression = {
+  castIfNeeded(exp, field.dataType, sparkSession.sqlContext.conf)
+}
+val name2UpdateValue = updateTable.assignments.map {
+  case Assignment(attr: AttributeReference, value) =>
+attr.name -> value
+}.toMap
+
+val updateExpressions = table.output
+  .map(attr => name2UpdateValue.getOrElse(attr.name, attr))
+  .filter { // filter the meta columns
+case attr: AttributeReference =>
+  !HoodieRecord.HOODIE_META_COLUMNS.asScala.toSet.contains(attr.name)
+case _=> true
+  }
+
+val projects = 
updateExpressions.zip(removeMetaFields(table.schema).fields).map {
+  case (attr: AttributeReference, field) =>
+Column(cast(attr, field))
+  case (exp, field) =>
+Column(Alias(cast(exp, field), field.name)())
+}
+
+var df = Dataset.ofRows(sparkSession, table)
+if (updateTable.condition.isDefined) {
+  df = df.filter(Column(updateTable.condition.get))
+}
+df = df.select(projects: _*)
+val config = buildHoodieConfig(sparkSession)
+df.write
+  .format("hudi")
+  .mode(SaveMode.Append)
+  .options(config)
+  .save()
+table.refresh()
+logInfo(s"finish execute update command for $tableAlias")
+Seq.empty[Row]
+  }
+
+  private def buildHoodieConfig(sparkSession: SparkSession): Map[String, 
String] = {
+val targetTable = sparkSession.sessionState.catalog
+  .getTableMetadata(TableIdentifier(tableAlias.identifier, 
tableAlias.database))
+val path = getTableLocation(targetTable, sparkSession)
+  .getOrElse(s"missing location for $tableAlias")
+
+val primaryColumns = 
HoodieOptionConfig.getPrimaryColumns(targetTable.storage.properties)
+
+assert(primaryColumns.nonEmpty,
+  s"There are no primary key in table $tableAlias, cannot execute update 
operator")
+withSparkConf(sparkSession, targetTable.storage.properties) {
+  Map(
+"path" -> removeStarFromPath(path.toString),
+RECORDKEY_FIELD_OPT_KEY -> primaryColumns.mkString(","),
+KEYGENERATOR_CLASS_OPT_KEY -> 
classOf[ComplexKeyGenerator].getCanonicalName,

Review comment:
   The `ComplexKeyGenerator` is the default KeyGenerator for sql, because 
sql has the primary keys definition  in the DDL. 
   User can also define custom KeyGenerator when create table, just like this:
   create table h0 (id int,name string) using hudi options(
 hoodie.datasource.write.keygenerator.class = 'xxx.MyKeyGenerator'
   )
   




-- 
This 

[GitHub] [hudi] pengzhiwei2018 commented on a change in pull request #2645: [HUDI-1659] Basic Implementation Of Spark Sql Support

2021-04-05 Thread GitBox


pengzhiwei2018 commented on a change in pull request #2645:
URL: https://github.com/apache/hudi/pull/2645#discussion_r607441239



##
File path: 
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableCommand.scala
##
@@ -56,13 +68,46 @@ case class CreateHoodieTableCommand(table: CatalogTable, 
ignoreIfExists: Boolean
 return Seq.empty[Row]
 // scalastyle:on
   } else {
-throw new IllegalArgumentException(s"Table 
${table.identifier.unquotedString} already exists.")
+throw new IllegalArgumentException(s"Table $tableName already exists.")
   }
 }
-// Add the meta fields to the schema,
-val newSchema = addMetaFields(table.schema)
+
 var path = getTableLocation(table, sparkSession)
   .getOrElse(s"Missing path for table ${table.identifier}")
+val conf = sparkSession.sessionState.newHadoopConf()
+val isTableExists = tableExists(path, conf)
+// Get the schema & table options
+val (newSchema, tableOptions) = if (table.tableType == 
CatalogTableType.EXTERNAL &&
+  isTableExists) {
+  // If this is an external table & the table has already exists in the 
location,
+  // load the schema from the table meta.
+  assert(table.schema.isEmpty,
+s"Should not specified table schema for an exists hoodie external " +
+  s"table: ${table.identifier.unquotedString}")
+  // Get Schema from the external table
+  val metaClient = HoodieTableMetaClient.builder()
+.setBasePath(path)
+.setConf(conf)
+.build()
+  val schemaResolver = new TableSchemaResolver(metaClient)
+  val avroSchema = schemaResolver.getTableAvroSchema(true)
+  val tableSchema = 
SchemaConverters.toSqlType(avroSchema).dataType.asInstanceOf[StructType]
+  // Get options from the external table
+  val options = HoodieOptionConfig.mappingTableConfigToSqlOption(
+metaClient.getTableConfig.getProps.asScala.toMap)
+  (tableSchema, options)
+} else {

Review comment:
   Yes, we should support both Managed table & UnManaged table for hudi 
just like other spark datasource table. I think this complies with the SQL 
specification.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] pengzhiwei2018 commented on a change in pull request #2645: [HUDI-1659] Basic Implementation Of Spark Sql Support

2021-03-25 Thread GitBox


pengzhiwei2018 commented on a change in pull request #2645:
URL: https://github.com/apache/hudi/pull/2645#discussion_r601159174



##
File path: 
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/HoodieBaseSqlTest.scala
##
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi
+
+import java.io.File
+
+import org.apache.spark.sql.{Row, SparkSession}
+import org.apache.spark.util.Utils
+import org.scalactic.source
+import org.scalatest.{BeforeAndAfterAll, FunSuite, Tag}
+
+class HoodieBaseSqlTest extends FunSuite with BeforeAndAfterAll {

Review comment:
   Yes, good suggestions!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] pengzhiwei2018 commented on a change in pull request #2645: [HUDI-1659] Basic Implementation Of Spark Sql Support

2021-03-25 Thread GitBox


pengzhiwei2018 commented on a change in pull request #2645:
URL: https://github.com/apache/hudi/pull/2645#discussion_r601154948



##
File path: 
hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
##
@@ -617,6 +619,16 @@ public PropertyBuilder setTableName(String tableName) {
   return this;
 }
 
+public PropertyBuilder setTableSchema(String tableSchema) {
+  this.tableSchema = tableSchema;
+  return this;
+}
+
+public PropertyBuilder setRowKeyFields(String rowKeyFields) {

Review comment:
   Hi @vinothchandar , I am afraid we cannot do this for sql. The common 
way to specify the primary key in sql is by the row key fields, just like most 
of the database does. I think we should not provide the generator class for 
user to specify the primary key in sql.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] pengzhiwei2018 commented on a change in pull request #2645: [HUDI-1659] Basic Implementation Of Spark Sql Support

2021-03-25 Thread GitBox


pengzhiwei2018 commented on a change in pull request #2645:
URL: https://github.com/apache/hudi/pull/2645#discussion_r601160434



##
File path: 
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/HoodieSparkSessionExtension.scala
##
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi
+
+import org.apache.spark.SPARK_VERSION
+import org.apache.spark.sql.SparkSessionExtensions
+import org.apache.spark.sql.hudi.analysis.HoodieAnalysis
+import org.apache.spark.sql.hudi.parser.HoodieSqlParser
+
+/**
+  * The Hoodie SparkSessionExtension for extending the syntax and add the 
rules.
+  */
+class HoodieSparkSessionExtension extends (SparkSessionExtensions => Unit) {
+  override def apply(extensions: SparkSessionExtensions): Unit = {
+if (SPARK_VERSION.startsWith("2.")) {

Review comment:
   Hi @vinothchandar yes, currently we only add the parser for spark2, 
because spark3 has already support the merge/delete syntax.
   Hi @xiarixiaoyao ,Thanks for your suggestion. I will review this for spark3.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] pengzhiwei2018 commented on a change in pull request #2645: [HUDI-1659] Basic Implementation Of Spark Sql Support

2021-03-25 Thread GitBox


pengzhiwei2018 commented on a change in pull request #2645:
URL: https://github.com/apache/hudi/pull/2645#discussion_r601158842



##
File path: hudi-spark-datasource/hudi-spark2/src/main/antlr4/imports/SqlBase.g4
##
@@ -0,0 +1,1099 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * This file is an adaptation of Presto's 
presto-parser/src/main/antlr4/com/facebook/presto/sql/parser/SqlBase.g4 grammar.

Review comment:
   Hi @vinothchandar , currently the sql parser is only for the spark2, I 
add the sql parser in the `hudi-spark2 ` project only. For spark3, as you have 
mentioned, it has support the delete/merge syntax, we do not need to extend the 
sql parser.
   
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] pengzhiwei2018 commented on a change in pull request #2645: [HUDI-1659] Basic Implementation Of Spark Sql Support

2021-03-25 Thread GitBox


pengzhiwei2018 commented on a change in pull request #2645:
URL: https://github.com/apache/hudi/pull/2645#discussion_r601154948



##
File path: 
hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
##
@@ -617,6 +619,16 @@ public PropertyBuilder setTableName(String tableName) {
   return this;
 }
 
+public PropertyBuilder setTableSchema(String tableSchema) {
+  this.tableSchema = tableSchema;
+  return this;
+}
+
+public PropertyBuilder setRowKeyFields(String rowKeyFields) {

Review comment:
   Hi @vinothchandar , I am afraid we cannot do this for sql. The common 
way to specify the primary key in sql is by the row key fields. I think we 
should not provide the generator class for user to specify the primary key in 
sql.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] pengzhiwei2018 commented on a change in pull request #2645: [HUDI-1659] Basic Implementation Of Spark Sql Support

2021-03-25 Thread GitBox


pengzhiwei2018 commented on a change in pull request #2645:
URL: https://github.com/apache/hudi/pull/2645#discussion_r601149509



##
File path: 
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/UuidKeyGenerator.java
##
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.keygen;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.UUID;
+import java.util.stream.Collectors;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
+
+/**
+ * A KeyGenerator which use the uuid as the record key.
+ */
+public class UuidKeyGenerator extends BuiltinKeyGenerator {

Review comment:
   This is used for `InsertInto` for hudi table without a primary key. 
Currently, we must return a `RecordKey` for each record when write to hudi. If 
there user has not specify a primary key in the `DDL`. I provide a default 
`UuidKeyGenerator` which generate uuid for the record.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] pengzhiwei2018 commented on a change in pull request #2645: [HUDI-1659] Basic Implementation Of Spark Sql Support

2021-03-25 Thread GitBox


pengzhiwei2018 commented on a change in pull request #2645:
URL: https://github.com/apache/hudi/pull/2645#discussion_r601152600



##
File path: 
hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
##
@@ -617,6 +619,16 @@ public PropertyBuilder setTableName(String tableName) {
   return this;
 }
 
+public PropertyBuilder setTableSchema(String tableSchema) {

Review comment:
   The `tableSchema` saved to the `hoodie.properites` is the first schema 
when we create table with the DDL. After that, the schema can also change after 
commit, this schema will store in the commit files currently.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] pengzhiwei2018 commented on a change in pull request #2645: [HUDI-1659] Basic Implementation Of Spark Sql Support

2021-03-25 Thread GitBox


pengzhiwei2018 commented on a change in pull request #2645:
URL: https://github.com/apache/hudi/pull/2645#discussion_r601149509



##
File path: 
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/UuidKeyGenerator.java
##
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.keygen;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.UUID;
+import java.util.stream.Collectors;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
+
+/**
+ * A KeyGenerator which use the uuid as the record key.
+ */
+public class UuidKeyGenerator extends BuiltinKeyGenerator {

Review comment:
   This is used for `InsertInto` for hudi table without a primary key. 
Currently, we must return a `RecordKey` for each record when write to hudi. If 
user has not specify a primary key in the `DDL`. I provide a default 
`UuidKeyGenerator` which generate uuid for the record.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] pengzhiwei2018 commented on a change in pull request #2645: [HUDI-1659] Basic Implementation Of Spark Sql Support

2021-03-25 Thread GitBox


pengzhiwei2018 commented on a change in pull request #2645:
URL: https://github.com/apache/hudi/pull/2645#discussion_r601147731



##
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java
##
@@ -54,9 +53,18 @@
 public abstract class HoodieWriteHandle extends HoodieIOHandle {
 
   private static final Logger LOG = 
LogManager.getLogger(HoodieWriteHandle.class);
+  /**
+   * The input schema of the incoming dataframe.
+   */
+  protected final Schema inputSchema;

Review comment:
   For the case of MergeInto with a custom `HoodiePayload`, the input 
schema may be different from the write schema as the are transformation logical 
in the `payload`. So I distinguish between input and write schema.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] pengzhiwei2018 commented on a change in pull request #2645: [HUDI-1659] Basic Implementation Of Spark Sql Support

2021-03-25 Thread GitBox


pengzhiwei2018 commented on a change in pull request #2645:
URL: https://github.com/apache/hudi/pull/2645#discussion_r601145258



##
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
##
@@ -394,4 +405,36 @@ public IOType getIOType() {
   public HoodieBaseFile baseFileForMerge() {
 return baseFileToMerge;
   }
+
+  /**
+   * A special record returned by {@link HoodieRecordPayload}, which means
+   * {@link HoodieMergeHandle} should just skip this record.
+   */
+  private static class IgnoreRecord implements GenericRecord {

Review comment:
   For  the  `MergeInto` Statement, If there are not records match the 
conditions, we should filter the record by the `ExpressionPayload`. e.g.
   
   Merge Into d0
   using ( select 1 as id, 'a1' as name ) s0
   on d0.id = s0.id
   when matched and s0.id %2 = 0 then update set *
   
   The input `(1, 'a1')` will be filtered by the match condition `id %2 = 0`. 
In our implementation,we push all the condition and update expression to the   
`ExpressionPayload`, So the `Payload` must have the ability to filter the 
record. Currently the return of `Option.empty` means "DELTE" for record. I add 
a special record `IgnoredRecord` to
   represents the filter record.
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org