[jira] [Commented] (BEAM-6517) Pipeline fails with deserializing lambda function on Spark (MapElements)
[ https://issues.apache.org/jira/browse/BEAM-6517?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16755656#comment-16755656 ] Vu Ho commented on BEAM-6517: - Thanks everyone for looking into this. The problem I have is only with Spark (unfortunately we cant use Flink which has been excellent during testing). For now we will continue with our shading strategy and use SimpleFunction. Hopefully once we upgrade to Spark 2.4.0 the problem would go away. I agree with [~kenn] that Beam should try to be compatible with older version of other libraries. This is something we are concerned when choosing Spark. Despite being the most popular query engine, the support hasn't been great as we run into all sort of class loader issues. > Pipeline fails with deserializing lambda function on Spark (MapElements) > > > Key: BEAM-6517 > URL: https://issues.apache.org/jira/browse/BEAM-6517 > Project: Beam > Issue Type: Bug > Components: io-java-avro >Affects Versions: 2.9.0 >Reporter: Vu Ho >Priority: Minor > > I'm trying to read from Parquet using Spark runner. Initial attempt failed > because of version mismatch (Spark 2.3.2 use older Parquet). I tried shading > parquet and avro, and it successfully read the Parquet record. However when I > tried to access the record field using lambda function: > {code:java} > p.apply(FileIO.match().filepattern(options.getInputFile())) > .apply(FileIO.readMatches()) > .apply(ParquetIO.readFiles(schema)) > .apply(MapElements.into(TypeDescriptors.strings()).via(it -> > it.get("name").toString())) > .apply(TextIO.write().to(options.getOutput()));{code} > > {code:java} > Exception in thread "main" java.lang.IllegalArgumentException: unable to > deserialize org.apache.beam.sdk.transforms.MapElements$1@1292071f > at > org.apache.beam.sdk.util.SerializableUtils.deserializeFromByteArray(SerializableUtils.java:74) > at > org.apache.beam.sdk.util.SerializableUtils.clone(SerializableUtils.java:100) > at > org.apache.beam.runners.spark.translation.MultiDoFnFunction.(MultiDoFnFunction.java:103) > at > org.apache.beam.runners.spark.translation.TransformTranslator$6.evaluate(TransformTranslator.java:374) > at > org.apache.beam.runners.spark.translation.TransformTranslator$6.evaluate(TransformTranslator.java:340) > at > org.apache.beam.runners.spark.SparkRunner$Evaluator.doVisitTransform(SparkRunner.java:438) > at > org.apache.beam.runners.spark.SparkRunner$Evaluator.visitPrimitiveTransform(SparkRunner.java:426) > at > org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:657) > at > org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:649) > at > org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:649) > at > org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:649) > at > org.apache.beam.sdk.runners.TransformHierarchy$Node.access$600(TransformHierarchy.java:311) > at > org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:245) > at org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:458) > at > org.apache.beam.runners.spark.SparkRunner.lambda$run$1(SparkRunner.java:223) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.io.IOException: unexpected exception type > at java.io.ObjectStreamClass.throwMiscException(ObjectStreamClass.java:1736) > at java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1266) > at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2078) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573) > at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1975) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567) > at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211) > at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573) > at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211) > at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573) > at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287) > at
[jira] [Commented] (BEAM-6517) Pipeline fails with deserializing lambda function on Spark (MapElements)
[ https://issues.apache.org/jira/browse/BEAM-6517?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16755129#comment-16755129 ] Ismaël Mejía commented on BEAM-6517: Mmm I like the idea of having a separate module for Avro. And as you mention Avro is almost impossible to shade/vendor. It seems so far that Beam Schema has not created a strong link with Avro. But then what is the plan, to deprecate the `sdk/java/core` one? and eventually remove it? If we do that and Avro is provided in the extension it could be a viable solution. I can give it a try but I want to be sure on the will of you guys to change `sdks/java/core` to remove it, otherwise the fix will be useless because core will leak it and clash with the runners one. Btw, not getting Avro out of core I think is a rush mistake we did before 2.0.0 (thinking about our discussion long time ago [~dhalp...@google.com] :P). > Pipeline fails with deserializing lambda function on Spark (MapElements) > > > Key: BEAM-6517 > URL: https://issues.apache.org/jira/browse/BEAM-6517 > Project: Beam > Issue Type: Bug > Components: io-java-avro >Affects Versions: 2.9.0 >Reporter: Vu Ho >Priority: Minor > > I'm trying to read from Parquet using Spark runner. Initial attempt failed > because of version mismatch (Spark 2.3.2 use older Parquet). I tried shading > parquet and avro, and it successfully read the Parquet record. However when I > tried to access the record field using lambda function: > {code:java} > p.apply(FileIO.match().filepattern(options.getInputFile())) > .apply(FileIO.readMatches()) > .apply(ParquetIO.readFiles(schema)) > .apply(MapElements.into(TypeDescriptors.strings()).via(it -> > it.get("name").toString())) > .apply(TextIO.write().to(options.getOutput()));{code} > > {code:java} > Exception in thread "main" java.lang.IllegalArgumentException: unable to > deserialize org.apache.beam.sdk.transforms.MapElements$1@1292071f > at > org.apache.beam.sdk.util.SerializableUtils.deserializeFromByteArray(SerializableUtils.java:74) > at > org.apache.beam.sdk.util.SerializableUtils.clone(SerializableUtils.java:100) > at > org.apache.beam.runners.spark.translation.MultiDoFnFunction.(MultiDoFnFunction.java:103) > at > org.apache.beam.runners.spark.translation.TransformTranslator$6.evaluate(TransformTranslator.java:374) > at > org.apache.beam.runners.spark.translation.TransformTranslator$6.evaluate(TransformTranslator.java:340) > at > org.apache.beam.runners.spark.SparkRunner$Evaluator.doVisitTransform(SparkRunner.java:438) > at > org.apache.beam.runners.spark.SparkRunner$Evaluator.visitPrimitiveTransform(SparkRunner.java:426) > at > org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:657) > at > org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:649) > at > org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:649) > at > org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:649) > at > org.apache.beam.sdk.runners.TransformHierarchy$Node.access$600(TransformHierarchy.java:311) > at > org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:245) > at org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:458) > at > org.apache.beam.runners.spark.SparkRunner.lambda$run$1(SparkRunner.java:223) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.io.IOException: unexpected exception type > at java.io.ObjectStreamClass.throwMiscException(ObjectStreamClass.java:1736) > at java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1266) > at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2078) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573) > at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1975) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567) > at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211) > at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573) > at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211) > at
[jira] [Commented] (BEAM-6517) Pipeline fails with deserializing lambda function on Spark (MapElements)
[ https://issues.apache.org/jira/browse/BEAM-6517?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16755072#comment-16755072 ] Kenneth Knowles commented on BEAM-6517: --- As a workaround we could have a separate jar for "OtherAvroIO" that supports 1.7.7 > Pipeline fails with deserializing lambda function on Spark (MapElements) > > > Key: BEAM-6517 > URL: https://issues.apache.org/jira/browse/BEAM-6517 > Project: Beam > Issue Type: Bug > Components: io-java-avro >Affects Versions: 2.9.0 >Reporter: Vu Ho >Priority: Minor > > I'm trying to read from Parquet using Spark runner. Initial attempt failed > because of version mismatch (Spark 2.3.2 use older Parquet). I tried shading > parquet and avro, and it successfully read the Parquet record. However when I > tried to access the record field using lambda function: > {code:java} > p.apply(FileIO.match().filepattern(options.getInputFile())) > .apply(FileIO.readMatches()) > .apply(ParquetIO.readFiles(schema)) > .apply(MapElements.into(TypeDescriptors.strings()).via(it -> > it.get("name").toString())) > .apply(TextIO.write().to(options.getOutput()));{code} > > {code:java} > Exception in thread "main" java.lang.IllegalArgumentException: unable to > deserialize org.apache.beam.sdk.transforms.MapElements$1@1292071f > at > org.apache.beam.sdk.util.SerializableUtils.deserializeFromByteArray(SerializableUtils.java:74) > at > org.apache.beam.sdk.util.SerializableUtils.clone(SerializableUtils.java:100) > at > org.apache.beam.runners.spark.translation.MultiDoFnFunction.(MultiDoFnFunction.java:103) > at > org.apache.beam.runners.spark.translation.TransformTranslator$6.evaluate(TransformTranslator.java:374) > at > org.apache.beam.runners.spark.translation.TransformTranslator$6.evaluate(TransformTranslator.java:340) > at > org.apache.beam.runners.spark.SparkRunner$Evaluator.doVisitTransform(SparkRunner.java:438) > at > org.apache.beam.runners.spark.SparkRunner$Evaluator.visitPrimitiveTransform(SparkRunner.java:426) > at > org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:657) > at > org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:649) > at > org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:649) > at > org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:649) > at > org.apache.beam.sdk.runners.TransformHierarchy$Node.access$600(TransformHierarchy.java:311) > at > org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:245) > at org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:458) > at > org.apache.beam.runners.spark.SparkRunner.lambda$run$1(SparkRunner.java:223) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.io.IOException: unexpected exception type > at java.io.ObjectStreamClass.throwMiscException(ObjectStreamClass.java:1736) > at java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1266) > at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2078) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573) > at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1975) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567) > at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211) > at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573) > at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211) > at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573) > at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211) > at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573) > at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211) > at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069) > at
[jira] [Commented] (BEAM-6517) Pipeline fails with deserializing lambda function on Spark (MapElements)
[ https://issues.apache.org/jira/browse/BEAM-6517?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16755070#comment-16755070 ] Kenneth Knowles commented on BEAM-6517: --- I haven't looked into vendoring Avro. Does Beam core depend on Avro or just AvroIO? I know it ships with the core, but maybe that is the problem. AvroIO has Avro classes on its surface that the user will need to use, so it cannot use a vendored Avro library. But we should aspire to support the most-used versions of the library out there. What changes would be necessary to also support 1.7.7? > Pipeline fails with deserializing lambda function on Spark (MapElements) > > > Key: BEAM-6517 > URL: https://issues.apache.org/jira/browse/BEAM-6517 > Project: Beam > Issue Type: Bug > Components: io-java-avro >Affects Versions: 2.9.0 >Reporter: Vu Ho >Priority: Minor > > I'm trying to read from Parquet using Spark runner. Initial attempt failed > because of version mismatch (Spark 2.3.2 use older Parquet). I tried shading > parquet and avro, and it successfully read the Parquet record. However when I > tried to access the record field using lambda function: > {code:java} > p.apply(FileIO.match().filepattern(options.getInputFile())) > .apply(FileIO.readMatches()) > .apply(ParquetIO.readFiles(schema)) > .apply(MapElements.into(TypeDescriptors.strings()).via(it -> > it.get("name").toString())) > .apply(TextIO.write().to(options.getOutput()));{code} > > {code:java} > Exception in thread "main" java.lang.IllegalArgumentException: unable to > deserialize org.apache.beam.sdk.transforms.MapElements$1@1292071f > at > org.apache.beam.sdk.util.SerializableUtils.deserializeFromByteArray(SerializableUtils.java:74) > at > org.apache.beam.sdk.util.SerializableUtils.clone(SerializableUtils.java:100) > at > org.apache.beam.runners.spark.translation.MultiDoFnFunction.(MultiDoFnFunction.java:103) > at > org.apache.beam.runners.spark.translation.TransformTranslator$6.evaluate(TransformTranslator.java:374) > at > org.apache.beam.runners.spark.translation.TransformTranslator$6.evaluate(TransformTranslator.java:340) > at > org.apache.beam.runners.spark.SparkRunner$Evaluator.doVisitTransform(SparkRunner.java:438) > at > org.apache.beam.runners.spark.SparkRunner$Evaluator.visitPrimitiveTransform(SparkRunner.java:426) > at > org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:657) > at > org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:649) > at > org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:649) > at > org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:649) > at > org.apache.beam.sdk.runners.TransformHierarchy$Node.access$600(TransformHierarchy.java:311) > at > org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:245) > at org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:458) > at > org.apache.beam.runners.spark.SparkRunner.lambda$run$1(SparkRunner.java:223) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.io.IOException: unexpected exception type > at java.io.ObjectStreamClass.throwMiscException(ObjectStreamClass.java:1736) > at java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1266) > at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2078) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573) > at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1975) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567) > at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211) > at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573) > at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211) > at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573) > at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211) > at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069) > at
[jira] [Commented] (BEAM-6517) Pipeline fails with deserializing lambda function on Spark (MapElements)
[ https://issues.apache.org/jira/browse/BEAM-6517?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16754887#comment-16754887 ] Ismaël Mejía commented on BEAM-6517: Thanks for the details [~vho] . This looks more like you have been bitten by an incompatibility issue on the different versions of Avro. Beam core depends on Avro 1.8.2 but Spark core depends on Avro 1.7.7 for version 2.3.2 and older. If you can my suggestion would be to provide the Spark dependency to version 2.4.0 so it is aligned with Beam. The real long term would be to vendor guava but this seems not easy. [~kenn] have you or someone else already looked into this? > Pipeline fails with deserializing lambda function on Spark (MapElements) > > > Key: BEAM-6517 > URL: https://issues.apache.org/jira/browse/BEAM-6517 > Project: Beam > Issue Type: Bug > Components: io-java-avro >Affects Versions: 2.9.0 >Reporter: Vu Ho >Priority: Minor > > I'm trying to read from Parquet using Spark runner. Initial attempt failed > because of version mismatch (Spark 2.3.2 use older Parquet). I tried shading > parquet and avro, and it successfully read the Parquet record. However when I > tried to access the record field using lambda function: > {code:java} > p.apply(FileIO.match().filepattern(options.getInputFile())) > .apply(FileIO.readMatches()) > .apply(ParquetIO.readFiles(schema)) > .apply(MapElements.into(TypeDescriptors.strings()).via(it -> > it.get("name").toString())) > .apply(TextIO.write().to(options.getOutput()));{code} > > {code:java} > Exception in thread "main" java.lang.IllegalArgumentException: unable to > deserialize org.apache.beam.sdk.transforms.MapElements$1@1292071f > at > org.apache.beam.sdk.util.SerializableUtils.deserializeFromByteArray(SerializableUtils.java:74) > at > org.apache.beam.sdk.util.SerializableUtils.clone(SerializableUtils.java:100) > at > org.apache.beam.runners.spark.translation.MultiDoFnFunction.(MultiDoFnFunction.java:103) > at > org.apache.beam.runners.spark.translation.TransformTranslator$6.evaluate(TransformTranslator.java:374) > at > org.apache.beam.runners.spark.translation.TransformTranslator$6.evaluate(TransformTranslator.java:340) > at > org.apache.beam.runners.spark.SparkRunner$Evaluator.doVisitTransform(SparkRunner.java:438) > at > org.apache.beam.runners.spark.SparkRunner$Evaluator.visitPrimitiveTransform(SparkRunner.java:426) > at > org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:657) > at > org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:649) > at > org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:649) > at > org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:649) > at > org.apache.beam.sdk.runners.TransformHierarchy$Node.access$600(TransformHierarchy.java:311) > at > org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:245) > at org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:458) > at > org.apache.beam.runners.spark.SparkRunner.lambda$run$1(SparkRunner.java:223) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.io.IOException: unexpected exception type > at java.io.ObjectStreamClass.throwMiscException(ObjectStreamClass.java:1736) > at java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1266) > at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2078) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573) > at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1975) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567) > at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211) > at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573) > at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211) > at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573) > at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211) > at
[jira] [Commented] (BEAM-6517) Pipeline fails with deserializing lambda function on Spark (MapElements)
[ https://issues.apache.org/jira/browse/BEAM-6517?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16754873#comment-16754873 ] Etienne Chauchot commented on BEAM-6517: [~vho] what happens if you test on Flink and without the shade (as it is not needed) - careful fat jar shades libraries, test it in local mode -. If it is related to shade and not spark [~iemejia] is better suited than me as he know shades a lot better than me. > Pipeline fails with deserializing lambda function on Spark (MapElements) > > > Key: BEAM-6517 > URL: https://issues.apache.org/jira/browse/BEAM-6517 > Project: Beam > Issue Type: Bug > Components: runner-spark >Affects Versions: 2.9.0 >Reporter: Vu Ho >Assignee: Vu Ho >Priority: Minor > > I'm trying to read from Parquet using Spark runner. Initial attempt failed > because of version mismatch (Spark 2.3.2 use older Parquet). I tried shading > parquet and avro, and it successfully read the Parquet record. However when I > tried to access the record field using lambda function: > {code:java} > p.apply(FileIO.match().filepattern(options.getInputFile())) > .apply(FileIO.readMatches()) > .apply(ParquetIO.readFiles(schema)) > .apply(MapElements.into(TypeDescriptors.strings()).via(it -> > it.get("name").toString())) > .apply(TextIO.write().to(options.getOutput()));{code} > > {code:java} > Exception in thread "main" java.lang.IllegalArgumentException: unable to > deserialize org.apache.beam.sdk.transforms.MapElements$1@1292071f > at > org.apache.beam.sdk.util.SerializableUtils.deserializeFromByteArray(SerializableUtils.java:74) > at > org.apache.beam.sdk.util.SerializableUtils.clone(SerializableUtils.java:100) > at > org.apache.beam.runners.spark.translation.MultiDoFnFunction.(MultiDoFnFunction.java:103) > at > org.apache.beam.runners.spark.translation.TransformTranslator$6.evaluate(TransformTranslator.java:374) > at > org.apache.beam.runners.spark.translation.TransformTranslator$6.evaluate(TransformTranslator.java:340) > at > org.apache.beam.runners.spark.SparkRunner$Evaluator.doVisitTransform(SparkRunner.java:438) > at > org.apache.beam.runners.spark.SparkRunner$Evaluator.visitPrimitiveTransform(SparkRunner.java:426) > at > org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:657) > at > org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:649) > at > org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:649) > at > org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:649) > at > org.apache.beam.sdk.runners.TransformHierarchy$Node.access$600(TransformHierarchy.java:311) > at > org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:245) > at org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:458) > at > org.apache.beam.runners.spark.SparkRunner.lambda$run$1(SparkRunner.java:223) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.io.IOException: unexpected exception type > at java.io.ObjectStreamClass.throwMiscException(ObjectStreamClass.java:1736) > at java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1266) > at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2078) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573) > at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1975) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567) > at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211) > at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573) > at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211) > at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573) > at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211) > at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573) > at
[jira] [Commented] (BEAM-6517) Pipeline fails with deserializing lambda function on Spark (MapElements)
[ https://issues.apache.org/jira/browse/BEAM-6517?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16754591#comment-16754591 ] Vu Ho commented on BEAM-6517: - Updated: I tried this on Flink after shading the library (by default it wouldn't need shading but just out of curiosity). It throws the same issue. So I guess it's not only related to Spark. > Pipeline fails with deserializing lambda function on Spark (MapElements) > > > Key: BEAM-6517 > URL: https://issues.apache.org/jira/browse/BEAM-6517 > Project: Beam > Issue Type: Bug > Components: runner-spark >Affects Versions: 2.9.0 >Reporter: Vu Ho >Assignee: Vu Ho >Priority: Minor > > I'm trying to read from Parquet using Spark runner. Initial attempt failed > because of version mismatch (Spark 2.3.2 use older Parquet). I tried shading > parquet and avro, and it successfully read the Parquet record. However when I > tried to access the record field using lambda function: > {code:java} > p.apply(FileIO.match().filepattern(options.getInputFile())) > .apply(FileIO.readMatches()) > .apply(ParquetIO.readFiles(schema)) > .apply(MapElements.into(TypeDescriptors.strings()).via(it -> > it.get("name").toString())) > .apply(TextIO.write().to(options.getOutput()));{code} > > {code:java} > Exception in thread "main" java.lang.IllegalArgumentException: unable to > deserialize org.apache.beam.sdk.transforms.MapElements$1@1292071f > at > org.apache.beam.sdk.util.SerializableUtils.deserializeFromByteArray(SerializableUtils.java:74) > at > org.apache.beam.sdk.util.SerializableUtils.clone(SerializableUtils.java:100) > at > org.apache.beam.runners.spark.translation.MultiDoFnFunction.(MultiDoFnFunction.java:103) > at > org.apache.beam.runners.spark.translation.TransformTranslator$6.evaluate(TransformTranslator.java:374) > at > org.apache.beam.runners.spark.translation.TransformTranslator$6.evaluate(TransformTranslator.java:340) > at > org.apache.beam.runners.spark.SparkRunner$Evaluator.doVisitTransform(SparkRunner.java:438) > at > org.apache.beam.runners.spark.SparkRunner$Evaluator.visitPrimitiveTransform(SparkRunner.java:426) > at > org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:657) > at > org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:649) > at > org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:649) > at > org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:649) > at > org.apache.beam.sdk.runners.TransformHierarchy$Node.access$600(TransformHierarchy.java:311) > at > org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:245) > at org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:458) > at > org.apache.beam.runners.spark.SparkRunner.lambda$run$1(SparkRunner.java:223) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.io.IOException: unexpected exception type > at java.io.ObjectStreamClass.throwMiscException(ObjectStreamClass.java:1736) > at java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1266) > at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2078) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573) > at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1975) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567) > at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211) > at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573) > at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211) > at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573) > at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211) > at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573) > at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287) > at
[jira] [Commented] (BEAM-6517) Pipeline fails with deserializing lambda function on Spark (MapElements)
[ https://issues.apache.org/jira/browse/BEAM-6517?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16754475#comment-16754475 ] Kenneth Knowles commented on BEAM-6517: --- Please pass the bug back with comment. Also [~iemejia] [~echauchot] who both know the Spark runner much better than I do. I don't necessarily think this is a Spark issue, but it is worth having experts (not me) look at your exception. > Pipeline fails with deserializing lambda function on Spark (MapElements) > > > Key: BEAM-6517 > URL: https://issues.apache.org/jira/browse/BEAM-6517 > Project: Beam > Issue Type: Bug > Components: runner-spark >Affects Versions: 2.9.0 >Reporter: Vu Ho >Assignee: Vu Ho >Priority: Minor > > I'm trying to read from Parquet using Spark runner. Initial attempt failed > because of version mismatch (Spark 2.3.2 use older Parquet). I tried shading > parquet and avro, and it successfully read the Parquet record. However when I > tried to access the record field using lambda function: > {code:java} > p.apply(FileIO.match().filepattern(options.getInputFile())) > .apply(FileIO.readMatches()) > .apply(ParquetIO.readFiles(schema)) > .apply(MapElements.into(TypeDescriptors.strings()).via(it -> > it.get("name").toString())) > .apply(TextIO.write().to(options.getOutput()));{code} > > {code:java} > Exception in thread "main" java.lang.IllegalArgumentException: unable to > deserialize org.apache.beam.sdk.transforms.MapElements$1@1292071f > at > org.apache.beam.sdk.util.SerializableUtils.deserializeFromByteArray(SerializableUtils.java:74) > at > org.apache.beam.sdk.util.SerializableUtils.clone(SerializableUtils.java:100) > at > org.apache.beam.runners.spark.translation.MultiDoFnFunction.(MultiDoFnFunction.java:103) > at > org.apache.beam.runners.spark.translation.TransformTranslator$6.evaluate(TransformTranslator.java:374) > at > org.apache.beam.runners.spark.translation.TransformTranslator$6.evaluate(TransformTranslator.java:340) > at > org.apache.beam.runners.spark.SparkRunner$Evaluator.doVisitTransform(SparkRunner.java:438) > at > org.apache.beam.runners.spark.SparkRunner$Evaluator.visitPrimitiveTransform(SparkRunner.java:426) > at > org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:657) > at > org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:649) > at > org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:649) > at > org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:649) > at > org.apache.beam.sdk.runners.TransformHierarchy$Node.access$600(TransformHierarchy.java:311) > at > org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:245) > at org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:458) > at > org.apache.beam.runners.spark.SparkRunner.lambda$run$1(SparkRunner.java:223) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.io.IOException: unexpected exception type > at java.io.ObjectStreamClass.throwMiscException(ObjectStreamClass.java:1736) > at java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1266) > at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2078) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573) > at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1975) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567) > at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211) > at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573) > at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211) > at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573) > at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211) > at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573) > at
[jira] [Commented] (BEAM-6517) Pipeline fails with deserializing lambda function on Spark (MapElements)
[ https://issues.apache.org/jira/browse/BEAM-6517?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16754473#comment-16754473 ] Kenneth Knowles commented on BEAM-6517: --- This is usually caused by a serialization/deserialization mismatch between the bytecode launching the job and the bytecode running the job. Do you perhaps have a fat jar with a slightly different version of the class? > Pipeline fails with deserializing lambda function on Spark (MapElements) > > > Key: BEAM-6517 > URL: https://issues.apache.org/jira/browse/BEAM-6517 > Project: Beam > Issue Type: Bug > Components: runner-spark >Affects Versions: 2.9.0 >Reporter: Vu Ho >Assignee: Kenneth Knowles >Priority: Minor > > I'm trying to read from Parquet using Spark runner. Initial attempt failed > because of version mismatch (Spark 2.3.2 use older Parquet). I tried shading > parquet and avro, and it successfully read the Parquet record. However when I > tried to access the record field using lambda function: > {code:java} > p.apply(FileIO.match().filepattern(options.getInputFile())) > .apply(FileIO.readMatches()) > .apply(ParquetIO.readFiles(schema)) > .apply(MapElements.into(TypeDescriptors.strings()).via(it -> > it.get("name").toString())) > .apply(TextIO.write().to(options.getOutput()));{code} > > {code:java} > Exception in thread "main" java.lang.IllegalArgumentException: unable to > deserialize org.apache.beam.sdk.transforms.MapElements$1@1292071f > at > org.apache.beam.sdk.util.SerializableUtils.deserializeFromByteArray(SerializableUtils.java:74) > at > org.apache.beam.sdk.util.SerializableUtils.clone(SerializableUtils.java:100) > at > org.apache.beam.runners.spark.translation.MultiDoFnFunction.(MultiDoFnFunction.java:103) > at > org.apache.beam.runners.spark.translation.TransformTranslator$6.evaluate(TransformTranslator.java:374) > at > org.apache.beam.runners.spark.translation.TransformTranslator$6.evaluate(TransformTranslator.java:340) > at > org.apache.beam.runners.spark.SparkRunner$Evaluator.doVisitTransform(SparkRunner.java:438) > at > org.apache.beam.runners.spark.SparkRunner$Evaluator.visitPrimitiveTransform(SparkRunner.java:426) > at > org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:657) > at > org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:649) > at > org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:649) > at > org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:649) > at > org.apache.beam.sdk.runners.TransformHierarchy$Node.access$600(TransformHierarchy.java:311) > at > org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:245) > at org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:458) > at > org.apache.beam.runners.spark.SparkRunner.lambda$run$1(SparkRunner.java:223) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.io.IOException: unexpected exception type > at java.io.ObjectStreamClass.throwMiscException(ObjectStreamClass.java:1736) > at java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1266) > at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2078) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573) > at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1975) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567) > at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211) > at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573) > at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211) > at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573) > at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211) > at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573) > at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287) >