[ 
https://issues.apache.org/jira/browse/HUDI-716?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17061925#comment-17061925
 ] 

lamber-ken edited comment on HUDI-716 at 3/18/20, 5:29 PM:
-----------------------------------------------------------

I tried to reproduce it, but it works ok.

*Step1: Use hudi 0.5.0 generate datas*
{code:java}
export SPARK_HOME=/work/BigData/install/spark/spark-2.4.4-bin-hadoop2.7
${SPARK_HOME}/bin/spark-shell \
  --packages org.apache.hudi:hudi-spark-bundle:0.5.0-incubating \
  --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer'

val tableName = "hudi_mor_table"
val basePath = "file:///tmp/hudi_mor_table"
var datas = List("""{ "name": "kenken", "ts": "qwer", "age": 12, "location": 
"latitude"}""")

val df = spark.read.json(spark.sparkContext.parallelize(datas, 2))
df.write.format("org.apache.hudi").
    option("hoodie.insert.shuffle.parallelism", "10").
    option("hoodie.upsert.shuffle.parallelism", "10").
    option("hoodie.delete.shuffle.parallelism", "10").
    option("hoodie.bulkinsert.shuffle.parallelism", "10").
    option("hoodie.datasource.write.recordkey.field", "name").
    option("hoodie.datasource.write.partitionpath.field", "location").
    option("hoodie.datasource.write.precombine.field", "ts").
    option("hoodie.table.name", tableName).
    mode("Overwrite").
    save(basePath)

var datas = List.tabulate(30)(i => List(s"""{ "name": "kenken${i}", "ts": 
"zasz", "age": 123, "location": "latitude"}"""))
for (data <- datas) {
  val df = spark.read.json(spark.sparkContext.parallelize(data, 2))
  df.write.format("org.apache.hudi").
    option("hoodie.insert.shuffle.parallelism", "10").
    option("hoodie.upsert.shuffle.parallelism", "10").
    option("hoodie.delete.shuffle.parallelism", "10").
    option("hoodie.bulkinsert.shuffle.parallelism", "10").
    option("hoodie.datasource.write.recordkey.field", "name").
    option("hoodie.datasource.write.partitionpath.field", "location").
    option("hoodie.datasource.write.precombine.field", "ts").
    option("hoodie.table.name", tableName).
    option("hoodie.keep.max.commits", "5").
    option("hoodie.keep.min.commits", "4").
    option("hoodie.cleaner.commits.retained", "3").
    mode("Append").
    save(basePath)
}
    
spark.read.format("org.apache.hudi").load(basePath + "/*/").show()
{code}
 

*Step2: upgrade to hudi 0.5.1*
{code:java}
export SPARK_HOME=/work/BigData/install/spark/spark-2.4.4-bin-hadoop2.7
${SPARK_HOME}/bin/spark-shell \
  --packages 
org.apache.hudi:hudi-spark-bundle_2.11:0.5.1-incubating,org.apache.spark:spark-avro_2.11:2.4.4
 \
  --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer'

val tableName = "hudi_mor_table"
val basePath = "file:///tmp/hudi_mor_table"
var datas = List.tabulate(30)(i => List(s"""{ "name": "kenken${i}", "ts": 
"zasz", "age": 123, "location": "latitude"}"""))

for (data <- datas) {
  val df = spark.read.json(spark.sparkContext.parallelize(data, 2))
  df.write.format("org.apache.hudi").
    option("hoodie.insert.shuffle.parallelism", "10").
    option("hoodie.upsert.shuffle.parallelism", "10").
    option("hoodie.delete.shuffle.parallelism", "10").
    option("hoodie.bulkinsert.shuffle.parallelism", "10").
    option("hoodie.datasource.write.recordkey.field", "name").
    option("hoodie.datasource.write.partitionpath.field", "location").
    option("hoodie.datasource.write.precombine.field", "ts").
    option("hoodie.table.name", tableName).
    option("hoodie.keep.max.commits", "5").
    option("hoodie.keep.min.commits", "4").
    option("hoodie.cleaner.commits.retained", "3").
    mode("Append").
    save(basePath)
}
    
spark.read.format("org.apache.hudi").load(basePath + "/*/").show()
{code}
 

