Thanks for clarifying. 

From the looks of your exception:

Caused by: java.io.NotSerializableException: 
com.sy.flink.test.Tuple2Serializerr$1
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
        at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)

com.sy.flink.test.Tuple2Serializerr$1: this states that an anonymous inner 
class in `Tuple2Serializerr` is not serializable.

Could you check if that’s the case?


On February 24, 2017 at 3:10:58 PM, Mohit Anchlia (mohitanch...@gmail.com) 
wrote:

But it is not an inner class.

On Thu, Feb 23, 2017 at 11:09 PM, Tzu-Li (Gordon) Tai <tzuli...@apache.org> 
wrote:
Since I don’t have your complete code, I’m guessing this is the problem:
Is your `Tuple2Serializer` an inner class? If yes, you should be able to solve 
the problem by declaring `Tuple2Serializer` to be `static`.

This is more of a Java problem -
It isn’t serializable if it isn’t static, because it will contain an implicit 
reference to the enclosing outer class, and therefore serializing it will 
result in serializing the outer class instance as well.


On February 24, 2017 at 2:43:38 PM, Mohit Anchlia (mohitanch...@gmail.com) 
wrote:

This is at high level what I am doing:

Serialize:

String s = tuple.getPos(0) + "," + tuple.getPos(1);
return s.getBytes()

Deserialize:

String s = new String(message);
String [] sarr = s.split(",");
Tuple2<Integer, Integer> tuple = new Tuple2<>(Integer.valueOf(sarr[0]), 
Integer.valueOf(sarr[1]));

return tuple;


On Thu, Feb 23, 2017 at 10:22 PM, Tzu-Li (Gordon) Tai <tzuli...@apache.org> 
wrote:
Hi Mohit,

As 刘彪 pointed out in his reply, the problem is that your `Tuple2Serializer` 
contains fields that are not serializable, so `Tuple2Serializer` itself is not 
serializable.
Could you perhaps share your `Tuple2Serializer` implementation with us so we 
can pinpoint the problem?

A snippet of the class fields and constructor will do, so you don’t have to 
provide the whole `serialize` / `deserialize` implementation if you don’t want 
to.

Cheers,
Gordon


On February 24, 2017 at 11:04:34 AM, Mohit Anchlia (mohitanch...@gmail.com) 
wrote:

I am using String inside to convert into bytes.

On Thu, Feb 23, 2017 at 6:50 PM, 刘彪 <mmyy1...@gmail.com> wrote:
Hi Mohit
As you did not give the whole codes of Tuple2Serializerr. I guess the reason is 
some fields of Tuple2Serializerr do not implement Serializable.

2017-02-24 9:07 GMT+08:00 Mohit Anchlia <mohitanch...@gmail.com>:
I wrote a key serialization class to write to kafka however I am getting this 
error. Not sure why as I've already implemented the interfaces.

Caused by: java.io.NotSerializableException: 
com.sy.flink.test.Tuple2Serializerr$1
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
        at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)

And the class implements the following:

public class Tuple2Serializerr implements

DeserializationSchema<Tuple2<Integer, Integer>>,

SerializationSchema<Tuple2<Integer, Integer>> {

And called like this:



FlinkKafkaProducer010<Tuple2<Integer, Integer>> myProducer = new 
FlinkKafkaProducer010<Tuple2<Integer, Integer>>(

"10.22.4.15:9092", // broker list

"my-topic", // target topic

new Tuple2Serializerr()); // serialization schema









Reply via email to