FYI, the jvm option -Dsun.io.serialization.extendedDebugInfo=true can
be handy for debugging serialization errors like this.

On Mon, May 9, 2016 at 5:20 PM, Lukasz Cwik <[email protected]> wrote:
> You can't just mark ReadFromKafka2 implements serializable and expect it to
> work, all the fields used with ReadFromKafka2 have to be serializable as
> well.
>
> Instead of using an anonoymous inner class, define a static inner class:
> class ReadFromKafka2 {
>   static class MyDoFn extends DoFn<String, String> {
> @Override
> public void processElement(ProcessContext ctx) throws Exception {
> System.out.printf("\n from kafka: '%s'  ", ctx.element());
> ctx.output(ctx.element());
> }
>   }
>
> ...
> }
>
> you can instantiate it with "new MyDoFn()"
>
> On Mon, May 9, 2016 at 5:07 PM, amir bahmanyari <[email protected]> wrote:
>>
>> Hi Lukasz,
>> Thanks for your reply.
>>
>> I changed it to: public class ReadFromKafka2 implements Serializable{
>>
>> & run it with the same anonymous inner class as below  once, and again
>> with making it explicit as:
>> kafkarecords.apply(ParDo.named("ReadInput").of(...etc
>> providing the same implementation as below.
>>
>> Same throw! :(
>>
>> Any sample code that works for deployment to a Flink cluster pls?
>> Not sure if I have my pom.xml straighten up either..
>> Thanks for your help.
>> ________________________________
>> From: Lukasz Cwik <[email protected]>
>> To: [email protected]; amir bahmanyari <[email protected]>
>> Sent: Monday, May 9, 2016 4:13 PM
>> Subject: Re: DoFn Deserialize issue
>>
>> Is ReadFromKafka2 serializable?
>>
>> Your code snippet uses an anonymous inner class, which means it will
>> attempt to serialize the outer class as well. In this case ReadFromKafka2
>>
>> On Mon, May 9, 2016 at 4:06 PM, amir bahmanyari <[email protected]>
>> wrote:
>>
>>
>> Hi Colleagues,
>> Sorry I have been bombarding this forum.
>> Not sure whats going on...Resolved a lot of runtime issues , now get the
>> following exception on my DoFn:
>>
>> kafkarecords.apply(ParDo.of(
>> new DoFn <String, String>() {
>>
>> @Override
>> public void processElement(ProcessContext ctx) throws Exception {
>> System.out.printf("\n from kafka: '%s'  ", ctx.element());
>> ctx.output(ctx.element());
>> }
>> })).apply(TextIO.Write.to("c:\\temp\\KafkaOut\\outputKafka.txt"));
>>
>> Any idea? I really appreciate your help.
>> Thanks.
>>
>>
>>  java.lang.IllegalArgumentException: unable to deserialize
>> com.myco.tech.arc.ReadFromKafka2$1@62dae245
>>         at
>> org.apache.beam.sdk.util.SerializableUtils.deserializeFromByteArray(SerializableUtils.java:78)
>>         at
>> org.apache.beam.sdk.util.SerializableUtils.clone(SerializableUtils.java:93)
>>         at
>> org.apache.beam.sdk.transforms.ParDo$Bound.<init>(ParDo.java:722)
>>         at org.apache.beam.sdk.transforms.ParDo$Unbound.of(ParDo.java:680)
>>         at
>> org.apache.beam.sdk.transforms.ParDo$Unbound.access$000(ParDo.java:598)
>>         at org.apache.beam.sdk.transforms.ParDo.of(ParDo.java:565)
>>         at org.apache.beam.sdk.transforms.ParDo.of(ParDo.java:560)
>>         at com.myco.tech.arc.ReadFromKafka2.main(ReadFromKafka2.java:245)
>>
>>
>>
>>
>

Reply via email to