[GitHub] [hudi] chandu-1101 commented on issue #9141: [SUPPORT] Example from Hudi Quick start doesnt work!

2023-07-19 Thread via GitHub


chandu-1101 commented on issue #9141:
URL: https://github.com/apache/hudi/issues/9141#issuecomment-1643139279

   
   The fix is to change the below 
   
   ```
   "addressLines": [null],
   ```
   to
   
   ```
   "addressLines": [""],
   ```
   in the source JSON. 
   
   code to reproduce the issue. 
   ```
   val df1 = spark.read.json(Seq(json1).toDS)
   import org.apache.spark.sql.{Column, DataFrame}
   import org.apache.commons.lang3.ClassUtils.getCanonicalName
   import org.apache.hudi.bootstrap.SparkParquetBootstrapDataProvider
   import org.apache.hudi.{DataSourceWriteOptions, QuickstartUtils}
   import org.apache.hudi.common.model.{HoodieAvroPayload, 
HoodieFileFormat, WriteOperationType}
   import org.apache.hudi.common.table.HoodieTableConfig
   import org.apache.hudi.config.{HoodieBootstrapConfig, HoodieWriteConfig}
   import org.apache.hudi.keygen.constant.KeyGeneratorOptions
   
   import java.util
   import org.apache.hudi.config.HoodieWriteConfig.TBL_NAME
   import org.apache.hudi.keygen.{NonpartitionedKeyGenerator, 
SimpleKeyGenerator}
   import org.apache.spark.sql.SaveMode
   import org.apache.spark.sql.functions.{col, hash, lit}
   import org.apache.hudi.QuickstartUtils._
   
   df1.write.format("hudi")
 .options(getQuickstartWriteConfigs)
 
.option("hoodie.datasource.hive_sync.partition_extractor_class","org.apache.hudi.hive.NonPartitionedExtractor")
 .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "cdc_pk")
 .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "_id.oid")
 .option(HoodieWriteConfig.TABLE_NAME, "GE11")
 .mode(SaveMode.Overwrite)
 .save("s3://bucket/snapshots-hudi/ge11-drop/snapshot1");
   ```
   
   
   
   
   
   
   The record that doesnt work
   ```
   {
"_id": {
"oid": "1"
},
"cdc_pk": "45",
"addressLogs": [{
"createdDate": "2021-09-06T17:17:41.576Z",
"fieldId": "eb4b6bd9-1fc0-4d38-b2d4-4cba87bb65a4",
"isDerived": false,
"location": "hyderabad (HC) PL",
"original": {
"location": "hyderabad (HC) PL"
},
"source": "p2",
"standardized": false,
"updatedDate": "2023-06-29T20:44:26.788Z"
}, {
"addressLines": [null],
"city": "hyderabad",
"country": "Srilanka",
"createdDate": "2023-06-29T20:44:26.788Z",
"fieldId": "1beefa35-7d08-4ca7-9fe1-88e59abb4c89",
"isDerived": false,
"location": "hyderabad, Srilanka",
"original": {
"city": "hyderabad",
"country": "Srilanka",
"location": "hyderabad, Srilanka"
},
"residentialType": "HOME",
"source": "p2",
"standardized": false,
"updatedDate": "2023-06-29T20:44:26.788Z"
}, {
"addressLines": ["raddison,Business Park,cly B-"],
"city": "hyderabad",
"country": "Srilanka",
"createdDate": "2023-06-29T20:44:26.788Z",
"fieldId": "42720793-1920-4a35-9e3e-23f91e00341e",
"isDerived": false,
"location": "hyderabad, TN, Srilanka",
"original": {
"city": "hyderabad",
"country": "Srilanka",
"location": "hyderabad, TN, Srilanka",
"state": "TN"
},
"residentialType": "WORK",
"source": "p2",
"standardized": false,
"state": "TN",
"updatedDate": "2023-06-29T20:44:26.788Z",
"zipCode": "02-583"
}, {
"addressLines": [null],
"city": "hyderabad",
"country": "Srilanka",
"createdDate": "2023-06-28T19:48:31.948Z",
"fieldId": "7b56cdae-fbbc-4dd4-996e-6214b590db4a",
"isDerived": false,
"location": "hyderabad, Srilanka",
"original": {
"city": "hyderabad",
"country": "Srilanka",
"location": "hyderabad, Srilanka"
},
"residentialType": "HOME",
"source": "p2",
"standardized": false,
"updatedDate": "2023-06-28T19:48:31.948Z"
}, {
"addressLines": ["raddison,Business Park,cly B-"],
"city": "hyderabad",
"country": "Srilanka",
"createdDate": "2023-06-28T19:48:31.948Z",
"fieldId": "27e67381-c688-4879-a0a4-319ae051dca8",
"isDerived"

[GitHub] [hudi] chandu-1101 commented on issue #9141: [SUPPORT] Example from Hudi Quick start doesnt work!

2023-07-19 Thread via GitHub


chandu-1101 commented on issue #9141:
URL: https://github.com/apache/hudi/issues/9141#issuecomment-1641941681

   Hi,
   
   I found the root cause. The data I had nested Jsons of arrays with null 
values. And when this is the case the write to hudi fails. 
   
   So, given that I have to merge some 500 collections across, now I should 
write code to recursively traverse every cell  of the data frame and remove 
nulls or substitute with empty/ default values --which is painful. 
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [hudi] chandu-1101 commented on issue #9141: [SUPPORT] Example from Hudi Quick start doesnt work!

2023-07-18 Thread via GitHub


chandu-1101 commented on issue #9141:
URL: https://github.com/apache/hudi/issues/9141#issuecomment-1639955458

   To eliminate the column case sensitiveness issue I renamed all columns to 
static strings with index-es --Except for ` _id.oid` , `cdc_pk` , and 
`addressLog` . Also, I printed the schema of `addressLog` and made sure none of 
the columns repeat.  Yet, again the same excption.
   
   NOTE: note that I am running on AWS EMR with only ganglia, Spark selected 
(no hive/ glue/ hudi are selected)
   
   
   addressLog column schema
   ```
   |-- addressLogs: array (nullable = true)
