bobgalvao opened a new issue #1723: URL: https://github.com/apache/hudi/issues/1723
Hi, I'm having a trouble using Apache Hudi with S3. **Steps to reproduce the behavior:** 1. Produce messages to topic Kafka. (2000 records per window on average) 2. Start streaming (sample code below). 3. Intermittently errors start to occur 4. It is necessary to leave the streaming consuming the message of Kafka for the error to occur. There is no standard. **Environment Description:** AWS EMR: emr-5.29.0 Hudi version : 0.5.0-inc Spark version : 2.4.4 Hive version : 2.3.6 Hadoop version : 2.8.5 Storage : S3 Running on Docker? : No The errors occur intermittently, making subsequent writing impossible for the error (error - 1) “Unrecognized token 'Objavro' ..” and for the error (error - 2 / 3) “Could not find any data file written for commit…” / "Failed to read schema from data...". In this last case, it normalize it in the next execution, but the streaming or batch processing ends with an error. Due to the problem in the use of S3, I started using HDFS with the same code, where I had no problems with inconsistencies caused by S3. I have already enabled EMRFS, but the same errors occur. I also enabled “hoodie.consistency.check.enabled” as recommended when using S3 storage. It seems to me to be related to the consistency of the S3. I often get the errors below: **Error – 1 (when this error occurs it is no longer possible to use the Hudi dataset.):** 20/05/21 17:49:36 ERROR JobScheduler: Error running job streaming job 1590083340000 ms.0 org.apache.hudi.hive.HoodieHiveSyncException: Failed to get dataset schema for AWS_CASE at org.apache.hudi.hive.HoodieHiveClient.getDataSchema(HoodieHiveClient.java:414) at org.apache.hudi.hive.HiveSyncTool.syncHoodieTable(HiveSyncTool.java:93) at org.apache.hudi.hive.HiveSyncTool.syncHoodieTable(HiveSyncTool.java:71) at org.apache.hudi.HoodieSparkSqlWriter$.syncHive(HoodieSparkSqlWriter.scala:236) at org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:169) at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:91) at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68) at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:86) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127) at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:156) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127) at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:83) at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:83) at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676) at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676) at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:84) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:165) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:74) at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676) at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:285) at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271) at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:229) at br.com.agi.bigdata.awscase.Main$.processRDD(Main.scala:91) at br.com.agi.bigdata.awscase.Main$$anonfun$main$1.apply(Main.scala:117) at br.com.agi.bigdata.awscase.Main$$anonfun$main$1.apply(Main.scala:114) at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:628) at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:628) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51) at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50) at scala.util.Try$.apply(Try.scala:192) at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:257) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:256) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.io.IOException: unable to read commit metadata at org.apache.hudi.common.model.HoodieCommitMetadata.fromBytes(HoodieCommitMetadata.java:328) at org.apache.hudi.hive.HoodieHiveClient.readSchemaFromLastCompaction(HoodieHiveClient.java:428) at org.apache.hudi.hive.HoodieHiveClient.getDataSchema(HoodieHiveClient.java:407) ... 48 more Caused by: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'Objavro': was expecting ('true', 'false' or 'null') at [Source: Objavro.schema�{"type":"record","name":"HoodieCompactionPlan","namespace":"org.apache.hudi.avro.model","fields":[{"name":"operations","type":["null",{"type":"array","items":{"type":"record","name":"HoodieCompactionOperation","fields":[{"name":"baseInstantTime","type":["null",{"type":"string","avro.java.string":"String"}]},{"name":"deltaFilePaths","type":["null",{"type":"array","items":{"type":"string","avro.java.string":"String"}}],"default":null},{"name":"dataFilePath","type":["null",{"type":"string","avro.java.string":"String"}],"default":null},{"name":"fileId","type":["null",{"type":"string","avro.java.string":"String"}]},{"name":"partitionPath","type":["null",{"type":"string","avro.java.string":"String"}],"default":null},{"name":"metrics","type":["null",{"type":"map","values":"double","avro.java.string":"String"}],"default":null}]}}],"default":null},{"name":"extraMetadata","type":["null",{"type":"map","values":{"type":"string","avro.java.string":"String"},"avro.java.string":"String"}],"default":null}]}� _v :���[�gקGC�20200521174801�s3://bucket01/AWS_CASE/0/.cfe57b0e-1ac0-4650-960a-615cdba323f9-0_20200521174801.log.1_1-927-392109�s3://bucket01/AWS_CASE/0/cfe57b0e-1ac0-4650-960a-615cdba323f9-0_1-895-378588_20200521174801.parquetLcfe57b0e-1ac0-4650-960a-615cdba323f9-00 TOTAL_LOG_FILES�?TOTAL_IO_MB TOTAL_IO_READ_MB(TOTAL_LOG_FILES_SIZE�@"TOTAL_IO_WRITE_MB&TOTAL_LOG_FILE_SIZE�@� _v :���[�gקGC; line: 1, column: 11] at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1581) at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:533) at com.fasterxml.jackson.core.json.ReaderBasedJsonParser._reportInvalidToken(ReaderBasedJsonParser.java:2462) at com.fasterxml.jackson.core.json.ReaderBasedJsonParser._handleOddValue(ReaderBasedJsonParser.java:1621) at com.fasterxml.jackson.core.json.ReaderBasedJsonParser.nextToken(ReaderBasedJsonParser.java:689) at com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:3776) at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:3721) at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:2726) at org.apache.hudi.common.model.HoodieCommitMetadata.fromJsonString(HoodieCommitMetadata.java:129) at org.apache.hudi.common.model.HoodieCommitMetadata.fromBytes(HoodieCommitMetadata.java:326) ... 50 more **Error – 2:** java.lang.IllegalArgumentException: Could not find any data file written for commit [20200520134502__deltacommit__COMPLETED], could not get schema for dataset s3://bucket01/AWS_CASE , CommitMetadata :HoodieCommitMetadata{partitionToWriteStats={}, compacted=false, extraMetadataMap={}} at org.apache.hudi.hive.HoodieHiveClient.lambda$null$10(HoodieHiveClient.java:393) at java.util.Optional.orElseThrow(Optional.java:290) at org.apache.hudi.hive.HoodieHiveClient.lambda$getDataSchema$11(HoodieHiveClient.java:391) at java.util.Optional.orElseGet(Optional.java:267) at org.apache.hudi.hive.HoodieHiveClient.getDataSchema(HoodieHiveClient.java:387) at org.apache.hudi.hive.HiveSyncTool.syncHoodieTable(HiveSyncTool.java:93) at org.apache.hudi.hive.HiveSyncTool.syncHoodieTable(HiveSyncTool.java:71) at org.apache.hudi.HoodieSparkSqlWriter$.syncHive(HoodieSparkSqlWriter.scala:236) at org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:169) at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:91) at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68) at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:86) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127) at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:156) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127) at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:83) at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:83) at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676) at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676) at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:84) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:165) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:74) at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676) at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:285) at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271) at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:229) at br.com.agi.bigdata.fastdata.aux.emr.schema1.table1.Main$.processRDD(Main.scala:109) at br.com.agi.bigdata.fastdata.aux.emr.schema1.table1.Main$$anonfun$main$1.apply(Main.scala:145) at br.com.agi.bigdata.fastdata.aux.emr.schema1.table1.Main$$anonfun$main$1.apply(Main.scala:135) at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:628) at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:628) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51) at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50) at scala.util.Try$.apply(Try.scala:192) at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:257) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:256) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) **Error - 3:** 20/05/21 17:20:33 ERROR JobScheduler: Error running job streaming job 1590081600000 ms.0 java.lang.IllegalArgumentException: Failed to read schema from data file s3://bucket01/AWS_CASE/0/cfe57b0e-1ac0-4650-960a-615cdba323f9-0_0-1453-608455_20200521172025.parquet. File does not exist. at org.apache.hudi.hive.HoodieHiveClient.readSchemaFromDataFile(HoodieHiveClient.java:456) at org.apache.hudi.hive.HoodieHiveClient.readSchemaFromLastCompaction(HoodieHiveClient.java:432) at org.apache.hudi.hive.HoodieHiveClient.getDataSchema(HoodieHiveClient.java:407) at org.apache.hudi.hive.HiveSyncTool.syncHoodieTable(HiveSyncTool.java:93) at org.apache.hudi.hive.HiveSyncTool.syncHoodieTable(HiveSyncTool.java:71) at org.apache.hudi.HoodieSparkSqlWriter$.syncHive(HoodieSparkSqlWriter.scala:236) at org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:169) at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:91) at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68) at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:86) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127) at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:156) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127) at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:83) at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:83) at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676) at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676) at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:84) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:165) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:74) at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676) at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:285) at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271) at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:229) at br.com.agi.bigdata.awscase.Main$.processRDD(Main.scala:91) at br.com.agi.bigdata.awscase.Main$$anonfun$main$1.apply(Main.scala:117) at br.com.agi.bigdata.awscase.Main$$anonfun$main$1.apply(Main.scala:114) at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:628) at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:628) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51) at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50) at scala.util.Try$.apply(Try.scala:192) at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:257) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:256) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) 20/05/21 17:20:33 ERROR ApplicationMaster: User class threw exception: java.lang.IllegalArgumentException: Failed to read schema from data file s3://bucket01/AWS_CASE/0/cfe57b0e-1ac0-4650-960a-615cdba323f9-0_0-1453-608455_20200521172025.parquet. File does not exist. java.lang.IllegalArgumentException: Failed to read schema from data file s3://bucket01/AWS_CASE/0/cfe57b0e-1ac0-4650-960a-615cdba323f9-0_0-1453-608455_20200521172025.parquet. File does not exist. at org.apache.hudi.hive.HoodieHiveClient.readSchemaFromDataFile(HoodieHiveClient.java:456) at org.apache.hudi.hive.HoodieHiveClient.readSchemaFromLastCompaction(HoodieHiveClient.java:432) at org.apache.hudi.hive.HoodieHiveClient.getDataSchema(HoodieHiveClient.java:407) at org.apache.hudi.hive.HiveSyncTool.syncHoodieTable(HiveSyncTool.java:93) at org.apache.hudi.hive.HiveSyncTool.syncHoodieTable(HiveSyncTool.java:71) at org.apache.hudi.HoodieSparkSqlWriter$.syncHive(HoodieSparkSqlWriter.scala:236) at org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:169) at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:91) at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68) at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:86) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127) at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:156) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127) at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:83) at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:83) at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676) at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676) at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:84) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:165) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:74) at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676) at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:285) at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271) at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:229) at br.com.agi.bigdata.awscase.Main$.processRDD(Main.scala:91) at br.com.agi.bigdata.awscase.Main$$anonfun$main$1.apply(Main.scala:117) at br.com.agi.bigdata.awscase.Main$$anonfun$main$1.apply(Main.scala:114) at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:628) at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:628) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51) at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50) at scala.util.Try$.apply(Try.scala:192) at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:257) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:256) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) **Hudi-cli information:** hudi->connect --path s3://bucket01/AWS_CASE 6671 [Spring Shell] INFO org.apache.hudi.common.table.HoodieTableMetaClient - Loading HoodieTableMetaClient from s3://bucket01/AWS_CASE 7242 [Spring Shell] WARN org.apache.hadoop.util.NativeCodeLoader - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 11229 [Spring Shell] INFO org.apache.hudi.common.util.FSUtils - Hadoop Configuration: fs.defaultFS: [hdfs://ip-10-xx-x-xx.agi.aws.local:8020], Config:[Configuration: core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml, yarn-default.xml, yarn-site.xml, hdfs-default.xml, hdfs-site.xml, emrfs-site.xml], FileSystem: [com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem@6a9763d6] 12778 [Spring Shell] INFO org.apache.hudi.common.table.HoodieTableConfig - Loading dataset properties from s3://bucket01/AWS_CASE/.hoodie/hoodie.properties 12795 [Spring Shell] INFO com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem - Opening 's3://bucket01/AWS_CASE/.hoodie/hoodie.properties' for reading 12835 [Spring Shell] INFO org.apache.hudi.common.table.HoodieTableMetaClient - Finished Loading Table of type MERGE_ON_READ from s3://bucket01/AWS_CASE Metadata for table AWS_CASE loaded hudi:AWS_CASE->desc 35395 [Spring Shell] INFO org.apache.hudi.common.table.timeline.HoodieActiveTimeline - Loaded instants java.util.stream.ReferencePipeline$Head@1e3450cf ╔═════════════════════════════════╤════════════════════════════════════════════════╗ ║ Property │ Value ║ ╠═════════════════════════════════╪════════════════════════════════════════════════╣ ║ basePath │ s3://bucket01/AWS_CASE ║ ╟─────────────────────────────────┼────────────────────────────────────────────────╢ ║ metaPath │ s3://bucket01/AWS_CASE/.hoodie ║ ╟─────────────────────────────────┼────────────────────────────────────────────────╢ ║ fileSystem │ s3 ║ ╟─────────────────────────────────┼────────────────────────────────────────────────╢ ║ hoodie.table.name │ AWS_CASE ║ ╟─────────────────────────────────┼────────────────────────────────────────────────╢ ║ hoodie.compaction.payload.class │ org.apache.hudi.common.model.HoodieAvroPayload ║ ╟─────────────────────────────────┼────────────────────────────────────────────────╢ ║ hoodie.table.type │ MERGE_ON_READ ║ ╟─────────────────────────────────┼────────────────────────────────────────────────╢ ║ hoodie.archivelog.folder │ archived ║ ╚═════════════════════════════════╧════════════════════════════════════════════════╝ **hudi:AWS_CASE->commits show** 69651 [Spring Shell] INFO com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem - Opening 's3://bucket01/AWS_CASE/.hoodie/20200521174926.commit' for reading Command failed java.lang.reflect.UndeclaredThrowableException java.lang.reflect.UndeclaredThrowableException at org.springframework.util.ReflectionUtils.rethrowRuntimeException(ReflectionUtils.java:315) at org.springframework.util.ReflectionUtils.handleInvocationTargetException(ReflectionUtils.java:295) at org.springframework.util.ReflectionUtils.handleReflectionException(ReflectionUtils.java:279) at org.springframework.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:219) at org.springframework.shell.core.SimpleExecutionStrategy.invoke(SimpleExecutionStrategy.java:68) at org.springframework.shell.core.SimpleExecutionStrategy.execute(SimpleExecutionStrategy.java:59) at org.springframework.shell.core.AbstractShell.executeCommand(AbstractShell.java:134) at org.springframework.shell.core.JLineShell.promptLoop(JLineShell.java:533) at org.springframework.shell.core.JLineShell.run(JLineShell.java:179) at java.lang.Thread.run(Thread.java:748) Caused by: java.io.IOException: unable to read commit metadata at org.apache.hudi.common.model.HoodieCommitMetadata.fromBytes(HoodieCommitMetadata.java:328) at org.apache.hudi.cli.commands.CommitsCommand.showCommits(CommitsCommand.java:89) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.springframework.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:216) ... 6 more Caused by: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'Objavro': was expecting ('true', 'false' or 'null') at [Source: Objavro.schema�{"type":"record","name":"HoodieCompactionPlan","namespace":"org.apache.hudi.avro.model","fields":[{"name":"operations","type":["null",{"type":"array","items":{"type":"record","name":"HoodieCompactionOperation","fields":[{"name":"baseInstantTime","type":["null",{"type":"string","avro.java.string":"String"}]},{"name":"deltaFilePaths","type":["null",{"type":"array","items":{"type":"string","avro.java.string":"String"}}],"default":null},{"name":"dataFilePath","type":["null",{"type":"string","avro.java.string":"String"}],"default":null},{"name":"fileId","type":["null",{"type":"string","avro.java.string":"String"}]},{"name":"partitionPath","type":["null",{"type":"string","avro.java.string":"String"}],"default":null},{"name":"metrics","type":["null",{"type":"map","values":"double","avro.java.string":"String"}],"default":null}]}}],"default":null},{"name":"extraMetadata","type":["null",{"type":"map","values":{"type":"string","avro.java.string":"String"},"avro.java.string":"String"}],"default":null}]}� _v :���[�gקGC�20200521174801�s3://bucket01/AWS_CASE/0/.cfe57b0e-1ac0-4650-960a-615cdba323f9-0_20200521174801.log.1_1-927-392109�s3://bucket01/AWS_CASE/0/cfe57b0e-1ac0-4650-960a-615cdba323f9-0_1-895-378588_20200521174801.parquetLcfe57b0e-1ac0-4650-960a-615cdba323f9-00 TOTAL_LOG_FILES�?TOTAL_IO_MB TOTAL_IO_READ_MB(TOTAL_LOG_FILES_SIZE�@"TOTAL_IO_WRITE_MB&TOTAL_LOG_FILE_SIZE�@� _v :���[�gקGC; line: 1, column: 11] at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1581) at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:533) at com.fasterxml.jackson.core.json.ReaderBasedJsonParser._reportInvalidToken(ReaderBasedJsonParser.java:2462) at com.fasterxml.jackson.core.json.ReaderBasedJsonParser._handleOddValue(ReaderBasedJsonParser.java:1621) at com.fasterxml.jackson.core.json.ReaderBasedJsonParser.nextToken(ReaderBasedJsonParser.java:689) at com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:3776) at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:3721) at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:2726) at org.apache.hudi.common.model.HoodieCommitMetadata.fromJsonString(HoodieCommitMetadata.java:129) at org.apache.hudi.common.model.HoodieCommitMetadata.fromBytes(HoodieCommitMetadata.java:326) ... 12 more **Sample code used:** package br.com.agi.bigdata.awscase import org.apache.hudi.DataSourceWriteOptions import org.apache.hudi.config.HoodieWriteConfig import org.apache.hudi.hive.MultiPartKeysValueExtractor import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.common.serialization.StringDeserializer import org.apache.spark.rdd.RDD import org.apache.spark.sql.functions._ import org.apache.spark.sql.types.{ArrayType, StringType, StructField, StructType} import org.apache.spark.sql.{SaveMode, SparkSession} import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent import org.apache.spark.streaming.kafka010.{CanCommitOffsets, HasOffsetRanges, KafkaUtils} import org.apache.spark.streaming.{Minutes, StreamingContext} import org.apache.spark.{SparkConf, SparkContext} object Main { val sparkConf: SparkConf = new SparkConf() .set("spark.sql.catalogImplementation", "hive") .set("spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version", "2") .set("spark.streaming.driver.writeAheadLog.closeFileAfterWrite", "true") .set("spark.streaming.receiver.writeAheadLog.closeFileAfterWrite", "true") .set("spark.yarn.maxAppAttempts", "3") .set("spark.locality.wait", "1") .set("spark.streaming.backpressure.enabled", "true") .set("spark.streaming.backpressure.initialRate", "1") .set("spark.streaming.receiver.maxRate", "1") .set("spark.streaming.receiver.initialRate", "1") .set("spark.sql.hive.convertMetastoreParquet", "false") .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") val sc = new SparkContext(sparkConf) val spark = SparkSession.builder().config(sparkConf).getOrCreate() spark.sparkContext.setLogLevel("ERROR") def processRDD(rddRaw: RDD[ConsumerRecord[String, String]]): Unit = { import spark.implicits._ val hiveSchema = spark.sql(s"select * from datalake.aux.emr.schema1.table1 limit 1").schema val fieldsRemove = Array("current_ts", "table", "op_type", "op_ts", "pos", "primary_keys", "tokens", "date_partition") val jsonSchema = Seq[StructField]( StructField("table", StringType), StructField("op_type", StringType), StructField("op_ts", StringType), StructField("current_ts", StringType), StructField("pos", StringType), StructField("primary_keys", ArrayType(StringType)), StructField("normalizedkey", StringType), StructField("after", StructType(hiveSchema.fields.filter(it => !fieldsRemove.contains(it.name)).map(it => StructField(it.name.toUpperCase, it.dataType, it.nullable, it.metadata)))) ) var df = spark.read .schema(StructType(jsonSchema)) .json(rddRaw.map(_.value())) .withColumn("normalizedkey", concat(lit(col("current_ts")), lit(col("pos"))).cast(StringType)) .select("table", "op_type", "op_ts", "current_ts", "pos", "primary_keys", "normalizedkey", "after.*") val hiveTableName = "AWS_CASE" val pks = df.select($"primary_keys").first().getSeq[String](0).map(col) df.withColumn("key", concat(pks: _*)) .withColumn("partition_id", (pks(0)/10000000).cast("Integer")) .write.format("org.apache.hudi") .option(HoodieWriteConfig.TABLE_NAME, hiveTableName) .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "key") .option(DataSourceWriteOptions.HIVE_DATABASE_OPT_KEY, "datalake") .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL) .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "normalizedkey") .option(DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY, "true") .option(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY, hiveTableName) .option(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY, classOf[MultiPartKeysValueExtractor].getName) .option(DataSourceWriteOptions.STORAGE_TYPE_OPT_KEY, DataSourceWriteOptions.MOR_STORAGE_TYPE_OPT_VAL) .option(DataSourceWriteOptions.DEFAULT_HIVE_ASSUME_DATE_PARTITION_OPT_VAL, "false") .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "partition_id") .option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY, "partition_id") .option(DataSourceWriteOptions.HIVE_URL_OPT_KEY, "jdbc:hive2://10.xx.x.xxx:10000") .option("hoodie.consistency.check.enabled", "true") .option("hoodie.cleaner.policy", "KEEP_LATEST_FILE_VERSIONS") .option("hoodie.keep.max.commits", 2) .option("hoodie.keep.min.commits", 1) .option("hoodie.parquet.compression.codec", "snappy") .option("hoodie.cleaner.commits.retained", 0) .option("hoodie.parquet.max.file.size", 1073741824) .option("hoodie.parquet.small.file.limit", 943718400) .mode(SaveMode.Append) .save("s3://bucket01/AWS_CASE") } def main(args: Array[String]): Unit = { val ssc = new StreamingContext(sc, Minutes(5)) val kafkaParams = Map[String, Object]( "bootstrap.servers" -> "broker01:9092,broker02:9092,broker03:9092", "key.deserializer" -> classOf[StringDeserializer], "value.deserializer" -> classOf[StringDeserializer], "group.id" -> "aws_case", "enable.auto.commit" -> (false: java.lang.Boolean), "auto.offset.reset" -> "earliest" ) val stream = KafkaUtils.createDirectStream[String, String]( ssc, PreferConsistent, Subscribe[String, String](Array("TOPIC-01"), kafkaParams) ) stream.foreachRDD(rddRaw => { val offsetRanges = rddRaw.asInstanceOf[HasOffsetRanges].offsetRanges if (!rddRaw.isEmpty()) { processRDD(rddRaw) } stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges) }) ssc.start() ssc.awaitTermination() } } Build.sbt: name := "aws-case-hudi" version := "0.1" scalaVersion := "2.11.12" libraryDependencies ++= Seq( "org.apache.spark" %% "spark-streaming" % "2.4.4" % "provided", "org.apache.spark" %% "spark-streaming-kafka-0-10" % "2.4.4", "org.apache.spark" %% "spark-sql" % "2.4.4", "org.apache.spark" %% "spark-core" % "2.4.4", "org.apache.hudi" % "hudi-spark-bundle" % "0.5.0-incubating", "org.apache.httpcomponents" % "httpclient" % "4.5.12", ) Best regards, bobgalvao ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org