You can't just mark ReadFromKafka2 implements serializable and expect it to
work, all the fields used with ReadFromKafka2 have to be serializable as
well.

Instead of using an anonoymous inner class, define a static inner class:
class ReadFromKafka2 {
  static class MyDoFn extends DoFn<String, String> {
@Override
public void processElement(ProcessContext ctx) throws Exception {
System.out.printf("\n from kafka: '%s'  ", ctx.element());
ctx.output(ctx.element());
}
  }

...
}

you can instantiate it with "new MyDoFn()"

On Mon, May 9, 2016 at 5:07 PM, amir bahmanyari <[email protected]> wrote:

> Hi Lukasz,
> Thanks for your reply.
>
> I changed it to: public class ReadFromKafka2 implements Serializable{
>
> & run it with the same anonymous inner class as below  once, and again
> with making it explicit as:
> kafkarecords.apply(ParDo.named("ReadInput").of(...etc
> providing the same implementation as below.
>
> Same throw! :(
>
> Any sample code that works for deployment to a Flink cluster pls?
> Not sure if I have my pom.xml straighten up either..
> Thanks for your help.
> ------------------------------
> *From:* Lukasz Cwik <[email protected]>
> *To:* [email protected]; amir bahmanyari <[email protected]>
>
> *Sent:* Monday, May 9, 2016 4:13 PM
> *Subject:* Re: DoFn Deserialize issue
>
> Is ReadFromKafka2 serializable?
>
> Your code snippet uses an anonymous inner class, which means it will
> attempt to serialize the outer class as well. In this case ReadFromKafka2
>
> On Mon, May 9, 2016 at 4:06 PM, amir bahmanyari <[email protected]>
> wrote:
>
>
> Hi Colleagues,
> Sorry I have been bombarding this forum.
> Not sure whats going on...Resolved a lot of runtime issues , now get the
> following exception on my DoFn:
>
> kafkarecords.apply(ParDo.of(
> new DoFn <String, String>() {
>
> @Override
> public void processElement(ProcessContext ctx) throws Exception {
> System.out.printf("\n from kafka: '%s'  ", ctx.element());
> ctx.output(ctx.element());
> }
> })).apply(TextIO.Write.to <http://textio.write.to/>
> ("c:\\temp\\KafkaOut\\outputKafka.txt"));
>
> Any idea? I really appreciate your help.
> Thanks.
>
>
>  java.lang.IllegalArgumentException: unable to deserialize
> com.myco.tech.arc.ReadFromKafka2$1@62dae245
>         at
> org.apache.beam.sdk.util.SerializableUtils.deserializeFromByteArray(SerializableUtils.java:78)
>         at
> org.apache.beam.sdk.util.SerializableUtils.clone(SerializableUtils.java:93)
>         at
> org.apache.beam.sdk.transforms.ParDo$Bound.<init>(ParDo.java:722)
>         at org.apache.beam.sdk.transforms.ParDo$Unbound.of(ParDo.java:680)
>         at
> org.apache.beam.sdk.transforms.ParDo$Unbound.access$000(ParDo.java:598)
>         at org.apache.beam.sdk.transforms.ParDo.of(ParDo.java:565)
>         at org.apache.beam.sdk.transforms.ParDo.of(ParDo.java:560)
>         at com.myco.tech.arc.ReadFromKafka2.main(ReadFromKafka2.java:245)
>
>
>
>
>

Reply via email to