bobgalvao opened a new issue #1723:
URL: https://github.com/apache/hudi/issues/1723


   Hi,
   
   I'm having a trouble using Apache Hudi with S3.
   
   **Steps to reproduce the behavior:**
   
   1. Produce messages to topic Kafka. (2000 records per window on average)
   2. Start streaming (sample code below).
   3. Intermittently errors start to occur
   4. It is necessary to leave the streaming consuming the message of Kafka for 
the error to occur. There is no standard.
   
   **Environment Description:**
   
   AWS EMR: emr-5.29.0
   Hudi version : 0.5.0-inc
   Spark version : 2.4.4
   Hive version : 2.3.6
   Hadoop version : 2.8.5
   Storage  : S3
   Running on Docker? : No
   
   The errors occur intermittently, making subsequent writing impossible for 
the error (error - 1) “Unrecognized token 'Objavro' ..” and for the error 
(error - 2 / 3) “Could not find any data file written for commit…” / "Failed to 
read schema from data...". In this last case, it normalize it in the next 
execution, but the streaming or batch processing ends with an error.
   
   Due to the problem in the use of S3, I started using HDFS with the same 
code, where I had no problems with inconsistencies caused by S3.
   
   I have already enabled EMRFS, but the same errors occur. I also enabled 
“hoodie.consistency.check.enabled” as recommended when using S3 storage. It 
seems to me to be related to the consistency of the S3.
   
   I often get the errors below:
   
   **Error – 1 (when this error occurs it is no longer possible to use the Hudi 
dataset.):**
   
   20/05/21 17:49:36 ERROR JobScheduler: Error running job streaming job 
1590083340000 ms.0
   org.apache.hudi.hive.HoodieHiveSyncException: Failed to get dataset schema 
for AWS_CASE
     at 
org.apache.hudi.hive.HoodieHiveClient.getDataSchema(HoodieHiveClient.java:414)
     at org.apache.hudi.hive.HiveSyncTool.syncHoodieTable(HiveSyncTool.java:93)
     at org.apache.hudi.hive.HiveSyncTool.syncHoodieTable(HiveSyncTool.java:71)
     at 
org.apache.hudi.HoodieSparkSqlWriter$.syncHive(HoodieSparkSqlWriter.scala:236)
     at 
org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:169)
     at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:91)
     at 
org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
     at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
     at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
     at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:86)
     at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
     at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
     at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:156)
     at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
     at 
org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
     at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
     at 
org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:83)
     at 
org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:83)
     at 
org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
     at 
org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
     at 
org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:84)
     at 
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:165)
     at 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:74)
     at 
org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676)
     at 
org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:285)
     at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271)
     at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:229)
     at br.com.agi.bigdata.awscase.Main$.processRDD(Main.scala:91)
     at br.com.agi.bigdata.awscase.Main$$anonfun$main$1.apply(Main.scala:117)
     at br.com.agi.bigdata.awscase.Main$$anonfun$main$1.apply(Main.scala:114)
     at 
org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:628)
     at 
org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:628)
     at 
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51)
     at 
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
     at 
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
     at 
org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
     at 
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50)
     at 
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
     at 
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
     at scala.util.Try$.apply(Try.scala:192)
     at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
     at 
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:257)
     at 
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
     at 
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
     at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
     at 
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:256)
     at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
     at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
     at java.lang.Thread.run(Thread.java:748)
   Caused by: java.io.IOException: unable to read commit metadata
     at 
org.apache.hudi.common.model.HoodieCommitMetadata.fromBytes(HoodieCommitMetadata.java:328)
     at 
org.apache.hudi.hive.HoodieHiveClient.readSchemaFromLastCompaction(HoodieHiveClient.java:428)
     at 
org.apache.hudi.hive.HoodieHiveClient.getDataSchema(HoodieHiveClient.java:407)
     ... 48 more
   Caused by: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 
