Vinoth Govindarajan created HUDI-2357:
-----------------------------------------

             Summary: MERGE INTO doesn't work for tables created using CTAS
                 Key: HUDI-2357
                 URL: https://issues.apache.org/jira/browse/HUDI-2357
             Project: Apache Hudi
          Issue Type: Sub-task
          Components: Spark Integration
            Reporter: Vinoth Govindarajan
            Assignee: pengzhiwei
             Fix For: 0.9.0


MERGE INTO command doesn't select the correct primaryKey for tables created 
using CTAS, whereas it works for tables created using CREATE TABLE command.

I guess we are hitting this issue because the keygenerator class is set to 
SqlKeyGenerator for tables created using CTAS:

 

This works:

 
{code:java}
create table h5 (id bigint, name string, ts bigint) using hudi
options (type = "cow" , primaryKey="id" , preCombineField="ts" );

merge into h5 as t0
using (
    select 5 as s_id, 'vinoth' as s_name, current_timestamp() as s_ts
) t1
on t1.s_id = t0.id
when matched then update set * 
when not matched then insert *;
{code}
hoodie.properties for working use-case:

 

 
{code:java}
➜  analytics.db git:(apache_hudi_support) cat h5/.hoodie/hoodie.properties
#Properties saved on Wed Aug 25 04:10:33 UTC 2021
#Wed Aug 25 04:10:33 UTC 2021
hoodie.table.name=h5
hoodie.table.recordkey.fields=id
hoodie.table.type=COPY_ON_WRITE
hoodie.table.precombine.field=ts
hoodie.table.partition.fields=
hoodie.archivelog.folder=archived
hoodie.table.create.schema={"type"\:"record","name"\:"topLevelRecord","fields"\:[{"name"\:"_hoodie_commit_time","type"\:["string","null"]},{"name"\:"_hoodie_commit_seqno","type"\:["string","null"]},{"name"\:"_hoodie_record_key","type"\:["string","null"]},{"name"\:"_hoodie_partition_path","type"\:["string","null"]},{"name"\:"_hoodie_file_name","type"\:["string","null"]},{"name"\:"id","type"\:["long","null"]},{"name"\:"name","type"\:["string","null"]},{"name"\:"ts","type"\:["long","null"]}]}
hoodie.timeline.layout.version=1
hoodie.table.version=1{code}
 

Whereas this doesn't work:
{code:java}
create table h4create table h4using hudioptions (type = "cow" , primaryKey="id" 
, preCombineField="ts" ) as select 5 as id, cast(rand() as string) as name, 
current_timestamp();
merge into h3 as t0using (    select '5' as s_id, 'vinoth' as s_name, 
current_timestamp() as s_ts) t1on t1.s_id = t0.idwhen matched then update set * 
when not matched then insert *;
544702 [main] ERROR org.apache.spark.sql.hive.thriftserver.SparkSQLDriver  - 
Failed in [merge into analytics.h3 as t0using (    select '5' as s_id, 'vinoth' 
as s_name, current_timestamp() as s_ts) t1on t1.s_id = t0.idwhen matched then 
update set *when not matched then insert *]java.lang.IllegalArgumentException: 
Merge Key[id] is not Equal to the defined primary key[] in table h3 at 
org.apache.spark.sql.hudi.command.MergeIntoHoodieTableCommand.buildMergeIntoConfig(MergeIntoHoodieTableCommand.scala:425)
 at 
org.apache.spark.sql.hudi.command.MergeIntoHoodieTableCommand.run(MergeIntoHoodieTableCommand.scala:147)
 at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
 at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
 at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:79)
 at org.apache.spark.sql.Dataset.$anonfun$logicalPlan$1(Dataset.scala:229) at 
org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3618) at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100)
 at 
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
 at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87)
 at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764) at 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
 at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3616) at 
org.apache.spark.sql.Dataset.<init>(Dataset.scala:229) at 
org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:100) at 
org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764) at 
org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:97) at 
org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:607) at 
org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764) at 
org.apache.spark.sql.SparkSession.sql(SparkSession.scala:602) at 
org.apache.spark.sql.SQLContext.sql(SQLContext.scala:650) at 
org.apache.spark.sql.hive.thriftserver.SparkSQLDriver.run(SparkSQLDriver.scala:63)
 at 
org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.processCmd(SparkSQLCLIDriver.scala:377)
 at 
org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.$anonfun$processLine$1(SparkSQLCLIDriver.scala:496)
 at 
