[GitHub] [hudi] nsivabalan commented on a change in pull request #3328: [HUDI-2208] Support Bulk Insert For Spark Sql

2021-08-08 Thread GitBox


nsivabalan commented on a change in pull request #3328:
URL: https://github.com/apache/hudi/pull/3328#discussion_r684894126



##
File path: 
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala
##
@@ -378,6 +379,41 @@ class TestInsertTable extends TestHoodieSqlBase {
   Seq(1, "a1", 10.0, "2021-07-18"),
   Seq(2, "a2", 10.0, "2021-07-18")
 )
+
+// Test bulk insert for multi-level partition
+val tableMultiPartition = generateTableName
+spark.sql(
+  s"""
+ |create table $tableMultiPartition (
+ |  id int,
+ |  name string,
+ |  price double,
+ |  dt string,
+ |  hh string
+ |) using hudi
+ | options (
+ |  type = '$tableType'
+ | )
+ | partitioned by (dt, hh)
+ | location '${tmp.getCanonicalPath}/$tableMultiPartition'
+   """.stripMargin)
+
+// Enable the bulk insert
+spark.sql("set hoodie.sql.bulk.insert.enable = true")
+spark.sql(s"insert into $tableMultiPartition values(1, 'a1', 10, 
'2021-07-18', '12')")
+
+checkAnswer(s"select id, name, price, dt, hh from 
$tableMultiPartition")(

Review comment:
   lets verify meta fields as well as suggested in other patch. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] nsivabalan commented on a change in pull request #3328: [HUDI-2208] Support Bulk Insert For Spark Sql

2021-08-08 Thread GitBox


nsivabalan commented on a change in pull request #3328:
URL: https://github.com/apache/hudi/pull/3328#discussion_r684803078



##
File path: 
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala
##
@@ -303,5 +304,184 @@ class TestInsertTable extends TestHoodieSqlBase {
   "assertion failed: Required select columns count: 4, Current select 
columns(including static partition column)" +
 " count: 3,columns: (1,a1,10)"
 )
+spark.sql("set hoodie.sql.bulk.insert.enable = true")
+spark.sql("set hoodie.sql.insert.mode= strict")
+
+val tableName2 = generateTableName

Review comment:
   Can we also enhance the test w/ both type of partitions(single level and 
multi-level). 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] nsivabalan commented on a change in pull request #3328: [HUDI-2208] Support Bulk Insert For Spark Sql

2021-08-05 Thread GitBox


nsivabalan commented on a change in pull request #3328:
URL: https://github.com/apache/hudi/pull/3328#discussion_r683843426