'Objavro': was expecting ('true', 'false' or 'null')
    at [Source: 
Objavro.schema�{"type":"record","name":"HoodieCompactionPlan","namespace":"org.apache.hudi.avro.model","fields":[{"name":"operations","type":["null",{"type":"array","items":{"type":"record","name":"HoodieCompactionOperation","fields":[{"name":"baseInstantTime","type":["null",{"type":"string","avro.java.string":"String"}]},{"name":"deltaFilePaths","type":["null",{"type":"array","items":{"type":"string","avro.java.string":"String"}}],"default":null},{"name":"dataFilePath","type":["null",{"type":"string","avro.java.string":"String"}],"default":null},{"name":"fileId","type":["null",{"type":"string","avro.java.string":"String"}]},{"name":"partitionPath","type":["null",{"type":"string","avro.java.string":"String"}],"default":null},{"name":"metrics","type":["null",{"type":"map","values":"double","avro.java.string":"String"}],"default":null}]}}],"default":null},{"name":"extraMetadata","type":["null",{"type":"map","values":{"type":"string","avro.java.string":"String"},"avro.java.string":"String"}],"default":null}]}�
 _v 
:���[�gקGC�20200521174801�s3://bucket01/AWS_CASE/0/.cfe57b0e-1ac0-4650-960a-615cdba323f9-0_20200521174801.log.1_1-927-392109�s3://bucket01/AWS_CASE/0/cfe57b0e-1ac0-4650-960a-615cdba323f9-0_1-895-378588_20200521174801.parquetLcfe57b0e-1ac0-4650-960a-615cdba323f9-00
TOTAL_LOG_FILES�?TOTAL_IO_MB 
TOTAL_IO_READ_MB(TOTAL_LOG_FILES_SIZE�@"TOTAL_IO_WRITE_MB&TOTAL_LOG_FILE_SIZE�@�
  _v :���[�gקGC; line: 1, column: 11]
     at 
com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1581)
     at 
com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:533)
     at 
com.fasterxml.jackson.core.json.ReaderBasedJsonParser._reportInvalidToken(ReaderBasedJsonParser.java:2462)
     at 
com.fasterxml.jackson.core.json.ReaderBasedJsonParser._handleOddValue(ReaderBasedJsonParser.java:1621)
     at 
com.fasterxml.jackson.core.json.ReaderBasedJsonParser.nextToken(ReaderBasedJsonParser.java:689)
     at 
com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:3776)
     at 
com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:3721)
     at 
com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:2726)
     at 
org.apache.hudi.common.model.HoodieCommitMetadata.fromJsonString(HoodieCommitMetadata.java:129)
     at 
org.apache.hudi.common.model.HoodieCommitMetadata.fromBytes(HoodieCommitMetadata.java:326)
     ... 50 more
   
   
   
   **Error – 2:**
   
   java.lang.IllegalArgumentException: Could not find any data file written for 
commit [20200520134502__deltacommit__COMPLETED], could not get schema for 
dataset s3://bucket01/AWS_CASE
   , CommitMetadata :HoodieCommitMetadata{partitionToWriteStats={}, 
compacted=false, extraMetadataMap={}}
        at 
org.apache.hudi.hive.HoodieHiveClient.lambda$null$10(HoodieHiveClient.java:393)
        at java.util.Optional.orElseThrow(Optional.java:290)
        at 
org.apache.hudi.hive.HoodieHiveClient.lambda$getDataSchema$11(HoodieHiveClient.java:391)
        at java.util.Optional.orElseGet(Optional.java:267)
        at 
org.apache.hudi.hive.HoodieHiveClient.getDataSchema(HoodieHiveClient.java:387)
        at 
org.apache.hudi.hive.HiveSyncTool.syncHoodieTable(HiveSyncTool.java:93)
        at 
org.apache.hudi.hive.HiveSyncTool.syncHoodieTable(HiveSyncTool.java:71)
        at 
org.apache.hudi.HoodieSparkSqlWriter$.syncHive(HoodieSparkSqlWriter.scala:236)
        at 
org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:169)
        at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:91)
        at 
org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
        at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
        at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
        at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:86)
        at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
        at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
        at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:156)
        at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at 
org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
        at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
        at 
org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:83)
        at 
org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:83)
        at 
org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
        at 
