Re: Example for Jackson JsonNode Kafka serialization schema
Hi Oran, as you've already suggested, you could just use a (flat)map function that takes an ObjectNode and outputs a string. In the mapper, you can do whatever you want in case of an invalid object: logging about it, discarding it, writing an "error json string", writing to a side output stream, ... On Tue, Jan 25, 2022 at 12:38 PM Oran Shuster wrote: > In the documentation we have an example on how to implement > deserialization from bytes to Jackson ObjectNode objects > - JSONKeyValueDeserializationSchema > > https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/datastream/kafka/ > > However, there is no example on the other direction: Taking an > ObjectNode/JsonNode (or just any POJO class) and using Jackson to serialize > it to string > > You can write a simple schema like so > > > public class JSONKafkaSerializationSchema implements > KafkaSerializationSchema { > private final ObjectMapper objectMapper = new ObjectMapper(); > > @Override > public ProducerRecord serialize(JsonNode element, > @Nullable Long timestamp) { > String topic = getTargetTopic(element); > > byte[] value; > > try { > value = objectMapper.writeValueAsBytes(element); > return new ProducerRecord<>(topic, value); > } catch (JsonProcessingException e) { > return null; > } > } > > private String getTargetTopic(JsonNode jsonNode) { > return jsonNode.get("topic").asText(); > } > } > > But this raises a question - What to do when a serialization fails? > if the input class is a simple POJO then Jackson should always succeed in > converting to bytes but that's not 100% guaranteed. > In case of failures, can we return null and the record will be discarded? > Null values are discarded in the case of the deserialization schema, from > the documentation - "Returns: The deserialized message as an object (null > if the message cannot be deserialized)." > If this is not possible, what is the proper way to serialize Jackson > objets into bytes in flink? Its possible to convert everything to String > before the kafka producer but then any logic to determine the topic we need > to send to will need to deserialize the string again >
Example for Jackson JsonNode Kafka serialization schema
In the documentation we have an example on how to implement deserialization from bytes to Jackson ObjectNode objects - JSONKeyValueDeserializationSchema https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/datastream/kafka/ However, there is no example on the other direction: Taking an ObjectNode/JsonNode (or just any POJO class) and using Jackson to serialize it to string You can write a simple schema like so public class JSONKafkaSerializationSchema implements KafkaSerializationSchema { private final ObjectMapper objectMapper = new ObjectMapper(); @Override public ProducerRecord serialize(JsonNode element, @Nullable Long timestamp) { String topic = getTargetTopic(element); byte[] value; try { value = objectMapper.writeValueAsBytes(element); return new ProducerRecord<>(topic, value); } catch (JsonProcessingException e) { return null; } } private String getTargetTopic(JsonNode jsonNode) { return jsonNode.get("topic").asText(); } } But this raises a question - What to do when a serialization fails? if the input class is a simple POJO then Jackson should always succeed in converting to bytes but that's not 100% guaranteed. In case of failures, can we return null and the record will be discarded? Null values are discarded in the case of the deserialization schema, from the documentation - "Returns: The deserialized message as an object (null if the message cannot be deserialized)." If this is not possible, what is the proper way to serialize Jackson objets into bytes in flink? Its possible to convert everything to String before the kafka producer but then any logic to determine the topic we need to send to will need to deserialize the string again
Re: KafkaProducer with generic (Avro) serialization schema
Hi, Thanks for the suggestions. Unfortunately I cannot make FromRecord/ForRecord/SchemaFor serializable, since those classes are out of my control. I use those from the avro4s library (https://github.com/sksamuel/avro4s). The problem here, especially with the deserializer is that I need to convert an Avro 'GenericRecord' to a Scala case class. Avro is written in Java, so thats a bit problematic and therefore I need to Avro4s library. Avro4s tries to verify on compile-time if the generic is actually convertible from/to a generic record, that is why I need those context bounds. As for @Aljoscha's workaround, I don't understand how this would solve it? Because doesn't that just move the problem? If I create a factory, I still need the generic (with context bounds) I specify at my KafkaConsumer/Deserialization schema. @Fabian I'm not sure if I understand your proposal. I still need the context bounds for those compile-time macro's of Avro4s. Once again, thanks for your help so far! Regards, Wouter Op wo 2 mei 2018 om 16:48 schreef Fabian Hueske <fhue...@gmail.com>: > Hi Wouter, > > you can try to make the SerializationSchema serializable by overriding > Java's serialization methods writeObject() and readObject() similar as > Flink's AvroRowSerializationSchema [1] does. > > Best, Fabian > > [1] > https://github.com/apache/flink/blob/master/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowSerializationSchema.java > > 2018-05-02 16:34 GMT+02:00 Piotr Nowojski <pi...@data-artisans.com>: > >> Hi, >> >> My Scala knowledge is very limited (and my Scala's serialization >> knowledge is non existent), but one way or another you have to make your >> SerializationSchema serialisable. If indeed this is the problem, maybe a >> better place to ask this question is on Stack Overflow or some scala >> specific mailing list/board (unless someone else from the Flink's community >> can provide an answer to this problem)? >> >> Piotrek >> >> On 1 May 2018, at 16:30, Wouter Zorgdrager <zorgdrag...@gmail.com> wrote: >> >> So, I'm still struggling with this issue. I dived a bit more into the >> problem and I'm pretty sure that the problem is that I have to (implicitly) >> pass the SchemaFor and RecordTo classes to my serialization schema >> (otherwise I can't make it generic). However those class aren't >> serializable, but of course I can't annotate them transient nor make it a >> lazy val which gives me the current issue. >> >> I hope someone has some leads for me. >> >> Thanks! >> >> Op do 26 apr. 2018 om 23:03 schreef Wouter Zorgdrager < >> zorgdrag...@gmail.com>: >> >>> Hi Bill, >>> >>> Thanks for your answer. However this proposal isn't going to solve my >>> issue, since the problem here is that the context bounds I need to give in >>> order to serialize it to Avro (SchemaFor, ToRecord and FromRecord) aren't >>> serializable classes. This results in Flink not being able to serialize the >>> KafkaProducer failing the whole job. >>> >>> Thanks, >>> Wouter >>> >>> Op do 26 apr. 2018 om 00:42 schreef Nortman, Bill < >>> william.nort...@pimco.com>: >>> >>>> The things I would try would first in you are you class Person and >>>> Address have getters and setters and a no argument constructor. >>>> >>>> >>>> >>>> *From:* Wouter Zorgdrager [mailto:zorgdrag...@gmail.com] >>>> *Sent:* Wednesday, April 25, 2018 7:17 AM >>>> *To:* user@flink.apache.org >>>> *Subject:* KafkaProducer with generic (Avro) serialization schema >>>> >>>> >>>> >>>> Dear reader, >>>> >>>> >>>> >>>> I'm currently working on writing a KafkaProducer which is able to >>>> serialize a generic type using avro4s. >>>> >>>> However this serialization schema is not serializable itself. Here is >>>> my code for this: >>>> >>>> >>>> >>>> The serialization schema: >>>> >>>> class AvroSerializationSchema[IN : SchemaFor : FromRecord: ToRecord] >>>> extends SerializationSchema[IN] { >>>> >>>> >>>> >>>> override def serialize(element: IN): Array[Byte] = { >>>> >>>> val byteArray = new ByteArrayOutputStream() >>>> >>>> val avroSer = AvroOutputStream.binary[IN](byteArray) >>>> >>>> avroSer.write(element)
Re: KafkaProducer with generic (Avro) serialization schema
Hi Wouter, you can try to make the SerializationSchema serializable by overriding Java's serialization methods writeObject() and readObject() similar as Flink's AvroRowSerializationSchema [1] does. Best, Fabian [1] https://github.com/apache/flink/blob/master/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowSerializationSchema.java 2018-05-02 16:34 GMT+02:00 Piotr Nowojski <pi...@data-artisans.com>: > Hi, > > My Scala knowledge is very limited (and my Scala's serialization knowledge > is non existent), but one way or another you have to make your > SerializationSchema serialisable. If indeed this is the problem, maybe a > better place to ask this question is on Stack Overflow or some scala > specific mailing list/board (unless someone else from the Flink's community > can provide an answer to this problem)? > > Piotrek > > On 1 May 2018, at 16:30, Wouter Zorgdrager <zorgdrag...@gmail.com> wrote: > > So, I'm still struggling with this issue. I dived a bit more into the > problem and I'm pretty sure that the problem is that I have to (implicitly) > pass the SchemaFor and RecordTo classes to my serialization schema > (otherwise I can't make it generic). However those class aren't > serializable, but of course I can't annotate them transient nor make it a > lazy val which gives me the current issue. > > I hope someone has some leads for me. > > Thanks! > > Op do 26 apr. 2018 om 23:03 schreef Wouter Zorgdrager < > zorgdrag...@gmail.com>: > >> Hi Bill, >> >> Thanks for your answer. However this proposal isn't going to solve my >> issue, since the problem here is that the context bounds I need to give in >> order to serialize it to Avro (SchemaFor, ToRecord and FromRecord) aren't >> serializable classes. This results in Flink not being able to serialize the >> KafkaProducer failing the whole job. >> >> Thanks, >> Wouter >> >> Op do 26 apr. 2018 om 00:42 schreef Nortman, Bill < >> william.nort...@pimco.com>: >> >>> The things I would try would first in you are you class Person and >>> Address have getters and setters and a no argument constructor. >>> >>> >>> >>> *From:* Wouter Zorgdrager [mailto:zorgdrag...@gmail.com] >>> *Sent:* Wednesday, April 25, 2018 7:17 AM >>> *To:* user@flink.apache.org >>> *Subject:* KafkaProducer with generic (Avro) serialization schema >>> >>> >>> >>> Dear reader, >>> >>> >>> >>> I'm currently working on writing a KafkaProducer which is able to >>> serialize a generic type using avro4s. >>> >>> However this serialization schema is not serializable itself. Here is my >>> code for this: >>> >>> >>> >>> The serialization schema: >>> >>> class AvroSerializationSchema[IN : SchemaFor : FromRecord: ToRecord] >>> extends SerializationSchema[IN] { >>> >>> >>> >>> override def serialize(element: IN): Array[Byte] = { >>> >>> val byteArray = new ByteArrayOutputStream() >>> >>> val avroSer = AvroOutputStream.binary[IN](byteArray) >>> >>> avroSer.write(element) >>> >>> avroSer.flush() >>> >>> avroSer.close() >>> >>> >>> >>> return byteArray.toByteArray >>> >>> } >>> >>> } >>> >>> >>> >>> The job code: >>> >>> case class Person(name : String, age : Int, address : Address) >>> >>> case class Address(city : String, street : String) >>> >>> >>> >>> class SimpleJob { >>> >>> >>> >>> @transient >>> >>> private lazy val serSchema : AvroSerializationSchema[Person] = new >>> AvroSerializationSchema[Person]() >>> >>> >>> >>> def start() = { >>> >>> val testPerson = Person("Test", 100, Address("Test", "Test")) >>> >>> >>> >>> val env = StreamExecutionEnvironment.getExecutionEnvironment >>> >>> >>> >>> env. >>> >>> fromCollection(Seq(testPerson)). >>> >>> addSink(createKafkaSink()) >>> >>> >>> >>> env.execute("Flink sample job") >>> >>> } >>> >>> >>> >>> >>> >>> def createKafkaSink() : RichSinkFunction[Pe
Re: KafkaProducer with generic (Avro) serialization schema
Hi, Piotr is right, the SerializationSchema has to be serializable, which means that the implicit values passed on for SchemaFor[IN], FromRecord[IN], and ToRecord[IN] need to be serializable. Is there no way of making those serializable? As a workaround you could think about having a factory for those types that is serializable and have the factory as the context bound and then lazily create the SchemaFor, FromRecord, and ToRecord from that. What do you think? Aljoscha > On 2. May 2018, at 16:34, Piotr Nowojski <pi...@data-artisans.com> wrote: > > Hi, > > My Scala knowledge is very limited (and my Scala's serialization knowledge is > non existent), but one way or another you have to make your > SerializationSchema serialisable. If indeed this is the problem, maybe a > better place to ask this question is on Stack Overflow or some scala specific > mailing list/board (unless someone else from the Flink's community can > provide an answer to this problem)? > > Piotrek > >> On 1 May 2018, at 16:30, Wouter Zorgdrager <zorgdrag...@gmail.com >> <mailto:zorgdrag...@gmail.com>> wrote: >> >> So, I'm still struggling with this issue. I dived a bit more into the >> problem and I'm pretty sure that the problem is that I have to (implicitly) >> pass the SchemaFor and RecordTo classes to my serialization schema >> (otherwise I can't make it generic). However those class aren't >> serializable, but of course I can't annotate them transient nor make it a >> lazy val which gives me the current issue. >> >> I hope someone has some leads for me. >> >> Thanks! >> >> Op do 26 apr. 2018 om 23:03 schreef Wouter Zorgdrager <zorgdrag...@gmail.com >> <mailto:zorgdrag...@gmail.com>>: >> Hi Bill, >> >> Thanks for your answer. However this proposal isn't going to solve my issue, >> since the problem here is that the context bounds I need to give in order to >> serialize it to Avro (SchemaFor, ToRecord and FromRecord) aren't >> serializable classes. This results in Flink not being able to serialize the >> KafkaProducer failing the whole job. >> >> Thanks, >> Wouter >> >> Op do 26 apr. 2018 om 00:42 schreef Nortman, Bill <william.nort...@pimco.com >> <mailto:william.nort...@pimco.com>>: >> The things I would try would first in you are you class Person and Address >> have getters and setters and a no argument constructor. >> >> >> >> From: Wouter Zorgdrager [mailto:zorgdrag...@gmail.com >> <mailto:zorgdrag...@gmail.com>] >> Sent: Wednesday, April 25, 2018 7:17 AM >> To: user@flink.apache.org <mailto:user@flink.apache.org> >> Subject: KafkaProducer with generic (Avro) serialization schema >> >> >> >> Dear reader, >> >> >> >> I'm currently working on writing a KafkaProducer which is able to serialize >> a generic type using avro4s. >> >> However this serialization schema is not serializable itself. Here is my >> code for this: >> >> >> >> The serialization schema: >> >> class AvroSerializationSchema[IN : SchemaFor : FromRecord: ToRecord] extends >> SerializationSchema[IN] { >> >> >> >> override def serialize(element: IN): Array[Byte] = { >> >> val byteArray = new ByteArrayOutputStream() >> >> val avroSer = AvroOutputStream.binary[IN](byteArray) >> >> avroSer.write(element) >> >> avroSer.flush() >> >> avroSer.close() >> >> >> >> return byteArray.toByteArray >> >> } >> >> } >> >> >> >> The job code: >> >> case class Person(name : String, age : Int, address : Address) >> >> case class Address(city : String, street : String) >> >> >> >> class SimpleJob { >> >> >> >> @transient >> >> private lazy val serSchema : AvroSerializationSchema[Person] = new >> AvroSerializationSchema[Person]() >> >> >> >> def start() = { >> >> val testPerson = Person("Test", 100, Address("Test", "Test")) >> >> >> >> val env = StreamExecutionEnvironment.getExecutionEnvironment >> >> >> >> env. >> >> fromCollection(Seq(testPerson)). >> >> addSink(createKafkaSink()) >> >> >> >> env.execute("Flink sample job") >
Re: KafkaProducer with generic (Avro) serialization schema
Hi, My Scala knowledge is very limited (and my Scala's serialization knowledge is non existent), but one way or another you have to make your SerializationSchema serialisable. If indeed this is the problem, maybe a better place to ask this question is on Stack Overflow or some scala specific mailing list/board (unless someone else from the Flink's community can provide an answer to this problem)? Piotrek > On 1 May 2018, at 16:30, Wouter Zorgdrager <zorgdrag...@gmail.com> wrote: > > So, I'm still struggling with this issue. I dived a bit more into the problem > and I'm pretty sure that the problem is that I have to (implicitly) pass the > SchemaFor and RecordTo classes to my serialization schema (otherwise I can't > make it generic). However those class aren't serializable, but of course I > can't annotate them transient nor make it a lazy val which gives me the > current issue. > > I hope someone has some leads for me. > > Thanks! > > Op do 26 apr. 2018 om 23:03 schreef Wouter Zorgdrager <zorgdrag...@gmail.com > <mailto:zorgdrag...@gmail.com>>: > Hi Bill, > > Thanks for your answer. However this proposal isn't going to solve my issue, > since the problem here is that the context bounds I need to give in order to > serialize it to Avro (SchemaFor, ToRecord and FromRecord) aren't serializable > classes. This results in Flink not being able to serialize the KafkaProducer > failing the whole job. > > Thanks, > Wouter > > Op do 26 apr. 2018 om 00:42 schreef Nortman, Bill <william.nort...@pimco.com > <mailto:william.nort...@pimco.com>>: > The things I would try would first in you are you class Person and Address > have getters and setters and a no argument constructor. > > > > From: Wouter Zorgdrager [mailto:zorgdrag...@gmail.com > <mailto:zorgdrag...@gmail.com>] > Sent: Wednesday, April 25, 2018 7:17 AM > To: user@flink.apache.org <mailto:user@flink.apache.org> > Subject: KafkaProducer with generic (Avro) serialization schema > > > > Dear reader, > > > > I'm currently working on writing a KafkaProducer which is able to serialize a > generic type using avro4s. > > However this serialization schema is not serializable itself. Here is my code > for this: > > > > The serialization schema: > > class AvroSerializationSchema[IN : SchemaFor : FromRecord: ToRecord] extends > SerializationSchema[IN] { > > > > override def serialize(element: IN): Array[Byte] = { > > val byteArray = new ByteArrayOutputStream() > > val avroSer = AvroOutputStream.binary[IN](byteArray) > > avroSer.write(element) > > avroSer.flush() > > avroSer.close() > > > > return byteArray.toByteArray > > } > > } > > > > The job code: > > case class Person(name : String, age : Int, address : Address) > > case class Address(city : String, street : String) > > > > class SimpleJob { > > > > @transient > > private lazy val serSchema : AvroSerializationSchema[Person] = new > AvroSerializationSchema[Person]() > > > > def start() = { > > val testPerson = Person("Test", 100, Address("Test", "Test")) > > > > val env = StreamExecutionEnvironment.getExecutionEnvironment > > > > env. > > fromCollection(Seq(testPerson)). > > addSink(createKafkaSink()) > > > > env.execute("Flink sample job") > > } > > > > > > def createKafkaSink() : RichSinkFunction[Person] = { > > //set some properties > > val properties = new Properties() > > properties.put("bootstrap.servers", "127.0.0.01:9092 > <https://urldefense.proofpoint.com/v2/url?u=http-3A__127.0.0.01-3A9092=DwMFaQ=91HTncUBNS9Yv-Uuv2IlCA=uIPe81JDQPedACYiDVt5c_onxyyylzQ6P_yssgrLiLA=r7xfZoirhywTOErLvFFTTOxHs8dZeMd6-JtIaFdDtb0=GR3YuCSPimKhPq1hcics55VX6yef8lIsMEyTmEGFRSc=>") > > properties.put("zookeeper.connect", "127.0.0.1:2181 > <https://urldefense.proofpoint.com/v2/url?u=http-3A__127.0.0.1-3A2181=DwMFaQ=91HTncUBNS9Yv-Uuv2IlCA=uIPe81JDQPedACYiDVt5c_onxyyylzQ6P_yssgrLiLA=r7xfZoirhywTOErLvFFTTOxHs8dZeMd6-JtIaFdDtb0=zkbyqz0oyZOwyBZ9Hy7PpuGlTyPPB639vVkkFc6FlpQ=>") > > > > new FlinkKafkaProducer011[Person]("persons", serSchema, properties) > > } > > > > } > > > > The code does compile, however it gives the following error on runtime: > InvalidProgramException: > Objectorg.apache.flink.streaming.util.serialization.KeyedSe
Re: KafkaProducer with generic (Avro) serialization schema
So, I'm still struggling with this issue. I dived a bit more into the problem and I'm pretty sure that the problem is that I have to (implicitly) pass the SchemaFor and RecordTo classes to my serialization schema (otherwise I can't make it generic). However those class aren't serializable, but of course I can't annotate them transient nor make it a lazy val which gives me the current issue. I hope someone has some leads for me. Thanks! Op do 26 apr. 2018 om 23:03 schreef Wouter Zorgdrager <zorgdrag...@gmail.com >: > Hi Bill, > > Thanks for your answer. However this proposal isn't going to solve my > issue, since the problem here is that the context bounds I need to give in > order to serialize it to Avro (SchemaFor, ToRecord and FromRecord) aren't > serializable classes. This results in Flink not being able to serialize the > KafkaProducer failing the whole job. > > Thanks, > Wouter > > Op do 26 apr. 2018 om 00:42 schreef Nortman, Bill < > william.nort...@pimco.com>: > >> The things I would try would first in you are you class Person and >> Address have getters and setters and a no argument constructor. >> >> >> >> *From:* Wouter Zorgdrager [mailto:zorgdrag...@gmail.com] >> *Sent:* Wednesday, April 25, 2018 7:17 AM >> *To:* user@flink.apache.org >> *Subject:* KafkaProducer with generic (Avro) serialization schema >> >> >> >> Dear reader, >> >> >> >> I'm currently working on writing a KafkaProducer which is able to >> serialize a generic type using avro4s. >> >> However this serialization schema is not serializable itself. Here is my >> code for this: >> >> >> >> The serialization schema: >> >> class AvroSerializationSchema[IN : SchemaFor : FromRecord: ToRecord] >> extends SerializationSchema[IN] { >> >> >> >> override def serialize(element: IN): Array[Byte] = { >> >> val byteArray = new ByteArrayOutputStream() >> >> val avroSer = AvroOutputStream.binary[IN](byteArray) >> >> avroSer.write(element) >> >> avroSer.flush() >> >> avroSer.close() >> >> >> >> return byteArray.toByteArray >> >> } >> >> } >> >> >> >> The job code: >> >> case class Person(name : String, age : Int, address : Address) >> >> case class Address(city : String, street : String) >> >> >> >> class SimpleJob { >> >> >> >> @transient >> >> private lazy val serSchema : AvroSerializationSchema[Person] = new >> AvroSerializationSchema[Person]() >> >> >> >> def start() = { >> >> val testPerson = Person("Test", 100, Address("Test", "Test")) >> >> >> >> val env = StreamExecutionEnvironment.getExecutionEnvironment >> >> >> >> env. >> >> fromCollection(Seq(testPerson)). >> >> addSink(createKafkaSink()) >> >> >> >> env.execute("Flink sample job") >> >> } >> >> >> >> >> >> def createKafkaSink() : RichSinkFunction[Person] = { >> >> //set some properties >> >> val properties = new Properties() >> >> properties.put("bootstrap.servers", "127.0.0.01:9092 >> <https://urldefense.proofpoint.com/v2/url?u=http-3A__127.0.0.01-3A9092=DwMFaQ=91HTncUBNS9Yv-Uuv2IlCA=uIPe81JDQPedACYiDVt5c_onxyyylzQ6P_yssgrLiLA=r7xfZoirhywTOErLvFFTTOxHs8dZeMd6-JtIaFdDtb0=GR3YuCSPimKhPq1hcics55VX6yef8lIsMEyTmEGFRSc=> >> ") >> >> properties.put("zookeeper.connect", "127.0.0.1:2181 >> <https://urldefense.proofpoint.com/v2/url?u=http-3A__127.0.0.1-3A2181=DwMFaQ=91HTncUBNS9Yv-Uuv2IlCA=uIPe81JDQPedACYiDVt5c_onxyyylzQ6P_yssgrLiLA=r7xfZoirhywTOErLvFFTTOxHs8dZeMd6-JtIaFdDtb0=zkbyqz0oyZOwyBZ9Hy7PpuGlTyPPB639vVkkFc6FlpQ=> >> ") >> >> >> >> new FlinkKafkaProducer011[Person]("persons", serSchema, properties) >> >> } >> >> >> >> } >> >> >> >> The code does compile, however it gives the following error on >> runtime: InvalidProgramException: Object >> org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper@639c2c1d >> is not serializable. >> >> >> >> I assume this means that my custom SerializationSchema is not >> serializable due to the use of SchemaFor, FromRecord and ToRecord. >> >> Anyone knows a solution or workaround? >> &
Re: KafkaProducer with generic (Avro) serialization schema
Hi Bill, Thanks for your answer. However this proposal isn't going to solve my issue, since the problem here is that the context bounds I need to give in order to serialize it to Avro (SchemaFor, ToRecord and FromRecord) aren't serializable classes. This results in Flink not being able to serialize the KafkaProducer failing the whole job. Thanks, Wouter Op do 26 apr. 2018 om 00:42 schreef Nortman, Bill <william.nort...@pimco.com >: > The things I would try would first in you are you class Person and Address > have getters and setters and a no argument constructor. > > > > *From:* Wouter Zorgdrager [mailto:zorgdrag...@gmail.com] > *Sent:* Wednesday, April 25, 2018 7:17 AM > *To:* user@flink.apache.org > *Subject:* KafkaProducer with generic (Avro) serialization schema > > > > Dear reader, > > > > I'm currently working on writing a KafkaProducer which is able to > serialize a generic type using avro4s. > > However this serialization schema is not serializable itself. Here is my > code for this: > > > > The serialization schema: > > class AvroSerializationSchema[IN : SchemaFor : FromRecord: ToRecord] > extends SerializationSchema[IN] { > > > > override def serialize(element: IN): Array[Byte] = { > > val byteArray = new ByteArrayOutputStream() > > val avroSer = AvroOutputStream.binary[IN](byteArray) > > avroSer.write(element) > > avroSer.flush() > > avroSer.close() > > > > return byteArray.toByteArray > > } > > } > > > > The job code: > > case class Person(name : String, age : Int, address : Address) > > case class Address(city : String, street : String) > > > > class SimpleJob { > > > > @transient > > private lazy val serSchema : AvroSerializationSchema[Person] = new > AvroSerializationSchema[Person]() > > > > def start() = { > > val testPerson = Person("Test", 100, Address("Test", "Test")) > > > > val env = StreamExecutionEnvironment.getExecutionEnvironment > > > > env. > > fromCollection(Seq(testPerson)). > > addSink(createKafkaSink()) > > > > env.execute("Flink sample job") > > } > > > > > > def createKafkaSink() : RichSinkFunction[Person] = { > > //set some properties > > val properties = new Properties() > > properties.put("bootstrap.servers", "127.0.0.01:9092 > <https://urldefense.proofpoint.com/v2/url?u=http-3A__127.0.0.01-3A9092=DwMFaQ=91HTncUBNS9Yv-Uuv2IlCA=uIPe81JDQPedACYiDVt5c_onxyyylzQ6P_yssgrLiLA=r7xfZoirhywTOErLvFFTTOxHs8dZeMd6-JtIaFdDtb0=GR3YuCSPimKhPq1hcics55VX6yef8lIsMEyTmEGFRSc=> > ") > > properties.put("zookeeper.connect", "127.0.0.1:2181 > <https://urldefense.proofpoint.com/v2/url?u=http-3A__127.0.0.1-3A2181=DwMFaQ=91HTncUBNS9Yv-Uuv2IlCA=uIPe81JDQPedACYiDVt5c_onxyyylzQ6P_yssgrLiLA=r7xfZoirhywTOErLvFFTTOxHs8dZeMd6-JtIaFdDtb0=zkbyqz0oyZOwyBZ9Hy7PpuGlTyPPB639vVkkFc6FlpQ=> > ") > > > > new FlinkKafkaProducer011[Person]("persons", serSchema, properties) > > } > > > > } > > > > The code does compile, however it gives the following error on > runtime: InvalidProgramException: Object > org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper@639c2c1d > is not serializable. > > > > I assume this means that my custom SerializationSchema is not serializable > due to the use of SchemaFor, FromRecord and ToRecord. > > Anyone knows a solution or workaround? > > > > Thanks in advance! > > Wouter > > This message contains confidential information and is intended only for > the individual named. If you are not the named addressee, you should not > disseminate, distribute, alter or copy this e-mail. Please notify the > sender immediately by e-mail if you have received this e-mail by mistake > and delete this e-mail from your system. E-mail transmissions cannot be > guaranteed to be secure or without error as information could be > intercepted, corrupted, lost, destroyed, arrive late or incomplete, or > contain viruses. The sender, therefore, does not accept liability for any > errors or omissions in the contents of this message which arise during or > as a result of e-mail transmission. If verification is required, please > request a hard-copy version. This message is provided for information > purposes and should not be construed as a solicitation or offer to buy or > sell any securities or related financial instruments in any jurisdiction. > Securities are offered in the U.S. through PIMCO Investments LLC, > distributor
RE: KafkaProducer with generic (Avro) serialization schema
The things I would try would first in you are you class Person and Address have getters and setters and a no argument constructor. From: Wouter Zorgdrager [mailto:zorgdrag...@gmail.com] Sent: Wednesday, April 25, 2018 7:17 AM To: user@flink.apache.org Subject: KafkaProducer with generic (Avro) serialization schema Dear reader, I'm currently working on writing a KafkaProducer which is able to serialize a generic type using avro4s. However this serialization schema is not serializable itself. Here is my code for this: The serialization schema: class AvroSerializationSchema[IN : SchemaFor : FromRecord: ToRecord] extends SerializationSchema[IN] { override def serialize(element: IN): Array[Byte] = { val byteArray = new ByteArrayOutputStream() val avroSer = AvroOutputStream.binary[IN](byteArray) avroSer.write(element) avroSer.flush() avroSer.close() return byteArray.toByteArray } } The job code: case class Person(name : String, age : Int, address : Address) case class Address(city : String, street : String) class SimpleJob { @transient private lazy val serSchema : AvroSerializationSchema[Person] = new AvroSerializationSchema[Person]() def start() = { val testPerson = Person("Test", 100, Address("Test", "Test")) val env = StreamExecutionEnvironment.getExecutionEnvironment env. fromCollection(Seq(testPerson)). addSink(createKafkaSink()) env.execute("Flink sample job") } def createKafkaSink() : RichSinkFunction[Person] = { //set some properties val properties = new Properties() properties.put("bootstrap.servers", "127.0.0.01:9092<https://urldefense.proofpoint.com/v2/url?u=http-3A__127.0.0.01-3A9092=DwMFaQ=91HTncUBNS9Yv-Uuv2IlCA=uIPe81JDQPedACYiDVt5c_onxyyylzQ6P_yssgrLiLA=r7xfZoirhywTOErLvFFTTOxHs8dZeMd6-JtIaFdDtb0=GR3YuCSPimKhPq1hcics55VX6yef8lIsMEyTmEGFRSc=>") properties.put("zookeeper.connect", "127.0.0.1:2181<https://urldefense.proofpoint.com/v2/url?u=http-3A__127.0.0.1-3A2181=DwMFaQ=91HTncUBNS9Yv-Uuv2IlCA=uIPe81JDQPedACYiDVt5c_onxyyylzQ6P_yssgrLiLA=r7xfZoirhywTOErLvFFTTOxHs8dZeMd6-JtIaFdDtb0=zkbyqz0oyZOwyBZ9Hy7PpuGlTyPPB639vVkkFc6FlpQ=>") new FlinkKafkaProducer011[Person]("persons", serSchema, properties) } } The code does compile, however it gives the following error on runtime: InvalidProgramException: Object org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper@639c2c1d<mailto:org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper@639c2c1d> is not serializable. I assume this means that my custom SerializationSchema is not serializable due to the use of SchemaFor, FromRecord and ToRecord. Anyone knows a solution or workaround? Thanks in advance! Wouter This message contains confidential information and is intended only for the individual named. If you are not the named addressee, you should not disseminate, distribute, alter or copy this e-mail. Please notify the sender immediately by e-mail if you have received this e-mail by mistake and delete this e-mail from your system. E-mail transmissions cannot be guaranteed to be secure or without error as information could be intercepted, corrupted, lost, destroyed, arrive late or incomplete, or contain viruses. The sender, therefore, does not accept liability for any errors or omissions in the contents of this message which arise during or as a result of e-mail transmission. If verification is required, please request a hard-copy version. This message is provided for information purposes and should not be construed as a solicitation or offer to buy or sell any securities or related financial instruments in any jurisdiction. Securities are offered in the U.S. through PIMCO Investments LLC, distributor and a company of PIMCO LLC. The individual providing the information herein is an employee of Pacific Investment Management Company LLC ("PIMCO"), an SEC-registered investment adviser. To the extent such individual advises you regarding a PIMCO investment strategy, he or she does so as an associated person of PIMCO. To the extent that any information is provided to you related to a PIMCO-sponsored investment fund ("PIMCO Fund"), it is being provided to you in the individual's capacity as a registered representative of PIMCO Investments LLC ("PI"), an SEC-registered broker-dealer. PI is not registered, and does not intend to register, as a municipal advisor and therefore does not provide advice with respect to the investment of the proceeds of municipal securities or municipal escrow investments. In addition, unless otherwise agreed by PIMCO, this communication and any related attachments are being provided on the express basis that they will not cause PIMCO LLC, or its affiliates, to become an investment advice fiduciary under ERISA or the Internal Revenue Code.
Re: Serialization schema
There was a private member variable that was not serializable and was not marked transient. Thanks for the pointer. On Thu, Feb 23, 2017 at 11:44 PM, Tzu-Li (Gordon) Tai <tzuli...@apache.org> wrote: > Thanks for clarifying. > > From the looks of your exception: > > Caused by: java.io.NotSerializableException: >>>>> com.sy.flink.test.Tuple2Serializerr$1 >>>>> at java.io.ObjectOutputStream.wri >>>>> teObject0(ObjectOutputStream.java:1184) >>>>> at java.io.ObjectOutputStream.def >>>>> aultWriteFields(ObjectOutputStream.java:1548) >>>>> >>>> > com.sy.flink.test.Tuple2Serializerr$1: this states that an anonymous > inner class in `Tuple2Serializerr` is not serializable. > > Could you check if that’s the case? > > > > On February 24, 2017 at 3:10:58 PM, Mohit Anchlia (mohitanch...@gmail.com) > wrote: > > But it is not an inner class. > > On Thu, Feb 23, 2017 at 11:09 PM, Tzu-Li (Gordon) Tai <tzuli...@apache.org > > wrote: > >> Since I don’t have your complete code, I’m guessing this is the problem: >> Is your `Tuple2Serializer` an inner class? If yes, you should be able to >> solve the problem by declaring `Tuple2Serializer` to be `static`. >> >> This is more of a Java problem - >> It isn’t serializable if it isn’t static, because it will contain an >> implicit reference to the enclosing outer class, and therefore serializing >> it will result in serializing the outer class instance as well. >> >> >> On February 24, 2017 at 2:43:38 PM, Mohit Anchlia (mohitanch...@gmail.com) >> wrote: >> >> This is at high level what I am doing: >> >> Serialize: >> >> String s = tuple.getPos(0) + "," + tuple.getPos(1); >> return s.getBytes() >> >> Deserialize: >> >> String s = new String(message); >> String [] sarr = s.split(","); >> Tuple2<Integer, Integer> tuple = new Tuple2<>(Integer.valueOf(sarr[0]), >> Integer.valueOf(sarr[1])); >> >> return tuple; >> >> >> On Thu, Feb 23, 2017 at 10:22 PM, Tzu-Li (Gordon) Tai < >> tzuli...@apache.org> wrote: >> >>> Hi Mohit, >>> >>> As 刘彪 pointed out in his reply, the problem is that your >>> `Tuple2Serializer` contains fields that are not serializable, so >>> `Tuple2Serializer` itself is not serializable. >>> Could you perhaps share your `Tuple2Serializer` implementation with us >>> so we can pinpoint the problem? >>> >>> A snippet of the class fields and constructor will do, so you don’t have >>> to provide the whole `serialize` / `deserialize` implementation if you >>> don’t want to. >>> >>> Cheers, >>> Gordon >>> >>> >>> On February 24, 2017 at 11:04:34 AM, Mohit Anchlia ( >>> mohitanch...@gmail.com) wrote: >>> >>> I am using String inside to convert into bytes. >>> >>> On Thu, Feb 23, 2017 at 6:50 PM, 刘彪 <mmyy1...@gmail.com> wrote: >>> >>>> Hi Mohit >>>> As you did not give the whole codes of Tuple2Serializerr. I guess the >>>> reason is some fields of Tuple2Serializerr do not implement Serializable. >>>> >>>> 2017-02-24 9:07 GMT+08:00 Mohit Anchlia <mohitanch...@gmail.com>: >>>> >>>>> I wrote a key serialization class to write to kafka however I am >>>>> getting this error. Not sure why as I've already implemented the >>>>> interfaces. >>>>> >>>>> Caused by: java.io.NotSerializableException: >>>>> com.sy.flink.test.Tuple2Serializerr$1 >>>>> at java.io.ObjectOutputStream.wri >>>>> teObject0(ObjectOutputStream.java:1184) >>>>> at java.io.ObjectOutputStream.def >>>>> aultWriteFields(ObjectOutputStream.java:1548) >>>>> >>>>> And the class implements the following: >>>>> >>>>> *public* *class* *Tuple2Serializerr* *implements* >>>>> >>>>> DeserializationSchema<Tuple2<Integer, Integer>>, >>>>> >>>>> SerializationSchema<Tuple2<Integer, Integer>> { >>>>> >>>>> And called like this: >>>>> >>>>> >>>>> FlinkKafkaProducer010<Tuple2<Integer, Integer>> myProducer = *new* >>>>> FlinkKafkaProducer010<Tuple2<Integer, Integer>>( >>>>> >>>>> "10.22.4.15:9092", // broker list >>>>> >>>>> "my-topic", // target topic >>>>> >>>>> *new* Tuple2Serializerr()); // serialization schema >>>>> >>>>> >>>>> >>>>> >>>> >>> >> >
Re: Serialization schema
But it is not an inner class. On Thu, Feb 23, 2017 at 11:09 PM, Tzu-Li (Gordon) Tai <tzuli...@apache.org> wrote: > Since I don’t have your complete code, I’m guessing this is the problem: > Is your `Tuple2Serializer` an inner class? If yes, you should be able to > solve the problem by declaring `Tuple2Serializer` to be `static`. > > This is more of a Java problem - > It isn’t serializable if it isn’t static, because it will contain an > implicit reference to the enclosing outer class, and therefore serializing > it will result in serializing the outer class instance as well. > > > On February 24, 2017 at 2:43:38 PM, Mohit Anchlia (mohitanch...@gmail.com) > wrote: > > This is at high level what I am doing: > > Serialize: > > String s = tuple.getPos(0) + "," + tuple.getPos(1); > return s.getBytes() > > Deserialize: > > String s = new String(message); > String [] sarr = s.split(","); > Tuple2<Integer, Integer> tuple = new Tuple2<>(Integer.valueOf(sarr[0]), > Integer.valueOf(sarr[1])); > > return tuple; > > > On Thu, Feb 23, 2017 at 10:22 PM, Tzu-Li (Gordon) Tai <tzuli...@apache.org > > wrote: > >> Hi Mohit, >> >> As 刘彪 pointed out in his reply, the problem is that your >> `Tuple2Serializer` contains fields that are not serializable, so >> `Tuple2Serializer` itself is not serializable. >> Could you perhaps share your `Tuple2Serializer` implementation with us so >> we can pinpoint the problem? >> >> A snippet of the class fields and constructor will do, so you don’t have >> to provide the whole `serialize` / `deserialize` implementation if you >> don’t want to. >> >> Cheers, >> Gordon >> >> >> On February 24, 2017 at 11:04:34 AM, Mohit Anchlia ( >> mohitanch...@gmail.com) wrote: >> >> I am using String inside to convert into bytes. >> >> On Thu, Feb 23, 2017 at 6:50 PM, 刘彪 <mmyy1...@gmail.com> wrote: >> >>> Hi Mohit >>> As you did not give the whole codes of Tuple2Serializerr. I guess the >>> reason is some fields of Tuple2Serializerr do not implement Serializable. >>> >>> 2017-02-24 9:07 GMT+08:00 Mohit Anchlia <mohitanch...@gmail.com>: >>> >>>> I wrote a key serialization class to write to kafka however I am >>>> getting this error. Not sure why as I've already implemented the >>>> interfaces. >>>> >>>> Caused by: java.io.NotSerializableException: >>>> com.sy.flink.test.Tuple2Serializerr$1 >>>> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.j >>>> ava:1184) >>>> at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputSt >>>> ream.java:1548) >>>> >>>> And the class implements the following: >>>> >>>> *public* *class* *Tuple2Serializerr* *implements* >>>> >>>> DeserializationSchema<Tuple2<Integer, Integer>>, >>>> >>>> SerializationSchema<Tuple2<Integer, Integer>> { >>>> >>>> And called like this: >>>> >>>> >>>> FlinkKafkaProducer010<Tuple2<Integer, Integer>> myProducer = *new* >>>> FlinkKafkaProducer010<Tuple2<Integer, Integer>>( >>>> >>>> "10.22.4.15:9092", // broker list >>>> >>>> "my-topic", // target topic >>>> >>>> *new* Tuple2Serializerr()); // serialization schema >>>> >>>> >>>> >>>> >>> >> >
Re: Serialization schema
This is at high level what I am doing: Serialize: String s = tuple.getPos(0) + "," + tuple.getPos(1); return s.getBytes() Deserialize: String s = new String(message); String [] sarr = s.split(","); Tuple2<Integer, Integer> tuple = new Tuple2<>(Integer.valueOf(sarr[0]), Integer.valueOf(sarr[1])); return tuple; On Thu, Feb 23, 2017 at 10:22 PM, Tzu-Li (Gordon) Tai <tzuli...@apache.org> wrote: > Hi Mohit, > > As 刘彪 pointed out in his reply, the problem is that your > `Tuple2Serializer` contains fields that are not serializable, so > `Tuple2Serializer` itself is not serializable. > Could you perhaps share your `Tuple2Serializer` implementation with us so > we can pinpoint the problem? > > A snippet of the class fields and constructor will do, so you don’t have > to provide the whole `serialize` / `deserialize` implementation if you > don’t want to. > > Cheers, > Gordon > > > On February 24, 2017 at 11:04:34 AM, Mohit Anchlia (mohitanch...@gmail.com) > wrote: > > I am using String inside to convert into bytes. > > On Thu, Feb 23, 2017 at 6:50 PM, 刘彪 <mmyy1...@gmail.com> wrote: > >> Hi Mohit >> As you did not give the whole codes of Tuple2Serializerr. I guess the >> reason is some fields of Tuple2Serializerr do not implement Serializable. >> >> 2017-02-24 9:07 GMT+08:00 Mohit Anchlia <mohitanch...@gmail.com>: >> >>> I wrote a key serialization class to write to kafka however I am getting >>> this error. Not sure why as I've already implemented the interfaces. >>> >>> Caused by: java.io.NotSerializableException: >>> com.sy.flink.test.Tuple2Serializerr$1 >>> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.j >>> ava:1184) >>> at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputSt >>> ream.java:1548) >>> >>> And the class implements the following: >>> >>> *public* *class* *Tuple2Serializerr* *implements* >>> >>> DeserializationSchema<Tuple2<Integer, Integer>>, >>> >>> SerializationSchema<Tuple2<Integer, Integer>> { >>> >>> And called like this: >>> >>> >>> FlinkKafkaProducer010<Tuple2<Integer, Integer>> myProducer = *new* >>> FlinkKafkaProducer010<Tuple2<Integer, Integer>>( >>> >>> "10.22.4.15:9092", // broker list >>> >>> "my-topic", // target topic >>> >>> *new* Tuple2Serializerr()); // serialization schema >>> >>> >>> >>> >> >
Re: Serialization schema
Hi Mohit, As 刘彪 pointed out in his reply, the problem is that your `Tuple2Serializer` contains fields that are not serializable, so `Tuple2Serializer` itself is not serializable. Could you perhaps share your `Tuple2Serializer` implementation with us so we can pinpoint the problem? A snippet of the class fields and constructor will do, so you don’t have to provide the whole `serialize` / `deserialize` implementation if you don’t want to. Cheers, Gordon On February 24, 2017 at 11:04:34 AM, Mohit Anchlia (mohitanch...@gmail.com) wrote: I am using String inside to convert into bytes. On Thu, Feb 23, 2017 at 6:50 PM, 刘彪 <mmyy1...@gmail.com> wrote: Hi Mohit As you did not give the whole codes of Tuple2Serializerr. I guess the reason is some fields of Tuple2Serializerr do not implement Serializable. 2017-02-24 9:07 GMT+08:00 Mohit Anchlia <mohitanch...@gmail.com>: I wrote a key serialization class to write to kafka however I am getting this error. Not sure why as I've already implemented the interfaces. Caused by: java.io.NotSerializableException: com.sy.flink.test.Tuple2Serializerr$1 at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) And the class implements the following: public class Tuple2Serializerr implements DeserializationSchema<Tuple2<Integer, Integer>>, SerializationSchema<Tuple2<Integer, Integer>> { And called like this: FlinkKafkaProducer010<Tuple2<Integer, Integer>> myProducer = new FlinkKafkaProducer010<Tuple2<Integer, Integer>>( "10.22.4.15:9092", // broker list "my-topic", // target topic new Tuple2Serializerr()); // serialization schema
Re: Serialization schema
I am using String inside to convert into bytes. On Thu, Feb 23, 2017 at 6:50 PM, 刘彪 <mmyy1...@gmail.com> wrote: > Hi Mohit > As you did not give the whole codes of Tuple2Serializerr. I guess the > reason is some fields of Tuple2Serializerr do not implement Serializable. > > 2017-02-24 9:07 GMT+08:00 Mohit Anchlia <mohitanch...@gmail.com>: > >> I wrote a key serialization class to write to kafka however I am getting >> this error. Not sure why as I've already implemented the interfaces. >> >> Caused by: java.io.NotSerializableException: >> com.sy.flink.test.Tuple2Serializerr$1 >> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream. >> java:1184) >> at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputSt >> ream.java:1548) >> >> And the class implements the following: >> >> *public* *class* *Tuple2Serializerr* *implements* >> >> DeserializationSchema<Tuple2<Integer, Integer>>, >> >> SerializationSchema<Tuple2<Integer, Integer>> { >> >> And called like this: >> >> >> FlinkKafkaProducer010<Tuple2<Integer, Integer>> myProducer = *new* >> FlinkKafkaProducer010<Tuple2<Integer, Integer>>( >> >> "10.22.4.15:9092", // broker list >> >> "my-topic", // target topic >> >> *new* Tuple2Serializerr()); // serialization schema >> >> >> >> >
Re: Serialization schema
Hi Mohit As you did not give the whole codes of Tuple2Serializerr. I guess the reason is some fields of Tuple2Serializerr do not implement Serializable. 2017-02-24 9:07 GMT+08:00 Mohit Anchlia <mohitanch...@gmail.com>: > I wrote a key serialization class to write to kafka however I am getting > this error. Not sure why as I've already implemented the interfaces. > > Caused by: java.io.NotSerializableException: com.sy.flink.test. > Tuple2Serializerr$1 > at java.io.ObjectOutputStream.writeObject0( > ObjectOutputStream.java:1184) > at java.io.ObjectOutputStream.defaultWriteFields( > ObjectOutputStream.java:1548) > > And the class implements the following: > > *public* *class* *Tuple2Serializerr* *implements* > > DeserializationSchema<Tuple2<Integer, Integer>>, > > SerializationSchema<Tuple2<Integer, Integer>> { > > And called like this: > > > FlinkKafkaProducer010<Tuple2<Integer, Integer>> myProducer = *new* > FlinkKafkaProducer010<Tuple2<Integer, Integer>>( > > "10.22.4.15:9092", // broker list > > "my-topic", // target topic > > *new* Tuple2Serializerr()); // serialization schema > > > >
Serialization schema
I wrote a key serialization class to write to kafka however I am getting this error. Not sure why as I've already implemented the interfaces. Caused by: java.io.NotSerializableException: com.sy.flink.test.Tuple2Serializerr$1 at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) And the class implements the following: *public* *class* *Tuple2Serializerr* *implements* DeserializationSchema<Tuple2<Integer, Integer>>, SerializationSchema<Tuple2<Integer, Integer>> { And called like this: FlinkKafkaProducer010<Tuple2<Integer, Integer>> myProducer = *new* FlinkKafkaProducer010<Tuple2<Integer, Integer>>( "10.22.4.15:9092", // broker list "my-topic", // target topic *new* Tuple2Serializerr()); // serialization schema