Re: Kafka and Flink integration
Hi Jürgen, one easy way is to disable the Kryo fallback with env.getConfig().disableGenericTypes(); If it was using Kryo you should see an exception, which also states the class, for which it needs to fallback to Kryo. This fails on the first non-Kryo class though. So depending on the other classes, you might need a minimal job using the Class to test it this way. Best, Konstantin On 05.07.2017 10:35, Jürgen Thomann wrote: > Hi Stephan, > > do you know an easy way to find out if Kryo or POJO is used? I have an > Object that would be a POJO, but it has one field that uses an object > without a public no argument constructor. As I understood the > documentation, this should result in Kryo being used. > > Thanks, > Jürgen > > On 03.07.2017 17:18, Stephan Ewen wrote: >> Hi Urs! >> >> Inside Flink (between Flink operators) >> - Kryo is not a problem, but types must be registered up front for >> good performance >> - Tuples and POJOs are often faster than the types that fall back to >> Kryo >> >> Persistent-storage (HDFS, Kafka, ...) >> - Kryo is not recommended, because its binary data format is not >> stable. It changes between major Kryo versions and between Kryo setups >> with different type registrations. >> - A stable format with schema evolution support (Avro, Thrift, ...) >> is recommended here. >> >> >> >> On Thu, Jun 22, 2017 at 9:28 AM, Urs Schoenenberger >> <urs.schoenenber...@tngtech.com >> <mailto:urs.schoenenber...@tngtech.com>> wrote: >> >> Hi Greg, >> >> do you have a link where I could read up on the rationale behind >> avoiding Kryo? I'm currently facing a similar decision and would >> like to >> get some more background on this. >> >> Thank you very much, >> Urs >> >> On 21.06.2017 12:10, Greg Hogan wrote: >> > The recommendation has been to avoid Kryo where possible. >> > >> > General data exchange: avro or thrift. >> > >> > Flink internal data exchange: POJO (or Tuple, which are slightly >> faster though less readable, and there is an outstanding PR to >> narrow or close the performance gap). >> > >> > Kryo is useful for types which cannot be modified to be a POJO. >> There are also cases where Kryo must be used because Flink has >> insufficient TypeInformation, such as when returning an interface >> or abstract type when the actual concrete type can be known. >> > >> > >> > >> >> On Jun 21, 2017, at 3:19 AM, nragon >> <nuno.goncal...@wedotechnologies.com >> <mailto:nuno.goncal...@wedotechnologies.com>> wrote: >> >> >> >> So, serialization between producer application -> kafka -> >> flink kafka >> >> consumer will use avro, thrift or kryo right? From there, the >> remaining >> >> pipeline can just use standard pojo serialization, which would >> be better? >> >> >> >> >> >> >> >> -- >> >> View this message in context: >> >> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-and-Flink-integration-tp13792p13885.html >> >> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-and-Flink-integration-tp13792p13885.html> >> >> Sent from the Apache Flink User Mailing List archive. mailing >> list archive at Nabble.com. >> > >> >> -- >> Urs Schönenberger - urs.schoenenber...@tngtech.com >> <mailto:urs.schoenenber...@tngtech.com> >> >> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring >> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke >> Sitz: Unterföhring * Amtsgericht München * HRB 135082 >> >> > > -- > Jürgen Thomann > System Administrator > > > InnoGames GmbH > Friesenstraße 13 - 20097 Hamburg - Germany > Tel +49 40 7889335-0 > > Managing Directors: Hendrik Klindworth, Michael Zillmer > VAT-ID: DE264068907 Amtsgericht Hamburg, HRB 108973 > > http://www.innogames.com – juergen.thom...@innogames.com > -- Konstantin Knauf * konstantin.kn...@tngtech.com * +49-174-3413182 TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke Sitz: Unterföhring * Amtsgericht München * HRB 135082 signature.asc Description: OpenPGP digital signature
Re: Kafka and Flink integration
Hi Stephan, do you know an easy way to find out if Kryo or POJO is used? I have an Object that would be a POJO, but it has one field that uses an object without a public no argument constructor. As I understood the documentation, this should result in Kryo being used. Thanks, Jürgen On 03.07.2017 17:18, Stephan Ewen wrote: Hi Urs! Inside Flink (between Flink operators) - Kryo is not a problem, but types must be registered up front for good performance - Tuples and POJOs are often faster than the types that fall back to Kryo Persistent-storage (HDFS, Kafka, ...) - Kryo is not recommended, because its binary data format is not stable. It changes between major Kryo versions and between Kryo setups with different type registrations. - A stable format with schema evolution support (Avro, Thrift, ...) is recommended here. On Thu, Jun 22, 2017 at 9:28 AM, Urs Schoenenberger <urs.schoenenber...@tngtech.com <mailto:urs.schoenenber...@tngtech.com>> wrote: Hi Greg, do you have a link where I could read up on the rationale behind avoiding Kryo? I'm currently facing a similar decision and would like to get some more background on this. Thank you very much, Urs On 21.06.2017 12:10, Greg Hogan wrote: > The recommendation has been to avoid Kryo where possible. > > General data exchange: avro or thrift. > > Flink internal data exchange: POJO (or Tuple, which are slightly faster though less readable, and there is an outstanding PR to narrow or close the performance gap). > > Kryo is useful for types which cannot be modified to be a POJO. There are also cases where Kryo must be used because Flink has insufficient TypeInformation, such as when returning an interface or abstract type when the actual concrete type can be known. > > > >> On Jun 21, 2017, at 3:19 AM, nragon <nuno.goncal...@wedotechnologies.com <mailto:nuno.goncal...@wedotechnologies.com>> wrote: >> >> So, serialization between producer application -> kafka -> flink kafka >> consumer will use avro, thrift or kryo right? From there, the remaining >> pipeline can just use standard pojo serialization, which would be better? >> >> >> >> -- >> View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-and-Flink-integration-tp13792p13885.html <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-and-Flink-integration-tp13792p13885.html> >> Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com. > -- Urs Schönenberger - urs.schoenenber...@tngtech.com <mailto:urs.schoenenber...@tngtech.com> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke Sitz: Unterföhring * Amtsgericht München * HRB 135082 -- Jürgen Thomann System Administrator InnoGames GmbH Friesenstraße 13 - 20097 Hamburg - Germany Tel +49 40 7889335-0 Managing Directors: Hendrik Klindworth, Michael Zillmer VAT-ID: DE264068907 Amtsgericht Hamburg, HRB 108973 http://www.innogames.com – juergen.thom...@innogames.com
Re: Kafka and Flink integration
Hi Urs! Inside Flink (between Flink operators) - Kryo is not a problem, but types must be registered up front for good performance - Tuples and POJOs are often faster than the types that fall back to Kryo Persistent-storage (HDFS, Kafka, ...) - Kryo is not recommended, because its binary data format is not stable. It changes between major Kryo versions and between Kryo setups with different type registrations. - A stable format with schema evolution support (Avro, Thrift, ...) is recommended here. On Thu, Jun 22, 2017 at 9:28 AM, Urs Schoenenberger < urs.schoenenber...@tngtech.com> wrote: > Hi Greg, > > do you have a link where I could read up on the rationale behind > avoiding Kryo? I'm currently facing a similar decision and would like to > get some more background on this. > > Thank you very much, > Urs > > On 21.06.2017 12:10, Greg Hogan wrote: > > The recommendation has been to avoid Kryo where possible. > > > > General data exchange: avro or thrift. > > > > Flink internal data exchange: POJO (or Tuple, which are slightly faster > though less readable, and there is an outstanding PR to narrow or close the > performance gap). > > > > Kryo is useful for types which cannot be modified to be a POJO. There > are also cases where Kryo must be used because Flink has insufficient > TypeInformation, such as when returning an interface or abstract type when > the actual concrete type can be known. > > > > > > > >> On Jun 21, 2017, at 3:19 AM, nragon <nuno.goncalves@ > wedotechnologies.com> wrote: > >> > >> So, serialization between producer application -> kafka -> flink kafka > >> consumer will use avro, thrift or kryo right? From there, the remaining > >> pipeline can just use standard pojo serialization, which would be > better? > >> > >> > >> > >> -- > >> View this message in context: http://apache-flink-user- > mailing-list-archive.2336050.n4.nabble.com/Kafka-and-Flink- > integration-tp13792p13885.html > >> Sent from the Apache Flink User Mailing List archive. mailing list > archive at Nabble.com. > > > > -- > Urs Schönenberger - urs.schoenenber...@tngtech.com > > TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring > Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke > Sitz: Unterföhring * Amtsgericht München * HRB 135082 >
Re: Kafka and Flink integration
Hi Greg, do you have a link where I could read up on the rationale behind avoiding Kryo? I'm currently facing a similar decision and would like to get some more background on this. Thank you very much, Urs On 21.06.2017 12:10, Greg Hogan wrote: > The recommendation has been to avoid Kryo where possible. > > General data exchange: avro or thrift. > > Flink internal data exchange: POJO (or Tuple, which are slightly faster > though less readable, and there is an outstanding PR to narrow or close the > performance gap). > > Kryo is useful for types which cannot be modified to be a POJO. There are > also cases where Kryo must be used because Flink has insufficient > TypeInformation, such as when returning an interface or abstract type when > the actual concrete type can be known. > > > >> On Jun 21, 2017, at 3:19 AM, nragon <nuno.goncal...@wedotechnologies.com> >> wrote: >> >> So, serialization between producer application -> kafka -> flink kafka >> consumer will use avro, thrift or kryo right? From there, the remaining >> pipeline can just use standard pojo serialization, which would be better? >> >> >> >> -- >> View this message in context: >> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-and-Flink-integration-tp13792p13885.html >> Sent from the Apache Flink User Mailing List archive. mailing list archive >> at Nabble.com. > -- Urs Schönenberger - urs.schoenenber...@tngtech.com TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke Sitz: Unterföhring * Amtsgericht München * HRB 135082
Re: Kafka and Flink integration
If the concrete type cannot be known then a proper TypeInformation cannot be created and Kryo must be used. There may be a few cases where the TypeInformation can be deduced by the developer but not by TypeExtractor and the returns TypeInformation must be explicitly given to prevent the use of Kryo. A recent example in Gelly was a Function with input and output types the same generic interface bound to different parameters. The implementation outputs the same concrete class as the input, but this programmatic structure cannot be deduced by the TypeExtractor so a returns TypeInformation was specified. Greg > On Jun 21, 2017, at 6:21 AM, Ted Yu <yuzhih...@gmail.com> wrote: > > Greg: > Can you clarify he last part? > Should it be: the concrete type cannot be known ? > > Original message > From: Greg Hogan <c...@greghogan.com> > Date: 6/21/17 3:10 AM (GMT-08:00) > To: nragon <nuno.goncal...@wedotechnologies.com> > Cc: user@flink.apache.org > Subject: Re: Kafka and Flink integration > > The recommendation has been to avoid Kryo where possible. > > General data exchange: avro or thrift. > > Flink internal data exchange: POJO (or Tuple, which are slightly faster > though less readable, and there is an outstanding PR to narrow or close the > performance gap). > > Kryo is useful for types which cannot be modified to be a POJO. There are > also cases where Kryo must be used because Flink has insufficient > TypeInformation, such as when returning an interface or abstract type when > the actual concrete type can be known. > > > > > On Jun 21, 2017, at 3:19 AM, nragon <nuno.goncal...@wedotechnologies.com> > > wrote: > > > > So, serialization between producer application -> kafka -> flink kafka > > consumer will use avro, thrift or kryo right? From there, the remaining > > pipeline can just use standard pojo serialization, which would be better? > > > > > > > > -- > > View this message in context: > > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-and-Flink-integration-tp13792p13885.html > > Sent from the Apache Flink User Mailing List archive. mailing list archive > > at Nabble.com.
Re: Kafka and Flink integration
Greg:Can you clarify he last part?Should it be: the concrete type cannot be known ? Original message From: Greg Hogan <c...@greghogan.com> Date: 6/21/17 3:10 AM (GMT-08:00) To: nragon <nuno.goncal...@wedotechnologies.com> Cc: user@flink.apache.org Subject: Re: Kafka and Flink integration The recommendation has been to avoid Kryo where possible. General data exchange: avro or thrift. Flink internal data exchange: POJO (or Tuple, which are slightly faster though less readable, and there is an outstanding PR to narrow or close the performance gap). Kryo is useful for types which cannot be modified to be a POJO. There are also cases where Kryo must be used because Flink has insufficient TypeInformation, such as when returning an interface or abstract type when the actual concrete type can be known. > On Jun 21, 2017, at 3:19 AM, nragon <nuno.goncal...@wedotechnologies.com> > wrote: > > So, serialization between producer application -> kafka -> flink kafka > consumer will use avro, thrift or kryo right? From there, the remaining > pipeline can just use standard pojo serialization, which would be better? > > > > -- > View this message in context: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-and-Flink-integration-tp13792p13885.html > Sent from the Apache Flink User Mailing List archive. mailing list archive at > Nabble.com.
Re: Kafka and Flink integration
The recommendation has been to avoid Kryo where possible. General data exchange: avro or thrift. Flink internal data exchange: POJO (or Tuple, which are slightly faster though less readable, and there is an outstanding PR to narrow or close the performance gap). Kryo is useful for types which cannot be modified to be a POJO. There are also cases where Kryo must be used because Flink has insufficient TypeInformation, such as when returning an interface or abstract type when the actual concrete type can be known. > On Jun 21, 2017, at 3:19 AM, nragon <nuno.goncal...@wedotechnologies.com> > wrote: > > So, serialization between producer application -> kafka -> flink kafka > consumer will use avro, thrift or kryo right? From there, the remaining > pipeline can just use standard pojo serialization, which would be better? > > > > -- > View this message in context: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-and-Flink-integration-tp13792p13885.html > Sent from the Apache Flink User Mailing List archive. mailing list archive at > Nabble.com.
Re: Kafka and Flink integration
So, serialization between producer application -> kafka -> flink kafka consumer will use avro, thrift or kryo right? From there, the remaining pipeline can just use standard pojo serialization, which would be better? -- View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-and-Flink-integration-tp13792p13885.html Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
Re: Kafka and Flink integration
Hi! For general data exchange between systems, it is often good to have a more standard format. Being able to evolve the schema of types is very helpful if you evolve the data pipeline (which almost always happens eventually). For that reason, Avro and Thrift are very popular for that type of data exchange. While they are not as fast as Kryo, they are more "robust" in the sense that the format is stable. Kryo is a good choice for intermediate data that is not persistent or at least not leaving one specific system. Greetings, Stephan On Tue, Jun 20, 2017 at 7:22 PM, nragon <nuno.goncal...@wedotechnologies.com > wrote: > Just one more question :). > Considering I'm producing into kafka with other application other than > flink, which serializer should i use in order to use pojo types when > consuming those same messages (now in flink)? > > > > -- > View this message in context: http://apache-flink-user- > mailing-list-archive.2336050.n4.nabble.com/Kafka-and-Flink- > integration-tp13792p13882.html > Sent from the Apache Flink User Mailing List archive. mailing list archive > at Nabble.com. >
RE: Kafka and Flink integration
Just one more question :). Considering I'm producing into kafka with other application other than flink, which serializer should i use in order to use pojo types when consuming those same messages (now in flink)? -- View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-and-Flink-integration-tp13792p13882.html Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
RE: Kafka and Flink integration
Thanks, I'll try to refactor into POJOs. -- View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-and-Flink-integration-tp13792p13879.html Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
RE: Kafka and Flink integration
Yes, POJOs can contain other nested POJO types. You just have to make sure that the nested field is either public, or has a corresponding public getter- and setter- method that follows the Java beans naming conventions. On 21 June 2017 at 12:20:31 AM, nragon (nuno.goncal...@wedotechnologies.com) wrote: Can i have pojo has composition of other pojo? My custom object has many dependencies and in order to refactor it I must also change another 5 classes as well. -- View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-and-Flink-integration-tp13792p13874.html Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
RE: Kafka and Flink integration
Can i have pojo has composition of other pojo? My custom object has many dependencies and in order to refactor it I must also change another 5 classes as well. -- View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-and-Flink-integration-tp13792p13874.html Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
RE: Kafka and Flink integration
Nico, I'll try some different approaches and will be back here, hopefully with some results :) Thanks for this brainstorming :) -Original Message- From: Nico Kruber [mailto:n...@data-artisans.com] Sent: 20 de junho de 2017 16:44 To: Nuno Rafael Goncalves <nuno.goncal...@wedotechnologies.com> Cc: user@flink.apache.org Subject: Re: Kafka and Flink integration I can only repeat what Gordon wrote on Friday: "It’s usually always recommended to register your classes with Kryo [using registerKryoType()], to avoid the somewhat inefficient classname writing. Also, depending on the case, to decrease serialization overhead, nothing really beats specific custom serialization. So, you can also register specific serializers for Kryo to use for the type." I also guess, this highly depends on your actual use case and in particular the class you are trying to de/serialize. Unfortunately, your image is to small to read, but does your performance improve when registering the class as a Kryo type? Also, I don't think, mapping it to a tuple will improve performance since Kryo would have to do something similar anyway. Instead, you could really have your own de/serializer and go from "Class (<-> Tuple) <-> Kryo <-> bytes" directly to "Class <-> bytes". Nico On Tuesday, 20 June 2017 17:20:38 CEST Nuno Rafael Goncalves wrote: > I believe there are some performance impact while de/serializing, > which is "normal". What I'm trying to understand is if there are any > tips to improve this process. For instance, tuples vs general class > types. Do you know if it's worth it to map a custom object into tuple > just for de/serialization process? > > According to jfr analysis, kryo methods are hit a lot. > > [cid:image003.jpg@01D2E9E1.26D2D370] > > > > > > > > -Original Message- > From: Nico Kruber [mailto:n...@data-artisans.com] > Sent: 20 de junho de 2017 16:04 > To: user@flink.apache.org > Cc: Nuno Rafael Goncalves <nuno.goncal...@wedotechnologies.com> > Subject: Re: Kafka and Flink integration > > > > No, this is only necessary if you want to register a custom serializer > itself [1]. Also, in case you are wondering about registerKryoType() - > this is only needed as a performance optimisation. > > > > What exactly is your problem? What are you trying to solve? > > (I can't read JFR files here, and from what I read at Oracle's site, > this requires a commercial license, too...) > > > > > > Nico > > > > [1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/ > > custom_serializers.html > > On Tuesday, 20 June 2017 11:54:45 CEST nragon wrote: > > Do I need to use registerTypeWithKryoSerializer() in my execution > > > > environment? > > > > My serialization into kafka is done with the following snippet > > > > > > > > try (ByteArrayOutputStream byteArrayOutStream = new > > ByteArrayOutputStream(); > > > > Output output = new Output(byteArrayOutStream)) { > > > > Kryo kryo = new Kryo(); > > > > kryo.writeClassAndObject(output, event); > > > > output.flush(); > > > > return byteArrayOutStream.toByteArray(); > > > > } catch (IOException e) { > > > > return null; > > > > } > > > > "event" is my custom object. > > > > > > > > then i desirialize it in flink's kafka consumer > > > > try (ByteArrayInputStream byteArrayInStream = new > > > > ByteArrayInputStream(bytes); Input input = new > > Input(byteArrayInStream, > > > > bytes.length)) { > > > > Kryo kryo = new Kryo(); > > > > return kryo.readClassAndObject(input); > > > > } catch (IOException e) { > > > > return null; > > > > } > > > > Thanks > > > > > > > > > > > > > > > > -- > > > > View this message in context: > > > > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ > > Kafka-> > a > > > > nd-Flink-integration-tp13792p13841.html Sent from the Apache Flink > > User > > > > Mailing List archive. mailing list archive at Nabble.com.
RE: Kafka and Flink integration
Hi Nuno, In general, if it is possible, it is recommended that you map your generic classes to Tuples / POJOs [1]. For Tuples / POJOs, Flink will create specialized serializers for them, whereas for generic classes (i.e. types which cannot be treated as POJOs) Flink simply fallbacks to using Kryo for them. The actual performance gain may depend a bit on what the original generic class type looked like. One other thing probably to look at is enabling object reuse for de-/serialization. However, be aware that the user code needs to be aware of this, otherwise it may lead to unexpected errors. Cheers, Gordon [1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/types_serialization.html#rules-for-pojo-types On 20 June 2017 at 11:24:03 PM, Nuno Rafael Goncalves ( nuno.goncal...@wedotechnologies.com) wrote: I believe there are some performance impact while de/serializing, which is “normal”. What I’m trying to understand is if there are any tips to improve this process. For instance, tuples vs general class types. Do you know if it’s worth it to map a custom object into tuple just for de/serialization process? According to jfr analysis, kryo methods are hit a lot. -Original Message- From: Nico Kruber [mailto:n...@data-artisans.com] Sent: 20 de junho de 2017 16:04 To: user@flink.apache.org Cc: Nuno Rafael Goncalves <nuno.goncal...@wedotechnologies.com> Subject: Re: Kafka and Flink integration No, this is only necessary if you want to register a custom serializer itself [1]. Also, in case you are wondering about registerKryoType() - this is only needed as a performance optimisation. What exactly is your problem? What are you trying to solve? (I can't read JFR files here, and from what I read at Oracle's site, this requires a commercial license, too...) Nico [1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/ custom_serializers.html On Tuesday, 20 June 2017 11:54:45 CEST nragon wrote: > Do I need to use registerTypeWithKryoSerializer() in my execution > environment? > My serialization into kafka is done with the following snippet > > try (ByteArrayOutputStream byteArrayOutStream = new ByteArrayOutputStream(); > Output output = new Output(byteArrayOutStream)) { > Kryo kryo = new Kryo(); > kryo.writeClassAndObject(output, event); > output.flush(); > return byteArrayOutStream.toByteArray(); > } catch (IOException e) { > return null; > } > > "event" is my custom object. > > then i desirialize it in flink's kafka consumer > try (ByteArrayInputStream byteArrayInStream = new > ByteArrayInputStream(bytes); Input input = new Input(byteArrayInStream, > bytes.length)) { > Kryo kryo = new Kryo(); > return kryo.readClassAndObject(input); > } catch (IOException e) { > return null; > } > > Thanks > > > > -- > View this message in context: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-a > nd-Flink-integration-tp13792p13841.html Sent from the Apache Flink User > Mailing List archive. mailing list archive at Nabble.com. image003.jpg@01D2E9E1.26D2D370 Description: Binary data
Re: Kafka and Flink integration
I can only repeat what Gordon wrote on Friday: "It’s usually always recommended to register your classes with Kryo [using registerKryoType()], to avoid the somewhat inefficient classname writing. Also, depending on the case, to decrease serialization overhead, nothing really beats specific custom serialization. So, you can also register specific serializers for Kryo to use for the type." I also guess, this highly depends on your actual use case and in particular the class you are trying to de/serialize. Unfortunately, your image is to small to read, but does your performance improve when registering the class as a Kryo type? Also, I don't think, mapping it to a tuple will improve performance since Kryo would have to do something similar anyway. Instead, you could really have your own de/serializer and go from "Class (<-> Tuple) <-> Kryo <-> bytes" directly to "Class <-> bytes". Nico On Tuesday, 20 June 2017 17:20:38 CEST Nuno Rafael Goncalves wrote: > I believe there are some performance impact while de/serializing, which is > "normal". What I'm trying to understand is if there are any tips to improve > this process. For instance, tuples vs general class types. Do you know if > it's worth it to map a custom object into tuple just for de/serialization > process? > > According to jfr analysis, kryo methods are hit a lot. > > [cid:image003.jpg@01D2E9E1.26D2D370] > > > > > > > > -Original Message- > From: Nico Kruber [mailto:n...@data-artisans.com] > Sent: 20 de junho de 2017 16:04 > To: user@flink.apache.org > Cc: Nuno Rafael Goncalves <nuno.goncal...@wedotechnologies.com> > Subject: Re: Kafka and Flink integration > > > > No, this is only necessary if you want to register a custom serializer > itself [1]. Also, in case you are wondering about registerKryoType() - this > is only needed as a performance optimisation. > > > > What exactly is your problem? What are you trying to solve? > > (I can't read JFR files here, and from what I read at Oracle's site, this > requires a commercial license, too...) > > > > > > Nico > > > > [1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/ > > custom_serializers.html > > On Tuesday, 20 June 2017 11:54:45 CEST nragon wrote: > > Do I need to use registerTypeWithKryoSerializer() in my execution > > > > environment? > > > > My serialization into kafka is done with the following snippet > > > > > > > > try (ByteArrayOutputStream byteArrayOutStream = new > > ByteArrayOutputStream(); > > > > Output output = new Output(byteArrayOutStream)) { > > > > Kryo kryo = new Kryo(); > > > > kryo.writeClassAndObject(output, event); > > > > output.flush(); > > > > return byteArrayOutStream.toByteArray(); > > > > } catch (IOException e) { > > > > return null; > > > > } > > > > "event" is my custom object. > > > > > > > > then i desirialize it in flink's kafka consumer > > > > try (ByteArrayInputStream byteArrayInStream = new > > > > ByteArrayInputStream(bytes); Input input = new Input(byteArrayInStream, > > > > bytes.length)) { > > > > Kryo kryo = new Kryo(); > > > > return kryo.readClassAndObject(input); > > > > } catch (IOException e) { > > > > return null; > > > > } > > > > Thanks > > > > > > > > > > > > > > > > -- > > > > View this message in context: > > > > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-> > > > a > > > > nd-Flink-integration-tp13792p13841.html Sent from the Apache Flink User > > > > Mailing List archive. mailing list archive at Nabble.com. signature.asc Description: This is a digitally signed message part.
RE: Kafka and Flink integration
I believe there are some performance impact while de/serializing, which is "normal". What I'm trying to understand is if there are any tips to improve this process. For instance, tuples vs general class types. Do you know if it's worth it to map a custom object into tuple just for de/serialization process? According to jfr analysis, kryo methods are hit a lot. [cid:image003.jpg@01D2E9E1.26D2D370] -Original Message- From: Nico Kruber [mailto:n...@data-artisans.com] Sent: 20 de junho de 2017 16:04 To: user@flink.apache.org Cc: Nuno Rafael Goncalves <nuno.goncal...@wedotechnologies.com> Subject: Re: Kafka and Flink integration No, this is only necessary if you want to register a custom serializer itself [1]. Also, in case you are wondering about registerKryoType() - this is only needed as a performance optimisation. What exactly is your problem? What are you trying to solve? (I can't read JFR files here, and from what I read at Oracle's site, this requires a commercial license, too...) Nico [1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/ custom_serializers.html On Tuesday, 20 June 2017 11:54:45 CEST nragon wrote: > Do I need to use registerTypeWithKryoSerializer() in my execution > environment? > My serialization into kafka is done with the following snippet > > try (ByteArrayOutputStream byteArrayOutStream = new ByteArrayOutputStream(); > Output output = new Output(byteArrayOutStream)) { > Kryo kryo = new Kryo(); > kryo.writeClassAndObject(output, event); > output.flush(); > return byteArrayOutStream.toByteArray(); > } catch (IOException e) { > return null; > } > > "event" is my custom object. > > then i desirialize it in flink's kafka consumer > try (ByteArrayInputStream byteArrayInStream = new > ByteArrayInputStream(bytes); Input input = new Input(byteArrayInStream, > bytes.length)) { > Kryo kryo = new Kryo(); > return kryo.readClassAndObject(input); > } catch (IOException e) { > return null; > } > > Thanks > > > > -- > View this message in context: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-a > nd-Flink-integration-tp13792p13841.html Sent from the Apache Flink User > Mailing List archive. mailing list archive at Nabble.com.
Re: Kafka and Flink integration
No, this is only necessary if you want to register a custom serializer itself [1]. Also, in case you are wondering about registerKryoType() - this is only needed as a performance optimisation. What exactly is your problem? What are you trying to solve? (I can't read JFR files here, and from what I read at Oracle's site, this requires a commercial license, too...) Nico [1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/ custom_serializers.html On Tuesday, 20 June 2017 11:54:45 CEST nragon wrote: > Do I need to use registerTypeWithKryoSerializer() in my execution > environment? > My serialization into kafka is done with the following snippet > > try (ByteArrayOutputStream byteArrayOutStream = new ByteArrayOutputStream(); > Output output = new Output(byteArrayOutStream)) { > Kryo kryo = new Kryo(); > kryo.writeClassAndObject(output, event); > output.flush(); > return byteArrayOutStream.toByteArray(); > } catch (IOException e) { > return null; > } > > "event" is my custom object. > > then i desirialize it in flink's kafka consumer > try (ByteArrayInputStream byteArrayInStream = new > ByteArrayInputStream(bytes); Input input = new Input(byteArrayInStream, > bytes.length)) { > Kryo kryo = new Kryo(); > return kryo.readClassAndObject(input); > } catch (IOException e) { > return null; > } > > Thanks > > > > -- > View this message in context: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-a > nd-Flink-integration-tp13792p13841.html Sent from the Apache Flink User > Mailing List archive. mailing list archive at Nabble.com. signature.asc Description: This is a digitally signed message part.
Re: Kafka and Flink integration
Do I need to use registerTypeWithKryoSerializer() in my execution environment? My serialization into kafka is done with the following snippet try (ByteArrayOutputStream byteArrayOutStream = new ByteArrayOutputStream(); Output output = new Output(byteArrayOutStream)) { Kryo kryo = new Kryo(); kryo.writeClassAndObject(output, event); output.flush(); return byteArrayOutStream.toByteArray(); } catch (IOException e) { return null; } "event" is my custom object. then i desirialize it in flink's kafka consumer try (ByteArrayInputStream byteArrayInStream = new ByteArrayInputStream(bytes); Input input = new Input(byteArrayInStream, bytes.length)) { Kryo kryo = new Kryo(); return kryo.readClassAndObject(input); } catch (IOException e) { return null; } Thanks -- View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-and-Flink-integration-tp13792p13841.html Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
Re: Kafka and Flink integration
My custom object is used across all job, so it'll be part of checkpoints. Can you point me some references with some examples? -- View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-and-Flink-integration-tp13792p13802.html Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
Re: Kafka and Flink integration
Hi! It’s usually always recommended to register your classes with Kryo, to avoid the somewhat inefficient classname writing. Also, depending on the case, to decrease serialization overhead, nothing really beats specific custom serialization. So, you can also register specific serializers for Kryo to use for the type. If you need to store these custom objects as managed state for your operators, you can also have your own custom Flink TypeSerializer for that. Best, Gordon On 16 June 2017 at 12:27:06 PM, nragon (nuno.goncal...@wedotechnologies.com) wrote: I have to produce custom objects into kafka and read them with flink. Any tuning advices to use kryo? Such as class registration or something like that? Any examples? Thanks -- View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-and-Flink-integration-tp13792.html Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
Kafka and Flink integration
I have to produce custom objects into kafka and read them with flink. Any tuning advices to use kryo? Such as class registration or something like that? Any examples? Thanks -- View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-and-Flink-integration-tp13792.html Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
Re: kafka and flink integration issue
Good to hear. Thanks for letting us know! On Mon, Feb 29, 2016 at 8:14 PM, Pankaj Kumarwrote: > yes versioning was issue . Job is working fine on flink 0.10.2. > > On Mon, Feb 29, 2016 at 3:15 PM, Stephan Ewen wrote: > >> Hi! >> >> A "NoSuchMethodError" is always a sign of a version mixup. Please make >> sure both versions (cluster and client) are exactly the same. >> >> Stephan >> >> >> On Sat, Feb 27, 2016 at 11:05 AM, Pankaj Kumar >> wrote: >> >>> Yes Robert , >>> i was trying to start Flink on cluster 0.10.1. >>> >>> But after changing flink version to 0.10.1 , also i am getting the same >>> error. >>> >>> On Sat, Feb 27, 2016 at 2:47 PM, Robert Metzger >>> wrote: >>> Hi Pankaj, I suspect you are trying to start Flink on a cluster with Flink 0.10.1 installed? On Sat, Feb 27, 2016 at 9:20 AM, Pankaj Kumar wrote: > I am trying to integrate kafka and flink. > my pom file is where {flink.version} is 0.10.2 > > > org.apache.flink > flink-java > ${flink.version} > > > org.apache.flink > flink-streaming-java > ${flink.version} > > > org.apache.flink > flink-clients > ${flink.version} > > > com.fasterxml.jackson.core > jackson-databind > 2.6.4 > > > org.projectlombok > lombok > 1.16.6 > provided > > > org.springframework > spring-web > 4.2.4.RELEASE > > > org.apache.flink > flink-connector-kafka > ${flink.version} > > > when i am running my jar with flink command line using command > > bin/flink run ~/apache_flink/target/Application-0.0.1.jar > > I am getting error > > tatus of job d20986a31f2a8166986d7db354de0ff7 changed to FAILED. > java.lang.NoSuchMethodError: > org.apache.flink.streaming.api.operators.StreamingRuntimeContext.isCheckpointingEnabled()Z > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.run(FlinkKafkaConsumer.java:387) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58) > at > org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:55) > > > I am building final jar using command *mvn clean package -Pbuild-jar* > > >>> >> >
Re: kafka and flink integration issue
yes versioning was issue . Job is working fine on flink 0.10.2. On Mon, Feb 29, 2016 at 3:15 PM, Stephan Ewenwrote: > Hi! > > A "NoSuchMethodError" is always a sign of a version mixup. Please make > sure both versions (cluster and client) are exactly the same. > > Stephan > > > On Sat, Feb 27, 2016 at 11:05 AM, Pankaj Kumar > wrote: > >> Yes Robert , >> i was trying to start Flink on cluster 0.10.1. >> >> But after changing flink version to 0.10.1 , also i am getting the same >> error. >> >> On Sat, Feb 27, 2016 at 2:47 PM, Robert Metzger >> wrote: >> >>> Hi Pankaj, >>> >>> I suspect you are trying to start Flink on a cluster with Flink 0.10.1 >>> installed? >>> >>> On Sat, Feb 27, 2016 at 9:20 AM, Pankaj Kumar >>> wrote: >>> I am trying to integrate kafka and flink. my pom file is where {flink.version} is 0.10.2 org.apache.flink flink-java ${flink.version} org.apache.flink flink-streaming-java ${flink.version} org.apache.flink flink-clients ${flink.version} com.fasterxml.jackson.core jackson-databind 2.6.4 org.projectlombok lombok 1.16.6 provided org.springframework spring-web 4.2.4.RELEASE org.apache.flink flink-connector-kafka ${flink.version} when i am running my jar with flink command line using command bin/flink run ~/apache_flink/target/Application-0.0.1.jar I am getting error tatus of job d20986a31f2a8166986d7db354de0ff7 changed to FAILED. java.lang.NoSuchMethodError: org.apache.flink.streaming.api.operators.StreamingRuntimeContext.isCheckpointingEnabled()Z at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.run(FlinkKafkaConsumer.java:387) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:55) I am building final jar using command *mvn clean package -Pbuild-jar* >>> >> >
Re: kafka and flink integration issue
Hi! A "NoSuchMethodError" is always a sign of a version mixup. Please make sure both versions (cluster and client) are exactly the same. Stephan On Sat, Feb 27, 2016 at 11:05 AM, Pankaj Kumarwrote: > Yes Robert , > i was trying to start Flink on cluster 0.10.1. > > But after changing flink version to 0.10.1 , also i am getting the same > error. > > On Sat, Feb 27, 2016 at 2:47 PM, Robert Metzger > wrote: > >> Hi Pankaj, >> >> I suspect you are trying to start Flink on a cluster with Flink 0.10.1 >> installed? >> >> On Sat, Feb 27, 2016 at 9:20 AM, Pankaj Kumar >> wrote: >> >>> I am trying to integrate kafka and flink. >>> my pom file is where {flink.version} is 0.10.2 >>> >>> >>> org.apache.flink >>> flink-java >>> ${flink.version} >>> >>> >>> org.apache.flink >>> flink-streaming-java >>> ${flink.version} >>> >>> >>> org.apache.flink >>> flink-clients >>> ${flink.version} >>> >>> >>> com.fasterxml.jackson.core >>> jackson-databind >>> 2.6.4 >>> >>> >>> org.projectlombok >>> lombok >>> 1.16.6 >>> provided >>> >>> >>> org.springframework >>> spring-web >>> 4.2.4.RELEASE >>> >>> >>> org.apache.flink >>> flink-connector-kafka >>> ${flink.version} >>> >>> >>> when i am running my jar with flink command line using command >>> >>> bin/flink run ~/apache_flink/target/Application-0.0.1.jar >>> >>> I am getting error >>> >>> tatus of job d20986a31f2a8166986d7db354de0ff7 changed to FAILED. >>> java.lang.NoSuchMethodError: >>> org.apache.flink.streaming.api.operators.StreamingRuntimeContext.isCheckpointingEnabled()Z >>> at >>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.run(FlinkKafkaConsumer.java:387) >>> at >>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58) >>> at >>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:55) >>> >>> >>> I am building final jar using command *mvn clean package -Pbuild-jar* >>> >>> >> >
Re: kafka and flink integration issue
Yes Robert , i was trying to start Flink on cluster 0.10.1. But after changing flink version to 0.10.1 , also i am getting the same error. On Sat, Feb 27, 2016 at 2:47 PM, Robert Metzgerwrote: > Hi Pankaj, > > I suspect you are trying to start Flink on a cluster with Flink 0.10.1 > installed? > > On Sat, Feb 27, 2016 at 9:20 AM, Pankaj Kumar > wrote: > >> I am trying to integrate kafka and flink. >> my pom file is where {flink.version} is 0.10.2 >> >> >> org.apache.flink >> flink-java >> ${flink.version} >> >> >> org.apache.flink >> flink-streaming-java >> ${flink.version} >> >> >> org.apache.flink >> flink-clients >> ${flink.version} >> >> >> com.fasterxml.jackson.core >> jackson-databind >> 2.6.4 >> >> >> org.projectlombok >> lombok >> 1.16.6 >> provided >> >> >> org.springframework >> spring-web >> 4.2.4.RELEASE >> >> >> org.apache.flink >> flink-connector-kafka >> ${flink.version} >> >> >> when i am running my jar with flink command line using command >> >> bin/flink run ~/apache_flink/target/Application-0.0.1.jar >> >> I am getting error >> >> tatus of job d20986a31f2a8166986d7db354de0ff7 changed to FAILED. >> java.lang.NoSuchMethodError: >> org.apache.flink.streaming.api.operators.StreamingRuntimeContext.isCheckpointingEnabled()Z >> at >> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.run(FlinkKafkaConsumer.java:387) >> at >> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58) >> at >> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:55) >> >> >> I am building final jar using command *mvn clean package -Pbuild-jar* >> >> >
Re: kafka and flink integration issue
Hi Pankaj, I suspect you are trying to start Flink on a cluster with Flink 0.10.1 installed? On Sat, Feb 27, 2016 at 9:20 AM, Pankaj Kumarwrote: > I am trying to integrate kafka and flink. > my pom file is where {flink.version} is 0.10.2 > > > org.apache.flink > flink-java > ${flink.version} > > > org.apache.flink > flink-streaming-java > ${flink.version} > > > org.apache.flink > flink-clients > ${flink.version} > > > com.fasterxml.jackson.core > jackson-databind > 2.6.4 > > > org.projectlombok > lombok > 1.16.6 > provided > > > org.springframework > spring-web > 4.2.4.RELEASE > > > org.apache.flink > flink-connector-kafka > ${flink.version} > > > when i am running my jar with flink command line using command > > bin/flink run ~/apache_flink/target/Application-0.0.1.jar > > I am getting error > > tatus of job d20986a31f2a8166986d7db354de0ff7 changed to FAILED. > java.lang.NoSuchMethodError: > org.apache.flink.streaming.api.operators.StreamingRuntimeContext.isCheckpointingEnabled()Z > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.run(FlinkKafkaConsumer.java:387) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58) > at > org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:55) > > > I am building final jar using command *mvn clean package -Pbuild-jar* > >
kafka and flink integration issue
I am trying to integrate kafka and flink. my pom file is where {flink.version} is 0.10.2 org.apache.flink flink-java ${flink.version} org.apache.flink flink-streaming-java ${flink.version} org.apache.flink flink-clients ${flink.version} com.fasterxml.jackson.core jackson-databind 2.6.4 org.projectlombok lombok 1.16.6 provided org.springframework spring-web 4.2.4.RELEASE org.apache.flink flink-connector-kafka ${flink.version} when i am running my jar with flink command line using command bin/flink run ~/apache_flink/target/Application-0.0.1.jar I am getting error tatus of job d20986a31f2a8166986d7db354de0ff7 changed to FAILED. java.lang.NoSuchMethodError: org.apache.flink.streaming.api.operators.StreamingRuntimeContext.isCheckpointingEnabled()Z at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.run(FlinkKafkaConsumer.java:387) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:55) I am building final jar using command *mvn clean package -Pbuild-jar*