org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
        at 
org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:84)
        at 
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:165)
        at 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:74)
        at 
org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676)
        at 
org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:285)
        at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271)
        at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:229)
        at 
br.com.agi.bigdata.fastdata.aux.emr.schema1.table1.Main$.processRDD(Main.scala:109)
        at 
br.com.agi.bigdata.fastdata.aux.emr.schema1.table1.Main$$anonfun$main$1.apply(Main.scala:145)
        at 
br.com.agi.bigdata.fastdata.aux.emr.schema1.table1.Main$$anonfun$main$1.apply(Main.scala:135)
        at 
org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:628)
        at 
org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:628)
        at 
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51)
        at 
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
        at 
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
        at 
org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
        at 
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50)
        at 
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
        at 
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
        at scala.util.Try$.apply(Try.scala:192)
        at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
        at 
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:257)
        at 
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
        at 
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
        at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
        at 
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:256)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
   
   
   **Error - 3:**
   
   20/05/21 17:20:33 ERROR JobScheduler: Error running job streaming job 
1590081600000 ms.0
   java.lang.IllegalArgumentException: Failed to read schema from data file 
s3://bucket01/AWS_CASE/0/cfe57b0e-1ac0-4650-960a-615cdba323f9-0_0-1453-608455_20200521172025.parquet.
 File does not exist.
     at 
org.apache.hudi.hive.HoodieHiveClient.readSchemaFromDataFile(HoodieHiveClient.java:456)
     at 
org.apache.hudi.hive.HoodieHiveClient.readSchemaFromLastCompaction(HoodieHiveClient.java:432)
     at 
org.apache.hudi.hive.HoodieHiveClient.getDataSchema(HoodieHiveClient.java:407)
     at org.apache.hudi.hive.HiveSyncTool.syncHoodieTable(HiveSyncTool.java:93)
     at org.apache.hudi.hive.HiveSyncTool.syncHoodieTable(HiveSyncTool.java:71)
     at 
org.apache.hudi.HoodieSparkSqlWriter$.syncHive(HoodieSparkSqlWriter.scala:236)
     at 
org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:169)
     at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:91)
     at 
org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
     at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
     at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
     at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:86)
     at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
     at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
     at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:156)
     at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
     at 
org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
     at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
     at 
org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:83)
     at 
org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:83)
     at 
org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
     at 
org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
     at 
org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:84)
     at 
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:165)
     at 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:74)
     at 
org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676)
     at 
org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:285)
     at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271)
     at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:229)
     at br.com.agi.bigdata.awscase.Main$.processRDD(Main.scala:91)
     at br.com.agi.bigdata.awscase.Main$$anonfun$main$1.apply(Main.scala:117)
     at br.com.agi.bigdata.awscase.Main$$anonfun$main$1.apply(Main.scala:114)
     at 
org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:628)
     at 
org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:628)
     at 
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51)
     at 
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
     at 
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
     at 
org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
     at 
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50)
     at 
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
     at 
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
     at scala.util.Try$.apply(Try.scala:192)
     at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
     at 
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:257)
     at 
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
     at 
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
     at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
     at 
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:256)
     at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
     at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
     at java.lang.Thread.run(Thread.java:748)
   20/05/21 17:20:33 ERROR ApplicationMaster: User class threw exception: 
java.lang.IllegalArgumentException: Failed to read schema from data file 
s3://bucket01/AWS_CASE/0/cfe57b0e-1ac0-4650-960a-615cdba323f9-0_0-1453-608455_20200521172025.parquet.
 File does not exist.
   java.lang.IllegalArgumentException: Failed to read schema from data file 
s3://bucket01/AWS_CASE/0/cfe57b0e-1ac0-4650-960a-615cdba323f9-0_0-1453-608455_20200521172025.parquet.
 File does not exist.
     at 
org.apache.hudi.hive.HoodieHiveClient.readSchemaFromDataFile(HoodieHiveClient.java:456)
     at 
org.apache.hudi.hive.HoodieHiveClient.readSchemaFromLastCompaction(HoodieHiveClient.java:432)
     at 