##
File path: 
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala
##
@@ -209,19 +209,32 @@ object InsertIntoHoodieTableCommand {
   .getOrElse(INSERT_DROP_DUPS_OPT_KEY.defaultValue)
   .toBoolean
 
-val operation = if (isOverwrite) {
-  if (table.partitionColumnNames.nonEmpty) {
-INSERT_OVERWRITE_OPERATION_OPT_VAL  // overwrite partition
-  } else {
-INSERT_OPERATION_OPT_VAL
+val enableBulkInsert = 
parameters.getOrElse(DataSourceWriteOptions.SQL_ENABLE_BULK_INSERT.key,
+  DataSourceWriteOptions.SQL_ENABLE_BULK_INSERT.defaultValue()).toBoolean
+val isPartitionedTable = table.partitionColumnNames.nonEmpty
+val isPrimaryKeyTable = primaryColumns.nonEmpty
+val operation =
+  (isPrimaryKeyTable, enableBulkInsert, isOverwrite, dropDuplicate) match {
+case (true, true, _, _) =>
+  throw new IllegalArgumentException(s"Table with primaryKey can not 
use bulk insert.")
+case (_, true, true, _) if isPartitionedTable =>
+  throw new IllegalArgumentException(s"Insert Overwrite Partition can 
not use bulk insert.")
+case (_, true, _, true) =>
+  throw new IllegalArgumentException(s"Bulk insert cannot support drop 
duplication." +
+s" Please disable $INSERT_DROP_DUPS_OPT_KEY and try again.")
+// if enableBulkInsert is true, use bulk insert for the insert 
overwrite non-partitioned table.
+case (_, true, true, _) if !isPartitionedTable => 
BULK_INSERT_OPERATION_OPT_VAL
+// insert overwrite partition
+case (_, _, true, _) if isPartitionedTable => 
INSERT_OVERWRITE_OPERATION_OPT_VAL
+// insert overwrite table
+case (_, _, true, _) if !isPartitionedTable => 
INSERT_OVERWRITE_TABLE_OPERATION_OPT_VAL
+// if the table has primaryKey and the dropDuplicate has disable, use 
the upsert operation
+case (true, false, false, false) => UPSERT_OPERATION_OPT_VAL
+// if enableBulkInsert is true and the table is non-primaryKeyed, use 
the bulk insert operation
+case (false, true, _, _) => BULK_INSERT_OPERATION_OPT_VAL
+// for the rest case, use the insert operation
+case (_, _, _, _) => INSERT_OPERATION_OPT_VAL

Review comment:
   I did go through every case here and have 2 suggestions. rest of the 
cases looks good. You don't need to consider my proposal above. But would like 
you to consider below feedback. 
   1.
   ```
   case (true, true, _, _) if !isNonStrictMode => throw new 
IllegalArgumentException(s"Table with primaryKey can not use bulk insert in 
strict mode.")
   ```
   Can we enable preCombine here and proceed with Bulk_Insert operation. Within 
hudi, we can do preCombine/dedup. As we agreed on using bulk_insert as default 
with CTAS, this will be a very common use-case. 
   
   2. 
   ```
   case (_, true, true, _) if isPartitionedTable =>
 throw new IllegalArgumentException(s"Insert Overwrite Partition 
can not use bulk insert.")
   ```
   since we agreed on enabling Bulk_insert as default for CTAS, this will be 
very common use-case as well. Can you help me understand why do we fail this 
call? why can't we let it proceed. This is basically, CTAS for a partitioned 
table.
   
   

##
File path: 
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
##
@@ -159,7 +159,10 @@ object HoodieSparkSqlWriter {
 
   // Convert to RDD[HoodieRecord]
   val genericRecords: RDD[GenericRecord] = 
HoodieSparkUtils.createRdd(df, schema, structName, nameSpace)
-  val shouldCombine = 
parameters(INSERT_DROP_DUPS_OPT_KEY.key()).toBoolean || 
operation.equals(WriteOperationType.UPSERT);
+  val shouldCombine = 
parameters(INSERT_DROP_DUPS_OPT_KEY.key()).toBoolean ||
+operation.equals(WriteOperationType.UPSERT) ||
+
parameters.getOrElse(HoodieWriteConfig.COMBINE_BEFORE_INSERT_PROP.key(),

Review comment:
   my bad. I get it now. 
   If InsertDropDups is set, we automatically set combine.before.insert. but if 
a user has set just "combine.before.insert", we need to do PreCombine here. 
   But I am not sure why this wasn't reported by anyone until now. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] nsivabalan commented on a change in pull request #3328: [HUDI-2208] Support Bulk Insert For Spark Sql

2021-08-03 Thread GitBox


nsivabalan commented on a change in pull request #3328:
URL: https://github.com/apache/hudi/pull/3328#discussion_r682137257



##
File path: 
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala
##
@@ -209,19 +209,32 @@ object InsertIntoHoodieTableCommand {
   .getOrElse(INSERT_DROP_DUPS_OPT_KEY.defaultValue)
   .toBoolean
 
-val operation = if (isOverwrite) {
-  if (table.partitionColumnNames.nonEmpty) {
-INSERT_OVERWRITE_OPERATION_OPT_VAL  // overwrite partition
-  } else {
-INSERT_OPERATION_OPT_VAL
+val enableBulkInsert = 
parameters.getOrElse(DataSourceWriteOptions.SQL_ENABLE_BULK_INSERT.key,
+  DataSourceWriteOptions.SQL_ENABLE_BULK_INSERT.defaultValue()).toBoolean
+val isPartitionedTable = table.partitionColumnNames.nonEmpty
+val isPrimaryKeyTable = primaryColumns.nonEmpty
+val operation =
+  (isPrimaryKeyTable, enableBulkInsert, isOverwrite, dropDuplicate) match {
+case (true, true, _, _) =>
+  throw new IllegalArgumentException(s"Table with primaryKey can not 
use bulk insert.")
+case (_, true, true, _) if isPartitionedTable =>
+  throw new IllegalArgumentException(s"Insert Overwrite Partition can 
not use bulk insert.")
+case (_, true, _, true) =>
+  throw new IllegalArgumentException(s"Bulk insert cannot support drop 
duplication." +
+s" Please disable $INSERT_DROP_DUPS_OPT_KEY and try again.")
+// if enableBulkInsert is true, use bulk insert for the insert 
overwrite non-partitioned table.
+case (_, true, true, _) if !isPartitionedTable => 
BULK_INSERT_OPERATION_OPT_VAL
+// insert overwrite partition
+case (_, _, true, _) if isPartitionedTable => 
INSERT_OVERWRITE_OPERATION_OPT_VAL
+// insert overwrite table
+case (_, _, true, _) if !isPartitionedTable => 
INSERT_OVERWRITE_TABLE_OPERATION_OPT_VAL
+// if the table has primaryKey and the dropDuplicate has disable, use 
the upsert operation
+case (true, false, false, false) => UPSERT_OPERATION_OPT_VAL
+// if enableBulkInsert is true and the table is non-primaryKeyed, use 
the bulk insert operation
+case (false, true, _, _) => BULK_INSERT_OPERATION_OPT_VAL
+// for the rest case, use the insert operation
+case (_, _, _, _) => INSERT_OPERATION_OPT_VAL

Review comment:
   actually I came across [INSERT OVERWRITE 
DIRECTORY](https://spark.apache.org/docs/latest/sql-ref-syntax-dml-insert-overwrite-directory.html)
 which can be mapped to insert_overwrite. 
   
   Here is a suggestion w/o using any additional configs: 
   CTAS -> bulk_insert 
   Insert into -> insert
   INSERT OVERWRITE -> insert overwrite table
   INSERT OVERWRITE DIRECTORY -> insert overwrite (partitions)
   
   
   
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] nsivabalan commented on a change in pull request #3328: [HUDI-2208] Support Bulk Insert For Spark Sql

2021-08-03 Thread GitBox


nsivabalan commented on a change in pull request #3328:
URL: https://github.com/apache/hudi/pull/3328#discussion_r682137257



##
File path: 
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala
##
@@ -209,19 +209,32 @@ object InsertIntoHoodieTableCommand {
   .getOrElse(INSERT_DROP_DUPS_OPT_KEY.defaultValue)
   .toBoolean
 
-val operation = if (isOverwrite) {
-  if (table.partitionColumnNames.nonEmpty) {
-INSERT_OVERWRITE_OPERATION_OPT_VAL  // overwrite partition
-  } else {
-INSERT_OPERATION_OPT_VAL
+val enableBulkInsert = 
parameters.getOrElse(DataSourceWriteOptions.SQL_ENABLE_BULK_INSERT.key,
+  DataSourceWriteOptions.SQL_ENABLE_BULK_INSERT.defaultValue()).toBoolean
+val isPartitionedTable = table.partitionColumnNames.nonEmpty
+val isPrimaryKeyTable = primaryColumns.nonEmpty
+val operation =
+  (isPrimaryKeyTable, enableBulkInsert, isOverwrite, dropDuplicate) match {
+case (true, true, _, _) =>
+  throw new IllegalArgumentException(s"Table with primaryKey can not 
use bulk insert.")
+case (_, true, true, _) if isPartitionedTable =>
+  throw new IllegalArgumentException(s"Insert Overwrite Partition can 
not use bulk insert.")
+case (_, true, _, true) =>
+  throw new IllegalArgumentException(s"Bulk insert cannot support drop 
duplication." +
+s" Please disable $INSERT_DROP_DUPS_OPT_KEY and try again.")
+// if enableBulkInsert is true, use bulk insert for the insert 
overwrite non-partitioned table.
+case (_, true, true, _) if !isPartitionedTable => 
BULK_INSERT_OPERATION_OPT_VAL
+// insert overwrite partition
+case (_, _, true, _) if isPartitionedTable => 
INSERT_OVERWRITE_OPERATION_OPT_VAL
+// insert overwrite table
+case (_, _, true, _) if !isPartitionedTable => 
INSERT_OVERWRITE_TABLE_OPERATION_OPT_VAL
+// if the table has primaryKey and the dropDuplicate has disable, use 
the upsert operation
+case (true, false, false, false) => UPSERT_OPERATION_OPT_VAL
+// if enableBulkInsert is true and the table is non-primaryKeyed, use 
the bulk insert operation
+case (false, true, _, _) => BULK_INSERT_OPERATION_OPT_VAL
+// for the rest case, use the insert operation
+case (_, _, _, _) => INSERT_OPERATION_OPT_VAL

Review comment:
   actually I came across [INSERT OVERWRITE 
DIRECTORY](https://spark.apache.org/docs/latest/sql-ref-syntax-dml-insert-overwrite-directory.html)
 which can be mapped to insert_overwrite. 
   
   Here is a suggestion w/o using any additional configs: 
   CTAS -> bulk_insert 
   Insert into -> insert
   INSERT OVERWRITE -> insert overwrite table
   INSERT OVERWRITE DIRECTORY -> insert overwrite 
   
   
   
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] nsivabalan commented on a change in pull request #3328: [HUDI-2208] Support Bulk Insert For Spark Sql

2021-08-03 Thread GitBox


nsivabalan commented on a change in pull request #3328:
URL: https://github.com/apache/hudi/pull/3328#discussion_r682101452



##
File path: 
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala
##
@@ -209,19 +209,32 @@ object InsertIntoHoodieTableCommand {
   .getOrElse(INSERT_DROP_DUPS_OPT_KEY.defaultValue)
   .toBoolean
 
-val operation = if (isOverwrite) {
-  if (table.partitionColumnNames.nonEmpty) {
-INSERT_OVERWRITE_OPERATION_OPT_VAL  // overwrite partition
-  } else {
-INSERT_OPERATION_OPT_VAL
+val enableBulkInsert = 
parameters.getOrElse(DataSourceWriteOptions.SQL_ENABLE_BULK_INSERT.key,
+  DataSourceWriteOptions.SQL_ENABLE_BULK_INSERT.defaultValue()).toBoolean
+val isPartitionedTable = table.partitionColumnNames.nonEmpty
+val isPrimaryKeyTable = primaryColumns.nonEmpty
+val operation =
+  (isPrimaryKeyTable, enableBulkInsert, isOverwrite, dropDuplicate) match {
+case (true, true, _, _) =>
+  throw new IllegalArgumentException(s"Table with primaryKey can not 
use bulk insert.")
+case (_, true, true, _) if isPartitionedTable =>
+  throw new IllegalArgumentException(s"Insert Overwrite Partition can 
not use bulk insert.")
+case (_, true, _, true) =>
+  throw new IllegalArgumentException(s"Bulk insert cannot support drop 
duplication." +
+s" Please disable $INSERT_DROP_DUPS_OPT_KEY and try again.")
+// if enableBulkInsert is true, use bulk insert for the insert 
overwrite non-partitioned table.
+case (_, true, true, _) if !isPartitionedTable => 
BULK_INSERT_OPERATION_OPT_VAL
+// insert overwrite partition
+case (_, _, true, _) if isPartitionedTable => 
INSERT_OVERWRITE_OPERATION_OPT_VAL
+// insert overwrite table
+case (_, _, true, _) if !isPartitionedTable => 
INSERT_OVERWRITE_TABLE_OPERATION_OPT_VAL
+// if the table has primaryKey and the dropDuplicate has disable, use 
the upsert operation
+case (true, false, false, false) => UPSERT_OPERATION_OPT_VAL
+// if enableBulkInsert is true and the table is non-primaryKeyed, use 
the bulk insert operation
+case (false, true, _, _) => BULK_INSERT_OPERATION_OPT_VAL
+// for the rest case, use the insert operation
+case (_, _, _, _) => INSERT_OPERATION_OPT_VAL

Review comment:
   @vinothchandar : do check this out before reviewing other feedbacks.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] nsivabalan commented on a change in pull request #3328: [HUDI-2208] Support Bulk Insert For Spark Sql

2021-08-02 Thread GitBox


nsivabalan commented on a change in pull request #3328:
URL: https://github.com/apache/hudi/pull/3328#discussion_r680197209



##
File path: 
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala
##
@@ -209,19 +209,32 @@ object InsertIntoHoodieTableCommand {
   .getOrElse(INSERT_DROP_DUPS_OPT_KEY.defaultValue)
   .toBoolean
 
-val operation = if (isOverwrite) {
-  if (table.partitionColumnNames.nonEmpty) {
-INSERT_OVERWRITE_OPERATION_OPT_VAL  // overwrite partition
-  } else {
-INSERT_OPERATION_OPT_VAL
+val enableBulkInsert = 
parameters.getOrElse(DataSourceWriteOptions.SQL_ENABLE_BULK_INSERT.key,
+  DataSourceWriteOptions.SQL_ENABLE_BULK_INSERT.defaultValue()).toBoolean
+val isPartitionedTable = table.partitionColumnNames.nonEmpty
+val isPrimaryKeyTable = primaryColumns.nonEmpty
+val operation =
+  (isPrimaryKeyTable, enableBulkInsert, isOverwrite, dropDuplicate) match {
+case (true, true, _, _) =>
+  throw new IllegalArgumentException(s"Table with primaryKey can not 
use bulk insert.")

Review comment:
   anyways, we can call it out that its responsibility of the user to 
ensure there are uniqueness. Also, IIUC, hudi can handle duplicates. Incase of 
updates, both records will be updated. but bulk_insert is very performant 
compared to regular Insert especially w/ row wirter. So, we should not keep it 
too restrictive for use. I know from the community msgs, that lot of users 
leverage bulk_insert. I would vote to relax this constraint. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] nsivabalan commented on a change in pull request #3328: [HUDI-2208] Support Bulk Insert For Spark Sql

2021-07-30 Thread GitBox


nsivabalan commented on a change in pull request #3328:
URL: https://github.com/apache/hudi/pull/3328#discussion_r680212711



##
File path: 
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala
##
@@ -209,19 +209,32 @@ object InsertIntoHoodieTableCommand {
   .getOrElse(INSERT_DROP_DUPS_OPT_KEY.defaultValue)
   .toBoolean
 
-val operation = if (isOverwrite) {
-  if (table.partitionColumnNames.nonEmpty) {
-INSERT_OVERWRITE_OPERATION_OPT_VAL  // overwrite partition
-  } else {
-INSERT_OPERATION_OPT_VAL
+val enableBulkInsert = 
parameters.getOrElse(DataSourceWriteOptions.SQL_ENABLE_BULK_INSERT.key,
+  DataSourceWriteOptions.SQL_ENABLE_BULK_INSERT.defaultValue()).toBoolean
+val isPartitionedTable = table.partitionColumnNames.nonEmpty
+val isPrimaryKeyTable = primaryColumns.nonEmpty
+val operation =
+  (isPrimaryKeyTable, enableBulkInsert, isOverwrite, dropDuplicate) match {
+case (true, true, _, _) =>
+  throw new IllegalArgumentException(s"Table with primaryKey can not 
use bulk insert.")
+case (_, true, true, _) if isPartitionedTable =>
+  throw new IllegalArgumentException(s"Insert Overwrite Partition can 
not use bulk insert.")
+case (_, true, _, true) =>
+  throw new IllegalArgumentException(s"Bulk insert cannot support drop 
duplication." +
+s" Please disable $INSERT_DROP_DUPS_OPT_KEY and try again.")
+// if enableBulkInsert is true, use bulk insert for the insert 
overwrite non-partitioned table.
+case (_, true, true, _) if !isPartitionedTable => 
BULK_INSERT_OPERATION_OPT_VAL
+// insert overwrite partition
+case (_, _, true, _) if isPartitionedTable => 
INSERT_OVERWRITE_OPERATION_OPT_VAL
+// insert overwrite table
+case (_, _, true, _) if !isPartitionedTable => 
INSERT_OVERWRITE_TABLE_OPERATION_OPT_VAL
+// if the table has primaryKey and the dropDuplicate has disable, use 
the upsert operation
+case (true, false, false, false) => UPSERT_OPERATION_OPT_VAL
+// if enableBulkInsert is true and the table is non-primaryKeyed, use 
the bulk insert operation
+case (false, true, _, _) => BULK_INSERT_OPERATION_OPT_VAL
+// for the rest case, use the insert operation
+case (_, _, _, _) => INSERT_OPERATION_OPT_VAL

Review comment:
   Here is my thought on choosing the right operation. Having too many case 
statements might complicate things and is error prone too. As I mentioned 
earlier, we should try to do any valid conversions in HoodiesSparkSqlWriter. 
Only those thats applicable just to sql dml, we should keep it here. 
   Anyways, here is one simplified approach. Ignoring the primary, non primary 
key table for now. We can come back to that later once we have consensus on 
this. 
   
   We need just two configs. 
   hoodie.sql.enable.bulk_insert (default false)
   hoodie.sql.overwrite.entire.table (default true)
   
   From sql syntax, there are two commands allowed. 
   "INSERT" into and "INSERT OVERWRITE". And these need to map to 4 operations 
on the hudi end (insert, bulk_insert, insert over write and insert overwrite 
table)
   
   "INSERT" with no other configs set -> insert operation
   "INSERT" with enable bulk insert set -> bulk_insert
   "INSERT OVERWRITE" with no other configs set -> insert_overwrite_table 
operation
   "INSERT OVERWRITE" with hoodie.sql.overwrite.entire.table = false -> 
insert_overwrite operation.
   "INSERT OVERWRITE" with enable bulk_insert set -> bulk_insert. pass the 
right save mode to HoodieSparkSqlWriter
   "INSERT OVERWRITE" with enable bulk_insert set and 
hoodie.sql.overwrite.entire.table = false -> bulk_insert. pass the right save 
mode to HoodieSparkSqlWriter.
   
   
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] nsivabalan commented on a change in pull request #3328: [HUDI-2208] Support Bulk Insert For Spark Sql

2021-07-30 Thread GitBox


nsivabalan commented on a change in pull request #3328:
URL: https://github.com/apache/hudi/pull/3328#discussion_r680186620



##
File path: 
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala
##
@@ -248,6 +248,14 @@ object DataSourceWriteOptions {
 .withDocumentation("When set to true, will perform write operations 
directly using the spark native " +
   "`Row` representation, avoiding any additional conversion costs.")
 
+  /**
+   * Enable the bulk insert for sql insert statement.
+   */
+  val SQL_ENABLE_BULK_INSERT:ConfigProperty[String] = ConfigProperty

Review comment:
   @vinothchandar : In sql, we don't have two separate commands like INSERT 
into and BULK_INSERT into. so, guess we are going this route. But default CTAS 
choose INSERT operation. I am thinking users may not use bulk_insert only since 
they have to set the property explicitly. any thoughts. 
   There are two things to discuss. 
   1. Which operation to use with CTAS
   2. which operation to use with INSERT into. 
   State as of now, is "Insert". And user has to explicitly set operation type 
to bulk_insert before calling any of this commands. 

##
File path: 
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala
##
@@ -209,19 +209,32 @@ object InsertIntoHoodieTableCommand {
   .getOrElse(INSERT_DROP_DUPS_OPT_KEY.defaultValue)
   .toBoolean
 
-val operation = if (isOverwrite) {
-  if (table.partitionColumnNames.nonEmpty) {
-INSERT_OVERWRITE_OPERATION_OPT_VAL  // overwrite partition
-  } else {
-INSERT_OPERATION_OPT_VAL
+val enableBulkInsert = 
parameters.getOrElse(DataSourceWriteOptions.SQL_ENABLE_BULK_INSERT.key,
+  DataSourceWriteOptions.SQL_ENABLE_BULK_INSERT.defaultValue()).toBoolean
+val isPartitionedTable = table.partitionColumnNames.nonEmpty
+val isPrimaryKeyTable = primaryColumns.nonEmpty
+val operation =
+  (isPrimaryKeyTable, enableBulkInsert, isOverwrite, dropDuplicate) match {
+case (true, true, _, _) =>
+  throw new IllegalArgumentException(s"Table with primaryKey can not 
use bulk insert.")
+case (_, true, true, _) if isPartitionedTable =>
+  throw new IllegalArgumentException(s"Insert Overwrite Partition can 
not use bulk insert.")
+case (_, true, _, true) =>
+  throw new IllegalArgumentException(s"Bulk insert cannot support drop 
duplication." +
+s" Please disable $INSERT_DROP_DUPS_OPT_KEY and try again.")
+// if enableBulkInsert is true, use bulk insert for the insert 
overwrite non-partitioned table.
+case (_, true, true, _) if !isPartitionedTable => 
BULK_INSERT_OPERATION_OPT_VAL
+// insert overwrite partition
+case (_, _, true, _) if isPartitionedTable => 
INSERT_OVERWRITE_OPERATION_OPT_VAL
+// insert overwrite table
+case (_, _, true, _) if !isPartitionedTable => 
INSERT_OVERWRITE_TABLE_OPERATION_OPT_VAL

Review comment:
   HoodieSparkSqlWriter will handle this save mode. 

##
File path: 
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala
##
@@ -209,19 +209,32 @@ object InsertIntoHoodieTableCommand {
   .getOrElse(INSERT_DROP_DUPS_OPT_KEY.defaultValue)
   .toBoolean
 
-val operation = if (isOverwrite) {
-  if (table.partitionColumnNames.nonEmpty) {
-INSERT_OVERWRITE_OPERATION_OPT_VAL  // overwrite partition
-  } else {
-INSERT_OPERATION_OPT_VAL
+val enableBulkInsert = 
parameters.getOrElse(DataSourceWriteOptions.SQL_ENABLE_BULK_INSERT.key,
+  DataSourceWriteOptions.SQL_ENABLE_BULK_INSERT.defaultValue()).toBoolean
+val isPartitionedTable = table.partitionColumnNames.nonEmpty
+val isPrimaryKeyTable = primaryColumns.nonEmpty
+val operation =
+  (isPrimaryKeyTable, enableBulkInsert, isOverwrite, dropDuplicate) match {
+case (true, true, _, _) =>
+  throw new IllegalArgumentException(s"Table with primaryKey can not 
use bulk insert.")
+case (_, true, true, _) if isPartitionedTable =>
+  throw new IllegalArgumentException(s"Insert Overwrite Partition can 
not use bulk insert.")
+case (_, true, _, true) =>
+  throw new IllegalArgumentException(s"Bulk insert cannot support drop 
duplication." +
+s" Please disable $INSERT_DROP_DUPS_OPT_KEY and try again.")
+// if enableBulkInsert is true, use bulk insert for the insert 
overwrite non-partitioned table.
+case (_, true, true, _) if !isPartitionedTable => 
BULK_INSERT_OPERATION_OPT_VAL
+// insert overwrite partition
+case (_, _, true, _) if isPartitionedTable => 
INSERT_OVERWRITE_OPERATION_OPT_VAL
+// insert overwrite table
+case (_, _, true, _) if !isPartitionedTable => 

[GitHub] [hudi] nsivabalan commented on a change in pull request #3328: [HUDI-2208] Support Bulk Insert For Spark Sql

2021-07-29 Thread GitBox


nsivabalan commented on a change in pull request #3328:
URL: https://github.com/apache/hudi/pull/3328#discussion_r679153080



##
File path: 
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala
##
@@ -209,19 +209,32 @@ object InsertIntoHoodieTableCommand {
   .getOrElse(INSERT_DROP_DUPS_OPT_KEY.defaultValue)
   .toBoolean
 
-val operation = if (isOverwrite) {
-  if (table.partitionColumnNames.nonEmpty) {
-INSERT_OVERWRITE_OPERATION_OPT_VAL  // overwrite partition
-  } else {
-INSERT_OPERATION_OPT_VAL
+val enableBulkInsert = 
parameters.getOrElse(DataSourceWriteOptions.SQL_ENABLE_BULK_INSERT.key,
+  DataSourceWriteOptions.SQL_ENABLE_BULK_INSERT.defaultValue()).toBoolean
+val isPartitionedTable = table.partitionColumnNames.nonEmpty
+val isPrimaryKeyTable = primaryColumns.nonEmpty
+val operation =
+  (isPrimaryKeyTable, enableBulkInsert, isOverwrite, dropDuplicate) match {
+case (true, true, _, _) =>
+  throw new IllegalArgumentException(s"Table with primaryKey can not 
use bulk insert.")
+case (_, true, true, _) if isPartitionedTable =>
+  throw new IllegalArgumentException(s"Insert Overwrite Partition can 
not use bulk insert.")
+case (_, true, _, true) =>
+  throw new IllegalArgumentException(s"Bulk insert cannot support drop 
duplication." +
+s" Please disable $INSERT_DROP_DUPS_OPT_KEY and try again.")
+// if enableBulkInsert is true, use bulk insert for the insert 
overwrite non-partitioned table.
+case (_, true, true, _) if !isPartitionedTable => 
BULK_INSERT_OPERATION_OPT_VAL

Review comment:
   also, if there are any valid optimizations, then probably we should move 
it to HoodiesparkSqlSwriter so that both spark datasource and sql dml benefits 
:) 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [hudi] nsivabalan commented on a change in pull request #3328: [HUDI-2208] Support Bulk Insert For Spark Sql

2021-07-27 Thread GitBox


nsivabalan commented on a change in pull request #3328:
URL: https://github.com/apache/hudi/pull/3328#discussion_r677405475



##
File path: 
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala
##
@@ -209,19 +209,32 @@ object InsertIntoHoodieTableCommand {
   .getOrElse(INSERT_DROP_DUPS_OPT_KEY.defaultValue)
   .toBoolean
 
-val operation = if (isOverwrite) {
-  if (table.partitionColumnNames.nonEmpty) {
-INSERT_OVERWRITE_OPERATION_OPT_VAL  // overwrite partition
-  } else {
-INSERT_OPERATION_OPT_VAL
+val enableBulkInsert = 
parameters.getOrElse(DataSourceWriteOptions.SQL_ENABLE_BULK_INSERT.key,
+  DataSourceWriteOptions.SQL_ENABLE_BULK_INSERT.defaultValue()).toBoolean
+val isPartitionedTable = table.partitionColumnNames.nonEmpty
+val isPrimaryKeyTable = primaryColumns.nonEmpty
+val operation =
+  (isPrimaryKeyTable, enableBulkInsert, isOverwrite, dropDuplicate) match {
+case (true, true, _, _) =>
+  throw new IllegalArgumentException(s"Table with primaryKey can not 
use bulk insert.")

Review comment:
   may I know why do we have this constraint? 

##
File path: 
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
##
@@ -159,7 +159,10 @@ object HoodieSparkSqlWriter {
 
   // Convert to RDD[HoodieRecord]
   val genericRecords: RDD[GenericRecord] = 
HoodieSparkUtils.createRdd(df, schema, structName, nameSpace)
-  val shouldCombine = 
parameters(INSERT_DROP_DUPS_OPT_KEY.key()).toBoolean || 
operation.equals(WriteOperationType.UPSERT);
+  val shouldCombine = 
parameters(INSERT_DROP_DUPS_OPT_KEY.key()).toBoolean ||
+operation.equals(WriteOperationType.UPSERT) ||
+
parameters.getOrElse(HoodieWriteConfig.COMBINE_BEFORE_INSERT_PROP.key(),

Review comment:
   sorry I don't get you. Precombine is just one field as is right. Not 
sure what do you mean by "not compute the preCombine field value"? can you 
throw some more light please. 
   In general, for inserts we don't do any precombine. But if this config 
(COMBINE_BEFORE_INSERT_PROP) is enabled, we need to do preCombine. 

##
File path: 
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala
##
@@ -243,6 +256,8 @@ object InsertIntoHoodieTableCommand {
 RECORDKEY_FIELD_OPT_KEY.key -> primaryColumns.mkString(","),
 PARTITIONPATH_FIELD_OPT_KEY.key -> partitionFields,
 PAYLOAD_CLASS_OPT_KEY.key -> payloadClassName,
+ENABLE_ROW_WRITER_OPT_KEY.key -> enableBulkInsert.toString,

Review comment:
   you can add one, but make the default as true. 

##
File path: 
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala
##
@@ -209,19 +209,32 @@ object InsertIntoHoodieTableCommand {
   .getOrElse(INSERT_DROP_DUPS_OPT_KEY.defaultValue)
   .toBoolean
 
-val operation = if (isOverwrite) {
-  if (table.partitionColumnNames.nonEmpty) {
-INSERT_OVERWRITE_OPERATION_OPT_VAL  // overwrite partition
-  } else {
-INSERT_OPERATION_OPT_VAL
+val enableBulkInsert = 
parameters.getOrElse(DataSourceWriteOptions.SQL_ENABLE_BULK_INSERT.key,
+  DataSourceWriteOptions.SQL_ENABLE_BULK_INSERT.defaultValue()).toBoolean
+val isPartitionedTable = table.partitionColumnNames.nonEmpty
+val isPrimaryKeyTable = primaryColumns.nonEmpty
+val operation =
+  (isPrimaryKeyTable, enableBulkInsert, isOverwrite, dropDuplicate) match {
+case (true, true, _, _) =>
+  throw new IllegalArgumentException(s"Table with primaryKey can not 
use bulk insert.")
+case (_, true, true, _) if isPartitionedTable =>
+  throw new IllegalArgumentException(s"Insert Overwrite Partition can 
not use bulk insert.")
+case (_, true, _, true) =>
+  throw new IllegalArgumentException(s"Bulk insert cannot support drop 
duplication." +
+s" Please disable $INSERT_DROP_DUPS_OPT_KEY and try again.")
+// if enableBulkInsert is true, use bulk insert for the insert 
overwrite non-partitioned table.
+case (_, true, true, _) if !isPartitionedTable => 
BULK_INSERT_OPERATION_OPT_VAL

Review comment:
   Am just trying to understand the sql dml here. We already handle save 
modes within HoodieSparkSqlWriter. So, trying to understand whats required in 
addition to that? Trying to avoid duplication if possible. I mean, for some of 
the cases listed here, its just about overWrite mode. 

##
File path: 
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
##
@@ -159,7 +159,10 @@ object HoodieSparkSqlWriter {
 
   // Convert to RDD[HoodieRecord]
   val genericRecords: RDD[GenericRecord] =