||-- element: struct (containsNull = true)
|||-- addressLines: array (nullable = true)
||||-- element: string (containsNull = true)
|||-- city: string (nullable = true)
|||-- country: string (nullable = true)
|||-- createdDate: string (nullable = true)
|||-- fieldId: string (nullable = true)
|||-- isDerived: boolean (nullable = true)
|||-- latLong: string (nullable = true)
|||-- location: string (nullable = true)
|||-- locationIp: struct (nullable = true)
||||-- city: string (nullable = true)
||||-- continentCode: string (nullable = true)
||||-- continentName: string (nullable = true)
||||-- country: string (nullable = true)
||||-- countryIsoCode: string (nullable = true)
||||-- latitude: string (nullable = true)
||||-- longitude: string (nullable = true)
||||-- postalCode: string (nullable = true)
||||-- registeredCountry: string (nullable = true)
||||-- registeredCountryIsoCode: string (nullable = true)
||||-- subDivisions: string (nullable = true)
||||-- subDivisionsIsoCode: string (nullable = true)
||||-- timeZone: string (nullable = true)
|||-- original: struct (nullable = true)
||||-- city: string (nullable = true)
||||-- country: string (nullable = true)
||||-- location: string (nullable = true)
||||-- state: string (nullable = true)
|||-- residentialType: string (nullable = true)
|||-- source: string (nullable = true)
|||-- sourceType: string (nullable = true)
|||-- standardized: boolean (nullable = true)
|||-- standardizedDate: string (nullable = true)
|||-- state: string (nullable = true)
|||-- updatedDate: string (nullable = true)
|||-- zipCode: string (nullable = true)
   ```
   
   final code
   
   ```
   import org.apache.spark.sql.{Column, DataFrame}
   import com.phenom.messagingv2.common.Application
   import com.phenom.messagingv2.utils.SparkUtils
   import org.apache.commons.lang3.ClassUtils.getCanonicalName
   import org.apache.hudi.bootstrap.SparkParquetBootstrapDataProvider
   import org.apache.hudi.{DataSourceWriteOptions, QuickstartUtils}
   import org.apache.hudi.common.model.{HoodieAvroPayload, 
HoodieFileFormat, WriteOperationType}
   import org.apache.hudi.common.table.HoodieTableConfig
   import org.apache.hudi.config.{HoodieBootstrapConfig, HoodieWriteConfig}
   import org.apache.hudi.keygen.constant.KeyGeneratorOptions
   
   import java.util
   import org.apache.hudi.config.HoodieWriteConfig.TBL_NAME
   import org.apache.hudi.keygen.{NonpartitionedKeyGenerator, 
SimpleKeyGenerator}
   import org.apache.spark.sql.SaveMode
   import org.apache.spark.sql.functions.{col, hash, lit}
   import org.apache.hudi.QuickstartUtils._
   
   
   def renameColumnsWithIndex(df: DataFrame): DataFrame = {
 var df2 = df;
 val newColumnNames = (0 to df.columns.length - 1).map(i => 
s"_index_$i")
 println(newColumnNames)
 df.columns.zip(newColumnNames).map { 
   case (oldName, newName) => 
 if("cdc_pk".equals(oldName) || oldName.contains("_id") ||  
oldName.contains("oid") || oldName.contains("addressLogs")  ) { 
   // do nothing  
 }else{ 
   println(oldName+" -> "+ newName); 
   df2 = df2.withColumnRenamed(oldName, newName) 
 }
 }.foldLeft(df2)((acc, df2) => acc)
   }
   
   
   val sess = spark;
   val snapshotDf = 
