Re: Error when using @DefaultCoder(AvroCoder.class)

2020-07-08 Thread Rui Wang
Tried some code search in Beam repo but I didn't find the exact line
of code that throws your exception.

However, I believe for Java Classes you used in primitives (ParDo,
CombineFn) and coders, it's very likely you need to make them
serializable (i.e. implements Serializable).


-Rui

On Wed, Jul 8, 2020 at 6:23 AM Kirill Zhdanovich  wrote:
>
> Hi!
> I'm using Apache Beam Java(2.19.0) with Dataflow. I created class and 
> annotated it with DefaultCoder
>
> @DefaultCoder(AvroCoder.class)
> public class ProductCatalog {
>
> When I trying to submit it to cluster I get an error:
>
> Caused by: java.io.NotSerializableException: ...common.ProductCatalog
>
> If I add `implements Serializable` to the class definition everything works 
> fine. In the Apache Beam guide, I don't see anything about using implements 
> Serializable. What I'm doing wrong? Thank you in advance for your help


Re: Error when using @DefaultCoder(AvroCoder.class)

2020-07-08 Thread Luke Cwik
Can you provide the full stacktrace?

On Wed, Jul 8, 2020 at 12:33 PM Rui Wang  wrote:

> Tried some code search in Beam repo but I didn't find the exact line
> of code that throws your exception.
>
> However, I believe for Java Classes you used in primitives (ParDo,
> CombineFn) and coders, it's very likely you need to make them
> serializable (i.e. implements Serializable).
>
>
> -Rui
>
> On Wed, Jul 8, 2020 at 6:23 AM Kirill Zhdanovich 
> wrote:
> >
> > Hi!
> > I'm using Apache Beam Java(2.19.0) with Dataflow. I created class and
> annotated it with DefaultCoder
> >
> > @DefaultCoder(AvroCoder.class)
> > public class ProductCatalog {
> >
> > When I trying to submit it to cluster I get an error:
> >
> > Caused by: java.io.NotSerializableException: ...common.ProductCatalog
> >
> > If I add `implements Serializable` to the class definition everything
> works fine. In the Apache Beam guide, I don't see anything about using
> implements Serializable. What I'm doing wrong? Thank you in advance for
> your help
>


Re: Error when using @DefaultCoder(AvroCoder.class)

2020-07-08 Thread Kirill Zhdanovich
Hello Luke,
I will try to get "22 more" soon, not sure how to it though

23:11:11.339 [ERROR] [system.err] Exception in thread "main"
java.lang.IllegalArgumentException: unable to serialize
DoFnWithExecutionInformation{doFn=com.ikea.search.ab.bootstrap.Job$CreateSessionMetrics@444f44c5,
mainOutputTag=Tag:400#6929f09b03d242ca>,
sideInputMapping={},
schemaInformation=DoFnSchemaInformation{elementConverters=[]}}
23:11:11.339 [ERROR] [system.err] at
org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:55)
23:11:11.339 [ERROR] [system.err] at
org.apache.beam.runners.core.construction.ParDoTranslation.translateDoFn(ParDoTranslation.java:606)
23:11:11.339 [ERROR] [system.err] at
org.apache.beam.runners.dataflow.PrimitiveParDoSingleFactory$PayloadTranslator$1.translateDoFn(PrimitiveParDoSingleFactory.java:197)
23:11:11.339 [ERROR] [system.err] at
org.apache.beam.runners.core.construction.ParDoTranslation.payloadForParDoLike(ParDoTranslation.java:738)
23:11:11.339 [ERROR] [system.err] at
org.apache.beam.runners.dataflow.PrimitiveParDoSingleFactory$PayloadTranslator.payloadForParDoSingle(PrimitiveParDoSingleFactory.java:193)
23:11:11.340 [ERROR] [system.err] at
org.apache.beam.runners.dataflow.PrimitiveParDoSingleFactory$PayloadTranslator.translate(PrimitiveParDoSingleFactory.java:154)
23:11:11.340 [ERROR] [system.err] at
org.apache.beam.runners.core.construction.PTransformTranslation$KnownTransformPayloadTranslator.translate(PTransformTranslation.java:417)
23:11:11.340 [ERROR] [system.err] at
org.apache.beam.runners.core.construction.PTransformTranslation.toProto(PTransformTranslation.java:225)
23:11:11.340 [ERROR] [system.err] at
org.apache.beam.runners.core.construction.SdkComponents.registerPTransform(SdkComponents.java:157)
23:11:11.340 [ERROR] [system.err] at
org.apache.beam.runners.core.construction.PipelineTranslation$1.visitPrimitiveTransform(PipelineTranslation.java:87)
23:11:11.340 [ERROR] [system.err] at
org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:665)
23:11:11.340 [ERROR] [system.err] at
org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:657)
23:11:11.340 [ERROR] [system.err] at
org.apache.beam.sdk.runners.TransformHierarchy$Node.access$600(TransformHierarchy.java:317)
23:11:11.340 [ERROR] [system.err] at
org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:251)
23:11:11.341 [ERROR] [system.err] at
org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:460)
23:11:11.341 [ERROR] [system.err] at
org.apache.beam.runners.core.construction.PipelineTranslation.toProto(PipelineTranslation.java:59)
23:11:11.341 [ERROR] [system.err] at
org.apache.beam.runners.dataflow.DataflowPipelineTranslator.translate(DataflowPipelineTranslator.java:187)
23:11:11.341 [ERROR] [system.err] at
org.apache.beam.runners.dataflow.DataflowRunner.run(DataflowRunner.java:795)
23:11:11.341 [ERROR] [system.err] at
org.apache.beam.runners.dataflow.DataflowRunner.run(DataflowRunner.java:186)
23:11:11.341 [ERROR] [system.err] at
org.apache.beam.sdk.Pipeline.run(Pipeline.java:315)
23:11:11.341 [ERROR] [system.err] at
org.apache.beam.sdk.Pipeline.run(Pipeline.java:301)
23:11:11.341 [ERROR] [system.err] at
com.ikea.search.ab.bootstrap.Job.testPipeline(Job.java:185)
23:11:11.341 [ERROR] [system.err] at
com.ikea.search.ab.bootstrap.Job.main(Job.java:196)
23:11:11.342 [ERROR] [system.err] Caused by:
java.io.NotSerializableException:
com.ikea.search.ab.common.ProductCatalog$Product
23:11:11.342 [ERROR] [system.err] at
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
23:11:11.342 [ERROR] [system.err] at
java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
23:11:11.342 [ERROR] [system.err] at
java.util.HashMap.internalWriteEntries(HashMap.java:1790)
23:11:11.342 [ERROR] [system.err] at
java.util.HashMap.writeObject(HashMap.java:1363)
23:11:11.342 [ERROR] [system.err] at
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
23:11:11.342 [ERROR] [system.err] at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
23:11:11.342 [ERROR] [system.err] at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
23:11:11.342 [ERROR] [system.err] at
java.lang.reflect.Method.invoke(Method.java:498)
23:11:11.342 [ERROR] [system.err] at
java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1140)
23:11:11.343 [ERROR] [system.err] at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
23:11:11.343 [ERROR] [system.err] at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
23:11:11.343 [ERROR] [system.err] at
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
23:11:11.343 [ERROR] [system.err] at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
23:11:11.343 [ERROR] [system.err] at
java.io.ObjectOutputStream.writeSerialData(ObjectOutpu

Re: Error when using @DefaultCoder(AvroCoder.class)

2020-07-08 Thread Luke Cwik
The problem is that CreateSessionMetrics has an instance of
com.ikea.search.ab.common.ProductCatalog$Product as part of its
serialization closure. Does it have a member variable of that type, or
refer to one that is part of its parent's class, or ...? Should it be
marked transient?

Marking it as serializable is necessary if you want to keep it part of that
class's closure and is unrelated to the usage of @DefaultCoder.

Here are some articles[1, 2, 3] about Java serialization and what is
captured that could help if you want additional background.

1: https://www.cs.unh.edu/~charpov/programming-serial-lambda.html
2: http://www.lifeisafile.com/Serialization-in-spark/
3:
https://stackoverflow.com/questions/40259196/understanding-sparks-closures-and-their-serialization


On Wed, Jul 8, 2020 at 1:22 PM Kirill Zhdanovich 
wrote:

> Hello Luke,
> I will try to get "22 more" soon, not sure how to it though
>
> 23:11:11.339 [ERROR] [system.err] Exception in thread "main"
> java.lang.IllegalArgumentException: unable to serialize
> DoFnWithExecutionInformation{doFn=com.ikea.search.ab.bootstrap.Job$CreateSessionMetrics@444f44c5,
> mainOutputTag=Tag:400#6929f09b03d242ca>,
> sideInputMapping={},
> schemaInformation=DoFnSchemaInformation{elementConverters=[]}}
> 23:11:11.339 [ERROR] [system.err] at
> org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:55)
> 23:11:11.339 [ERROR] [system.err] at
> org.apache.beam.runners.core.construction.ParDoTranslation.translateDoFn(ParDoTranslation.java:606)
> 23:11:11.339 [ERROR] [system.err] at
> org.apache.beam.runners.dataflow.PrimitiveParDoSingleFactory$PayloadTranslator$1.translateDoFn(PrimitiveParDoSingleFactory.java:197)
> 23:11:11.339 [ERROR] [system.err] at
> org.apache.beam.runners.core.construction.ParDoTranslation.payloadForParDoLike(ParDoTranslation.java:738)
> 23:11:11.339 [ERROR] [system.err] at
> org.apache.beam.runners.dataflow.PrimitiveParDoSingleFactory$PayloadTranslator.payloadForParDoSingle(PrimitiveParDoSingleFactory.java:193)
> 23:11:11.340 [ERROR] [system.err] at
> org.apache.beam.runners.dataflow.PrimitiveParDoSingleFactory$PayloadTranslator.translate(PrimitiveParDoSingleFactory.java:154)
> 23:11:11.340 [ERROR] [system.err] at
> org.apache.beam.runners.core.construction.PTransformTranslation$KnownTransformPayloadTranslator.translate(PTransformTranslation.java:417)
> 23:11:11.340 [ERROR] [system.err] at
> org.apache.beam.runners.core.construction.PTransformTranslation.toProto(PTransformTranslation.java:225)
> 23:11:11.340 [ERROR] [system.err] at
> org.apache.beam.runners.core.construction.SdkComponents.registerPTransform(SdkComponents.java:157)
> 23:11:11.340 [ERROR] [system.err] at
> org.apache.beam.runners.core.construction.PipelineTranslation$1.visitPrimitiveTransform(PipelineTranslation.java:87)
> 23:11:11.340 [ERROR] [system.err] at
> org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:665)
> 23:11:11.340 [ERROR] [system.err] at
> org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:657)
> 23:11:11.340 [ERROR] [system.err] at
> org.apache.beam.sdk.runners.TransformHierarchy$Node.access$600(TransformHierarchy.java:317)
> 23:11:11.340 [ERROR] [system.err] at
> org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:251)
> 23:11:11.341 [ERROR] [system.err] at
> org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:460)
> 23:11:11.341 [ERROR] [system.err] at
> org.apache.beam.runners.core.construction.PipelineTranslation.toProto(PipelineTranslation.java:59)
> 23:11:11.341 [ERROR] [system.err] at
> org.apache.beam.runners.dataflow.DataflowPipelineTranslator.translate(DataflowPipelineTranslator.java:187)
> 23:11:11.341 [ERROR] [system.err] at
> org.apache.beam.runners.dataflow.DataflowRunner.run(DataflowRunner.java:795)
> 23:11:11.341 [ERROR] [system.err] at
> org.apache.beam.runners.dataflow.DataflowRunner.run(DataflowRunner.java:186)
> 23:11:11.341 [ERROR] [system.err] at
> org.apache.beam.sdk.Pipeline.run(Pipeline.java:315)
> 23:11:11.341 [ERROR] [system.err] at
> org.apache.beam.sdk.Pipeline.run(Pipeline.java:301)
> 23:11:11.341 [ERROR] [system.err] at
> com.ikea.search.ab.bootstrap.Job.testPipeline(Job.java:185)
> 23:11:11.341 [ERROR] [system.err] at
> com.ikea.search.ab.bootstrap.Job.main(Job.java:196)
> 23:11:11.342 [ERROR] [system.err] Caused by:
> java.io.NotSerializableException:
> com.ikea.search.ab.common.ProductCatalog$Product
> 23:11:11.342 [ERROR] [system.err] at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
> 23:11:11.342 [ERROR] [system.err] at
> java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
> 23:11:11.342 [ERROR] [system.err] at
> java.util.HashMap.internalWriteEntries(HashMap.java:1790)
> 23:11:11.342 [ERROR] [system.err] at
> java.util.HashMap.writeObject(HashMap.java:1363)
> 23:11:11.342 [ERROR] [system.err] at
> sun.reflect.NativeMethodAccessorImpl.invoke0(Native Metho

Re: Error when using @DefaultCoder(AvroCoder.class)

2020-07-08 Thread Kirill Zhdanovich
Cool! Thanks a lot for your help, Luke.

On Wed, 8 Jul 2020 at 23:45, Luke Cwik  wrote:

> The problem is that CreateSessionMetrics has an instance of
> com.ikea.search.ab.common.ProductCatalog$Product as part of its
> serialization closure. Does it have a member variable of that type, or
> refer to one that is part of its parent's class, or ...? Should it be
> marked transient?
>
> Marking it as serializable is necessary if you want to keep it part of
> that class's closure and is unrelated to the usage of @DefaultCoder.
>
> Here are some articles[1, 2, 3] about Java serialization and what is
> captured that could help if you want additional background.
>
> 1: https://www.cs.unh.edu/~charpov/programming-serial-lambda.html
> 2: http://www.lifeisafile.com/Serialization-in-spark/
> 3:
> https://stackoverflow.com/questions/40259196/understanding-sparks-closures-and-their-serialization
>
>
> On Wed, Jul 8, 2020 at 1:22 PM Kirill Zhdanovich 
> wrote:
>
>> Hello Luke,
>> I will try to get "22 more" soon, not sure how to it though
>>
>> 23:11:11.339 [ERROR] [system.err] Exception in thread "main"
>> java.lang.IllegalArgumentException: unable to serialize
>> DoFnWithExecutionInformation{doFn=com.ikea.search.ab.bootstrap.Job$CreateSessionMetrics@444f44c5,
>> mainOutputTag=Tag:400#6929f09b03d242ca>,
>> sideInputMapping={},
>> schemaInformation=DoFnSchemaInformation{elementConverters=[]}}
>> 23:11:11.339 [ERROR] [system.err] at
>> org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:55)
>> 23:11:11.339 [ERROR] [system.err] at
>> org.apache.beam.runners.core.construction.ParDoTranslation.translateDoFn(ParDoTranslation.java:606)
>> 23:11:11.339 [ERROR] [system.err] at
>> org.apache.beam.runners.dataflow.PrimitiveParDoSingleFactory$PayloadTranslator$1.translateDoFn(PrimitiveParDoSingleFactory.java:197)
>> 23:11:11.339 [ERROR] [system.err] at
>> org.apache.beam.runners.core.construction.ParDoTranslation.payloadForParDoLike(ParDoTranslation.java:738)
>> 23:11:11.339 [ERROR] [system.err] at
>> org.apache.beam.runners.dataflow.PrimitiveParDoSingleFactory$PayloadTranslator.payloadForParDoSingle(PrimitiveParDoSingleFactory.java:193)
>> 23:11:11.340 [ERROR] [system.err] at
>> org.apache.beam.runners.dataflow.PrimitiveParDoSingleFactory$PayloadTranslator.translate(PrimitiveParDoSingleFactory.java:154)
>> 23:11:11.340 [ERROR] [system.err] at
>> org.apache.beam.runners.core.construction.PTransformTranslation$KnownTransformPayloadTranslator.translate(PTransformTranslation.java:417)
>> 23:11:11.340 [ERROR] [system.err] at
>> org.apache.beam.runners.core.construction.PTransformTranslation.toProto(PTransformTranslation.java:225)
>> 23:11:11.340 [ERROR] [system.err] at
>> org.apache.beam.runners.core.construction.SdkComponents.registerPTransform(SdkComponents.java:157)
>> 23:11:11.340 [ERROR] [system.err] at
>> org.apache.beam.runners.core.construction.PipelineTranslation$1.visitPrimitiveTransform(PipelineTranslation.java:87)
>> 23:11:11.340 [ERROR] [system.err] at
>> org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:665)
>> 23:11:11.340 [ERROR] [system.err] at
>> org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:657)
>> 23:11:11.340 [ERROR] [system.err] at
>> org.apache.beam.sdk.runners.TransformHierarchy$Node.access$600(TransformHierarchy.java:317)
>> 23:11:11.340 [ERROR] [system.err] at
>> org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:251)
>> 23:11:11.341 [ERROR] [system.err] at
>> org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:460)
>> 23:11:11.341 [ERROR] [system.err] at
>> org.apache.beam.runners.core.construction.PipelineTranslation.toProto(PipelineTranslation.java:59)
>> 23:11:11.341 [ERROR] [system.err] at
>> org.apache.beam.runners.dataflow.DataflowPipelineTranslator.translate(DataflowPipelineTranslator.java:187)
>> 23:11:11.341 [ERROR] [system.err] at
>> org.apache.beam.runners.dataflow.DataflowRunner.run(DataflowRunner.java:795)
>> 23:11:11.341 [ERROR] [system.err] at
>> org.apache.beam.runners.dataflow.DataflowRunner.run(DataflowRunner.java:186)
>> 23:11:11.341 [ERROR] [system.err] at
>> org.apache.beam.sdk.Pipeline.run(Pipeline.java:315)
>> 23:11:11.341 [ERROR] [system.err] at
>> org.apache.beam.sdk.Pipeline.run(Pipeline.java:301)
>> 23:11:11.341 [ERROR] [system.err] at
>> com.ikea.search.ab.bootstrap.Job.testPipeline(Job.java:185)
>> 23:11:11.341 [ERROR] [system.err] at
>> com.ikea.search.ab.bootstrap.Job.main(Job.java:196)
>> 23:11:11.342 [ERROR] [system.err] Caused by:
>> java.io.NotSerializableException:
>> com.ikea.search.ab.common.ProductCatalog$Product
>> 23:11:11.342 [ERROR] [system.err] at
>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
>> 23:11:11.342 [ERROR] [system.err] at
>> java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
>> 23:11:11.342 [ERROR] [system.err] at
>> java.util.HashMap.internalWriteEntries(HashMap.java:1790)