FYI, the jvm option -Dsun.io.serialization.extendedDebugInfo=true can be handy for debugging serialization errors like this.
On Mon, May 9, 2016 at 5:20 PM, Lukasz Cwik <[email protected]> wrote: > You can't just mark ReadFromKafka2 implements serializable and expect it to > work, all the fields used with ReadFromKafka2 have to be serializable as > well. > > Instead of using an anonoymous inner class, define a static inner class: > class ReadFromKafka2 { > static class MyDoFn extends DoFn<String, String> { > @Override > public void processElement(ProcessContext ctx) throws Exception { > System.out.printf("\n from kafka: '%s' ", ctx.element()); > ctx.output(ctx.element()); > } > } > > ... > } > > you can instantiate it with "new MyDoFn()" > > On Mon, May 9, 2016 at 5:07 PM, amir bahmanyari <[email protected]> wrote: >> >> Hi Lukasz, >> Thanks for your reply. >> >> I changed it to: public class ReadFromKafka2 implements Serializable{ >> >> & run it with the same anonymous inner class as below once, and again >> with making it explicit as: >> kafkarecords.apply(ParDo.named("ReadInput").of(...etc >> providing the same implementation as below. >> >> Same throw! :( >> >> Any sample code that works for deployment to a Flink cluster pls? >> Not sure if I have my pom.xml straighten up either.. >> Thanks for your help. >> ________________________________ >> From: Lukasz Cwik <[email protected]> >> To: [email protected]; amir bahmanyari <[email protected]> >> Sent: Monday, May 9, 2016 4:13 PM >> Subject: Re: DoFn Deserialize issue >> >> Is ReadFromKafka2 serializable? >> >> Your code snippet uses an anonymous inner class, which means it will >> attempt to serialize the outer class as well. In this case ReadFromKafka2 >> >> On Mon, May 9, 2016 at 4:06 PM, amir bahmanyari <[email protected]> >> wrote: >> >> >> Hi Colleagues, >> Sorry I have been bombarding this forum. >> Not sure whats going on...Resolved a lot of runtime issues , now get the >> following exception on my DoFn: >> >> kafkarecords.apply(ParDo.of( >> new DoFn <String, String>() { >> >> @Override >> public void processElement(ProcessContext ctx) throws Exception { >> System.out.printf("\n from kafka: '%s' ", ctx.element()); >> ctx.output(ctx.element()); >> } >> })).apply(TextIO.Write.to("c:\\temp\\KafkaOut\\outputKafka.txt")); >> >> Any idea? I really appreciate your help. >> Thanks. >> >> >> java.lang.IllegalArgumentException: unable to deserialize >> com.myco.tech.arc.ReadFromKafka2$1@62dae245 >> at >> org.apache.beam.sdk.util.SerializableUtils.deserializeFromByteArray(SerializableUtils.java:78) >> at >> org.apache.beam.sdk.util.SerializableUtils.clone(SerializableUtils.java:93) >> at >> org.apache.beam.sdk.transforms.ParDo$Bound.<init>(ParDo.java:722) >> at org.apache.beam.sdk.transforms.ParDo$Unbound.of(ParDo.java:680) >> at >> org.apache.beam.sdk.transforms.ParDo$Unbound.access$000(ParDo.java:598) >> at org.apache.beam.sdk.transforms.ParDo.of(ParDo.java:565) >> at org.apache.beam.sdk.transforms.ParDo.of(ParDo.java:560) >> at com.myco.tech.arc.ReadFromKafka2.main(ReadFromKafka2.java:245) >> >> >> >> >