sess.read.parquet("s3://bucket/snapshots-test/ge11-drop/")
   snapshotDf.cache()
   snapshotDf.registerTempTable("snapshot")
   
   // 472 is the culprit
   val snapshotDf2 = renameColumnsWithIndex(sess.sql("select * from 
snapshot order by _id.oid limit 472 ")  ) 
   snapshotDf2.registerTempTable("snapshot2") 
   // val snapshotDf2 = sess.sql("select _id, cdc_pk,eventId, 
additionalConfig, additionalFields, additionalRequestInfo, address, 
addressLogs, alertData,alertUrl

[GitHub] [hudi] chandu-1101 commented on issue #9141: [SUPPORT] Example from Hudi Quick start doesnt work!

2023-07-18 Thread via GitHub


chandu-1101 commented on issue #9141:
URL: https://github.com/apache/hudi/issues/9141#issuecomment-1639855549

   Hi,
   
@ad1happy2go I get the same exception again. Below are the 4 variants I 
tried. 
   
   spark shell command: With -jars taking my custom jar. --package referring to 
hudi.jar
   
   ```
   spark-shell --driver-memory 1g --executor-memory 4g --executor-cores 1 
--driver-cores 1 --conf spark.dynamicAllocation.maxExecutors=2 --conf 
"spark.serializer=org.apache.spark.serializer.KryoSerializer" --conf 
"spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension" 
--conf 
"spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog"
 --conf "spark.sql.caseSensitive=true"  --conf 
"spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension" 
--conf 
"spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog"
 --conf spark.sql.legacy.parquet.int96RebaseModeInRead=CORRECTED --conf 
spark.sql.legacy.parquet.int96RebaseModeInWrite=CORRECTED --conf 
spark.sql.legacy.parquet.datetimeRebaseModeInRead=CORRECTED --conf 
spark.sql.legacy.parquet.datetimeRebaseModeInWrite=CORRECTED --name ravic  
--packages org.apache.hudi:hudi-spark3.3-bundle_2.12:0.13.1--jars 
/home/hadoop/jars2/spark-1.0-SNAPSHOT.jar
   
   ```
   
   spark shell command: With -jars taking my custom jar and  hudi.jar
   
   ```
   spark-shell --driver-memory 1g --executor-memory 4g --executor-cores 1 
--driver-cores 1 --conf spark.dynamicAllocation.maxExecutors=2 --conf 
"spark.serializer=org.apache.spark.serializer.KryoSerializer" --conf 
"spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension" 
--conf 
"spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog"
 --conf "spark.sql.caseSensitive=true"  --conf 
"spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension" 
--conf 
"spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog"
 --conf spark.sql.legacy.parquet.int96RebaseModeInRead=CORRECTED --conf 
spark.sql.legacy.parquet.int96RebaseModeInWrite=CORRECTED --conf 
spark.sql.legacy.parquet.datetimeRebaseModeInRead=CORRECTED --conf 
spark.sql.legacy.parquet.datetimeRebaseModeInWrite=CORRECTED --name ravic 
--jars 
/home/hadoop/jars2/spark-1.0-SNAPSHOT.jar,/home/hadoop/hudi/hudi-release-0.12.3/packaging/hudi-spark-bundle/target/hudi
 -spark3.3-bundle_2.12-0.12.3.jar
   ```
   
   
   spark shell command: With -jars taking hudi.jar (no custom jar)
   
   ```
   spark-shell --driver-memory 1g --executor-memory 4g --executor-cores 1 
--driver-cores 1 --conf spark.dynamicAllocation.maxExecutors=2 --conf 
"spark.serializer=org.apache.spark.serializer.KryoSerializer" --conf 
"spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension" 
--conf 
"spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog"
 --conf "spark.sql.caseSensitive=true"  --conf 
"spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension" 
--conf 
"spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog"
 --conf spark.sql.legacy.parquet.int96RebaseModeInRead=CORRECTED --conf 
spark.sql.legacy.parquet.int96RebaseModeInWrite=CORRECTED --conf 
spark.sql.legacy.parquet.datetimeRebaseModeInRead=CORRECTED --conf 
spark.sql.legacy.parquet.datetimeRebaseModeInWrite=CORRECTED --name ravic 
--jars 
/home/hadoop/hudi/hudi-release-0.12.3/packaging/hudi-spark-bundle/target/hudi-spark3.3-bundle_2.12-0.12.3.jar
   ```
   
   spark shell command: With  only  --package switch (no custom jar, no hudi 
jar)
   
   ```
   spark-shell --driver-memory 1g --executor-memory 4g --executor-cores 1 
--driver-cores 1 --conf spark.dynamicAllocation.maxExecutors=2 --conf 
"spark.serializer=org.apache.spark.serializer.KryoSerializer" --conf 
"spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension" 
--conf 
"spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog"
 --conf "spark.sql.caseSensitive=true"  --conf 
"spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension" 
--conf 
"spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog"
 --conf spark.sql.legacy.parquet.int96RebaseModeInRead=CORRECTED --conf 
spark.sql.legacy.parquet.int96RebaseModeInWrite=CORRECTED --conf 
spark.sql.legacy.parquet.datetimeRebaseModeInRead=CORRECTED --conf 
spark.sql.legacy.parquet.datetimeRebaseModeInWrite=CORRECTED --name ravic 
--packages org.apache.hudi:hudi-spark3.3-bundle_2.12:0.13.1 
   ```
   
   
   WARNS I get  when I run the code (same above code. exactly same.)
   
   ```
   07-18 08:58:36 ${sys:config.appname} WARN HoodieSparkSqlWriter$: hoodie 
table at s3://bucket/snapshots-hudi/ge11-drop/snapshot already exists. Deleting 
existing data & overwriting with new data.
   07-18 08:58:38 ${sys:config.appname} WARN SQLConf: The SQL config 
'spark.sql.legacy.parquet.int96Reba

[GitHub] [hudi] chandu-1101 commented on issue #9141: [SUPPORT] Example from Hudi Quick start doesnt work!

2023-07-12 Thread via GitHub


chandu-1101 commented on issue #9141:
URL: https://github.com/apache/hudi/issues/9141#issuecomment-1632104122

   I think i am able to narrow down the issue to an extent. 
   
   1. I tried adding column by column, one by one to the select query and re 
ran the insert. At one column things broke. 
   2. Then i adjusted the `limit X` in the sql query (code below) ; and after 
several iterations i find when `limit 472` things break again! So something 
weird is happening with the data once the `471st` row is crossed. What is it? I 
see the Json schema of 471st, 472nd rows similar 
(https://www.diffchecker.com/QRbXbzd5/) 
   
   What is strange to me is:: 
   1. As of now i am using spark-SQL (no hudi) to merge the snapshot (parquet 
file) with the cdc (json files) and since 1.5y this is going good --meaning 
existing schema is still intact. 
   2. Then, why does it fail with Hudi? with an error?
   3. One more:  if i change the code to add bulk insert, then this insert 
works (where is the code? previous posts, pl check). But the subsequent merge 
fails --why? i don't know!
   
   
   - I am wondering how the companies are using Hudi 
   - or if CDC merging (meaning CDC dataframe merging with snapshot dataframe 
in spark) is the wrong usecase to hudi? 
   - or should the data follow some pattern internally? for things to work? --I 
have ~5000 db collections to which merge has to be run. I am thinking what can 
be done if i encounter the exceptions i get, in production?
   
   
   spark-shell command
   ```
   spark-shell --driver-memory 1g --executor-memory 4g --executor-cores 1 
--driver-cores 1 --conf spark.dynamicAllocation.maxExecutors=2 --conf 
"spark.serializer=org.apache.spark.serializer.KryoSerializer" --conf 
"spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension" 
--conf 
"spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog"
 --conf "spark.sql.caseSensitive=true"  --conf 
"spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension" 
--conf 
"spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog"
 --conf spark.sql.legacy.parquet.int96RebaseModeInRead=CORRECTED --conf 
spark.sql.legacy.parquet.int96RebaseModeInWrite=CORRECTED --conf 
spark.sql.legacy.parquet.datetimeRebaseModeInRead=CORRECTED --conf 
spark.sql.legacy.parquet.datetimeRebaseModeInWrite=CORRECTED --name ravic  
--packages org.apache.hudi:hudi-spark3.3-bundle_2.12:0.13.1--jars 
/home/hadoop/jars2/spark-1.0-SNAPSHOT.jar,/home/hadoop/hudi
 
/hudi-release-0.12.3/packaging/hudi-spark-bundle/target/hudi-spark3.3-bundle_2.12-0.12.3.jar
   
   ```
   
   
   Code
   ```
   import org.apache.commons.lang3.ClassUtils.getCanonicalName
   import org.apache.hudi.bootstrap.SparkParquetBootstrapDataProvider
   import org.apache.hudi.{DataSourceWriteOptions, QuickstartUtils}
   import org.apache.hudi.common.model.{HoodieAvroPayload, 
HoodieFileFormat, WriteOperationType}
   import org.apache.hudi.common.table.HoodieTableConfig
   import org.apache.hudi.config.{HoodieBootstrapConfig, HoodieWriteConfig}
   import org.apache.hudi.keygen.constant.KeyGeneratorOptions
   
   import java.util
   import org.apache.hudi.config.HoodieWriteConfig.TBL_NAME
   import org.apache.hudi.keygen.{NonpartitionedKeyGenerator, 
SimpleKeyGenerator}
   import org.apache.spark.sql.SaveMode
   import org.apache.spark.sql.functions.{col, hash, lit}
   import org.apache.hudi.QuickstartUtils._
   
   val sess = spark;
   val snapshotDf = 
sess.read.parquet("s3://p-crm-messaging-v2/snapshots-test/ge11-drop/")
   snapshotDf.cache()
   snapshotDf.registerTempTable("snapshot")
   
   // 472 is the culprit
   val snapshotDf2 = sess.sql("select * from snapshot order by _id.oid 
limit 472 ")   
   snapshotDf2.registerTempTable("snapshot2") 
   val snapshotDf3 = sess.sql("select _id, cdc_pk, addressLogs  from 
snapshot2 ")
   snapshotDf3.write.format("hudi")
 .options(getQuickstartWriteConfigs)
 .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "cdc_pk")
 .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "_id.oid")
 .option(HoodieWriteConfig.TABLE_NAME, "GE11")
 .mode(SaveMode.Overwrite)
 .save("s3://p-crm-messaging-v2/snapshots-hudi/ge11-drop/snapshot");
   ```
   
   
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [hudi] chandu-1101 commented on issue #9141: [SUPPORT] Example from Hudi Quick start doesnt work!

2023-07-11 Thread via GitHub


chandu-1101 commented on issue #9141:
URL: https://github.com/apache/hudi/issues/9141#issuecomment-1631095205

   When i change the scala code to the below (exactly like said in the quick 
start) the insert itself fails. 
   
   ```
val sess = Application.spark();
 val snapshotDf = 
sess.read.parquet("s3://bucket/snapshots-test/ge11-drop/")
 val cdcSchema1 = 
SparkUtils.getSchema("s3://bucket/schemas/GE11GLOBAL_candidates-CandidatesList.json")
 val cdcDf = 
sess.read.schema(cdcSchema1).json("s3://bucket/inputs-test/ge11-drop/*")
  
   snapshotDf.createOrReplaceTempView("snapshot")
   val snapshotDf2 = snapshotDf.limit(4).withColumn("cdc_pk",lit("0"))
   
   snapshotDf2.write.format("hudi")
 .options(getQuickstartWriteConfigs)
 .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "cdc_pk")
 .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "_id.oid")
 .option(HoodieWriteConfig.TABLE_NAME,"GE11")
 .mode(SaveMode.Overwrite)
 .save("s3://bucket/snapshots-hudi/ge11-drop/snapshot");
   ```
   
   i start the spark shell as follows
   
   ```
   spark-shell --driver-memory 1g --executor-memory 4g --executor-cores 1 