org.apache.hudi.hive.HoodieHiveClient.getDataSchema(HoodieHiveClient.java:407)
     at org.apache.hudi.hive.HiveSyncTool.syncHoodieTable(HiveSyncTool.java:93)
     at org.apache.hudi.hive.HiveSyncTool.syncHoodieTable(HiveSyncTool.java:71)
     at 
org.apache.hudi.HoodieSparkSqlWriter$.syncHive(HoodieSparkSqlWriter.scala:236)
     at 
org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:169)
     at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:91)
     at 
org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
     at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
     at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
     at 
org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:86)
     at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
     at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
     at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:156)
     at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
     at 
org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
     at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
     at 
org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:83)
     at 
org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:83)
     at 
org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
     at 
org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
     at 
org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:84)
     at 
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:165)
     at 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:74)
     at 
org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676)
     at 
org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:285)
     at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271)
     at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:229)
     at br.com.agi.bigdata.awscase.Main$.processRDD(Main.scala:91)
     at br.com.agi.bigdata.awscase.Main$$anonfun$main$1.apply(Main.scala:117)
     at br.com.agi.bigdata.awscase.Main$$anonfun$main$1.apply(Main.scala:114)
     at 
org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:628)
     at 
org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:628)
     at 
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51)
     at 
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
     at 
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
     at 
org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
     at 
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50)
     at 
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
     at 
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
     at scala.util.Try$.apply(Try.scala:192)
     at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
     at 
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:257)
     at 
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
     at 
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
     at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
     at 
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:256)
     at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
     at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
     at java.lang.Thread.run(Thread.java:748)
   
   
   
   
   **Hudi-cli information:**
   
   hudi->connect --path s3://bucket01/AWS_CASE
   6671 [Spring Shell] INFO  org.apache.hudi.common.table.HoodieTableMetaClient 
 - Loading HoodieTableMetaClient from s3://bucket01/AWS_CASE
   7242 [Spring Shell] WARN  org.apache.hadoop.util.NativeCodeLoader  - Unable 
to load native-hadoop library for your platform... using builtin-java classes 
where applicable
   11229 [Spring Shell] INFO  org.apache.hudi.common.util.FSUtils  - Hadoop 
Configuration: fs.defaultFS: [hdfs://ip-10-xx-x-xx.agi.aws.local:8020], 
Config:[Configuration: core-default.xml, core-site.xml, mapred-default.xml, 
mapred-site.xml, yarn-default.xml, yarn-site.xml, hdfs-default.xml, 
hdfs-site.xml, emrfs-site.xml], FileSystem: 
[com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem@6a9763d6]
   12778 [Spring Shell] INFO  org.apache.hudi.common.table.HoodieTableConfig  - 
Loading dataset properties from s3://bucket01/AWS_CASE/.hoodie/hoodie.properties
   12795 [Spring Shell] INFO  
com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem  - Opening 
's3://bucket01/AWS_CASE/.hoodie/hoodie.properties' for reading
   12835 [Spring Shell] INFO  
org.apache.hudi.common.table.HoodieTableMetaClient  - Finished Loading Table of 
type MERGE_ON_READ from s3://bucket01/AWS_CASE
   Metadata for table AWS_CASE loaded
   
   hudi:AWS_CASE->desc
   35395 [Spring Shell] INFO  
org.apache.hudi.common.table.timeline.HoodieActiveTimeline  - Loaded instants 
java.util.stream.ReferencePipeline$Head@1e3450cf
   
╔═════════════════════════════════╤════════════════════════════════════════════════╗
   ║ Property                        │ Value                                    
      ║
   
╠═════════════════════════════════╪════════════════════════════════════════════════╣
   ║ basePath                        │ s3://bucket01/AWS_CASE                   
      ║
   
╟─────────────────────────────────┼────────────────────────────────────────────────╢
   ║ metaPath                        │ s3://bucket01/AWS_CASE/.hoodie           
      ║
   
╟─────────────────────────────────┼────────────────────────────────────────────────╢
   ║ fileSystem                      │ s3                                       
      ║
   
╟─────────────────────────────────┼────────────────────────────────────────────────╢
   ║ hoodie.table.name               │ AWS_CASE                                 
      ║
   
╟─────────────────────────────────┼────────────────────────────────────────────────╢
   ║ hoodie.compaction.payload.class │ 
org.apache.hudi.common.model.HoodieAvroPayload ║
   
╟─────────────────────────────────┼────────────────────────────────────────────────╢
   ║ hoodie.table.type               │ MERGE_ON_READ                            
      ║
   
╟─────────────────────────────────┼────────────────────────────────────────────────╢
   ║ hoodie.archivelog.folder        │ archived                                 
      ║
   
╚═════════════════════════════════╧════════════════════════════════════════════════╝
   
   **hudi:AWS_CASE->commits show**
   69651 [Spring Shell] INFO  
com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem  - Opening 
's3://bucket01/AWS_CASE/.hoodie/20200521174926.commit' for reading
   Command failed java.lang.reflect.UndeclaredThrowableException
   java.lang.reflect.UndeclaredThrowableException
     at 
org.springframework.util.ReflectionUtils.rethrowRuntimeException(ReflectionUtils.java:315)
     at 
org.springframework.util.ReflectionUtils.handleInvocationTargetException(ReflectionUtils.java:295)
     at 
org.springframework.util.ReflectionUtils.handleReflectionException(ReflectionUtils.java:279)
     at 
org.springframework.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:219)
     at 
