Re: Example for Jackson JsonNode Kafka serialization schema

2022-01-28 Thread Robert Metzger
Hi Oran,

as you've already suggested, you could just use a (flat)map function that
takes an ObjectNode and outputs a string.
In the mapper, you can do whatever you want in case of an invalid object:
logging about it, discarding it, writing an "error json string", writing to
a side output stream, ...


On Tue, Jan 25, 2022 at 12:38 PM Oran Shuster 
wrote:

> In the documentation we have an example on how to implement
> deserialization from bytes to Jackson ObjectNode objects
> - JSONKeyValueDeserializationSchema
>
> https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/datastream/kafka/
>
> However, there is no example on the other direction: Taking an
> ObjectNode/JsonNode (or just any POJO class) and using Jackson to serialize
> it to string
>
> You can write a simple schema like so
>
>
> public class JSONKafkaSerializationSchema implements
> KafkaSerializationSchema {
> private final ObjectMapper objectMapper = new ObjectMapper();
>
> @Override
> public ProducerRecord serialize(JsonNode element,
> @Nullable Long timestamp) {
> String topic = getTargetTopic(element);
>
> byte[] value;
>
> try {
> value = objectMapper.writeValueAsBytes(element);
> return new ProducerRecord<>(topic, value);
> } catch (JsonProcessingException e) {
> return null;
> }
> }
>
> private String getTargetTopic(JsonNode jsonNode) {
> return jsonNode.get("topic").asText();
> }
> }
>
> But this raises a question - What to do when a serialization fails?
> if the input class is a simple POJO then Jackson should always succeed in
> converting to bytes but that's not 100% guaranteed.
> In case of failures, can we return null and the record will be discarded?
> Null values are discarded in the case of the deserialization schema, from
> the documentation - "Returns: The deserialized message as an object (null
> if the message cannot be deserialized)."
> If this is not possible, what is the proper way to serialize Jackson
> objets into bytes in flink? Its possible to convert everything to String
> before the kafka producer but then any logic to determine the topic we need
> to send to will need to deserialize the string again
>


Example for Jackson JsonNode Kafka serialization schema

2022-01-25 Thread Oran Shuster
In the documentation we have an example on how to implement deserialization
from bytes to Jackson ObjectNode objects - JSONKeyValueDeserializationSchema
https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/datastream/kafka/

However, there is no example on the other direction: Taking an
ObjectNode/JsonNode (or just any POJO class) and using Jackson to serialize
it to string

You can write a simple schema like so


public class JSONKafkaSerializationSchema implements
KafkaSerializationSchema {
private final ObjectMapper objectMapper = new ObjectMapper();

@Override
public ProducerRecord serialize(JsonNode element,
@Nullable Long timestamp) {
String topic = getTargetTopic(element);

byte[] value;

try {
value = objectMapper.writeValueAsBytes(element);
return new ProducerRecord<>(topic, value);
} catch (JsonProcessingException e) {
return null;
}
}

private String getTargetTopic(JsonNode jsonNode) {
return jsonNode.get("topic").asText();
}
}

But this raises a question - What to do when a serialization fails?
if the input class is a simple POJO then Jackson should always succeed in
converting to bytes but that's not 100% guaranteed.
In case of failures, can we return null and the record will be discarded?
Null values are discarded in the case of the deserialization schema, from
the documentation - "Returns: The deserialized message as an object (null
if the message cannot be deserialized)."
If this is not possible, what is the proper way to serialize Jackson objets
into bytes in flink? Its possible to convert everything to String before
the kafka producer but then any logic to determine the topic we need to
send to will need to deserialize the string again


Re: KafkaProducer with generic (Avro) serialization schema

2018-05-02 Thread Wouter Zorgdrager
Hi,

Thanks for the suggestions.