--driver-cores 1 --conf spark.dynamicAllocation.maxExecutors=2  --conf 
"spark.serializer=org.apache.spark.serializer.KryoSerializer" --conf 
"spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension" 
--conf 
"spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog"
 --conf "spark.kryo.registrator=org.apache.spark.HoodieSparkKryoRegistrar" 
--conf spark.sql.legacy.parquet.int96RebaseModeInRead=CORRECTED --conf 
spark.sql.legacy.parquet.int96RebaseModeInWrite=CORRECTED --conf 
spark.sql.legacy.parquet.datetimeRebaseModeInRead=CORRECTED --conf 
spark.sql.legacy.parquet.datetimeRebaseModeInWrite=CORRECTED --name ravic  
--packages org.apache.hudi:hudi-spark3.3-bundle_2.12:0.13.1--jars 
/home/hadoop/jars2/spark-1.0-SNAPSHOT.jar,/home/hadoop/hudi/hudi-release-0.12.3/packaging/hudi-spark-bundle/target/hudi-spark3.3-bundle_2.12-0.12.3.jar
   
   ```
   
   Exception (I am unable to get the basic insert working) 
   ```
   07-11 15:58:17 ${sys:config.appname} WARN DAGScheduler: Broadcasting large 
task binary with size 1032.2 KiB
   07-11 15:58:18 ${sys:config.appname} ERROR HoodieSparkSqlWriter$: UPSERT 
