[GitHub] [hudi] chandu-1101 commented on issue #9141: [SUPPORT] Example from Hudi Quick start doesnt work!
chandu-1101 commented on issue #9141: URL: https://github.com/apache/hudi/issues/9141#issuecomment-1643139279 The fix is to change the below ``` "addressLines": [null], ``` to ``` "addressLines": [""], ``` in the source JSON. code to reproduce the issue. ``` val df1 = spark.read.json(Seq(json1).toDS) import org.apache.spark.sql.{Column, DataFrame} import org.apache.commons.lang3.ClassUtils.getCanonicalName import org.apache.hudi.bootstrap.SparkParquetBootstrapDataProvider import org.apache.hudi.{DataSourceWriteOptions, QuickstartUtils} import org.apache.hudi.common.model.{HoodieAvroPayload, HoodieFileFormat, WriteOperationType} import org.apache.hudi.common.table.HoodieTableConfig import org.apache.hudi.config.{HoodieBootstrapConfig, HoodieWriteConfig} import org.apache.hudi.keygen.constant.KeyGeneratorOptions import java.util import org.apache.hudi.config.HoodieWriteConfig.TBL_NAME import org.apache.hudi.keygen.{NonpartitionedKeyGenerator, SimpleKeyGenerator} import org.apache.spark.sql.SaveMode import org.apache.spark.sql.functions.{col, hash, lit} import org.apache.hudi.QuickstartUtils._ df1.write.format("hudi") .options(getQuickstartWriteConfigs) .option("hoodie.datasource.hive_sync.partition_extractor_class","org.apache.hudi.hive.NonPartitionedExtractor") .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "cdc_pk") .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "_id.oid") .option(HoodieWriteConfig.TABLE_NAME, "GE11") .mode(SaveMode.Overwrite) .save("s3://bucket/snapshots-hudi/ge11-drop/snapshot1"); ``` The record that doesnt work ``` { "_id": { "oid": "1" }, "cdc_pk": "45", "addressLogs": [{ "createdDate": "2021-09-06T17:17:41.576Z", "fieldId": "eb4b6bd9-1fc0-4d38-b2d4-4cba87bb65a4", "isDerived": false, "location": "hyderabad (HC) PL", "original": { "location": "hyderabad (HC) PL" }, "source": "p2", "standardized": false, "updatedDate": "2023-06-29T20:44:26.788Z" }, { "addressLines": [null], "city": "hyderabad", "country": "Srilanka", "createdDate": "2023-06-29T20:44:26.788Z", "fieldId": "1beefa35-7d08-4ca7-9fe1-88e59abb4c89", "isDerived": false, "location": "hyderabad, Srilanka", "original": { "city": "hyderabad", "country": "Srilanka", "location": "hyderabad, Srilanka" }, "residentialType": "HOME", "source": "p2", "standardized": false, "updatedDate": "2023-06-29T20:44:26.788Z" }, { "addressLines": ["raddison,Business Park,cly B-"], "city": "hyderabad", "country": "Srilanka", "createdDate": "2023-06-29T20:44:26.788Z", "fieldId": "42720793-1920-4a35-9e3e-23f91e00341e", "isDerived": false, "location": "hyderabad, TN, Srilanka", "original": { "city": "hyderabad", "country": "Srilanka", "location": "hyderabad, TN, Srilanka", "state": "TN" }, "residentialType": "WORK", "source": "p2", "standardized": false, "state": "TN", "updatedDate": "2023-06-29T20:44:26.788Z", "zipCode": "02-583" }, { "addressLines": [null], "city": "hyderabad", "country": "Srilanka", "createdDate": "2023-06-28T19:48:31.948Z", "fieldId": "7b56cdae-fbbc-4dd4-996e-6214b590db4a", "isDerived": false, "location": "hyderabad, Srilanka", "original": { "city": "hyderabad", "country": "Srilanka", "location": "hyderabad, Srilanka" }, "residentialType": "HOME", "source": "p2", "standardized": false, "updatedDate": "2023-06-28T19:48:31.948Z" }, { "addressLines": ["raddison,Business Park,cly B-"], "city": "hyderabad", "country": "Srilanka", "createdDate": "2023-06-28T19:48:31.948Z", "fieldId": "27e67381-c688-4879-a0a4-319ae051dca8", "isDerived"
[GitHub] [hudi] chandu-1101 commented on issue #9141: [SUPPORT] Example from Hudi Quick start doesnt work!
chandu-1101 commented on issue #9141: URL: https://github.com/apache/hudi/issues/9141#issuecomment-1641941681 Hi, I found the root cause. The data I had nested Jsons of arrays with null values. And when this is the case the write to hudi fails. So, given that I have to merge some 500 collections across, now I should write code to recursively traverse every cell of the data frame and remove nulls or substitute with empty/ default values --which is painful. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] chandu-1101 commented on issue #9141: [SUPPORT] Example from Hudi Quick start doesnt work!
chandu-1101 commented on issue #9141: URL: https://github.com/apache/hudi/issues/9141#issuecomment-1639955458 To eliminate the column case sensitiveness issue I renamed all columns to static strings with index-es --Except for ` _id.oid` , `cdc_pk` , and `addressLog` . Also, I printed the schema of `addressLog` and made sure none of the columns repeat. Yet, again the same excption. NOTE: note that I am running on AWS EMR with only ganglia, Spark selected (no hive/ glue/ hudi are selected) addressLog column schema ``` |-- addressLogs: array (nullable = true) ||-- element: struct (containsNull = true) |||-- addressLines: array (nullable = true) ||||-- element: string (containsNull = true) |||-- city: string (nullable = true) |||-- country: string (nullable = true) |||-- createdDate: string (nullable = true) |||-- fieldId: string (nullable = true) |||-- isDerived: boolean (nullable = true) |||-- latLong: string (nullable = true) |||-- location: string (nullable = true) |||-- locationIp: struct (nullable = true) ||||-- city: string (nullable = true) ||||-- continentCode: string (nullable = true) ||||-- continentName: string (nullable = true) ||||-- country: string (nullable = true) ||||-- countryIsoCode: string (nullable = true) ||||-- latitude: string (nullable = true) ||||-- longitude: string (nullable = true) ||||-- postalCode: string (nullable = true) ||||-- registeredCountry: string (nullable = true) ||||-- registeredCountryIsoCode: string (nullable = true) ||||-- subDivisions: string (nullable = true) ||||-- subDivisionsIsoCode: string (nullable = true) ||||-- timeZone: string (nullable = true) |||-- original: struct (nullable = true) ||||-- city: string (nullable = true) ||||-- country: string (nullable = true) ||||-- location: string (nullable = true) ||||-- state: string (nullable = true) |||-- residentialType: string (nullable = true) |||-- source: string (nullable = true) |||-- sourceType: string (nullable = true) |||-- standardized: boolean (nullable = true) |||-- standardizedDate: string (nullable = true) |||-- state: string (nullable = true) |||-- updatedDate: string (nullable = true) |||-- zipCode: string (nullable = true) ``` final code ``` import org.apache.spark.sql.{Column, DataFrame} import com.phenom.messagingv2.common.Application import com.phenom.messagingv2.utils.SparkUtils import org.apache.commons.lang3.ClassUtils.getCanonicalName import org.apache.hudi.bootstrap.SparkParquetBootstrapDataProvider import org.apache.hudi.{DataSourceWriteOptions, QuickstartUtils} import org.apache.hudi.common.model.{HoodieAvroPayload, HoodieFileFormat, WriteOperationType} import org.apache.hudi.common.table.HoodieTableConfig import org.apache.hudi.config.{HoodieBootstrapConfig, HoodieWriteConfig} import org.apache.hudi.keygen.constant.KeyGeneratorOptions import java.util import org.apache.hudi.config.HoodieWriteConfig.TBL_NAME import org.apache.hudi.keygen.{NonpartitionedKeyGenerator, SimpleKeyGenerator} import org.apache.spark.sql.SaveMode import org.apache.spark.sql.functions.{col, hash, lit} import org.apache.hudi.QuickstartUtils._ def renameColumnsWithIndex(df: DataFrame): DataFrame = { var df2 = df; val newColumnNames = (0 to df.columns.length - 1).map(i => s"_index_$i") println(newColumnNames) df.columns.zip(newColumnNames).map { case (oldName, newName) => if("cdc_pk".equals(oldName) || oldName.contains("_id") || oldName.contains("oid") || oldName.contains("addressLogs") ) { // do nothing }else{ println(oldName+" -> "+ newName); df2 = df2.withColumnRenamed(oldName, newName) } }.foldLeft(df2)((acc, df2) => acc) } val sess = spark; val snapshotDf = sess.read.parquet("s3://bucket/snapshots-test/ge11-drop/") snapshotDf.cache() snapshotDf.registerTempTable("snapshot") // 472 is the culprit val snapshotDf2 = renameColumnsWithIndex(sess.sql("select * from snapshot order by _id.oid limit 472 ") ) snapshotDf2.registerTempTable("snapshot2") // val snapshotDf2 = sess.sql("select _id, cdc_pk,eventId, additionalConfig, additionalFields, additionalRequestInfo, address, addressLogs, alertData,alertUrl
[GitHub] [hudi] chandu-1101 commented on issue #9141: [SUPPORT] Example from Hudi Quick start doesnt work!
chandu-1101 commented on issue #9141: URL: https://github.com/apache/hudi/issues/9141#issuecomment-1639855549 Hi, @ad1happy2go I get the same exception again. Below are the 4 variants I tried. spark shell command: With -jars taking my custom jar. --package referring to hudi.jar ``` spark-shell --driver-memory 1g --executor-memory 4g --executor-cores 1 --driver-cores 1 --conf spark.dynamicAllocation.maxExecutors=2 --conf "spark.serializer=org.apache.spark.serializer.KryoSerializer" --conf "spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension" --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog" --conf "spark.sql.caseSensitive=true" --conf "spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension" --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog" --conf spark.sql.legacy.parquet.int96RebaseModeInRead=CORRECTED --conf spark.sql.legacy.parquet.int96RebaseModeInWrite=CORRECTED --conf spark.sql.legacy.parquet.datetimeRebaseModeInRead=CORRECTED --conf spark.sql.legacy.parquet.datetimeRebaseModeInWrite=CORRECTED --name ravic --packages org.apache.hudi:hudi-spark3.3-bundle_2.12:0.13.1--jars /home/hadoop/jars2/spark-1.0-SNAPSHOT.jar ``` spark shell command: With -jars taking my custom jar and hudi.jar ``` spark-shell --driver-memory 1g --executor-memory 4g --executor-cores 1 --driver-cores 1 --conf spark.dynamicAllocation.maxExecutors=2 --conf "spark.serializer=org.apache.spark.serializer.KryoSerializer" --conf "spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension" --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog" --conf "spark.sql.caseSensitive=true" --conf "spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension" --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog" --conf spark.sql.legacy.parquet.int96RebaseModeInRead=CORRECTED --conf spark.sql.legacy.parquet.int96RebaseModeInWrite=CORRECTED --conf spark.sql.legacy.parquet.datetimeRebaseModeInRead=CORRECTED --conf spark.sql.legacy.parquet.datetimeRebaseModeInWrite=CORRECTED --name ravic --jars /home/hadoop/jars2/spark-1.0-SNAPSHOT.jar,/home/hadoop/hudi/hudi-release-0.12.3/packaging/hudi-spark-bundle/target/hudi -spark3.3-bundle_2.12-0.12.3.jar ``` spark shell command: With -jars taking hudi.jar (no custom jar) ``` spark-shell --driver-memory 1g --executor-memory 4g --executor-cores 1 --driver-cores 1 --conf spark.dynamicAllocation.maxExecutors=2 --conf "spark.serializer=org.apache.spark.serializer.KryoSerializer" --conf "spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension" --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog" --conf "spark.sql.caseSensitive=true" --conf "spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension" --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog" --conf spark.sql.legacy.parquet.int96RebaseModeInRead=CORRECTED --conf spark.sql.legacy.parquet.int96RebaseModeInWrite=CORRECTED --conf spark.sql.legacy.parquet.datetimeRebaseModeInRead=CORRECTED --conf spark.sql.legacy.parquet.datetimeRebaseModeInWrite=CORRECTED --name ravic --jars /home/hadoop/hudi/hudi-release-0.12.3/packaging/hudi-spark-bundle/target/hudi-spark3.3-bundle_2.12-0.12.3.jar ``` spark shell command: With only --package switch (no custom jar, no hudi jar) ``` spark-shell --driver-memory 1g --executor-memory 4g --executor-cores 1 --driver-cores 1 --conf spark.dynamicAllocation.maxExecutors=2 --conf "spark.serializer=org.apache.spark.serializer.KryoSerializer" --conf "spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension" --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog" --conf "spark.sql.caseSensitive=true" --conf "spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension" --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog" --conf spark.sql.legacy.parquet.int96RebaseModeInRead=CORRECTED --conf spark.sql.legacy.parquet.int96RebaseModeInWrite=CORRECTED --conf spark.sql.legacy.parquet.datetimeRebaseModeInRead=CORRECTED --conf spark.sql.legacy.parquet.datetimeRebaseModeInWrite=CORRECTED --name ravic --packages org.apache.hudi:hudi-spark3.3-bundle_2.12:0.13.1 ``` WARNS I get when I run the code (same above code. exactly same.) ``` 07-18 08:58:36 ${sys:config.appname} WARN HoodieSparkSqlWriter$: hoodie table at s3://bucket/snapshots-hudi/ge11-drop/snapshot already exists. Deleting existing data & overwriting with new data. 07-18 08:58:38 ${sys:config.appname} WARN SQLConf: The SQL config 'spark.sql.legacy.parquet.int96Reba
[GitHub] [hudi] chandu-1101 commented on issue #9141: [SUPPORT] Example from Hudi Quick start doesnt work!
chandu-1101 commented on issue #9141: URL: https://github.com/apache/hudi/issues/9141#issuecomment-1632104122 I think i am able to narrow down the issue to an extent. 1. I tried adding column by column, one by one to the select query and re ran the insert. At one column things broke. 2. Then i adjusted the `limit X` in the sql query (code below) ; and after several iterations i find when `limit 472` things break again! So something weird is happening with the data once the `471st` row is crossed. What is it? I see the Json schema of 471st, 472nd rows similar (https://www.diffchecker.com/QRbXbzd5/) What is strange to me is:: 1. As of now i am using spark-SQL (no hudi) to merge the snapshot (parquet file) with the cdc (json files) and since 1.5y this is going good --meaning existing schema is still intact. 2. Then, why does it fail with Hudi? with an error? 3. One more: if i change the code to add bulk insert, then this insert works (where is the code? previous posts, pl check). But the subsequent merge fails --why? i don't know! - I am wondering how the companies are using Hudi - or if CDC merging (meaning CDC dataframe merging with snapshot dataframe in spark) is the wrong usecase to hudi? - or should the data follow some pattern internally? for things to work? --I have ~5000 db collections to which merge has to be run. I am thinking what can be done if i encounter the exceptions i get, in production? spark-shell command ``` spark-shell --driver-memory 1g --executor-memory 4g --executor-cores 1 --driver-cores 1 --conf spark.dynamicAllocation.maxExecutors=2 --conf "spark.serializer=org.apache.spark.serializer.KryoSerializer" --conf "spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension" --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog" --conf "spark.sql.caseSensitive=true" --conf "spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension" --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog" --conf spark.sql.legacy.parquet.int96RebaseModeInRead=CORRECTED --conf spark.sql.legacy.parquet.int96RebaseModeInWrite=CORRECTED --conf spark.sql.legacy.parquet.datetimeRebaseModeInRead=CORRECTED --conf spark.sql.legacy.parquet.datetimeRebaseModeInWrite=CORRECTED --name ravic --packages org.apache.hudi:hudi-spark3.3-bundle_2.12:0.13.1--jars /home/hadoop/jars2/spark-1.0-SNAPSHOT.jar,/home/hadoop/hudi /hudi-release-0.12.3/packaging/hudi-spark-bundle/target/hudi-spark3.3-bundle_2.12-0.12.3.jar ``` Code ``` import org.apache.commons.lang3.ClassUtils.getCanonicalName import org.apache.hudi.bootstrap.SparkParquetBootstrapDataProvider import org.apache.hudi.{DataSourceWriteOptions, QuickstartUtils} import org.apache.hudi.common.model.{HoodieAvroPayload, HoodieFileFormat, WriteOperationType} import org.apache.hudi.common.table.HoodieTableConfig import org.apache.hudi.config.{HoodieBootstrapConfig, HoodieWriteConfig} import org.apache.hudi.keygen.constant.KeyGeneratorOptions import java.util import org.apache.hudi.config.HoodieWriteConfig.TBL_NAME import org.apache.hudi.keygen.{NonpartitionedKeyGenerator, SimpleKeyGenerator} import org.apache.spark.sql.SaveMode import org.apache.spark.sql.functions.{col, hash, lit} import org.apache.hudi.QuickstartUtils._ val sess = spark; val snapshotDf = sess.read.parquet("s3://p-crm-messaging-v2/snapshots-test/ge11-drop/") snapshotDf.cache() snapshotDf.registerTempTable("snapshot") // 472 is the culprit val snapshotDf2 = sess.sql("select * from snapshot order by _id.oid limit 472 ") snapshotDf2.registerTempTable("snapshot2") val snapshotDf3 = sess.sql("select _id, cdc_pk, addressLogs from snapshot2 ") snapshotDf3.write.format("hudi") .options(getQuickstartWriteConfigs) .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "cdc_pk") .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "_id.oid") .option(HoodieWriteConfig.TABLE_NAME, "GE11") .mode(SaveMode.Overwrite) .save("s3://p-crm-messaging-v2/snapshots-hudi/ge11-drop/snapshot"); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] chandu-1101 commented on issue #9141: [SUPPORT] Example from Hudi Quick start doesnt work!
chandu-1101 commented on issue #9141: URL: https://github.com/apache/hudi/issues/9141#issuecomment-1631095205 When i change the scala code to the below (exactly like said in the quick start) the insert itself fails. ``` val sess = Application.spark(); val snapshotDf = sess.read.parquet("s3://bucket/snapshots-test/ge11-drop/") val cdcSchema1 = SparkUtils.getSchema("s3://bucket/schemas/GE11GLOBAL_candidates-CandidatesList.json") val cdcDf = sess.read.schema(cdcSchema1).json("s3://bucket/inputs-test/ge11-drop/*") snapshotDf.createOrReplaceTempView("snapshot") val snapshotDf2 = snapshotDf.limit(4).withColumn("cdc_pk",lit("0")) snapshotDf2.write.format("hudi") .options(getQuickstartWriteConfigs) .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "cdc_pk") .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "_id.oid") .option(HoodieWriteConfig.TABLE_NAME,"GE11") .mode(SaveMode.Overwrite) .save("s3://bucket/snapshots-hudi/ge11-drop/snapshot"); ``` i start the spark shell as follows ``` spark-shell --driver-memory 1g --executor-memory 4g --executor-cores 1 --driver-cores 1 --conf spark.dynamicAllocation.maxExecutors=2 --conf "spark.serializer=org.apache.spark.serializer.KryoSerializer" --conf "spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension" --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog" --conf "spark.kryo.registrator=org.apache.spark.HoodieSparkKryoRegistrar" --conf spark.sql.legacy.parquet.int96RebaseModeInRead=CORRECTED --conf spark.sql.legacy.parquet.int96RebaseModeInWrite=CORRECTED --conf spark.sql.legacy.parquet.datetimeRebaseModeInRead=CORRECTED --conf spark.sql.legacy.parquet.datetimeRebaseModeInWrite=CORRECTED --name ravic --packages org.apache.hudi:hudi-spark3.3-bundle_2.12:0.13.1--jars /home/hadoop/jars2/spark-1.0-SNAPSHOT.jar,/home/hadoop/hudi/hudi-release-0.12.3/packaging/hudi-spark-bundle/target/hudi-spark3.3-bundle_2.12-0.12.3.jar ``` Exception (I am unable to get the basic insert working) ``` 07-11 15:58:17 ${sys:config.appname} WARN DAGScheduler: Broadcasting large task binary with size 1032.2 KiB 07-11 15:58:18 ${sys:config.appname} ERROR HoodieSparkSqlWriter$: UPSERT failed with errors org.apache.hudi.exception.HoodieException: Write to Hudi failed at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:148) at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73) at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84) at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:103) at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107) at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:224) at org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:114) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$7(SQLExecution.scala:139) at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107) at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:224) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:139) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:245) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:138) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:68) at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:100) at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:96) at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:615) at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:177) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:615) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30) at org.apache.spark.sql.catalyst.plans.logic
[GitHub] [hudi] chandu-1101 commented on issue #9141: [SUPPORT] Example from Hudi Quick start doesnt work!
chandu-1101 commented on issue #9141: URL: https://github.com/apache/hudi/issues/9141#issuecomment-1630735240 Hi, After trimming down the data files etc the issue is still reproducable. 1. parquet file has complex data structures ; during the bulk insert we are inserting only 4 rows into hudi table 2. same with CDC (schema file below) ; during merge CDC has only 10 rows 3. I am getting error (below) during the merge of CDC data into the hudi table 4. ran on spark on EMR code ``` val sess = Application.spark(); /* get snapshot df; cdc df */ val snapshotDf = sess.read.parquet("s3://bucket/snapshots-test/ge11-drop/") val cdcSchema1 = SparkUtils.getSchema("s3://bucket/schemas/ge11-schema.json") val cdcDf = sess.read.schema(cdcSchema1).json("s3://bucket/inputs-test/ge11-drop/*") /* done */ /* merge them */ snapshotDf.createOrReplaceTempView("snapshot") val snapshotDf2 = snapshotDf.limit(4).withColumn("cdc_pk",lit("0")) snapshotDf2.write.format("hudi") .options(getQuickstartWriteConfigs) .option(DataSourceWriteOptions.OPERATION.key(), WriteOperationType.BULK_INSERT.name()) .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "cdc_pk") .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "_id.oid") .option(HoodieWriteConfig.TABLE_NAME,"GE11") .option(HoodieTableConfig.BASE_FILE_FORMAT.key(), HoodieFileFormat.PARQUET.name()) .option(HoodieBootstrapConfig.BASE_PATH.key(), "s3://bucket/snapshots-hudi/ge11-drop/snapshot") .option("hoodie.datasource.write.table.type","COPY_ON_WRITE") .mode(SaveMode.Overwrite) .save("s3://bucket/snapshots-hudi/ge11-drop/snapshot"); cdcDf.createOrReplaceTempView("cdc") val _cdcDf = sess.sql("select * from cdc where _id.oid is not null and _id.oid !='' limit 10 ") _cdcDf.createOrReplaceTempView("_cdc"); _cdcDf.write.format("hudi") .option(HoodieWriteConfig.PRECOMBINE_FIELD_NAME.key(), "cdc_pk") .option(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(), "_id.oid") .option("hoodie.datasource.write.operation", "upsert") .option(TBL_NAME.key(), "GE11") .mode(SaveMode.Append) .save("s3://bucket/snapshots-hudi/ge11-drop/snapshot"); // << ERROR here /* done */ ``` ERROR ``` Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2863) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2799) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2798) at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2798) at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1239) at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1239) at scala.Option.foreach(Option.scala:407) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1239) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3051) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2993) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2982) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:1009) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2229) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2250) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2269) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2294) at org.apache.spark.rdd.RDD.count(RDD.scala:1274) at org.apache.hudi.HoodieSparkSqlWriter$.commitAndPerformPostOperations(HoodieSparkSqlWriter.scala:721) at org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:350) at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:145) at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73) at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84) at org.apache.s
[GitHub] [hudi] chandu-1101 commented on issue #9141: [SUPPORT] Example from Hudi Quick start doesnt work!
chandu-1101 commented on issue #9141: URL: https://github.com/apache/hudi/issues/9141#issuecomment-1630119428 @ ad1happy2go thank you for the reply. I will check these and get back. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org