Is ReadFromKafka2 serializable? Your code snippet uses an anonymous inner class, which means it will attempt to serialize the outer class as well. In this case ReadFromKafka2
On Mon, May 9, 2016 at 4:06 PM, amir bahmanyari <[email protected]> wrote: > > Hi Colleagues, > Sorry I have been bombarding this forum. > Not sure whats going on...Resolved a lot of runtime issues , now get the > following exception on my DoFn: > > kafkarecords.apply(ParDo.of( > new DoFn <String, String>() { > > @Override > public void processElement(ProcessContext ctx) throws Exception { > System.out.printf("\n from kafka: '%s' ", ctx.element()); > ctx.output(ctx.element()); > } > })).apply(TextIO.Write.to("c:\\temp\\KafkaOut\\outputKafka.txt")); > > Any idea? I really appreciate your help. > Thanks. > > > java.lang.IllegalArgumentException: unable to deserialize > com.myco.tech.arc.ReadFromKafka2$1@62dae245 > at > org.apache.beam.sdk.util.SerializableUtils.deserializeFromByteArray(SerializableUtils.java:78) > at > org.apache.beam.sdk.util.SerializableUtils.clone(SerializableUtils.java:93) > at > org.apache.beam.sdk.transforms.ParDo$Bound.<init>(ParDo.java:722) > at org.apache.beam.sdk.transforms.ParDo$Unbound.of(ParDo.java:680) > at > org.apache.beam.sdk.transforms.ParDo$Unbound.access$000(ParDo.java:598) > at org.apache.beam.sdk.transforms.ParDo.of(ParDo.java:565) > at org.apache.beam.sdk.transforms.ParDo.of(ParDo.java:560) > at com.myco.tech.arc.ReadFromKafka2.main(ReadFromKafka2.java:245) > >