org.springframework.shell.core.SimpleExecutionStrategy.invoke(SimpleExecutionStrategy.java:68)
     at 
org.springframework.shell.core.SimpleExecutionStrategy.execute(SimpleExecutionStrategy.java:59)
     at 
org.springframework.shell.core.AbstractShell.executeCommand(AbstractShell.java:134)
     at 
org.springframework.shell.core.JLineShell.promptLoop(JLineShell.java:533)
     at org.springframework.shell.core.JLineShell.run(JLineShell.java:179)
     at java.lang.Thread.run(Thread.java:748)
   Caused by: java.io.IOException: unable to read commit metadata
     at 
org.apache.hudi.common.model.HoodieCommitMetadata.fromBytes(HoodieCommitMetadata.java:328)
     at 
org.apache.hudi.cli.commands.CommitsCommand.showCommits(CommitsCommand.java:89)
     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
     at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
     at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
     at java.lang.reflect.Method.invoke(Method.java:498)
     at 
org.springframework.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:216)
     ... 6 more
   Caused by: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 
'Objavro': was expecting ('true', 'false' or 'null')
    at [Source: 
Objavro.schema�{"type":"record","name":"HoodieCompactionPlan","namespace":"org.apache.hudi.avro.model","fields":[{"name":"operations","type":["null",{"type":"array","items":{"type":"record","name":"HoodieCompactionOperation","fields":[{"name":"baseInstantTime","type":["null",{"type":"string","avro.java.string":"String"}]},{"name":"deltaFilePaths","type":["null",{"type":"array","items":{"type":"string","avro.java.string":"String"}}],"default":null},{"name":"dataFilePath","type":["null",{"type":"string","avro.java.string":"String"}],"default":null},{"name":"fileId","type":["null",{"type":"string","avro.java.string":"String"}]},{"name":"partitionPath","type":["null",{"type":"string","avro.java.string":"String"}],"default":null},{"name":"metrics","type":["null",{"type":"map","values":"double","avro.java.string":"String"}],"default":null}]}}],"default":null},{"name":"extraMetadata","type":["null",{"type":"map","values":{"type":"string","avro.java.string":"String"},"avro.java.string":"String"}],"default":null}]}�
 _v 
:���[�gקGC�20200521174801�s3://bucket01/AWS_CASE/0/.cfe57b0e-1ac0-4650-960a-615cdba323f9-0_20200521174801.log.1_1-927-392109�s3://bucket01/AWS_CASE/0/cfe57b0e-1ac0-4650-960a-615cdba323f9-0_1-895-378588_20200521174801.parquetLcfe57b0e-1ac0-4650-960a-615cdba323f9-00
                                                                  
TOTAL_LOG_FILES�?TOTAL_IO_MB 
TOTAL_IO_READ_MB(TOTAL_LOG_FILES_SIZE�@"TOTAL_IO_WRITE_MB&TOTAL_LOG_FILE_SIZE�@�
  _v :���[�gקGC; line: 1, column: 11]
     at 
com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1581)
     at 
com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:533)
     at 
com.fasterxml.jackson.core.json.ReaderBasedJsonParser._reportInvalidToken(ReaderBasedJsonParser.java:2462)
     at 
com.fasterxml.jackson.core.json.ReaderBasedJsonParser._handleOddValue(ReaderBasedJsonParser.java:1621)
     at 
com.fasterxml.jackson.core.json.ReaderBasedJsonParser.nextToken(ReaderBasedJsonParser.java:689)
     at 
com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:3776)
     at 
