[ 
https://issues.apache.org/jira/browse/BEAM-6517?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16754591#comment-16754591
 ] 

Vu Ho edited comment on BEAM-6517 at 1/29/19 5:45 AM:
------------------------------------------------------

Updated: I tried this on Flink after shading the library (by default it 
wouldn't need shading but just out of curiosity). It throws the same issue.

Also this only happens with when the lambda expression contains GenericRecord, 
so I guess it's not only related to Spark but perhaps caused by shading 
Parquet& Avro library.

 

 


was (Author: vho):
Updated: I tried this on Flink after shading the library (by default it 
wouldn't need shading but just out of curiosity). It throws the same issue. So 
I guess it's not only related to Spark.

 

 

> Pipeline fails with deserializing lambda function on Spark (MapElements)
> ------------------------------------------------------------------------
>
>                 Key: BEAM-6517
>                 URL: https://issues.apache.org/jira/browse/BEAM-6517
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-spark
>    Affects Versions: 2.9.0
>            Reporter: Vu Ho
>            Assignee: Vu Ho
>            Priority: Minor
>
> I'm trying to read from Parquet using Spark runner. Initial attempt failed 
> because of version mismatch (Spark 2.3.2 use older Parquet). I tried shading 
> parquet and avro, and it successfully read the Parquet record. However when I 
> tried to access the record field using lambda function:
> {code:java}
> p.apply(FileIO.match().filepattern(options.getInputFile()))
>   .apply(FileIO.readMatches())
>   .apply(ParquetIO.readFiles(schema))
>   .apply(MapElements.into(TypeDescriptors.strings()).via(it -> 
> it.get("name").toString()))
>   .apply(TextIO.write().to(options.getOutput()));{code}
>  
> {code:java}
> Exception in thread "main" java.lang.IllegalArgumentException: unable to 
> deserialize org.apache.beam.sdk.transforms.MapElements$1@1292071f
> at 
> org.apache.beam.sdk.util.SerializableUtils.deserializeFromByteArray(SerializableUtils.java:74)
> at 
> org.apache.beam.sdk.util.SerializableUtils.clone(SerializableUtils.java:100)
> at 
> org.apache.beam.runners.spark.translation.MultiDoFnFunction.<init>(MultiDoFnFunction.java:103)
> at 
> org.apache.beam.runners.spark.translation.TransformTranslator$6.evaluate(TransformTranslator.java:374)
> at 
> org.apache.beam.runners.spark.translation.TransformTranslator$6.evaluate(TransformTranslator.java:340)
> at 
> org.apache.beam.runners.spark.SparkRunner$Evaluator.doVisitTransform(SparkRunner.java:438)
> at 
> org.apache.beam.runners.spark.SparkRunner$Evaluator.visitPrimitiveTransform(SparkRunner.java:426)
> at 
> org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:657)
> at 
> org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:649)
> at 
> org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:649)
> at 
> org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:649)
> at 
> org.apache.beam.sdk.runners.TransformHierarchy$Node.access$600(TransformHierarchy.java:311)
> at 
> org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:245)
> at org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:458)
> at 
> org.apache.beam.runners.spark.SparkRunner.lambda$run$1(SparkRunner.java:223)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.io.IOException: unexpected exception type
> at java.io.ObjectStreamClass.throwMiscException(ObjectStreamClass.java:1736)
> at java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1266)
> at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2078)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
> at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1975)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567)
> at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
> at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
> at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
> at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
> at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
> at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
> at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
> at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
> at 
> org.apache.beam.sdk.util.SerializableUtils.deserializeFromByteArray(SerializableUtils.java:71)
> ... 19 more
> Caused by: java.lang.reflect.InvocationTargetException
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at java.lang.invoke.SerializedLambda.readResolve(SerializedLambda.java:230)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1260)
> ... 41 more
> Caused by: java.lang.IllegalArgumentException: Invalid lambda deserialization
> at 
> beamtest.examples.ParquetWordCount.$deserializeLambda$(ParquetWordCount.java:23)
> ... 51 more{code}
>  
> This doesn't happen if I use SimpleFunction, but many times lambda functions 
> are more convenient and readable 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to