[ 
https://issues.apache.org/jira/browse/BEAM-6517?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16755656#comment-16755656
 ] 

Vu Ho commented on BEAM-6517:
-----------------------------

Thanks everyone for looking into this. The problem I have is only with Spark 
(unfortunately we cant use Flink which has been excellent during testing). For 
now we will continue with our shading strategy and use SimpleFunction. 
Hopefully once we upgrade to Spark 2.4.0 the problem would go away.

I agree with [~kenn] that Beam should try to be compatible with older version 
of other libraries. This is something we are concerned when choosing Spark. 
Despite being the most popular query engine, the support hasn't been great as 
we run into all sort of class loader issues.

> Pipeline fails with deserializing lambda function on Spark (MapElements)
> ------------------------------------------------------------------------
>
>                 Key: BEAM-6517
>                 URL: https://issues.apache.org/jira/browse/BEAM-6517
>             Project: Beam
>          Issue Type: Bug
>          Components: io-java-avro
>    Affects Versions: 2.9.0
>            Reporter: Vu Ho
>            Priority: Minor
>
> I'm trying to read from Parquet using Spark runner. Initial attempt failed 
> because of version mismatch (Spark 2.3.2 use older Parquet). I tried shading 
> parquet and avro, and it successfully read the Parquet record. However when I 
> tried to access the record field using lambda function:
> {code:java}
> p.apply(FileIO.match().filepattern(options.getInputFile()))
>   .apply(FileIO.readMatches())
>   .apply(ParquetIO.readFiles(schema))
>   .apply(MapElements.into(TypeDescriptors.strings()).via(it -> 
> it.get("name").toString()))
>   .apply(TextIO.write().to(options.getOutput()));{code}
>  
> {code:java}
> Exception in thread "main" java.lang.IllegalArgumentException: unable to 
> deserialize org.apache.beam.sdk.transforms.MapElements$1@1292071f
> at 
> org.apache.beam.sdk.util.SerializableUtils.deserializeFromByteArray(SerializableUtils.java:74)
> at 
> org.apache.beam.sdk.util.SerializableUtils.clone(SerializableUtils.java:100)
> at 
> org.apache.beam.runners.spark.translation.MultiDoFnFunction.<init>(MultiDoFnFunction.java:103)
> at 
> org.apache.beam.runners.spark.translation.TransformTranslator$6.evaluate(TransformTranslator.java:374)
> at 
> org.apache.beam.runners.spark.translation.TransformTranslator$6.evaluate(TransformTranslator.java:340)
> at 
> org.apache.beam.runners.spark.SparkRunner$Evaluator.doVisitTransform(SparkRunner.java:438)
> at 
> org.apache.beam.runners.spark.SparkRunner$Evaluator.visitPrimitiveTransform(SparkRunner.java:426)
> at 
> org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:657)
> at 
> org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:649)
> at 
> org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:649)
> at 
> org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:649)
> at 
> org.apache.beam.sdk.runners.TransformHierarchy$Node.access$600(TransformHierarchy.java:311)
> at 
> org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:245)
> at org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:458)
> at 
> org.apache.beam.runners.spark.SparkRunner.lambda$run$1(SparkRunner.java:223)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.io.IOException: unexpected exception type
> at java.io.ObjectStreamClass.throwMiscException(ObjectStreamClass.java:1736)
> at java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1266)
> at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2078)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
> at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1975)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567)
> at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
> at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
> at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
> at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
> at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
> at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
> at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
> at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
> at 
> org.apache.beam.sdk.util.SerializableUtils.deserializeFromByteArray(SerializableUtils.java:71)
> ... 19 more
> Caused by: java.lang.reflect.InvocationTargetException
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at java.lang.invoke.SerializedLambda.readResolve(SerializedLambda.java:230)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1260)
> ... 41 more
> Caused by: java.lang.IllegalArgumentException: Invalid lambda deserialization
> at 
> beamtest.examples.ParquetWordCount.$deserializeLambda$(ParquetWordCount.java:23)
> ... 51 more{code}
>  
> This doesn't happen if I use SimpleFunction, but many times lambda functions 
> are more convenient and readable 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to