[ 
https://issues.apache.org/jira/browse/BEAM-6517?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16754887#comment-16754887
 ] 

Ismaël Mejía commented on BEAM-6517:
------------------------------------

Thanks for the details [~vho] . This looks more like you have been bitten by an 
incompatibility issue on the different versions of Avro. Beam core depends on 
Avro 1.8.2 but Spark core depends on Avro 1.7.7 for version 2.3.2 and older. If 
you can my suggestion would be to provide the Spark dependency to version 2.4.0 
so it is  aligned with Beam.

The real long term would be to vendor guava but this seems not easy. [~kenn] 
have you or someone else already looked into this?

 

> Pipeline fails with deserializing lambda function on Spark (MapElements)
> ------------------------------------------------------------------------
>
>                 Key: BEAM-6517
>                 URL: https://issues.apache.org/jira/browse/BEAM-6517
>             Project: Beam
>          Issue Type: Bug
>          Components: io-java-avro
>    Affects Versions: 2.9.0
>            Reporter: Vu Ho
>            Priority: Minor
>
> I'm trying to read from Parquet using Spark runner. Initial attempt failed 
> because of version mismatch (Spark 2.3.2 use older Parquet). I tried shading 
> parquet and avro, and it successfully read the Parquet record. However when I 
> tried to access the record field using lambda function:
> {code:java}
> p.apply(FileIO.match().filepattern(options.getInputFile()))
>   .apply(FileIO.readMatches())
>   .apply(ParquetIO.readFiles(schema))
>   .apply(MapElements.into(TypeDescriptors.strings()).via(it -> 
> it.get("name").toString()))
>   .apply(TextIO.write().to(options.getOutput()));{code}
>  
> {code:java}
> Exception in thread "main" java.lang.IllegalArgumentException: unable to 
> deserialize org.apache.beam.sdk.transforms.MapElements$1@1292071f
> at 
> org.apache.beam.sdk.util.SerializableUtils.deserializeFromByteArray(SerializableUtils.java:74)
> at 
> org.apache.beam.sdk.util.SerializableUtils.clone(SerializableUtils.java:100)
> at 
> org.apache.beam.runners.spark.translation.MultiDoFnFunction.<init>(MultiDoFnFunction.java:103)
> at 
> org.apache.beam.runners.spark.translation.TransformTranslator$6.evaluate(TransformTranslator.java:374)
> at 
> org.apache.beam.runners.spark.translation.TransformTranslator$6.evaluate(TransformTranslator.java:340)
> at 
> org.apache.beam.runners.spark.SparkRunner$Evaluator.doVisitTransform(SparkRunner.java:438)
> at 
> org.apache.beam.runners.spark.SparkRunner$Evaluator.visitPrimitiveTransform(SparkRunner.java:426)
> at 
> org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:657)
> at 
> org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:649)
> at 
> org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:649)
> at 
> org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:649)
> at 
> org.apache.beam.sdk.runners.TransformHierarchy$Node.access$600(TransformHierarchy.java:311)
> at 
> org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:245)
> at org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:458)
> at 
> org.apache.beam.runners.spark.SparkRunner.lambda$run$1(SparkRunner.java:223)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.io.IOException: unexpected exception type
> at java.io.ObjectStreamClass.throwMiscException(ObjectStreamClass.java:1736)
> at java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1266)
> at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2078)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
> at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1975)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567)
> at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
> at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
> at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
> at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
> at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
> at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
> at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
> at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
> at 
> org.apache.beam.sdk.util.SerializableUtils.deserializeFromByteArray(SerializableUtils.java:71)
> ... 19 more
> Caused by: java.lang.reflect.InvocationTargetException
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at java.lang.invoke.SerializedLambda.readResolve(SerializedLambda.java:230)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1260)
> ... 41 more
> Caused by: java.lang.IllegalArgumentException: Invalid lambda deserialization
> at 
> beamtest.examples.ParquetWordCount.$deserializeLambda$(ParquetWordCount.java:23)
> ... 51 more{code}
>  
> This doesn't happen if I use SimpleFunction, but many times lambda functions 
> are more convenient and readable 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to