This is at high level what I am doing: Serialize:
String s = tuple.getPos(0) + "," + tuple.getPos(1); return s.getBytes() Deserialize: String s = new String(message); String [] sarr = s.split(","); Tuple2<Integer, Integer> tuple = new Tuple2<>(Integer.valueOf(sarr[0]), Integer.valueOf(sarr[1])); return tuple; On Thu, Feb 23, 2017 at 10:22 PM, Tzu-Li (Gordon) Tai <tzuli...@apache.org> wrote: > Hi Mohit, > > As 刘彪 pointed out in his reply, the problem is that your > `Tuple2Serializer` contains fields that are not serializable, so > `Tuple2Serializer` itself is not serializable. > Could you perhaps share your `Tuple2Serializer` implementation with us so > we can pinpoint the problem? > > A snippet of the class fields and constructor will do, so you don’t have > to provide the whole `serialize` / `deserialize` implementation if you > don’t want to. > > Cheers, > Gordon > > > On February 24, 2017 at 11:04:34 AM, Mohit Anchlia (mohitanch...@gmail.com) > wrote: > > I am using String inside to convert into bytes. > > On Thu, Feb 23, 2017 at 6:50 PM, 刘彪 <mmyy1...@gmail.com> wrote: > >> Hi Mohit >> As you did not give the whole codes of Tuple2Serializerr. I guess the >> reason is some fields of Tuple2Serializerr do not implement Serializable. >> >> 2017-02-24 9:07 GMT+08:00 Mohit Anchlia <mohitanch...@gmail.com>: >> >>> I wrote a key serialization class to write to kafka however I am getting >>> this error. Not sure why as I've already implemented the interfaces. >>> >>> Caused by: java.io.NotSerializableException: >>> com.sy.flink.test.Tuple2Serializerr$1 >>> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.j >>> ava:1184) >>> at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputSt >>> ream.java:1548) >>> >>> And the class implements the following: >>> >>> *public* *class* *Tuple2Serializerr* *implements* >>> >>> DeserializationSchema<Tuple2<Integer, Integer>>, >>> >>> SerializationSchema<Tuple2<Integer, Integer>> { >>> >>> And called like this: >>> >>> >>> FlinkKafkaProducer010<Tuple2<Integer, Integer>> myProducer = *new* >>> FlinkKafkaProducer010<Tuple2<Integer, Integer>>( >>> >>> "10.22.4.15:9092", // broker list >>> >>> "my-topic", // target topic >>> >>> *new* Tuple2Serializerr()); // serialization schema >>> >>> >>> >>> >> >