[ https://issues.apache.org/jira/browse/SPARK-25588?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16642200#comment-16642200 ]
Michael Heuer commented on SPARK-25588: --------------------------------------- I moved the version back to 2.3.2, see [https://github.com/bigdatagenomics/adam/pull/2055] and created this more succinct failing unit test [https://github.com/bigdatagenomics/adam/blob/2551654a284a4efba70aff3a2efa8f5e29bb8ea3/adam-core/src/test/scala/org/bdgenomics/adam/rdd/read/Issue2058Suite.scala] > SchemaParseException: Can't redefine: list when reading from Parquet > -------------------------------------------------------------------- > > Key: SPARK-25588 > URL: https://issues.apache.org/jira/browse/SPARK-25588 > Project: Spark > Issue Type: Bug > Components: SQL > Affects Versions: 2.3.2 > Environment: Spark version 2.3.2 > Reporter: Michael Heuer > Priority: Major > > In ADAM, a library downstream of Spark, we use Avro to define a schema, > generate Java classes from the Avro schema using the avro-maven-plugin, and > generate Scala Products from the Avro schema using our own code generation > library. > In the code path demonstrated by the following unit test, we write out to > Parquet and read back in using an RDD of Avro-generated Java classes and then > write out to Parquet and read back in using a Dataset of Avro-generated Scala > Products. > {code:scala} > sparkTest("transform reads to variant rdd") { > val reads = sc.loadAlignments(testFile("small.sam")) > def checkSave(variants: VariantRDD) { > val tempPath = tmpLocation(".adam") > variants.saveAsParquet(tempPath) > assert(sc.loadVariants(tempPath).rdd.count === 20) > } > val variants: VariantRDD = reads.transmute[Variant, VariantProduct, > VariantRDD]( > (rdd: RDD[AlignmentRecord]) => { > rdd.map(AlignmentRecordRDDSuite.varFn) > }) > checkSave(variants) > val sqlContext = SQLContext.getOrCreate(sc) > import sqlContext.implicits._ > val variantsDs: VariantRDD = reads.transmuteDataset[Variant, > VariantProduct, VariantRDD]( > (ds: Dataset[AlignmentRecordProduct]) => { > ds.map(r => { > VariantProduct.fromAvro( > AlignmentRecordRDDSuite.varFn(r.toAvro)) > }) > }) > checkSave(variantsDs) > } > {code} > https://github.com/bigdatagenomics/adam/blob/master/adam-core/src/test/scala/org/bdgenomics/adam/rdd/read/AlignmentRecordRDDSuite.scala#L1540 > Note the schema in Parquet are different: > RDD code path > {noformat} > $ parquet-tools schema > /var/folders/m6/4yqn_4q129lbth_dq3qzj_8h0000gn/T/TempSuite3400691035694870641.adam/part-r-00000.gz.parquet > message org.bdgenomics.formats.avro.Variant { > optional binary contigName (UTF8); > optional int64 start; > optional int64 end; > required group names (LIST) { > repeated binary array (UTF8); > } > optional boolean splitFromMultiAllelic; > optional binary referenceAllele (UTF8); > optional binary alternateAllele (UTF8); > optional double quality; > optional boolean filtersApplied; > optional boolean filtersPassed; > required group filtersFailed (LIST) { > repeated binary array (UTF8); > } > optional group annotation { > optional binary ancestralAllele (UTF8); > optional int32 alleleCount; > optional int32 readDepth; > optional int32 forwardReadDepth; > optional int32 reverseReadDepth; > optional int32 referenceReadDepth; > optional int32 referenceForwardReadDepth; > optional int32 referenceReverseReadDepth; > optional float alleleFrequency; > optional binary cigar (UTF8); > optional boolean dbSnp; > optional boolean hapMap2; > optional boolean hapMap3; > optional boolean validated; > optional boolean thousandGenomes; > optional boolean somatic; > required group transcriptEffects (LIST) { > repeated group array { > optional binary alternateAllele (UTF8); > required group effects (LIST) { > repeated binary array (UTF8); > } > optional binary geneName (UTF8); > optional binary geneId (UTF8); > optional binary featureType (UTF8); > optional binary featureId (UTF8); > optional binary biotype (UTF8); > optional int32 rank; > optional int32 total; > optional binary genomicHgvs (UTF8); > optional binary transcriptHgvs (UTF8); > optional binary proteinHgvs (UTF8); > optional int32 cdnaPosition; > optional int32 cdnaLength; > optional int32 cdsPosition; > optional int32 cdsLength; > optional int32 proteinPosition; > optional int32 proteinLength; > optional int32 distance; > required group messages (LIST) { > repeated binary array (ENUM); > } > } > } > required group attributes (MAP) { > repeated group map (MAP_KEY_VALUE) { > required binary key (UTF8); > required binary value (UTF8); > } > } > } > } > {noformat} > Dataset code path: > {noformat} > $ parquet-tools schema > /var/folders/m6/4yqn_4q129lbth_dq3qzj_8h0000gn/T/TempSuite2879366708769671307.adam/part-00000-b123eb8b-2648-4648-8096-b3de95343141-c000.snappy.parquet > message spark_schema { > optional binary contigName (UTF8); > optional int64 start; > optional int64 end; > optional group names (LIST) { > repeated group list { > optional binary element (UTF8); > } > } > optional boolean splitFromMultiAllelic; > optional binary referenceAllele (UTF8); > optional binary alternateAllele (UTF8); > optional double quality; > optional boolean filtersApplied; > optional boolean filtersPassed; > optional group filtersFailed (LIST) { > repeated group list { > optional binary element (UTF8); > } > } > optional group annotation { > optional binary ancestralAllele (UTF8); > optional int32 alleleCount; > optional int32 readDepth; > optional int32 forwardReadDepth; > optional int32 reverseReadDepth; > optional int32 referenceReadDepth; > optional int32 referenceForwardReadDepth; > optional int32 referenceReverseReadDepth; > optional float alleleFrequency; > optional binary cigar (UTF8); > optional boolean dbSnp; > optional boolean hapMap2; > optional boolean hapMap3; > optional boolean validated; > optional boolean thousandGenomes; > optional boolean somatic; > optional group transcriptEffects (LIST) { > repeated group list { > optional group element { > optional binary alternateAllele (UTF8); > optional group effects (LIST) { > repeated group list { > optional binary element (UTF8); > } > } > optional binary geneName (UTF8); > optional binary geneId (UTF8); > optional binary featureType (UTF8); > optional binary featureId (UTF8); > optional binary biotype (UTF8); > optional int32 rank; > optional int32 total; > optional binary genomicHgvs (UTF8); > optional binary transcriptHgvs (UTF8); > optional binary proteinHgvs (UTF8); > optional int32 cdnaPosition; > optional int32 cdnaLength; > optional int32 cdsPosition; > optional int32 cdsLength; > optional int32 proteinPosition; > optional int32 proteinLength; > optional int32 distance; > optional group messages (LIST) { > repeated group list { > optional binary element (UTF8); > } > } > } > } > } > optional group attributes (MAP) { > repeated group key_value { > required binary key (UTF8); > optional binary value (UTF8); > } > } > } > } > {noformat} > With Spark 2.4.0 (RC2), and Parquet dependency version 1.10.0, the Dataset > path now fails > {noformat} > $ mvn test > ... > - transform reads to variant rdd *** FAILED *** > org.apache.spark.SparkException: Job aborted due to stage failure: > Task 0 in stage 3.0 failed 1 times, most recent failure: Lost task 0.0 in > stage 3.0 (TID 3, localhost, executor driver): > org.apache.avro.SchemaParseException: Can't redefine: list > at org.apache.avro.Schema$Names.put(Schema.java:1128) > at org.apache.avro.Schema$NamedSchema.writeNameRef(Schema.java:562) > at org.apache.avro.Schema$RecordSchema.toJson(Schema.java:690) > at org.apache.avro.Schema$ArraySchema.toJson(Schema.java:805) > at org.apache.avro.Schema$UnionSchema.toJson(Schema.java:882) > at org.apache.avro.Schema$RecordSchema.fieldsToJson(Schema.java:716) > at org.apache.avro.Schema$RecordSchema.toJson(Schema.java:701) > at org.apache.avro.Schema$UnionSchema.toJson(Schema.java:882) > at org.apache.avro.Schema$RecordSchema.fieldsToJson(Schema.java:716) > at org.apache.avro.Schema$RecordSchema.toJson(Schema.java:701) > at org.apache.avro.Schema.toString(Schema.java:324) > at > org.apache.avro.SchemaCompatibility.checkReaderWriterCompatibility(SchemaCompatibility.java:68) > at > org.apache.parquet.avro.AvroRecordConverter.isElementType(AvroRecordConverter.java:866) > at > org.apache.parquet.avro.AvroIndexedRecordConverter$AvroArrayConverter.<init>(AvroIndexedRecordConverter.java:333) > at > org.apache.parquet.avro.AvroIndexedRecordConverter.newConverter(AvroIndexedRecordConverter.java:172) > at > org.apache.parquet.avro.AvroIndexedRecordConverter.<init>(AvroIndexedRecordConverter.java:94) > at > org.apache.parquet.avro.AvroIndexedRecordConverter.newConverter(AvroIndexedRecordConverter.java:168) > at > org.apache.parquet.avro.AvroIndexedRecordConverter.<init>(AvroIndexedRecordConverter.java:94) > at > org.apache.parquet.avro.AvroIndexedRecordConverter.<init>(AvroIndexedRecordConverter.java:66) > at > org.apache.parquet.avro.AvroCompatRecordMaterializer.<init>(AvroCompatRecordMaterializer.java:34) > at > org.apache.parquet.avro.AvroReadSupport.newCompatMaterializer(AvroReadSupport.java:144) > at > org.apache.parquet.avro.AvroReadSupport.prepareForRead(AvroReadSupport.java:136) > at > org.apache.parquet.hadoop.InternalParquetRecordReader.initialize(InternalParquetRecordReader.java:204) > at > org.apache.parquet.hadoop.ParquetRecordReader.initializeInternalReader(ParquetRecordReader.java:182) > at > org.apache.parquet.hadoop.ParquetRecordReader.initialize(ParquetRecordReader.java:140) > at > org.apache.spark.rdd.NewHadoopRDD$$anon$1.liftedTree1$1(NewHadoopRDD.scala:199) > at > org.apache.spark.rdd.NewHadoopRDD$$anon$1.<init>(NewHadoopRDD.scala:196) > at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:151) > at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:70) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) > at org.apache.spark.scheduler.Task.run(Task.scala:121) > at > org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402) > at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > 2018-09-29 21:39:47 ERROR TaskSetManager:70 - Task 0 in stage 3.0 failed 1 > times; aborting job > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1866) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) > at > org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1866) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926) > at scala.Option.foreach(Option.scala:257) > at > org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:926) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2100) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2049) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2038) > at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49) > at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:2126) > at org.apache.spark.rdd.RDD.count(RDD.scala:1168) > at > org.bdgenomics.adam.rdd.read.AlignmentRecordRDDSuite$$anonfun$78.checkSave$6(AlignmentRecordRDDSuite.scala:1551) > at > org.bdgenomics.adam.rdd.read.AlignmentRecordRDDSuite$$anonfun$78.apply$mcV$sp(AlignmentRecordRDDSuite.scala:1579) > at > org.bdgenomics.utils.misc.SparkFunSuite$$anonfun$sparkTest$1.apply$mcV$sp(SparkFunSuite.scala:102) > at > org.bdgenomics.utils.misc.SparkFunSuite$$anonfun$sparkTest$1.apply(SparkFunSuite.scala:98) > at > org.bdgenomics.utils.misc.SparkFunSuite$$anonfun$sparkTest$1.apply(SparkFunSuite.scala:98) > at > org.scalatest.Transformer$$anonfun$apply$1.apply$mcV$sp(Transformer.scala:22) > at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85) > at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104) > at org.scalatest.Transformer.apply(Transformer.scala:22) > at org.scalatest.Transformer.apply(Transformer.scala:20) > at org.scalatest.FunSuiteLike$$anon$1.apply(FunSuiteLike.scala:166) > at org.scalatest.Suite$class.withFixture(Suite.scala:1122) > at org.scalatest.FunSuite.withFixture(FunSuite.scala:1555) > at > org.scalatest.FunSuiteLike$class.invokeWithFixture$1(FunSuiteLike.scala:163) > at > org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:175) > at > org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:175) > at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306) > at org.scalatest.FunSuiteLike$class.runTest(FunSuiteLike.scala:175) > at > org.bdgenomics.adam.util.ADAMFunSuite.org$scalatest$BeforeAndAfter$$super$runTest(ADAMFunSuite.scala:24) > at org.scalatest.BeforeAndAfter$class.runTest(BeforeAndAfter.scala:200) > at org.bdgenomics.adam.util.ADAMFunSuite.runTest(ADAMFunSuite.scala:24) > at > org.scalatest.FunSuiteLike$$anonfun$runTests$1.apply(FunSuiteLike.scala:208) > at > org.scalatest.FunSuiteLike$$anonfun$runTests$1.apply(FunSuiteLike.scala:208) > at > org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:413) > at > org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:401) > at scala.collection.immutable.List.foreach(List.scala:392) > at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401) > at > org.scalatest.SuperEngine.org$scalatest$SuperEngine$$runTestsInBranch(Engine.scala:396) > at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:483) > at org.scalatest.FunSuiteLike$class.runTests(FunSuiteLike.scala:208) > at org.scalatest.FunSuite.runTests(FunSuite.scala:1555) > at org.scalatest.Suite$class.run(Suite.scala:1424) > at > org.scalatest.FunSuite.org$scalatest$FunSuiteLike$$super$run(FunSuite.scala:1555) > at org.scalatest.FunSuiteLike$$anonfun$run$1.apply(FunSuiteLike.scala:212) > at org.scalatest.FunSuiteLike$$anonfun$run$1.apply(FunSuiteLike.scala:212) > at org.scalatest.SuperEngine.runImpl(Engine.scala:545) > at org.scalatest.FunSuiteLike$class.run(FunSuiteLike.scala:212) > at > org.bdgenomics.adam.util.ADAMFunSuite.org$scalatest$BeforeAndAfter$$super$run(ADAMFunSuite.scala:24) > at org.scalatest.BeforeAndAfter$class.run(BeforeAndAfter.scala:241) > at org.bdgenomics.adam.util.ADAMFunSuite.run(ADAMFunSuite.scala:24) > at org.scalatest.tools.SuiteRunner.run(SuiteRunner.scala:55) > at > org.scalatest.tools.Runner$$anonfun$doRunRunRunDaDoRunRun$3.apply(Runner.scala:2563) > at > org.scalatest.tools.Runner$$anonfun$doRunRunRunDaDoRunRun$3.apply(Runner.scala:2557) > at scala.collection.immutable.List.foreach(List.scala:392) > at org.scalatest.tools.Runner$.doRunRunRunDaDoRunRun(Runner.scala:2557) > at > org.scalatest.tools.Runner$$anonfun$runOptionallyWithPassFailReporter$2.apply(Runner.scala:1044) > at > org.scalatest.tools.Runner$$anonfun$runOptionallyWithPassFailReporter$2.apply(Runner.scala:1043) > at > org.scalatest.tools.Runner$.withClassLoaderAndDispatchReporter(Runner.scala:2722) > at > org.scalatest.tools.Runner$.runOptionallyWithPassFailReporter(Runner.scala:1043) > at org.scalatest.tools.Runner$.main(Runner.scala:860) > at org.scalatest.tools.Runner.main(Runner.scala) > Cause: org.apache.avro.SchemaParseException: Can't redefine: list > at org.apache.avro.Schema$Names.put(Schema.java:1128) > at org.apache.avro.Schema$NamedSchema.writeNameRef(Schema.java:562) > at org.apache.avro.Schema$RecordSchema.toJson(Schema.java:690) > at org.apache.avro.Schema$ArraySchema.toJson(Schema.java:805) > at org.apache.avro.Schema$UnionSchema.toJson(Schema.java:882) > at org.apache.avro.Schema$RecordSchema.fieldsToJson(Schema.java:716) > at org.apache.avro.Schema$RecordSchema.toJson(Schema.java:701) > at org.apache.avro.Schema$UnionSchema.toJson(Schema.java:882) > at org.apache.avro.Schema$RecordSchema.fieldsToJson(Schema.java:716) > at org.apache.avro.Schema$RecordSchema.toJson(Schema.java:701) > at org.apache.avro.Schema.toString(Schema.java:324) > at > org.apache.avro.SchemaCompatibility.checkReaderWriterCompatibility(SchemaCompatibility.java:68) > at > org.apache.parquet.avro.AvroRecordConverter.isElementType(AvroRecordConverter.java:866) > at > org.apache.parquet.avro.AvroIndexedRecordConverter$AvroArrayConverter.<init>(AvroIndexedRecordConverter.java:333) > at > org.apache.parquet.avro.AvroIndexedRecordConverter.newConverter(AvroIndexedRecordConverter.java:172) > at > org.apache.parquet.avro.AvroIndexedRecordConverter.<init>(AvroIndexedRecordConverter.java:94) > at > org.apache.parquet.avro.AvroIndexedRecordConverter.newConverter(AvroIndexedRecordConverter.java:168) > at > org.apache.parquet.avro.AvroIndexedRecordConverter.<init>(AvroIndexedRecordConverter.java:94) > at > org.apache.parquet.avro.AvroIndexedRecordConverter.<init>(AvroIndexedRecordConverter.java:66) > at > org.apache.parquet.avro.AvroCompatRecordMaterializer.<init>(AvroCompatRecordMaterializer.java:34) > at > org.apache.parquet.avro.AvroReadSupport.newCompatMaterializer(AvroReadSupport.java:144) > at > org.apache.parquet.avro.AvroReadSupport.prepareForRead(AvroReadSupport.java:136) > at > org.apache.parquet.hadoop.InternalParquetRecordReader.initialize(InternalParquetRecordReader.java:204) > at > org.apache.parquet.hadoop.ParquetRecordReader.initializeInternalReader(ParquetRecordReader.java:182) > at > org.apache.parquet.hadoop.ParquetRecordReader.initialize(ParquetRecordReader.java:140) > at > org.apache.spark.rdd.NewHadoopRDD$$anon$1.liftedTree1$1(NewHadoopRDD.scala:199) > at org.apache.spark.rdd.NewHadoopRDD$$anon$1.<init>(NewHadoopRDD.scala:196) > at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:151) > at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:70) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) > at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) > at org.apache.spark.scheduler.Task.run(Task.scala:121) > at > org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402) > at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > {noformat} > Regression from Spark version 2.3.1. -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org