*Step3: upgrade to hudi master*
{code:java}
export SPARK_HOME=/work/BigData/install/spark/spark-2.4.4-bin-hadoop2.7
${SPARK_HOME}/bin/spark-shell \
    --driver-memory 6G \
    --packages org.apache.spark:spark-avro_2.11:2.4.4 \
    --jars `ls 
packaging/hudi-spark-bundle/target/hudi-spark-bundle_*.*-*.*.*-SNAPSHOT.jar` \
    --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer'

val tableName = "hudi_mor_table"
val basePath = "file:///tmp/hudi_mor_table"
var datas = List.tabulate(30)(i => List(s"""{ "name": "kenken${i}", "ts": 
"zasz", "age": 123, "location": "latitude"}"""))

for (data <- datas) {
  val df = spark.read.json(spark.sparkContext.parallelize(data, 2))
  df.write.format("org.apache.hudi").
    option("hoodie.insert.shuffle.parallelism", "10").
    option("hoodie.upsert.shuffle.parallelism", "10").
    option("hoodie.delete.shuffle.parallelism", "10").
    option("hoodie.bulkinsert.shuffle.parallelism", "10").
    option("hoodie.datasource.write.recordkey.field", "name").
    option("hoodie.datasource.write.partitionpath.field", "location").
    option("hoodie.datasource.write.precombine.field", "ts").
    option("hoodie.table.name", tableName).
    option("hoodie.keep.max.commits", "5").
    option("hoodie.keep.min.commits", "4").
    option("hoodie.cleaner.commits.retained", "3").
    mode("Append").
    save(basePath)
}
    
spark.read.format("org.apache.hudi").load(basePath + "/*/").show()
{code}
 


was (Author: lamber-ken):
I tried to reproduce it, but it works ok.

*Step1: Use hudi 0.5.0 generate datas*
{code:java}
export SPARK_HOME=/work/BigData/install/spark/spark-2.4.4-bin-hadoop2.7
${SPARK_HOME}/bin/spark-shell \
  --packages org.apache.hudi:hudi-spark-bundle:0.5.0-incubating \
  --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer'

val tableName = "hudi_mor_table"
val basePath = "file:///tmp/hudi_mor_table"
var datas = List("""{ "name": "kenken", "ts": "qwer", "age": 12, "location": 
"latitude"}""")

val df = spark.read.json(spark.sparkContext.parallelize(datas, 2))
df.write.format("org.apache.hudi").
    option("hoodie.insert.shuffle.parallelism", "10").
    option("hoodie.upsert.shuffle.parallelism", "10").
    option("hoodie.delete.shuffle.parallelism", "10").
    option("hoodie.bulkinsert.shuffle.parallelism", "10").
    option("hoodie.datasource.write.recordkey.field", "name").
    option("hoodie.datasource.write.partitionpath.field", "location").
    option("hoodie.datasource.write.precombine.field", "ts").
    option("hoodie.table.name", tableName).
    mode("Overwrite").
    save(basePath)

var datas = List.tabulate(30)(i => List(s"""{ "name": "kenken${i}", "ts": 
"zasz", "age": 123, "location": "latitude"}"""))
for (data <- datas) {
  val df = spark.read.json(spark.sparkContext.parallelize(data, 2))
  df.write.format("org.apache.hudi").
    option("hoodie.insert.shuffle.parallelism", "10").
    option("hoodie.upsert.shuffle.parallelism", "10").
    option("hoodie.delete.shuffle.parallelism", "10").
    option("hoodie.bulkinsert.shuffle.parallelism", "10").
    option("hoodie.datasource.write.recordkey.field", "name").
    option("hoodie.datasource.write.partitionpath.field", "location").
    option("hoodie.datasource.write.precombine.field", "ts").
    option("hoodie.table.name", tableName).
    option("hoodie.keep.max.commits", "5").
    option("hoodie.keep.min.commits", "4").
    option("hoodie.cleaner.commits.retained", "3").
    mode("Append").
    save(basePath)
}
    
spark.read.format("org.apache.hudi").load(basePath + "/*/").show()
{code}
 

*Step2: upgrade to hudi 0.5.1*
{code:java}
export SPARK_HOME=/work/BigData/install/spark/spark-2.4.4-bin-hadoop2.7
${SPARK_HOME}/bin/spark-shell \
  --packages 
org.apache.hudi:hudi-spark-bundle_2.11:0.5.1-incubating,org.apache.spark:spark-avro_2.11:2.4.4
 \
  --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer'