failed with errors
   org.apache.hudi.exception.HoodieException: Write to Hudi failed
 at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:148)
 at 
org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
 at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
 at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
 at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
 at 
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:103)
 at 
org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
 at 
org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:224)
 at 
org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:114)
 at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$7(SQLExecution.scala:139)
 at 
org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
 at 
org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:224)
 at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:139)
 at 
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:245)
 at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:138)
 at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
 at 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:68)
 at 
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:100)
 at 
org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:96)
 at 
org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:615)
 at 
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:177)
 at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:615)
 at 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
 at 
org.apache.spark.sql.catalyst.plans.logic

[GitHub] [hudi] chandu-1101 commented on issue #9141: [SUPPORT] Example from Hudi Quick start doesnt work!

2023-07-11 Thread via GitHub


chandu-1101 commented on issue #9141:
URL: https://github.com/apache/hudi/issues/9141#issuecomment-1630735240

   Hi,
   
   After trimming down the data files etc the issue is still reproducable. 
   
   1. parquet file has complex data structures ; during the bulk insert we are 
inserting only 4 rows into hudi table 
   2. same with CDC (schema file below) ;  during merge CDC has only 10 rows
   3. I am getting error (below) during the merge of CDC data into the hudi 
table
   4. ran on spark on EMR
   
   code
   ```
 val sess = Application.spark();
   /* get snapshot df; cdc df  */
 val snapshotDf = 
sess.read.parquet("s3://bucket/snapshots-test/ge11-drop/")
 val cdcSchema1 = 
SparkUtils.getSchema("s3://bucket/schemas/ge11-schema.json")
 val cdcDf = 
sess.read.schema(cdcSchema1).json("s3://bucket/inputs-test/ge11-drop/*")
   /* done */
   
   /* merge them */
   snapshotDf.createOrReplaceTempView("snapshot")
   val snapshotDf2 = snapshotDf.limit(4).withColumn("cdc_pk",lit("0"))
   
   snapshotDf2.write.format("hudi")
 .options(getQuickstartWriteConfigs)
 .option(DataSourceWriteOptions.OPERATION.key(), 
WriteOperationType.BULK_INSERT.name())
 .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "cdc_pk")
 .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "_id.oid")
 .option(HoodieWriteConfig.TABLE_NAME,"GE11")
 .option(HoodieTableConfig.BASE_FILE_FORMAT.key(), 
HoodieFileFormat.PARQUET.name())
 .option(HoodieBootstrapConfig.BASE_PATH.key(), 
"s3://bucket/snapshots-hudi/ge11-drop/snapshot")
 .option("hoodie.datasource.write.table.type","COPY_ON_WRITE")
 .mode(SaveMode.Overwrite)
 .save("s3://bucket/snapshots-hudi/ge11-drop/snapshot");
   
   
   cdcDf.createOrReplaceTempView("cdc")
   val _cdcDf = sess.sql("select * from cdc where _id.oid is not null and 
_id.oid !='' limit 10 ")
   _cdcDf.createOrReplaceTempView("_cdc");
   _cdcDf.write.format("hudi")
 .option(HoodieWriteConfig.PRECOMBINE_FIELD_NAME.key(), "cdc_pk")
 .option(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(), "_id.oid")
 .option("hoodie.datasource.write.operation", "upsert")
 .option(TBL_NAME.key(), "GE11")
 .mode(SaveMode.Append)
 .save("s3://bucket/snapshots-hudi/ge11-drop/snapshot"); // << 
ERROR here
   /* done */
   ```
   
   ERROR
   ```
   
   Driver stacktrace:
 at 
org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2863)
 at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2799)
 at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2798)
 at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
 at 
scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
 at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
 at 
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2798)
 at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1239)
 at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1239)
 at scala.Option.foreach(Option.scala:407)
 at 
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1239)
 at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3051)
 at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2993)
 at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2982)
 at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
 at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:1009)
 at org.apache.spark.SparkContext.runJob(SparkContext.scala:2229)
 at org.apache.spark.SparkContext.runJob(SparkContext.scala:2250)
 at org.apache.spark.SparkContext.runJob(SparkContext.scala:2269)
 at org.apache.spark.SparkContext.runJob(SparkContext.scala:2294)
 at org.apache.spark.rdd.RDD.count(RDD.scala:1274)
 at 
org.apache.hudi.HoodieSparkSqlWriter$.commitAndPerformPostOperations(HoodieSparkSqlWriter.scala:721)
 at 
org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:350)
 at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:145)
 at 
org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
 at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
 at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
 at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
 at 
org.apache.s

[GitHub] [hudi] chandu-1101 commented on issue #9141: [SUPPORT] Example from Hudi Quick start doesnt work!

2023-07-10 Thread via GitHub


chandu-1101 commented on issue #9141:
URL: https://github.com/apache/hudi/issues/9141#issuecomment-1630119428

   @ ad1happy2go  thank you for the reply. I will check these and get back.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org