Unfortunately I cannot make FromRecord/ForRecord/SchemaFor serializable,
since those classes are out of my control. I use those from the avro4s
library (https://github.com/sksamuel/avro4s). The problem here, especially
with the deserializer is that I need to convert an Avro 'GenericRecord' to
a Scala case class. Avro is written in Java, so thats a bit problematic and
therefore I need to Avro4s library. Avro4s tries to verify on compile-time
if the generic is actually convertible from/to a generic record, that is
why I need those context bounds.

As for @Aljoscha's workaround, I don't understand how this would solve it?
Because doesn't that just move the problem? If I create a factory, I still
need the generic (with context bounds) I specify at my
KafkaConsumer/Deserialization schema.

@Fabian I'm not sure if I understand your proposal. I still need the
context bounds for those compile-time macro's of Avro4s.

Once again, thanks for your help so far!

Regards,
Wouter



Op wo 2 mei 2018 om 16:48 schreef Fabian Hueske <fhue...@gmail.com>:

> Hi Wouter,
>
> you can try to make the SerializationSchema serializable by overriding
> Java's serialization methods writeObject() and readObject() similar as
> Flink's AvroRowSerializationSchema [1] does.
>
> Best, Fabian
>
> [1]
> https://github.com/apache/flink/blob/master/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowSerializationSchema.java
>
> 2018-05-02 16:34 GMT+02:00 Piotr Nowojski <pi...@data-artisans.com>:
>
>> Hi,
>>
>> My Scala knowledge is very limited (and my Scala's serialization
>> knowledge is non existent), but one way or another you have to make your
>> SerializationSchema serialisable. If indeed this is the problem, maybe a
>> better place to ask this question is on Stack Overflow or some scala
>> specific mailing list/board (unless someone else from the Flink's community
>> can provide an answer to this problem)?
>>
>> Piotrek
>>
>> On 1 May 2018, at 16:30, Wouter Zorgdrager <zorgdrag...@gmail.com> wrote:
>>
>> So, I'm still struggling with this issue. I dived a bit more into the
>> problem and I'm pretty sure that the problem is that I have to (implicitly)
>> pass the SchemaFor and RecordTo classes to my serialization schema
>> (otherwise I can't make it generic). However those class aren't
>> serializable, but of course I can't annotate them transient nor make it a
>> lazy val which gives me the current issue.
>>
>> I hope someone has some leads for me.
>>
>> Thanks!
>>
>> Op do 26 apr. 2018 om 23:03 schreef Wouter Zorgdrager <
>> zorgdrag...@gmail.com>:
>>
>>> Hi Bill,
>>>
>>> Thanks for your answer. However this proposal isn't going to solve my
>>> issue, since the problem here is that the context bounds I need to give in
>>> order to serialize it to Avro (SchemaFor, ToRecord and FromRecord) aren't
>>> serializable classes. This results in Flink not being able to serialize the
>>> KafkaProducer failing the whole job.
>>>
>>> Thanks,
>>> Wouter
>>>
>>> Op do 26 apr. 2018 om 00:42 schreef Nortman, Bill <
>>> william.nort...@pimco.com>:
>>>
>>>> The things I would try would first in you are you class Person and
>>>> Address have getters and setters and a no argument constructor.
>>>>
>>>>
>>>>
>>>> *From:* Wouter Zorgdrager [mailto:zorgdrag...@gmail.com]
>>>> *Sent:* Wednesday, April 25, 2018 7:17 AM
>>>> *To:* user@flink.apache.org
>>>> *Subject:* KafkaProducer with generic (Avro) serialization schema
>>>>
>>>>
>>>>
>>>> Dear reader,
>>>>
>>>>
>>>>
>>>> I'm currently working on writing a KafkaProducer which is able to
>>>> serialize a generic type using avro4s.
>>>>
>>>> However this serialization schema is not serializable itself. Here is
>>>> my code for this:
>>>>
>>>>
>>>>
>>>> The serialization schema:
>>>>
>>>> class AvroSerializationSchema[IN : SchemaFor : FromRecord: ToRecord]
>>>> extends SerializationSchema[IN] {
>>>>
>>>>
>>>>
>>>>   override def serialize(element: IN): Array[Byte] = {
>>>>
>>>> val byteArray = new ByteArrayOutputStream()
>>>>
>>>> val avroSer = AvroOutputStream.binary[IN](byteArray)
>>>>
>>>> avroSer.write(element)

Re: KafkaProducer with generic (Avro) serialization schema

2018-05-02 Thread Fabian Hueske
Hi Wouter,

you can try to make the SerializationSchema serializable by overriding
Java's serialization methods writeObject() and readObject() similar as
Flink's AvroRowSerializationSchema [1] does.

Best, Fabian

[1]
https://github.com/apache/flink/blob/master/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowSerializationSchema.java

2018-05-02 16:34 GMT+02:00 Piotr Nowojski <pi...@data-artisans.com>:

> Hi,
>
> My Scala knowledge is very limited (and my Scala's serialization knowledge
> is non existent), but one way or another you have to make your
> SerializationSchema serialisable. If indeed this is the problem, maybe a
> better place to ask this question is on Stack Overflow or some scala
> specific mailing list/board (unless someone else from the Flink's community
> can provide an answer to this problem)?
>
> Piotrek
>
> On 1 May 2018, at 16:30, Wouter Zorgdrager <zorgdrag...@gmail.com> wrote:
>
> So, I'm still struggling with this issue. I dived a bit more into the
> problem and I'm pretty sure that the problem is that I have to (implicitly)
> pass the SchemaFor and RecordTo classes to my serialization schema
> (otherwise I can't make it generic). However those class aren't
> serializable, but of course I can't annotate them transient nor make it a
> lazy val which gives me the current issue.
>
> I hope someone has some leads for me.
>
> Thanks!
>
> Op do 26 apr. 2018 om 23:03 schreef Wouter Zorgdrager <
> zorgdrag...@gmail.com>:
>
>> Hi Bill,
>>
>> Thanks for your answer. However this proposal isn't going to solve my
>> issue, since the problem here is that the context bounds I need to give in
>> order to serialize it to Avro (SchemaFor, ToRecord and FromRecord) aren't
>> serializable classes. This results in Flink not being able to serialize the
>> KafkaProducer failing the whole job.
>>
>> Thanks,
>> Wouter
>>
>> Op do 26 apr. 2018 om 00:42 schreef Nortman, Bill <
>> william.nort...@pimco.com>:
>>
>>> The things I would try would first in you are you class Person and
>>> Address have getters and setters and a no argument constructor.
>>>
>>>
>>>
>>> *From:* Wouter Zorgdrager [mailto:zorgdrag...@gmail.com]
>>> *Sent:* Wednesday, April 25, 2018 7:17 AM
>>> *To:* user@flink.apache.org
>>> *Subject:* KafkaProducer with generic (Avro) serialization schema
>>>
>>>
>>>
>>> Dear reader,
>>>
>>>
>>>
>>> I'm currently working on writing a KafkaProducer which is able to
>>> serialize a generic type using avro4s.
>>>
>>> However this serialization schema is not serializable itself. Here is my
>>> code for this:
>>>
>>>
>>>
>>> The serialization schema:
>>>
>>> class AvroSerializationSchema[IN : SchemaFor : FromRecord: ToRecord]
>>> extends SerializationSchema[IN] {
>>>
>>>
>>>
>>>   override def serialize(element: IN): Array[Byte] = {
>>>
>>> val byteArray = new ByteArrayOutputStream()
>>>
>>> val avroSer = AvroOutputStream.binary[IN](byteArray)
>>>
>>> avroSer.write(element)
>>>
>>> avroSer.flush()
>>>
>>> avroSer.close()
>>>
>>>
>>>
>>> return byteArray.toByteArray
>>>
>>>   }
>>>
>>> }
>>>
>>>
>>>
>>> The job code:
>>>
>>> case class Person(name : String, age : Int, address : Address)
>>>
>>> case class Address(city : String, street : String)
>>>
>>>
>>>
>>> class SimpleJob {
>>>
>>>
>>>
>>>   @transient
>>>
>>>   private lazy val serSchema : AvroSerializationSchema[Person] = new
>>> AvroSerializationSchema[Person]()
>>>
>>>
>>>
>>>   def start() = {
>>>
>>> val testPerson = Person("Test", 100, Address("Test", "Test"))
>>>
>>>
>>>
>>> val env = StreamExecutionEnvironment.getExecutionEnvironment
>>>
>>>
>>>
>>> env.
>>>
>>>   fromCollection(Seq(testPerson)).
>>>
>>>   addSink(createKafkaSink())
>>>
>>>
>>>
>>> env.execute("Flink sample job")
>>>
>>>   }
>>>
>>>
>>>
>>>
>>>
>>>   def createKafkaSink() : RichSinkFunction[Pe

Re: KafkaProducer with generic (Avro) serialization schema

2018-05-02 Thread Aljoscha Krettek
Hi,

Piotr is right, the SerializationSchema has to be serializable, which means 
that the implicit values passed on for SchemaFor[IN], FromRecord[IN], and 
ToRecord[IN] need to be serializable. Is there no way of making those 
serializable? As a workaround you could think about having a factory for those 
types that is serializable and have the factory as the context bound and then 
lazily create the SchemaFor, FromRecord, and ToRecord from that.

What do you think?

Aljoscha

> On 2. May 2018, at 16:34, Piotr Nowojski <pi...@data-artisans.com> wrote:
> 
> Hi,
> 
> My Scala knowledge is very limited (and my Scala's serialization knowledge is 
> non existent), but one way or another you have to make your 
> SerializationSchema serialisable. If indeed this is the problem, maybe a 
> better place to ask this question is on Stack Overflow or some scala specific 
> mailing list/board (unless someone else from the Flink's community can 
> provide an answer to this problem)? 
> 
> Piotrek
> 
>> On 1 May 2018, at 16:30, Wouter Zorgdrager <zorgdrag...@gmail.com 
>> <mailto:zorgdrag...@gmail.com>> wrote:
>> 
>> So, I'm still struggling with this issue. I dived a bit more into the 
>> problem and I'm pretty sure that the problem is that I have to (implicitly) 
>> pass the SchemaFor and RecordTo classes to my serialization schema 
>> (otherwise I can't make it generic). However those class aren't 
>> serializable, but of course I can't annotate them transient nor make it a 
>> lazy val which gives me the current issue. 
>> 
>> I hope someone has some leads for me. 
>> 
>> Thanks!
>> 
>> Op do 26 apr. 2018 om 23:03 schreef Wouter Zorgdrager <zorgdrag...@gmail.com 
>> <mailto:zorgdrag...@gmail.com>>:
>> Hi Bill,
>> 
>> Thanks for your answer. However this proposal isn't going to solve my issue, 
>> since the problem here is that the context bounds I need to give in order to 
>> serialize it to Avro (SchemaFor, ToRecord and FromRecord) aren't 
>> serializable classes. This results in Flink not being able to serialize the 
>> KafkaProducer failing the whole job. 
>> 
>> Thanks,
>> Wouter
>> 
>> Op do 26 apr. 2018 om 00:42 schreef Nortman, Bill <william.nort...@pimco.com 
>> <mailto:william.nort...@pimco.com>>:
>> The things I would try would first in you are you class Person and Address 
>> have getters and setters and a no argument constructor.
>> 
>>  
>> 
>> From: Wouter Zorgdrager [mailto:zorgdrag...@gmail.com 
>> <mailto:zorgdrag...@gmail.com>] 
>> Sent: Wednesday, April 25, 2018 7:17 AM
>> To: user@flink.apache.org <mailto:user@flink.apache.org>
>> Subject: KafkaProducer with generic (Avro) serialization schema
>> 
>>  
>> 
>> Dear reader,
>> 
>>  
>> 
>> I'm currently working on writing a KafkaProducer which is able to serialize 
>> a generic type using avro4s.
>> 
>> However this serialization schema is not serializable itself. Here is my 
>> code for this:
>> 
>>  
>> 
>> The serialization schema:
>> 
>> class AvroSerializationSchema[IN : SchemaFor : FromRecord: ToRecord] extends 
>> SerializationSchema[IN] {
>> 
>>  
>> 
>>   override def serialize(element: IN): Array[Byte] = {
>> 
>> val byteArray = new ByteArrayOutputStream()
>> 
>> val avroSer = AvroOutputStream.binary[IN](byteArray)
>> 
>> avroSer.write(element)
>> 
>> avroSer.flush()
>> 
>> avroSer.close()
>> 
>>  
>> 
>> return byteArray.toByteArray
>> 
>>   }
>> 
>> }
>> 
>>  
>> 
>> The job code:
>> 
>> case class Person(name : String, age : Int, address : Address)
>> 
>> case class Address(city : String, street : String)
>> 
>>  
>> 
>> class SimpleJob {
>> 
>>  
>> 
>>   @transient
>> 
>>   private lazy val serSchema : AvroSerializationSchema[Person] = new 
>> AvroSerializationSchema[Person]()
>> 
>>  
>> 
>>   def start() = {
>> 
>> val testPerson = Person("Test", 100, Address("Test", "Test"))
>> 
>>  
>> 
>> val env = StreamExecutionEnvironment.getExecutionEnvironment
>> 
>>  
>> 
>> env.
>> 
>>   fromCollection(Seq(testPerson)).
>> 
>>   addSink(createKafkaSink())
>> 
>>  
>> 
>> env.execute("Flink sample job")
>

Re: KafkaProducer with generic (Avro) serialization schema

2018-05-02 Thread Piotr Nowojski
Hi,

My Scala knowledge is very limited (and my Scala's serialization knowledge is 
non existent), but one way or another you have to make your SerializationSchema 
serialisable. If indeed this is the problem, maybe a better place to ask this 
question is on Stack Overflow or some scala specific mailing list/board (unless 
someone else from the Flink's community can provide an answer to this problem)? 

Piotrek

> On 1 May 2018, at 16:30, Wouter Zorgdrager <zorgdrag...@gmail.com> wrote:
> 
> So, I'm still struggling with this issue. I dived a bit more into the problem 
> and I'm pretty sure that the problem is that I have to (implicitly) pass the 
> SchemaFor and RecordTo classes to my serialization schema (otherwise I can't 
> make it generic). However those class aren't serializable, but of course I 
> can't annotate them transient nor make it a lazy val which gives me the 
> current issue. 
> 
> I hope someone has some leads for me. 
> 
> Thanks!
> 
> Op do 26 apr. 2018 om 23:03 schreef Wouter Zorgdrager <zorgdrag...@gmail.com 
> <mailto:zorgdrag...@gmail.com>>:
> Hi Bill,
> 
> Thanks for your answer. However this proposal isn't going to solve my issue, 
> since the problem here is that the context bounds I need to give in order to 
> serialize it to Avro (SchemaFor, ToRecord and FromRecord) aren't serializable 
> classes. This results in Flink not being able to serialize the KafkaProducer 
> failing the whole job. 
> 
> Thanks,
> Wouter
> 
> Op do 26 apr. 2018 om 00:42 schreef Nortman, Bill <william.nort...@pimco.com 
> <mailto:william.nort...@pimco.com>>:
> The things I would try would first in you are you class Person and Address 
> have getters and setters and a no argument constructor.
> 
>  
> 
> From: Wouter Zorgdrager [mailto:zorgdrag...@gmail.com 
> <mailto:zorgdrag...@gmail.com>] 
> Sent: Wednesday, April 25, 2018 7:17 AM
> To: user@flink.apache.org <mailto:user@flink.apache.org>
> Subject: KafkaProducer with generic (Avro) serialization schema
> 
>  
> 
> Dear reader,
> 
>  
> 
> I'm currently working on writing a KafkaProducer which is able to serialize a 
> generic type using avro4s.
> 
> However this serialization schema is not serializable itself. Here is my code 
> for this:
> 
>  
> 
> The serialization schema:
> 
> class AvroSerializationSchema[IN : SchemaFor : FromRecord: ToRecord] extends 
> SerializationSchema[IN] {
> 
>  
> 
>   override def serialize(element: IN): Array[Byte] = {
> 
> val byteArray = new ByteArrayOutputStream()
> 
> val avroSer = AvroOutputStream.binary[IN](byteArray)
> 
> avroSer.write(element)
> 
> avroSer.flush()
> 
> avroSer.close()
> 
>  
> 
> return byteArray.toByteArray
> 
>   }
> 
> }
> 
>  
> 
> The job code:
> 
> case class Person(name : String, age : Int, address : Address)
> 
> case class Address(city : String, street : String)
> 
>  
> 
> class SimpleJob {
> 
>  
> 
>   @transient
> 
>   private lazy val serSchema : AvroSerializationSchema[Person] = new 
> AvroSerializationSchema[Person]()
> 
>  
> 
>   def start() = {
> 
> val testPerson = Person("Test", 100, Address("Test", "Test"))
> 
>  
> 
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> 
>  
> 
> env.
> 
>   fromCollection(Seq(testPerson)).
> 
>   addSink(createKafkaSink())
> 
>  
> 
> env.execute("Flink sample job")
> 
>   }
> 
>  
> 
>  
> 
>   def createKafkaSink() : RichSinkFunction[Person] = {
> 
> //set some properties
> 
> val properties = new Properties()
> 
> properties.put("bootstrap.servers", "127.0.0.01:9092 
> <https://urldefense.proofpoint.com/v2/url?u=http-3A__127.0.0.01-3A9092=DwMFaQ=91HTncUBNS9Yv-Uuv2IlCA=uIPe81JDQPedACYiDVt5c_onxyyylzQ6P_yssgrLiLA=r7xfZoirhywTOErLvFFTTOxHs8dZeMd6-JtIaFdDtb0=GR3YuCSPimKhPq1hcics55VX6yef8lIsMEyTmEGFRSc=>")
> 
> properties.put("zookeeper.connect", "127.0.0.1:2181 
> <https://urldefense.proofpoint.com/v2/url?u=http-3A__127.0.0.1-3A2181=DwMFaQ=91HTncUBNS9Yv-Uuv2IlCA=uIPe81JDQPedACYiDVt5c_onxyyylzQ6P_yssgrLiLA=r7xfZoirhywTOErLvFFTTOxHs8dZeMd6-JtIaFdDtb0=zkbyqz0oyZOwyBZ9Hy7PpuGlTyPPB639vVkkFc6FlpQ=>")
> 
>  
> 
> new FlinkKafkaProducer011[Person]("persons", serSchema, properties)
> 
>   }
> 
>  
> 
> }
> 
>  
> 
> The code does compile, however it gives the following error on runtime: 
> InvalidProgramException: 
> Objectorg.apache.flink.streaming.util.serialization.KeyedSe

Re: KafkaProducer with generic (Avro) serialization schema

2018-05-01 Thread Wouter Zorgdrager
So, I'm still struggling with this issue. I dived a bit more into the
problem and I'm pretty sure that the problem is that I have to (implicitly)
pass the SchemaFor and RecordTo classes to my serialization schema
(otherwise I can't make it generic). However those class aren't
serializable, but of course I can't annotate them transient nor make it a
lazy val which gives me the current issue.

I hope someone has some leads for me.

Thanks!

Op do 26 apr. 2018 om 23:03 schreef Wouter Zorgdrager <zorgdrag...@gmail.com
>:

> Hi Bill,
>
> Thanks for your answer. However this proposal isn't going to solve my
> issue, since the problem here is that the context bounds I need to give in
> order to serialize it to Avro (SchemaFor, ToRecord and FromRecord) aren't
> serializable classes. This results in Flink not being able to serialize the
> KafkaProducer failing the whole job.
>
> Thanks,
> Wouter
>
> Op do 26 apr. 2018 om 00:42 schreef Nortman, Bill <
> william.nort...@pimco.com>:
>
>> The things I would try would first in you are you class Person and
>> Address have getters and setters and a no argument constructor.
>>
>>
>>
>> *From:* Wouter Zorgdrager [mailto:zorgdrag...@gmail.com]
>> *Sent:* Wednesday, April 25, 2018 7:17 AM
>> *To:* user@flink.apache.org
>> *Subject:* KafkaProducer with generic (Avro) serialization schema
>>
>>
>>
>> Dear reader,
>>
>>
>>
>> I'm currently working on writing a KafkaProducer which is able to
>> serialize a generic type using avro4s.
>>
>> However this serialization schema is not serializable itself. Here is my
>> code for this:
>>
>>
>>
>> The serialization schema:
>>
>> class AvroSerializationSchema[IN : SchemaFor : FromRecord: ToRecord]
>> extends SerializationSchema[IN] {
>>
>>
>>
>>   override def serialize(element: IN): Array[Byte] = {
>>
>> val byteArray = new ByteArrayOutputStream()
>>
>> val avroSer = AvroOutputStream.binary[IN](byteArray)
>>
>> avroSer.write(element)
>>
>> avroSer.flush()
>>
>> avroSer.close()
>>
>>
>>
>> return byteArray.toByteArray
>>
>>   }
>>
>> }
>>
>>
>>
>> The job code:
>>
>> case class Person(name : String, age : Int, address : Address)
>>
>> case class Address(city : String, street : String)
>>
>>
>>
>> class SimpleJob {
>>
>>
>>
>>   @transient
>>
>>   private lazy val serSchema : AvroSerializationSchema[Person] = new
>> AvroSerializationSchema[Person]()
>>
>>
>>
>>   def start() = {
>>
>> val testPerson = Person("Test", 100, Address("Test", "Test"))
>>
>>
>>
>> val env = StreamExecutionEnvironment.getExecutionEnvironment
>>
>>
>>
>> env.
>>
>>   fromCollection(Seq(testPerson)).
>>
>>   addSink(createKafkaSink())
>>
>>
>>
>> env.execute("Flink sample job")
>>
>>   }
>>
>>
>>
>>
>>
>>   def createKafkaSink() : RichSinkFunction[Person] = {
>>
>> //set some properties
>>
>> val properties = new Properties()
>>
>> properties.put("bootstrap.servers", "127.0.0.01:9092
>> <https://urldefense.proofpoint.com/v2/url?u=http-3A__127.0.0.01-3A9092=DwMFaQ=91HTncUBNS9Yv-Uuv2IlCA=uIPe81JDQPedACYiDVt5c_onxyyylzQ6P_yssgrLiLA=r7xfZoirhywTOErLvFFTTOxHs8dZeMd6-JtIaFdDtb0=GR3YuCSPimKhPq1hcics55VX6yef8lIsMEyTmEGFRSc=>
>> ")
>>
>> properties.put("zookeeper.connect", "127.0.0.1:2181
>> <https://urldefense.proofpoint.com/v2/url?u=http-3A__127.0.0.1-3A2181=DwMFaQ=91HTncUBNS9Yv-Uuv2IlCA=uIPe81JDQPedACYiDVt5c_onxyyylzQ6P_yssgrLiLA=r7xfZoirhywTOErLvFFTTOxHs8dZeMd6-JtIaFdDtb0=zkbyqz0oyZOwyBZ9Hy7PpuGlTyPPB639vVkkFc6FlpQ=>
>> ")
>>
>>
>>
>> new FlinkKafkaProducer011[Person]("persons", serSchema, properties)
>>
>>   }
>>
>>
>>
>> }
>>
>>
>>
>> The code does compile, however it gives the following error on
>> runtime: InvalidProgramException: Object
>> org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper@639c2c1d
>> is not serializable.
>>
>>
>>
>> I assume this means that my custom SerializationSchema is not
>> serializable due to the use of SchemaFor, FromRecord and ToRecord.
>>
>> Anyone knows a solution or workaround?
>>
&

Re: KafkaProducer with generic (Avro) serialization schema

2018-04-26 Thread Wouter Zorgdrager
Hi Bill,

Thanks for your answer. However this proposal isn't going to solve my
issue, since the problem here is that the context bounds I need to give in
order to serialize it to Avro (SchemaFor, ToRecord and FromRecord) aren't
serializable classes. This results in Flink not being able to serialize the
KafkaProducer failing the whole job.

Thanks,
Wouter

Op do 26 apr. 2018 om 00:42 schreef Nortman, Bill <william.nort...@pimco.com
>:

> The things I would try would first in you are you class Person and Address
> have getters and setters and a no argument constructor.
>
>
>
> *From:* Wouter Zorgdrager [mailto:zorgdrag...@gmail.com]
> *Sent:* Wednesday, April 25, 2018 7:17 AM
> *To:* user@flink.apache.org
> *Subject:* KafkaProducer with generic (Avro) serialization schema
>
>
>
> Dear reader,
>
>
>
> I'm currently working on writing a KafkaProducer which is able to
> serialize a generic type using avro4s.
>
> However this serialization schema is not serializable itself. Here is my
> code for this:
>
>
>
> The serialization schema:
>
> class AvroSerializationSchema[IN : SchemaFor : FromRecord: ToRecord]
> extends SerializationSchema[IN] {
>
>
>
>   override def serialize(element: IN): Array[Byte] = {
>
> val byteArray = new ByteArrayOutputStream()
>
> val avroSer = AvroOutputStream.binary[IN](byteArray)
>
> avroSer.write(element)
>
> avroSer.flush()
>
> avroSer.close()
>
>
>
> return byteArray.toByteArray
>
>   }
>
> }
>
>
>
> The job code:
>
> case class Person(name : String, age : Int, address : Address)
>
> case class Address(city : String, street : String)
>
>
>
> class SimpleJob {
>
>
>
>   @transient
>
>   private lazy val serSchema : AvroSerializationSchema[Person] = new
> AvroSerializationSchema[Person]()
>
>
>
>   def start() = {
>
> val testPerson = Person("Test", 100, Address("Test", "Test"))
>
>
>
> val env = StreamExecutionEnvironment.getExecutionEnvironment
>
>
>
> env.
>
>   fromCollection(Seq(testPerson)).
>
>   addSink(createKafkaSink())
>
>
>
> env.execute("Flink sample job")
>
>   }
>
>
>
>
>
>   def createKafkaSink() : RichSinkFunction[Person] = {
>
> //set some properties
>
> val properties = new Properties()
>
> properties.put("bootstrap.servers", "127.0.0.01:9092
> <https://urldefense.proofpoint.com/v2/url?u=http-3A__127.0.0.01-3A9092=DwMFaQ=91HTncUBNS9Yv-Uuv2IlCA=uIPe81JDQPedACYiDVt5c_onxyyylzQ6P_yssgrLiLA=r7xfZoirhywTOErLvFFTTOxHs8dZeMd6-JtIaFdDtb0=GR3YuCSPimKhPq1hcics55VX6yef8lIsMEyTmEGFRSc=>
> ")
>
> properties.put("zookeeper.connect", "127.0.0.1:2181
> <https://urldefense.proofpoint.com/v2/url?u=http-3A__127.0.0.1-3A2181=DwMFaQ=91HTncUBNS9Yv-Uuv2IlCA=uIPe81JDQPedACYiDVt5c_onxyyylzQ6P_yssgrLiLA=r7xfZoirhywTOErLvFFTTOxHs8dZeMd6-JtIaFdDtb0=zkbyqz0oyZOwyBZ9Hy7PpuGlTyPPB639vVkkFc6FlpQ=>
> ")
>
>
>
> new FlinkKafkaProducer011[Person]("persons", serSchema, properties)
>
>   }
>
>
>
> }
>
>
>
> The code does compile, however it gives the following error on
> runtime: InvalidProgramException: Object
> org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper@639c2c1d
> is not serializable.
>
>
>
> I assume this means that my custom SerializationSchema is not serializable
> due to the use of SchemaFor, FromRecord and ToRecord.
>
> Anyone knows a solution or workaround?
>
>
>
> Thanks in advance!
>
> Wouter
>
> This message contains confidential information and is intended only for
> the individual named. If you are not the named addressee, you should not
> disseminate, distribute, alter or copy this e-mail. Please notify the
> sender immediately by e-mail if you have received this e-mail by mistake
> and delete this e-mail from your system. E-mail transmissions cannot be
> guaranteed to be secure or without error as information could be
> intercepted, corrupted, lost, destroyed, arrive late or incomplete, or
> contain viruses. The sender, therefore, does not accept liability for any
> errors or omissions in the contents of this message which arise during or
> as a result of e-mail transmission. If verification is required, please
> request a hard-copy version. This message is provided for information
> purposes and should not be construed as a solicitation or offer to buy or
> sell any securities or related financial instruments in any jurisdiction.
> Securities are offered in the U.S. through PIMCO Investments LLC,
> distributor 

RE: KafkaProducer with generic (Avro) serialization schema

2018-04-25 Thread Nortman, Bill
The things I would try would first in you are you class Person and Address have 
getters and setters and a no argument constructor.

From: Wouter Zorgdrager [mailto:zorgdrag...@gmail.com]
Sent: Wednesday, April 25, 2018 7:17 AM
To: user@flink.apache.org
Subject: KafkaProducer with generic (Avro) serialization schema

Dear reader,

I'm currently working on writing a KafkaProducer which is able to serialize a 
generic type using avro4s.
However this serialization schema is not serializable itself. Here is my code 
for this:

The serialization schema:
class AvroSerializationSchema[IN : SchemaFor : FromRecord: ToRecord] extends 
SerializationSchema[IN] {

  override def serialize(element: IN): Array[Byte] = {
val byteArray = new ByteArrayOutputStream()
val avroSer = AvroOutputStream.binary[IN](byteArray)
avroSer.write(element)
avroSer.flush()
avroSer.close()

return byteArray.toByteArray
  }
}

The job code:
case class Person(name : String, age : Int, address : Address)
case class Address(city : String, street : String)

class SimpleJob {

  @transient
  private lazy val serSchema : AvroSerializationSchema[Person] = new 
AvroSerializationSchema[Person]()

  def start() = {
val testPerson = Person("Test", 100, Address("Test", "Test"))

val env = StreamExecutionEnvironment.getExecutionEnvironment

env.
  fromCollection(Seq(testPerson)).
  addSink(createKafkaSink())

env.execute("Flink sample job")
  }


  def createKafkaSink() : RichSinkFunction[Person] = {
//set some properties
val properties = new Properties()
properties.put("bootstrap.servers", 
"127.0.0.01:9092<https://urldefense.proofpoint.com/v2/url?u=http-3A__127.0.0.01-3A9092=DwMFaQ=91HTncUBNS9Yv-Uuv2IlCA=uIPe81JDQPedACYiDVt5c_onxyyylzQ6P_yssgrLiLA=r7xfZoirhywTOErLvFFTTOxHs8dZeMd6-JtIaFdDtb0=GR3YuCSPimKhPq1hcics55VX6yef8lIsMEyTmEGFRSc=>")
properties.put("zookeeper.connect", 
"127.0.0.1:2181<https://urldefense.proofpoint.com/v2/url?u=http-3A__127.0.0.1-3A2181=DwMFaQ=91HTncUBNS9Yv-Uuv2IlCA=uIPe81JDQPedACYiDVt5c_onxyyylzQ6P_yssgrLiLA=r7xfZoirhywTOErLvFFTTOxHs8dZeMd6-JtIaFdDtb0=zkbyqz0oyZOwyBZ9Hy7PpuGlTyPPB639vVkkFc6FlpQ=>")

new FlinkKafkaProducer011[Person]("persons", serSchema, properties)
  }

}

The code does compile, however it gives the following error on runtime: 
InvalidProgramException: Object 
org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper@639c2c1d<mailto:org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper@639c2c1d>
 is not serializable.

I assume this means that my custom SerializationSchema is not serializable due 
to the use of SchemaFor, FromRecord and ToRecord.
Anyone knows a solution or workaround?

Thanks in advance!
Wouter
This message contains confidential information and is intended only for the 
individual named. If you are not the named addressee, you should not 
disseminate, distribute, alter or copy this e-mail. Please notify the sender 
immediately by e-mail if you have received this e-mail by mistake and delete 
this e-mail from your system. E-mail transmissions cannot be guaranteed to be 
secure or without error as information could be intercepted, corrupted, lost, 
destroyed, arrive late or incomplete, or contain viruses. The sender, 
therefore, does not accept liability for any errors or omissions in the 
contents of this message which arise during or as a result of e-mail 
transmission. If verification is required, please request a hard-copy version. 
This message is provided for information purposes and should not be construed 
as a solicitation or offer to buy or sell any securities or related financial 
instruments in any jurisdiction.  Securities are offered in the U.S. through 
PIMCO Investments LLC, distributor and a company of PIMCO LLC.

The individual providing the information herein is an employee of Pacific 
Investment Management Company LLC ("PIMCO"), an SEC-registered investment 
adviser.  To the extent such individual advises you regarding a PIMCO 
investment strategy, he or she does so as an associated person of PIMCO.  To 
the extent that any information is provided to you related to a PIMCO-sponsored 
investment fund ("PIMCO Fund"), it is being provided to you in the individual's 
capacity as a registered representative of PIMCO Investments LLC ("PI"), an 
SEC-registered broker-dealer.  PI is not registered, and does not intend to 
register, as a municipal advisor and therefore does not provide advice with 
respect to the investment of the proceeds of municipal securities or municipal 
escrow investments.  In addition, unless otherwise agreed by PIMCO, this 
communication and any related attachments are being provided on the express 
basis that they will not cause PIMCO LLC, or its affiliates, to become an 
investment advice fiduciary under ERISA or the Internal Revenue Code.


Re: Serialization schema

2017-02-26 Thread Mohit Anchlia
There was a private member variable that was not serializable and was not
marked transient. Thanks for the pointer.

On Thu, Feb 23, 2017 at 11:44 PM, Tzu-Li (Gordon) Tai <tzuli...@apache.org>
wrote:

> Thanks for clarifying.
>
> From the looks of your exception:
>
> Caused by: java.io.NotSerializableException:
>>>>> com.sy.flink.test.Tuple2Serializerr$1
>>>>> at java.io.ObjectOutputStream.wri
>>>>> teObject0(ObjectOutputStream.java:1184)
>>>>> at java.io.ObjectOutputStream.def
>>>>> aultWriteFields(ObjectOutputStream.java:1548)
>>>>>
>>>>
> com.sy.flink.test.Tuple2Serializerr$1: this states that an anonymous
> inner class in `Tuple2Serializerr` is not serializable.
>
> Could you check if that’s the case?
>
>
>
> On February 24, 2017 at 3:10:58 PM, Mohit Anchlia (mohitanch...@gmail.com)
> wrote:
>
> But it is not an inner class.
>
> On Thu, Feb 23, 2017 at 11:09 PM, Tzu-Li (Gordon) Tai <tzuli...@apache.org
> > wrote:
>
>> Since I don’t have your complete code, I’m guessing this is the problem:
>> Is your `Tuple2Serializer` an inner class? If yes, you should be able to
>> solve the problem by declaring `Tuple2Serializer` to be `static`.
>>
>> This is more of a Java problem -
>> It isn’t serializable if it isn’t static, because it will contain an
>> implicit reference to the enclosing outer class, and therefore serializing
>> it will result in serializing the outer class instance as well.
>>
>>
>> On February 24, 2017 at 2:43:38 PM, Mohit Anchlia (mohitanch...@gmail.com)
>> wrote:
>>
>> This is at high level what I am doing:
>>
>> Serialize:
>>
>> String s = tuple.getPos(0) + "," + tuple.getPos(1);
>> return s.getBytes()
>>
>> Deserialize:
>>
>> String s = new String(message);
>> String [] sarr = s.split(",");
>> Tuple2<Integer, Integer> tuple = new Tuple2<>(Integer.valueOf(sarr[0]),
>> Integer.valueOf(sarr[1]));
>>
>> return tuple;
>>
>>
>> On Thu, Feb 23, 2017 at 10:22 PM, Tzu-Li (Gordon) Tai <
>> tzuli...@apache.org> wrote:
>>
>>> Hi Mohit,
>>>
>>> As 刘彪 pointed out in his reply, the problem is that your
>>> `Tuple2Serializer` contains fields that are not serializable, so
>>> `Tuple2Serializer` itself is not serializable.
>>> Could you perhaps share your `Tuple2Serializer` implementation with us
>>> so we can pinpoint the problem?
>>>
>>> A snippet of the class fields and constructor will do, so you don’t have
>>> to provide the whole `serialize` / `deserialize` implementation if you
>>> don’t want to.
>>>
>>> Cheers,
>>> Gordon
>>>
>>>
>>> On February 24, 2017 at 11:04:34 AM, Mohit Anchlia (
>>> mohitanch...@gmail.com) wrote:
>>>
>>> I am using String inside to convert into bytes.
>>>
>>> On Thu, Feb 23, 2017 at 6:50 PM, 刘彪 <mmyy1...@gmail.com> wrote:
>>>
>>>> Hi Mohit
>>>> As you did not give the whole codes of Tuple2Serializerr. I guess the
>>>> reason is some fields of Tuple2Serializerr do not implement Serializable.
>>>>
>>>> 2017-02-24 9:07 GMT+08:00 Mohit Anchlia <mohitanch...@gmail.com>:
>>>>
>>>>> I wrote a key serialization class to write to kafka however I am
>>>>> getting this error. Not sure why as I've already implemented the 
>>>>> interfaces.
>>>>>
>>>>> Caused by: java.io.NotSerializableException:
>>>>> com.sy.flink.test.Tuple2Serializerr$1
>>>>> at java.io.ObjectOutputStream.wri
>>>>> teObject0(ObjectOutputStream.java:1184)
>>>>> at java.io.ObjectOutputStream.def
>>>>> aultWriteFields(ObjectOutputStream.java:1548)
>>>>>
>>>>> And the class implements the following:
>>>>>
>>>>> *public* *class* *Tuple2Serializerr* *implements*
>>>>>
>>>>> DeserializationSchema<Tuple2<Integer, Integer>>,
>>>>>
>>>>> SerializationSchema<Tuple2<Integer, Integer>> {
>>>>>
>>>>> And called like this:
>>>>>
>>>>>
>>>>> FlinkKafkaProducer010<Tuple2<Integer, Integer>> myProducer = *new*
>>>>> FlinkKafkaProducer010<Tuple2<Integer, Integer>>(
>>>>>
>>>>> "10.22.4.15:9092", // broker list
>>>>>
>>>>> "my-topic", // target topic
>>>>>
>>>>> *new* Tuple2Serializerr()); // serialization schema
>>>>>
>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>


Re: Serialization schema

2017-02-23 Thread Mohit Anchlia
But it is not an inner class.

On Thu, Feb 23, 2017 at 11:09 PM, Tzu-Li (Gordon) Tai <tzuli...@apache.org>
wrote:

> Since I don’t have your complete code, I’m guessing this is the problem:
> Is your `Tuple2Serializer` an inner class? If yes, you should be able to
> solve the problem by declaring `Tuple2Serializer` to be `static`.
>
> This is more of a Java problem -
> It isn’t serializable if it isn’t static, because it will contain an
> implicit reference to the enclosing outer class, and therefore serializing
> it will result in serializing the outer class instance as well.
>
>
> On February 24, 2017 at 2:43:38 PM, Mohit Anchlia (mohitanch...@gmail.com)
> wrote:
>
> This is at high level what I am doing:
>
> Serialize:
>
> String s = tuple.getPos(0) + "," + tuple.getPos(1);
> return s.getBytes()
>
> Deserialize:
>
> String s = new String(message);
> String [] sarr = s.split(",");
> Tuple2<Integer, Integer> tuple = new Tuple2<>(Integer.valueOf(sarr[0]),
> Integer.valueOf(sarr[1]));
>
> return tuple;
>
>
> On Thu, Feb 23, 2017 at 10:22 PM, Tzu-Li (Gordon) Tai <tzuli...@apache.org
> > wrote:
>
>> Hi Mohit,
>>
>> As 刘彪 pointed out in his reply, the problem is that your
>> `Tuple2Serializer` contains fields that are not serializable, so
>> `Tuple2Serializer` itself is not serializable.
>> Could you perhaps share your `Tuple2Serializer` implementation with us so
>> we can pinpoint the problem?
>>
>> A snippet of the class fields and constructor will do, so you don’t have
>> to provide the whole `serialize` / `deserialize` implementation if you
>> don’t want to.
>>
>> Cheers,
>> Gordon
>>
>>
>> On February 24, 2017 at 11:04:34 AM, Mohit Anchlia (
>> mohitanch...@gmail.com) wrote:
>>
>> I am using String inside to convert into bytes.
>>
>> On Thu, Feb 23, 2017 at 6:50 PM, 刘彪 <mmyy1...@gmail.com> wrote:
>>
>>> Hi Mohit
>>> As you did not give the whole codes of Tuple2Serializerr. I guess the
>>> reason is some fields of Tuple2Serializerr do not implement Serializable.
>>>
>>> 2017-02-24 9:07 GMT+08:00 Mohit Anchlia <mohitanch...@gmail.com>:
>>>
>>>> I wrote a key serialization class to write to kafka however I am
>>>> getting this error. Not sure why as I've already implemented the 
>>>> interfaces.
>>>>
>>>> Caused by: java.io.NotSerializableException:
>>>> com.sy.flink.test.Tuple2Serializerr$1
>>>> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.j
>>>> ava:1184)
>>>> at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputSt
>>>> ream.java:1548)
>>>>
>>>> And the class implements the following:
>>>>
>>>> *public* *class* *Tuple2Serializerr* *implements*
>>>>
>>>> DeserializationSchema<Tuple2<Integer, Integer>>,
>>>>
>>>> SerializationSchema<Tuple2<Integer, Integer>> {
>>>>
>>>> And called like this:
>>>>
>>>>
>>>> FlinkKafkaProducer010<Tuple2<Integer, Integer>> myProducer = *new*
>>>> FlinkKafkaProducer010<Tuple2<Integer, Integer>>(
>>>>
>>>> "10.22.4.15:9092", // broker list
>>>>
>>>> "my-topic", // target topic
>>>>
>>>> *new* Tuple2Serializerr()); // serialization schema
>>>>
>>>>
>>>>
>>>>
>>>
>>
>


Re: Serialization schema

2017-02-23 Thread Mohit Anchlia
This is at high level what I am doing:

Serialize:

String s = tuple.getPos(0) + "," + tuple.getPos(1);
return s.getBytes()

Deserialize:

String s = new String(message);
String [] sarr = s.split(",");
Tuple2<Integer, Integer> tuple = new Tuple2<>(Integer.valueOf(sarr[0]),
Integer.valueOf(sarr[1]));

return tuple;


On Thu, Feb 23, 2017 at 10:22 PM, Tzu-Li (Gordon) Tai <tzuli...@apache.org>
wrote:

> Hi Mohit,
>
> As 刘彪 pointed out in his reply, the problem is that your
> `Tuple2Serializer` contains fields that are not serializable, so
> `Tuple2Serializer` itself is not serializable.
> Could you perhaps share your `Tuple2Serializer` implementation with us so
> we can pinpoint the problem?
>
> A snippet of the class fields and constructor will do, so you don’t have
> to provide the whole `serialize` / `deserialize` implementation if you
> don’t want to.
>
> Cheers,
> Gordon
>
>
> On February 24, 2017 at 11:04:34 AM, Mohit Anchlia (mohitanch...@gmail.com)
> wrote:
>
> I am using String inside to convert into bytes.
>
> On Thu, Feb 23, 2017 at 6:50 PM, 刘彪 <mmyy1...@gmail.com> wrote:
>
>> Hi Mohit
>> As you did not give the whole codes of Tuple2Serializerr. I guess the
>> reason is some fields of Tuple2Serializerr do not implement Serializable.
>>
>> 2017-02-24 9:07 GMT+08:00 Mohit Anchlia <mohitanch...@gmail.com>:
>>
>>> I wrote a key serialization class to write to kafka however I am getting
>>> this error. Not sure why as I've already implemented the interfaces.
>>>
>>> Caused by: java.io.NotSerializableException:
>>> com.sy.flink.test.Tuple2Serializerr$1
>>> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.j
>>> ava:1184)
>>> at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputSt
>>> ream.java:1548)
>>>
>>> And the class implements the following:
>>>
>>> *public* *class* *Tuple2Serializerr* *implements*
>>>
>>> DeserializationSchema<Tuple2<Integer, Integer>>,
>>>
>>> SerializationSchema<Tuple2<Integer, Integer>> {
>>>
>>> And called like this:
>>>
>>>
>>> FlinkKafkaProducer010<Tuple2<Integer, Integer>> myProducer = *new*
>>> FlinkKafkaProducer010<Tuple2<Integer, Integer>>(
>>>
>>> "10.22.4.15:9092", // broker list
>>>
>>> "my-topic", // target topic
>>>
>>> *new* Tuple2Serializerr()); // serialization schema
>>>
>>>
>>>
>>>
>>
>


Re: Serialization schema

2017-02-23 Thread Tzu-Li (Gordon) Tai
Hi Mohit,

As 刘彪 pointed out in his reply, the problem is that your `Tuple2Serializer` 
contains fields that are not serializable, so `Tuple2Serializer` itself is not 
serializable.
Could you perhaps share your `Tuple2Serializer` implementation with us so we 
can pinpoint the problem?

A snippet of the class fields and constructor will do, so you don’t have to 
provide the whole `serialize` / `deserialize` implementation if you don’t want 
to.

Cheers,
Gordon


On February 24, 2017 at 11:04:34 AM, Mohit Anchlia (mohitanch...@gmail.com) 
wrote:

I am using String inside to convert into bytes.

On Thu, Feb 23, 2017 at 6:50 PM, 刘彪 <mmyy1...@gmail.com> wrote:
Hi Mohit
As you did not give the whole codes of Tuple2Serializerr. I guess the reason is 
some fields of Tuple2Serializerr do not implement Serializable.

2017-02-24 9:07 GMT+08:00 Mohit Anchlia <mohitanch...@gmail.com>:
I wrote a key serialization class to write to kafka however I am getting this 
error. Not sure why as I've already implemented the interfaces.

Caused by: java.io.NotSerializableException: 
com.sy.flink.test.Tuple2Serializerr$1
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
    at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)

And the class implements the following:

public class Tuple2Serializerr implements

DeserializationSchema<Tuple2<Integer, Integer>>,

SerializationSchema<Tuple2<Integer, Integer>> {

And called like this:



FlinkKafkaProducer010<Tuple2<Integer, Integer>> myProducer = new 
FlinkKafkaProducer010<Tuple2<Integer, Integer>>(

"10.22.4.15:9092", // broker list

"my-topic", // target topic

new Tuple2Serializerr()); // serialization schema









Re: Serialization schema

2017-02-23 Thread Mohit Anchlia
I am using String inside to convert into bytes.

On Thu, Feb 23, 2017 at 6:50 PM, 刘彪 <mmyy1...@gmail.com> wrote:

> Hi Mohit
> As you did not give the whole codes of Tuple2Serializerr. I guess the
> reason is some fields of Tuple2Serializerr do not implement Serializable.
>
> 2017-02-24 9:07 GMT+08:00 Mohit Anchlia <mohitanch...@gmail.com>:
>
>> I wrote a key serialization class to write to kafka however I am getting
>> this error. Not sure why as I've already implemented the interfaces.
>>
>> Caused by: java.io.NotSerializableException:
>> com.sy.flink.test.Tuple2Serializerr$1
>> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.
>> java:1184)
>> at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputSt
>> ream.java:1548)
>>
>> And the class implements the following:
>>
>> *public* *class* *Tuple2Serializerr* *implements*
>>
>> DeserializationSchema<Tuple2<Integer, Integer>>,
>>
>> SerializationSchema<Tuple2<Integer, Integer>> {
>>
>> And called like this:
>>
>>
>> FlinkKafkaProducer010<Tuple2<Integer, Integer>> myProducer = *new*
>> FlinkKafkaProducer010<Tuple2<Integer, Integer>>(
>>
>> "10.22.4.15:9092", // broker list
>>
>> "my-topic", // target topic
>>
>> *new* Tuple2Serializerr()); // serialization schema
>>
>>
>>
>>
>


Re: Serialization schema

2017-02-23 Thread 刘彪
Hi Mohit
As you did not give the whole codes of Tuple2Serializerr. I guess the
reason is some fields of Tuple2Serializerr do not implement Serializable.

2017-02-24 9:07 GMT+08:00 Mohit Anchlia <mohitanch...@gmail.com>:

> I wrote a key serialization class to write to kafka however I am getting
> this error. Not sure why as I've already implemented the interfaces.
>
> Caused by: java.io.NotSerializableException: com.sy.flink.test.
> Tuple2Serializerr$1
> at java.io.ObjectOutputStream.writeObject0(
> ObjectOutputStream.java:1184)
> at java.io.ObjectOutputStream.defaultWriteFields(
> ObjectOutputStream.java:1548)
>
> And the class implements the following:
>
> *public* *class* *Tuple2Serializerr* *implements*
>
> DeserializationSchema<Tuple2<Integer, Integer>>,
>
> SerializationSchema<Tuple2<Integer, Integer>> {
>
> And called like this:
>
>
> FlinkKafkaProducer010<Tuple2<Integer, Integer>> myProducer = *new*
> FlinkKafkaProducer010<Tuple2<Integer, Integer>>(
>
> "10.22.4.15:9092", // broker list
>
> "my-topic", // target topic
>
> *new* Tuple2Serializerr()); // serialization schema
>
>
>
>


Serialization schema

2017-02-23 Thread Mohit Anchlia
I wrote a key serialization class to write to kafka however I am getting
this error. Not sure why as I've already implemented the interfaces.

Caused by: java.io.NotSerializableException:
com.sy.flink.test.Tuple2Serializerr$1
at
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)

And the class implements the following:

*public* *class* *Tuple2Serializerr* *implements*

DeserializationSchema<Tuple2<Integer, Integer>>,

SerializationSchema<Tuple2<Integer, Integer>> {

And called like this:


FlinkKafkaProducer010<Tuple2<Integer, Integer>> myProducer = *new*
FlinkKafkaProducer010<Tuple2<Integer, Integer>>(

"10.22.4.15:9092", // broker list

"my-topic", // target topic

*new* Tuple2Serializerr()); // serialization schema