org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.$anonfun$processLine$1$adapted(SparkSQLCLIDriver.scala:490)
 at scala.collection.Iterator.foreach(Iterator.scala:941) at 
scala.collection.Iterator.foreach$(Iterator.scala:941) at 
scala.collection.AbstractIterator.foreach(Iterator.scala:1429) at 
scala.collection.IterableLike.foreach(IterableLike.scala:74) at 
scala.collection.IterableLike.foreach$(IterableLike.scala:73) at 
scala.collection.AbstractIterable.foreach(Iterable.scala:56) at 
org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.processLine(SparkSQLCLIDriver.scala:490)
 at 
org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver$.main(SparkSQLCLIDriver.scala:282)
 at 
org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.main(SparkSQLCLIDriver.scala)
 at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
Method) at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source) 
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown 
Source) at java.base/java.lang.reflect.Method.invoke(Unknown Source) at 
org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52) at 
org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:928)
 at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180) at 
org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203) at 
org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90) at 
org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1007) at 
org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1016) at 
org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)java.lang.IllegalArgumentException:
 Merge Key[id] is not Equal to the defined primary key[] in table h3 at 
org.apache.spark.sql.hudi.command.MergeIntoHoodieTableCommand.buildMergeIntoConfig(MergeIntoHoodieTableCommand.scala:425)
 at 
org.apache.spark.sql.hudi.command.MergeIntoHoodieTableCommand.run(MergeIntoHoodieTableCommand.scala:147)
 at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
 at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
 at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:79)
 at org.apache.spark.sql.Dataset.$anonfun$logicalPlan$1(Dataset.scala:229) at 
org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3618) at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100)
 at 
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
 at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87)
 at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764) at 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
 at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3616) at 
org.apache.spark.sql.Dataset.<init>(Dataset.scala:229) at 
org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:100) at 
org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764) at 
org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:97) at 
org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:607) at 
org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764) at 
org.apache.spark.sql.SparkSession.sql(SparkSession.scala:602) at 
org.apache.spark.sql.SQLContext.sql(SQLContext.scala:650) at 
org.apache.spark.sql.hive.thriftserver.SparkSQLDriver.run(SparkSQLDriver.scala:63)
 at 
org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.processCmd(SparkSQLCLIDriver.scala:377)
 at 
org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.$anonfun$processLine$1(SparkSQLCLIDriver.scala:496)
 at 
org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.$anonfun$processLine$1$adapted(SparkSQLCLIDriver.scala:490)
 at scala.collection.Iterator.foreach(Iterator.scala:941) at 
scala.collection.Iterator.foreach$(Iterator.scala:941) at 
scala.collection.AbstractIterator.foreach(Iterator.scala:1429) at 
scala.collection.IterableLike.foreach(IterableLike.scala:74) at 
scala.collection.IterableLike.foreach$(IterableLike.scala:73) at 
scala.collection.AbstractIterable.foreach(Iterable.scala:56) at 
org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.processLine(SparkSQLCLIDriver.scala:490)
 at 
org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver$.main(SparkSQLCLIDriver.scala:282)
 at 
org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.main(SparkSQLCLIDriver.scala)
 at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
Method) at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source) 
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown 
Source) at java.base/java.lang.reflect.Method.invoke(Unknown Source) at 
org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52) at 
org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:928)
 at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180) at 
org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203) at 
org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90) at 
org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1007) at 
org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1016) at 
org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala){code}
hoodie.properties for not working use-case (CTAS table):
{code:java}
➜  analytics.db git:(apache_hudi_support) cat h4/.hoodie/hoodie.properties
#Properties saved on Wed Aug 25 04:09:37 UTC 2021
#Wed Aug 25 04:09:37 UTC 2021
hoodie.table.name=h4
hoodie.table.recordkey.fields=id
hoodie.table.type=COPY_ON_WRITE
hoodie.table.precombine.field=ts
hoodie.table.partition.fields=
hoodie.archivelog.folder=archived
hoodie.populate.meta.fields=true
hoodie.table.keygenerator.class=org.apache.spark.sql.hudi.command.SqlKeyGenerator
hoodie.table.base.file.format=PARQUET
hoodie.timeline.layout.version=1
hoodie.table.version=1{code}
This is blocker for dbt integration 
https://issues.apache.org/jira/browse/HUDI-2319

Please try to fix it as part of the 0.9.0 release so that the dbt integration 
can be unblocked.

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to