val tableName = "hudi_mor_table"
val basePath = "file:///tmp/hudi_mor_table"
var datas = List.tabulate(30)(i => List(s"""{ "name": "kenken${i}", "ts": 
"zasz", "age": 123, "location": "latitude"}"""))

for (data <- datas) {
  val df = spark.read.json(spark.sparkContext.parallelize(data, 2))
  df.write.format("org.apache.hudi").
    option("hoodie.insert.shuffle.parallelism", "10").
    option("hoodie.upsert.shuffle.parallelism", "10").
    option("hoodie.delete.shuffle.parallelism", "10").
    option("hoodie.bulkinsert.shuffle.parallelism", "10").
    option("hoodie.datasource.write.recordkey.field", "name").
    option("hoodie.datasource.write.partitionpath.field", "location").
    option("hoodie.datasource.write.precombine.field", "ts").
    option("hoodie.table.name", tableName).
    option("hoodie.keep.max.commits", "5").
    option("hoodie.keep.min.commits", "4").
    option("hoodie.cleaner.commits.retained", "3").
    mode("Append").
    save(basePath)
}
    
spark.read.format("org.apache.hudi").load(basePath + "/*/").show()
{code}

> Exception: Not an Avro data file when running HoodieCleanClient.runClean
> ------------------------------------------------------------------------
>
>                 Key: HUDI-716
>                 URL: https://issues.apache.org/jira/browse/HUDI-716
>             Project: Apache Hudi (incubating)
>          Issue Type: Bug
>          Components: DeltaStreamer
>            Reporter: Alexander Filipchik
>            Assignee: lamber-ken
>            Priority: Major
>             Fix For: 0.6.0
>
>
> Just upgraded to upstream master from 0.5 and seeing an issue at the end of 
> the delta sync run: 
> 20/03/17 02:13:49 ERROR HoodieDeltaStreamer: Got error running delta sync 
> once. Shutting down20/03/17 02:13:49 ERROR HoodieDeltaStreamer: Got error 
> running delta sync once. Shutting 
> downorg.apache.hudi.exception.HoodieIOException: Not an Avro data file at 
> org.apache.hudi.client.HoodieCleanClient.runClean(HoodieCleanClient.java:144) 
> at 
> org.apache.hudi.client.HoodieCleanClient.lambda$clean$0(HoodieCleanClient.java:88)
>  at 
> java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
>  at 
> java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:580) 
> at org.apache.hudi.client.HoodieCleanClient.clean(HoodieCleanClient.java:86) 
> at org.apache.hudi.client.HoodieWriteClient.clean(HoodieWriteClient.java:843) 
> at 
> org.apache.hudi.client.HoodieWriteClient.postCommit(HoodieWriteClient.java:520)
>  at 
> org.apache.hudi.client.AbstractHoodieWriteClient.commit(AbstractHoodieWriteClient.java:168)
>  at 
> org.apache.hudi.client.AbstractHoodieWriteClient.commit(AbstractHoodieWriteClient.java:111)
>  at 
> org.apache.hudi.utilities.deltastreamer.DeltaSync.writeToSink(DeltaSync.java:395)
>  at 
> org.apache.hudi.utilities.deltastreamer.DeltaSync.syncOnce(DeltaSync.java:237)
>  at 
> org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.sync(HoodieDeltaStreamer.java:121)
>  at 
> org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.main(HoodieDeltaStreamer.java:294)
>  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>  at java.lang.reflect.Method.invoke(Method.java:498) at 
> org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52) 
> at 
> org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:845)
>  at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:161) at 
> org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:184) at 
> org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86) at 
> org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:920) 
> at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:929) at 
> org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)Caused by: 
> java.io.IOException: Not an Avro data file at 
> org.apache.avro.file.DataFileReader.openReader(DataFileReader.java:50) at 
> org.apache.hudi.common.util.AvroUtils.deserializeAvroMetadata(AvroUtils.java:147)
>  at 
> org.apache.hudi.common.util.CleanerUtils.getCleanerPlan(CleanerUtils.java:87) 
> at 
> org.apache.hudi.client.HoodieCleanClient.runClean(HoodieCleanClient.java:141) 
> ... 24 more
>  
> It is attempting to read an old cleanup file (2 month old) and crashing
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to