[GitHub] [hudi] nsivabalan commented on a diff in pull request #8697: [HUDI-5514] Improving usability/performance with out of box default for append only use-cases

2023-08-03 Thread via GitHub


nsivabalan commented on code in PR #8697:
URL: https://github.com/apache/hudi/pull/8697#discussion_r1283304161


##
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala:
##
@@ -429,6 +416,40 @@ object HoodieSparkSqlWriter {
 }
   }
 
+  def deduceOperation(hoodieConfig: HoodieConfig, paramsWithoutDefaults : 
Map[String, String]): WriteOperationType = {
+var operation = 
WriteOperationType.fromValue(hoodieConfig.getString(OPERATION))
+// TODO clean up
+// It does not make sense to allow upsert() operation if INSERT_DROP_DUPS 
is true
+// Auto-correct the operation to "insert" if OPERATION is set to "upsert" 
wrongly
+// or not set (in which case it will be set as "upsert" by 
parametersWithWriteDefaults()) .
+if (hoodieConfig.getBoolean(INSERT_DROP_DUPS) &&
+  operation == WriteOperationType.UPSERT) {
+
+  log.warn(s"$UPSERT_OPERATION_OPT_VAL is not applicable " +
+s"when $INSERT_DROP_DUPS is set to be true, " +
+s"overriding the $OPERATION to be $INSERT_OPERATION_OPT_VAL")
+
+  operation = WriteOperationType.INSERT
+  operation
+} else {
+  // if no record key, no preCombine, we should treat it as append only 
workload
+  // and make bulk_insert as operation type.
+  if 
(!paramsWithoutDefaults.containsKey(DataSourceWriteOptions.RECORDKEY_FIELD.key())
+&& 
!paramsWithoutDefaults.containsKey(DataSourceWriteOptions.PRECOMBINE_FIELD.key())
+&& !paramsWithoutDefaults.containsKey(OPERATION.key())) {
+log.warn(s"Choosing BULK_INSERT as the operation type since auto 
record key generation is applicable")
+operation = WriteOperationType.BULK_INSERT
+  }
+  // if no record key is set, will switch the default operation to INSERT 
(auto record key gen)
+  else if 
(!hoodieConfig.contains(DataSourceWriteOptions.RECORDKEY_FIELD.key())
+&& !paramsWithoutDefaults.containsKey(OPERATION.key())) {
+log.warn(s"Choosing INSERT as the operation type since auto record key 
generation is applicable")
+operation = WriteOperationType.INSERT

Review Comment:
   hey @xushiyan : I do hear your point. Here we are mainly simplifying a user 
who is comes from writing to parquet table to hudi table. thats why we are 
looking into the mandatory fields like record key, precombine. Can we go ahead 
and land this in for 0.14.0. 
   we can continue our discussion on how to simplify further. bcoz, very likely 
if someone is setting file sizing, they know hudi to some extent and not just 
trying to replace df.write.parquet w/ df.write.hudi. we can jot down few more 
use-cases and come up w/ a holistic plan 
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [hudi] nsivabalan commented on a diff in pull request #8697: [HUDI-5514] Improving usability/performance with out of box default for append only use-cases

2023-08-01 Thread via GitHub


nsivabalan commented on code in PR #8697:
URL: https://github.com/apache/hudi/pull/8697#discussion_r1281307491


##
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala:
##
@@ -429,6 +416,40 @@ object HoodieSparkSqlWriter {
 }
   }
 
+  def deduceOperation(hoodieConfig: HoodieConfig, paramsWithoutDefaults : 
Map[String, String]): WriteOperationType = {
+var operation = 
WriteOperationType.fromValue(hoodieConfig.getString(OPERATION))
+// TODO clean up
+// It does not make sense to allow upsert() operation if INSERT_DROP_DUPS 
is true
+// Auto-correct the operation to "insert" if OPERATION is set to "upsert" 
wrongly
+// or not set (in which case it will be set as "upsert" by 
parametersWithWriteDefaults()) .
+if (hoodieConfig.getBoolean(INSERT_DROP_DUPS) &&
+  operation == WriteOperationType.UPSERT) {
+
+  log.warn(s"$UPSERT_OPERATION_OPT_VAL is not applicable " +
+s"when $INSERT_DROP_DUPS is set to be true, " +
+s"overriding the $OPERATION to be $INSERT_OPERATION_OPT_VAL")
+
+  operation = WriteOperationType.INSERT
+  operation
+} else {
+  // if no record key, no preCombine, we should treat it as append only 
workload
+  // and make bulk_insert as operation type.
+  if 
(!paramsWithoutDefaults.containsKey(DataSourceWriteOptions.RECORDKEY_FIELD.key())
+&& 
!paramsWithoutDefaults.containsKey(DataSourceWriteOptions.PRECOMBINE_FIELD.key())
+&& !paramsWithoutDefaults.containsKey(OPERATION.key())) {
+log.warn(s"Choosing BULK_INSERT as the operation type since auto 
record key generation is applicable")
+operation = WriteOperationType.BULK_INSERT
+  }
+  // if no record key is set, will switch the default operation to INSERT 
(auto record key gen)
+  else if 
(!hoodieConfig.contains(DataSourceWriteOptions.RECORDKEY_FIELD.key())
+&& !paramsWithoutDefaults.containsKey(OPERATION.key())) {
+log.warn(s"Choosing INSERT as the operation type since auto record key 
generation is applicable")
+operation = WriteOperationType.INSERT

Review Comment:
   nope. we don't enforce. we are infact adding upsert and delete support for 
pkless here https://github.com/apache/hudi/pull/9261 if incoming df has meta 
fields 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [hudi] nsivabalan commented on a diff in pull request #8697: [HUDI-5514] Improving usability/performance with out of box default for append only use-cases

2023-07-20 Thread via GitHub


nsivabalan commented on code in PR #8697:
URL: https://github.com/apache/hudi/pull/8697#discussion_r1270185720


##
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala:
##
@@ -402,6 +389,40 @@ object HoodieSparkSqlWriter {
 }
   }
 
+  def deduceOperation(hoodieConfig: HoodieConfig, paramsWithoutDefaults : 
Map[String, String]): WriteOperationType = {
+var operation = 
WriteOperationType.fromValue(hoodieConfig.getString(OPERATION))
+// TODO clean up
+// It does not make sense to allow upsert() operation if INSERT_DROP_DUPS 
is true
+// Auto-correct the operation to "insert" if OPERATION is set to "upsert" 
wrongly
+// or not set (in which case it will be set as "upsert" by 
parametersWithWriteDefaults()) .
+if (hoodieConfig.getBoolean(INSERT_DROP_DUPS) &&
+  operation == WriteOperationType.UPSERT) {
+
+  log.warn(s"$UPSERT_OPERATION_OPT_VAL is not applicable " +
+s"when $INSERT_DROP_DUPS is set to be true, " +
+s"overriding the $OPERATION to be $INSERT_OPERATION_OPT_VAL")
+
+  operation = WriteOperationType.INSERT
+}
+
+// if no record key, no preCombine and no explicit partition path is set, 
we should treat it as append only workload

Review Comment:
   yeah. had discussions w/ other hudi eng as well. 
   here is the agreement. 
   when none of record key, operation and preCombine is set, we will make 
bulk_insert as the operation type. in other cases (atleast in spark-ds), upsert 
will be  the operation type



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [hudi] nsivabalan commented on a diff in pull request #8697: [HUDI-5514] Improving usability/performance with out of box default for append only use-cases

2023-07-09 Thread via GitHub


nsivabalan commented on code in PR #8697:
URL: https://github.com/apache/hudi/pull/8697#discussion_r1257563336


##
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala:
##
@@ -142,24 +144,27 @@ trait ProvidesHoodieConfig extends Logging {
 //   we'd prefer that value over auto-deduced operation. Otherwise, we 
deduce target operation type
 val operationOverride = 
combinedOpts.get(DataSourceWriteOptions.OPERATION.key)
 val operation = operationOverride.getOrElse {
-  (enableBulkInsert, isOverwritePartition, isOverwriteTable, 
dropDuplicate, isNonStrictMode, isPartitionedTable) match {
-case (true, _, _, _, false, _) =>
+  (enableBulkInsert, isOverwritePartition, isOverwriteTable, 
dropDuplicate, isNonStrictMode, isPartitionedTable,
+  autoGenerateRecordKeys) match {
+case (true, _, _, _, false, _, _) =>
   throw new IllegalArgumentException(s"Table with primaryKey can not 
use bulk insert in ${insertMode.value()} mode.")
-case (true, true, _, _, _, true) =>
+case (true, true, _, _, _, true, _) =>
   throw new IllegalArgumentException(s"Insert Overwrite Partition can 
not use bulk insert.")
-case (true, _, _, true, _, _) =>
+case (true, _, _, true, _, _, _) =>
   throw new IllegalArgumentException(s"Bulk insert cannot support drop 
duplication." +
 s" Please disable $INSERT_DROP_DUPS and try again.")
 // if enableBulkInsert is true, use bulk insert for the insert 
overwrite non-partitioned table.
-case (true, false, true, _, _, false) => BULK_INSERT_OPERATION_OPT_VAL
+case (true, false, true, _, _, false, _) => 
BULK_INSERT_OPERATION_OPT_VAL
 // insert overwrite table
-case (false, false, true, _, _, _) => 
INSERT_OVERWRITE_TABLE_OPERATION_OPT_VAL
+case (false, false, true, _, _, _, _) => 
INSERT_OVERWRITE_TABLE_OPERATION_OPT_VAL
 // insert overwrite partition
-case (_, true, false, _, _, true) => INSERT_OVERWRITE_OPERATION_OPT_VAL
+case (_, true, false, _, _, true, _) => 
INSERT_OVERWRITE_OPERATION_OPT_VAL
 // disable dropDuplicate, and provide preCombineKey, use the upsert 
operation for strict and upsert mode.
-case (false, false, false, false, false, _) if hasPrecombineColumn => 
UPSERT_OPERATION_OPT_VAL
+case (false, false, false, false, false, _, _) if hasPrecombineColumn 
=> UPSERT_OPERATION_OPT_VAL
 // if table is pk table and has enableBulkInsert use bulk insert for 
non-strict mode.
-case (true, _, _, _, true, _) => BULK_INSERT_OPERATION_OPT_VAL
+case (true, _, _, _, true, _, _) => BULK_INSERT_OPERATION_OPT_VAL
+// if auto record key generation is enabled, use bulk_insert
+case (_, _, _, _, _, true, true) => BULK_INSERT_OPERATION_OPT_VAL

Review Comment:
   fixed it. 



##
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala:
##
@@ -402,6 +389,40 @@ object HoodieSparkSqlWriter {
 }
   }
 
+  def deduceOperation(hoodieConfig: HoodieConfig, paramsWithoutDefaults : 
Map[String, String]): WriteOperationType = {
+var operation = 
WriteOperationType.fromValue(hoodieConfig.getString(OPERATION))
+// TODO clean up

Review Comment:
   nope. I just copied the comments over. not really sure



##
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala:
##
@@ -402,6 +389,40 @@ object HoodieSparkSqlWriter {
 }
   }
 
+  def deduceOperation(hoodieConfig: HoodieConfig, paramsWithoutDefaults : 
Map[String, String]): WriteOperationType = {
+var operation = 
WriteOperationType.fromValue(hoodieConfig.getString(OPERATION))
+// TODO clean up
+// It does not make sense to allow upsert() operation if INSERT_DROP_DUPS 
is true
+// Auto-correct the operation to "insert" if OPERATION is set to "upsert" 
wrongly
+// or not set (in which case it will be set as "upsert" by 
parametersWithWriteDefaults()) .
+if (hoodieConfig.getBoolean(INSERT_DROP_DUPS) &&
+  operation == WriteOperationType.UPSERT) {
+
+  log.warn(s"$UPSERT_OPERATION_OPT_VAL is not applicable " +
+s"when $INSERT_DROP_DUPS is set to be true, " +
+s"overriding the $OPERATION to be $INSERT_OPERATION_OPT_VAL")
+
+  operation = WriteOperationType.INSERT
+}
+
+// if no record key, no preCombine and no explicit partition path is set, 
we should treat it as append only workload

Review Comment:
   we are being conservative here. Major motivation here is to target users 
coming from parquet table. They would be writing to parquet table as 
df.write.format("parquet").save(basePath). 
   when they want to use hudi, table name is mandatory, but everything else if 
optional. 
   
   So, all they need to do is,