[ https://issues.apache.org/jira/browse/HUDI-716?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17061925#comment-17061925 ]
lamber-ken edited comment on HUDI-716 at 3/18/20, 5:29 PM: ----------------------------------------------------------- I tried to reproduce it, but it works ok. *Step1: Use hudi 0.5.0 generate datas* {code:java} export SPARK_HOME=/work/BigData/install/spark/spark-2.4.4-bin-hadoop2.7 ${SPARK_HOME}/bin/spark-shell \ --packages org.apache.hudi:hudi-spark-bundle:0.5.0-incubating \ --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' val tableName = "hudi_mor_table" val basePath = "file:///tmp/hudi_mor_table" var datas = List("""{ "name": "kenken", "ts": "qwer", "age": 12, "location": "latitude"}""") val df = spark.read.json(spark.sparkContext.parallelize(datas, 2)) df.write.format("org.apache.hudi"). option("hoodie.insert.shuffle.parallelism", "10"). option("hoodie.upsert.shuffle.parallelism", "10"). option("hoodie.delete.shuffle.parallelism", "10"). option("hoodie.bulkinsert.shuffle.parallelism", "10"). option("hoodie.datasource.write.recordkey.field", "name"). option("hoodie.datasource.write.partitionpath.field", "location"). option("hoodie.datasource.write.precombine.field", "ts"). option("hoodie.table.name", tableName). mode("Overwrite"). save(basePath) var datas = List.tabulate(30)(i => List(s"""{ "name": "kenken${i}", "ts": "zasz", "age": 123, "location": "latitude"}""")) for (data <- datas) { val df = spark.read.json(spark.sparkContext.parallelize(data, 2)) df.write.format("org.apache.hudi"). option("hoodie.insert.shuffle.parallelism", "10"). option("hoodie.upsert.shuffle.parallelism", "10"). option("hoodie.delete.shuffle.parallelism", "10"). option("hoodie.bulkinsert.shuffle.parallelism", "10"). option("hoodie.datasource.write.recordkey.field", "name"). option("hoodie.datasource.write.partitionpath.field", "location"). option("hoodie.datasource.write.precombine.field", "ts"). option("hoodie.table.name", tableName). option("hoodie.keep.max.commits", "5"). option("hoodie.keep.min.commits", "4"). option("hoodie.cleaner.commits.retained", "3"). mode("Append"). save(basePath) } spark.read.format("org.apache.hudi").load(basePath + "/*/").show() {code} *Step2: upgrade to hudi 0.5.1* {code:java} export SPARK_HOME=/work/BigData/install/spark/spark-2.4.4-bin-hadoop2.7 ${SPARK_HOME}/bin/spark-shell \ --packages org.apache.hudi:hudi-spark-bundle_2.11:0.5.1-incubating,org.apache.spark:spark-avro_2.11:2.4.4 \ --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' val tableName = "hudi_mor_table" val basePath = "file:///tmp/hudi_mor_table" var datas = List.tabulate(30)(i => List(s"""{ "name": "kenken${i}", "ts": "zasz", "age": 123, "location": "latitude"}""")) for (data <- datas) { val df = spark.read.json(spark.sparkContext.parallelize(data, 2)) df.write.format("org.apache.hudi"). option("hoodie.insert.shuffle.parallelism", "10"). option("hoodie.upsert.shuffle.parallelism", "10"). option("hoodie.delete.shuffle.parallelism", "10"). option("hoodie.bulkinsert.shuffle.parallelism", "10"). option("hoodie.datasource.write.recordkey.field", "name"). option("hoodie.datasource.write.partitionpath.field", "location"). option("hoodie.datasource.write.precombine.field", "ts"). option("hoodie.table.name", tableName). option("hoodie.keep.max.commits", "5"). option("hoodie.keep.min.commits", "4"). option("hoodie.cleaner.commits.retained", "3"). mode("Append"). save(basePath) } spark.read.format("org.apache.hudi").load(basePath + "/*/").show() {code} *Step3: upgrade to hudi master* {code:java} export SPARK_HOME=/work/BigData/install/spark/spark-2.4.4-bin-hadoop2.7 ${SPARK_HOME}/bin/spark-shell \ --driver-memory 6G \ --packages org.apache.spark:spark-avro_2.11:2.4.4 \ --jars `ls packaging/hudi-spark-bundle/target/hudi-spark-bundle_*.*-*.*.*-SNAPSHOT.jar` \ --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' val tableName = "hudi_mor_table" val basePath = "file:///tmp/hudi_mor_table" var datas = List.tabulate(30)(i => List(s"""{ "name": "kenken${i}", "ts": "zasz", "age": 123, "location": "latitude"}""")) for (data <- datas) { val df = spark.read.json(spark.sparkContext.parallelize(data, 2)) df.write.format("org.apache.hudi"). option("hoodie.insert.shuffle.parallelism", "10"). option("hoodie.upsert.shuffle.parallelism", "10"). option("hoodie.delete.shuffle.parallelism", "10"). option("hoodie.bulkinsert.shuffle.parallelism", "10"). option("hoodie.datasource.write.recordkey.field", "name"). option("hoodie.datasource.write.partitionpath.field", "location"). option("hoodie.datasource.write.precombine.field", "ts"). option("hoodie.table.name", tableName). option("hoodie.keep.max.commits", "5"). option("hoodie.keep.min.commits", "4"). option("hoodie.cleaner.commits.retained", "3"). mode("Append"). save(basePath) } spark.read.format("org.apache.hudi").load(basePath + "/*/").show() {code} was (Author: lamber-ken): I tried to reproduce it, but it works ok. *Step1: Use hudi 0.5.0 generate datas* {code:java} export SPARK_HOME=/work/BigData/install/spark/spark-2.4.4-bin-hadoop2.7 ${SPARK_HOME}/bin/spark-shell \ --packages org.apache.hudi:hudi-spark-bundle:0.5.0-incubating \ --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' val tableName = "hudi_mor_table" val basePath = "file:///tmp/hudi_mor_table" var datas = List("""{ "name": "kenken", "ts": "qwer", "age": 12, "location": "latitude"}""") val df = spark.read.json(spark.sparkContext.parallelize(datas, 2)) df.write.format("org.apache.hudi"). option("hoodie.insert.shuffle.parallelism", "10"). option("hoodie.upsert.shuffle.parallelism", "10"). option("hoodie.delete.shuffle.parallelism", "10"). option("hoodie.bulkinsert.shuffle.parallelism", "10"). option("hoodie.datasource.write.recordkey.field", "name"). option("hoodie.datasource.write.partitionpath.field", "location"). option("hoodie.datasource.write.precombine.field", "ts"). option("hoodie.table.name", tableName). mode("Overwrite"). save(basePath) var datas = List.tabulate(30)(i => List(s"""{ "name": "kenken${i}", "ts": "zasz", "age": 123, "location": "latitude"}""")) for (data <- datas) { val df = spark.read.json(spark.sparkContext.parallelize(data, 2)) df.write.format("org.apache.hudi"). option("hoodie.insert.shuffle.parallelism", "10"). option("hoodie.upsert.shuffle.parallelism", "10"). option("hoodie.delete.shuffle.parallelism", "10"). option("hoodie.bulkinsert.shuffle.parallelism", "10"). option("hoodie.datasource.write.recordkey.field", "name"). option("hoodie.datasource.write.partitionpath.field", "location"). option("hoodie.datasource.write.precombine.field", "ts"). option("hoodie.table.name", tableName). option("hoodie.keep.max.commits", "5"). option("hoodie.keep.min.commits", "4"). option("hoodie.cleaner.commits.retained", "3"). mode("Append"). save(basePath) } spark.read.format("org.apache.hudi").load(basePath + "/*/").show() {code} *Step2: upgrade to hudi 0.5.1* {code:java} export SPARK_HOME=/work/BigData/install/spark/spark-2.4.4-bin-hadoop2.7 ${SPARK_HOME}/bin/spark-shell \ --packages org.apache.hudi:hudi-spark-bundle_2.11:0.5.1-incubating,org.apache.spark:spark-avro_2.11:2.4.4 \ --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' val tableName = "hudi_mor_table" val basePath = "file:///tmp/hudi_mor_table" var datas = List.tabulate(30)(i => List(s"""{ "name": "kenken${i}", "ts": "zasz", "age": 123, "location": "latitude"}""")) for (data <- datas) { val df = spark.read.json(spark.sparkContext.parallelize(data, 2)) df.write.format("org.apache.hudi"). option("hoodie.insert.shuffle.parallelism", "10"). option("hoodie.upsert.shuffle.parallelism", "10"). option("hoodie.delete.shuffle.parallelism", "10"). option("hoodie.bulkinsert.shuffle.parallelism", "10"). option("hoodie.datasource.write.recordkey.field", "name"). option("hoodie.datasource.write.partitionpath.field", "location"). option("hoodie.datasource.write.precombine.field", "ts"). option("hoodie.table.name", tableName). option("hoodie.keep.max.commits", "5"). option("hoodie.keep.min.commits", "4"). option("hoodie.cleaner.commits.retained", "3"). mode("Append"). save(basePath) } spark.read.format("org.apache.hudi").load(basePath + "/*/").show() {code} > Exception: Not an Avro data file when running HoodieCleanClient.runClean > ------------------------------------------------------------------------ > > Key: HUDI-716 > URL: https://issues.apache.org/jira/browse/HUDI-716 > Project: Apache Hudi (incubating) > Issue Type: Bug > Components: DeltaStreamer > Reporter: Alexander Filipchik > Assignee: lamber-ken > Priority: Major > Fix For: 0.6.0 > > > Just upgraded to upstream master from 0.5 and seeing an issue at the end of > the delta sync run: > 20/03/17 02:13:49 ERROR HoodieDeltaStreamer: Got error running delta sync > once. Shutting down20/03/17 02:13:49 ERROR HoodieDeltaStreamer: Got error > running delta sync once. Shutting > downorg.apache.hudi.exception.HoodieIOException: Not an Avro data file at > org.apache.hudi.client.HoodieCleanClient.runClean(HoodieCleanClient.java:144) > at > org.apache.hudi.client.HoodieCleanClient.lambda$clean$0(HoodieCleanClient.java:88) > at > java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382) > at > java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:580) > at org.apache.hudi.client.HoodieCleanClient.clean(HoodieCleanClient.java:86) > at org.apache.hudi.client.HoodieWriteClient.clean(HoodieWriteClient.java:843) > at > org.apache.hudi.client.HoodieWriteClient.postCommit(HoodieWriteClient.java:520) > at > org.apache.hudi.client.AbstractHoodieWriteClient.commit(AbstractHoodieWriteClient.java:168) > at > org.apache.hudi.client.AbstractHoodieWriteClient.commit(AbstractHoodieWriteClient.java:111) > at > org.apache.hudi.utilities.deltastreamer.DeltaSync.writeToSink(DeltaSync.java:395) > at > org.apache.hudi.utilities.deltastreamer.DeltaSync.syncOnce(DeltaSync.java:237) > at > org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.sync(HoodieDeltaStreamer.java:121) > at > org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.main(HoodieDeltaStreamer.java:294) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) at > org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52) > at > org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:845) > at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:161) at > org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:184) at > org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86) at > org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:920) > at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:929) at > org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)Caused by: > java.io.IOException: Not an Avro data file at > org.apache.avro.file.DataFileReader.openReader(DataFileReader.java:50) at > org.apache.hudi.common.util.AvroUtils.deserializeAvroMetadata(AvroUtils.java:147) > at > org.apache.hudi.common.util.CleanerUtils.getCleanerPlan(CleanerUtils.java:87) > at > org.apache.hudi.client.HoodieCleanClient.runClean(HoodieCleanClient.java:141) > ... 24 more > > It is attempting to read an old cleanup file (2 month old) and crashing > -- This message was sent by Atlassian Jira (v8.3.4#803005)