com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:3721)
     at 
com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:2726)
     at 
org.apache.hudi.common.model.HoodieCommitMetadata.fromJsonString(HoodieCommitMetadata.java:129)
     at 
org.apache.hudi.common.model.HoodieCommitMetadata.fromBytes(HoodieCommitMetadata.java:326)
     ... 12 more
   
   
   **Sample code used:**
   
   package br.com.agi.bigdata.awscase
   
   import org.apache.hudi.DataSourceWriteOptions
   import org.apache.hudi.config.HoodieWriteConfig
   import org.apache.hudi.hive.MultiPartKeysValueExtractor
   import org.apache.kafka.clients.consumer.ConsumerRecord
   import org.apache.kafka.common.serialization.StringDeserializer
   import org.apache.spark.rdd.RDD
   import org.apache.spark.sql.functions._
   import org.apache.spark.sql.types.{ArrayType, StringType, StructField, 
StructType}
   import org.apache.spark.sql.{SaveMode, SparkSession}
   import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
   import 
org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
   import org.apache.spark.streaming.kafka010.{CanCommitOffsets, 
HasOffsetRanges, KafkaUtils}
   import org.apache.spark.streaming.{Minutes, StreamingContext}
   import org.apache.spark.{SparkConf, SparkContext}
   
   object Main {
   
     val sparkConf: SparkConf = new SparkConf()
       .set("spark.sql.catalogImplementation", "hive")
       .set("spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version", "2")
       .set("spark.streaming.driver.writeAheadLog.closeFileAfterWrite", "true")
       .set("spark.streaming.receiver.writeAheadLog.closeFileAfterWrite", 
"true")
       .set("spark.yarn.maxAppAttempts", "3")
       .set("spark.locality.wait", "1")
       .set("spark.streaming.backpressure.enabled", "true")
       .set("spark.streaming.backpressure.initialRate", "1")
       .set("spark.streaming.receiver.maxRate", "1")
       .set("spark.streaming.receiver.initialRate", "1")
       .set("spark.sql.hive.convertMetastoreParquet", "false")
       .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
   
     val sc = new SparkContext(sparkConf)
     val spark = SparkSession.builder().config(sparkConf).getOrCreate()
     spark.sparkContext.setLogLevel("ERROR")
   
     def processRDD(rddRaw: RDD[ConsumerRecord[String, String]]): Unit = {
       import spark.implicits._
   
       val hiveSchema = spark.sql(s"select * from 
datalake.aux.emr.schema1.table1 limit 1").schema
   
       val fieldsRemove = Array("current_ts", "table", "op_type", "op_ts", 
"pos", "primary_keys", "tokens", "date_partition")
   
       val jsonSchema = Seq[StructField](
         StructField("table", StringType),
         StructField("op_type", StringType),
         StructField("op_ts", StringType),
         StructField("current_ts", StringType),
         StructField("pos", StringType),
         StructField("primary_keys", ArrayType(StringType)),
         StructField("normalizedkey", StringType),
         StructField("after", StructType(hiveSchema.fields.filter(it => 
!fieldsRemove.contains(it.name)).map(it => StructField(it.name.toUpperCase, 
it.dataType, it.nullable, it.metadata))))
       )
   
       var df = spark.read
         .schema(StructType(jsonSchema))
         .json(rddRaw.map(_.value()))
         .withColumn("normalizedkey", concat(lit(col("current_ts")), 
lit(col("pos"))).cast(StringType))
         .select("table", "op_type", "op_ts", "current_ts", "pos", 
"primary_keys", "normalizedkey", "after.*")
   
       val hiveTableName = "AWS_CASE"
       val pks = df.select($"primary_keys").first().getSeq[String](0).map(col)
   
       df.withColumn("key", concat(pks: _*))
         .withColumn("partition_id", (pks(0)/10000000).cast("Integer"))
         .write.format("org.apache.hudi")
         .option(HoodieWriteConfig.TABLE_NAME, hiveTableName)
         .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "key")
         .option(DataSourceWriteOptions.HIVE_DATABASE_OPT_KEY, "datalake")
         .option(DataSourceWriteOptions.OPERATION_OPT_KEY, 
DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
         .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, 
"normalizedkey")
         .option(DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY, "true")
         .option(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY, hiveTableName)
         .option(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY, 
classOf[MultiPartKeysValueExtractor].getName)
         .option(DataSourceWriteOptions.STORAGE_TYPE_OPT_KEY, 
DataSourceWriteOptions.MOR_STORAGE_TYPE_OPT_VAL)
         
.option(DataSourceWriteOptions.DEFAULT_HIVE_ASSUME_DATE_PARTITION_OPT_VAL, 
"false")
         .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, 
"partition_id")
         .option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY, 
"partition_id")
         .option(DataSourceWriteOptions.HIVE_URL_OPT_KEY, 
"jdbc:hive2://10.xx.x.xxx:10000")
         .option("hoodie.consistency.check.enabled", "true")
         .option("hoodie.cleaner.policy", "KEEP_LATEST_FILE_VERSIONS")
         .option("hoodie.keep.max.commits", 2)
         .option("hoodie.keep.min.commits", 1)
         .option("hoodie.parquet.compression.codec", "snappy")
         .option("hoodie.cleaner.commits.retained", 0)
         .option("hoodie.parquet.max.file.size", 1073741824)
         .option("hoodie.parquet.small.file.limit", 943718400)
         .mode(SaveMode.Append)
         .save("s3://bucket01/AWS_CASE")
   
     }
   
     def main(args: Array[String]): Unit = {
   
       val ssc = new StreamingContext(sc, Minutes(5))
       val kafkaParams = Map[String, Object](
         "bootstrap.servers" -> "broker01:9092,broker02:9092,broker03:9092",
         "key.deserializer" -> classOf[StringDeserializer],
         "value.deserializer" -> classOf[StringDeserializer],
         "group.id" -> "aws_case",
         "enable.auto.commit" -> (false: java.lang.Boolean),
         "auto.offset.reset" -> "earliest"
       )
   
       val stream = KafkaUtils.createDirectStream[String, String](
         ssc,
         PreferConsistent,
         Subscribe[String, String](Array("TOPIC-01"), kafkaParams)
       )
   
       stream.foreachRDD(rddRaw => {
         val offsetRanges = rddRaw.asInstanceOf[HasOffsetRanges].offsetRanges
         if (!rddRaw.isEmpty()) {
           processRDD(rddRaw)
         }
         stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
       })
       ssc.start()
       ssc.awaitTermination()
     }
   }
   
   Build.sbt:
   
   name := "aws-case-hudi"
   version := "0.1"
   scalaVersion := "2.11.12"
   libraryDependencies ++= Seq(
     "org.apache.spark" %% "spark-streaming" % "2.4.4" % "provided",
     "org.apache.spark" %% "spark-streaming-kafka-0-10" % "2.4.4",
     "org.apache.spark" %% "spark-sql" % "2.4.4",
     "org.apache.spark" %% "spark-core" % "2.4.4",
     "org.apache.hudi" % "hudi-spark-bundle" % "0.5.0-incubating",
     "org.apache.httpcomponents" % "httpclient" % "4.5.12",
   )
   
   Best regards,
   bobgalvao
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to