Re: Kafka and Flink integration

2017-07-05 Thread Konstantin Knauf
Hi Jürgen,

one easy way is to disable the Kryo fallback with

env.getConfig().disableGenericTypes();

If it was using Kryo you should see an exception, which also states the
class, for which it needs to fallback to Kryo. This fails on the first
non-Kryo class though. So depending on the other classes, you might need
a minimal job using the Class to test it this way.

Best,

Konstantin


On 05.07.2017 10:35, Jürgen Thomann wrote:
> Hi Stephan,
> 
> do you know an easy way to find out if Kryo or POJO is used? I have an
> Object that would be a POJO, but it has one field that uses an object
> without a public no argument constructor. As I understood the
> documentation, this should result in Kryo being used.
> 
> Thanks,
> Jürgen
> 
> On 03.07.2017 17:18, Stephan Ewen wrote:
>> Hi Urs!
>>
>> Inside Flink (between Flink operators)
>>   - Kryo is not a problem, but types must be registered up front for
>> good performance
>>   - Tuples and POJOs are often faster than the types that fall back to
>> Kryo
>>
>> Persistent-storage (HDFS, Kafka, ...)
>>   - Kryo is not recommended, because its binary data format is not
>> stable. It changes between major Kryo versions and between Kryo setups
>> with different type registrations.
>>   - A stable format with schema evolution support (Avro, Thrift, ...)
>> is recommended here.
>>
>>
>>
>> On Thu, Jun 22, 2017 at 9:28 AM, Urs Schoenenberger
>> <urs.schoenenber...@tngtech.com
>> <mailto:urs.schoenenber...@tngtech.com>> wrote:
>>
>> Hi Greg,
>>
>> do you have a link where I could read up on the rationale behind
>> avoiding Kryo? I'm currently facing a similar decision and would
>> like to
>> get some more background on this.
>>
>> Thank you very much,
>> Urs
>>
>> On 21.06.2017 12:10, Greg Hogan wrote:
>> > The recommendation has been to avoid Kryo where possible.
>> >
>> > General data exchange: avro or thrift.
>> >
>> > Flink internal data exchange: POJO (or Tuple, which are slightly
>> faster though less readable, and there is an outstanding PR to
>> narrow or close the performance gap).
>> >
>> > Kryo is useful for types which cannot be modified to be a POJO.
>> There are also cases where Kryo must be used because Flink has
>> insufficient TypeInformation, such as when returning an interface
>> or abstract type when the actual concrete type can be known.
>> >
>> >
>> >
>> >> On Jun 21, 2017, at 3:19 AM, nragon
>> <nuno.goncal...@wedotechnologies.com
>> <mailto:nuno.goncal...@wedotechnologies.com>> wrote:
>> >>
>> >> So, serialization between producer application -> kafka ->
>> flink kafka
>> >> consumer will use avro, thrift or kryo right? From there, the
>> remaining
>> >> pipeline can just use standard pojo serialization, which would
>> be better?
>> >>
>> >>
>> >>
>> >> --
>> >> View this message in context:
>> 
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-and-Flink-integration-tp13792p13885.html
>> 
>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-and-Flink-integration-tp13792p13885.html>
>> >> Sent from the Apache Flink User Mailing List archive. mailing
>> list archive at Nabble.com.
>> >
>>
>> --
>> Urs Schönenberger - urs.schoenenber...@tngtech.com
>> <mailto:urs.schoenenber...@tngtech.com>
>>
>> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
>> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
>> Sitz: Unterföhring * Amtsgericht München * HRB 135082
>>
>>
> 
> -- 
> Jürgen Thomann
> System Administrator
> 
> 
> InnoGames GmbH
> Friesenstraße 13 - 20097 Hamburg - Germany
> Tel +49 40 7889335-0
> 
> Managing Directors: Hendrik Klindworth, Michael Zillmer
> VAT-ID: DE264068907 Amtsgericht Hamburg, HRB 108973
> 
> http://www.innogames.com – juergen.thom...@innogames.com
> 

-- 
Konstantin Knauf * konstantin.kn...@tngtech.com * +49-174-3413182
TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
Sitz: Unterföhring * Amtsgericht München * HRB 135082



signature.asc
Description: OpenPGP digital signature


Re: Kafka and Flink integration

2017-07-05 Thread Jürgen Thomann

Hi Stephan,

do you know an easy way to find out if Kryo or POJO is used? I have an 
Object that would be a POJO, but it has one field that uses an object 
without a public no argument constructor. As I understood the 
documentation, this should result in Kryo being used.


Thanks,
Jürgen

On 03.07.2017 17:18, Stephan Ewen wrote:

Hi Urs!

Inside Flink (between Flink operators)
  - Kryo is not a problem, but types must be registered up front for 
good performance
  - Tuples and POJOs are often faster than the types that fall back to 
Kryo


Persistent-storage (HDFS, Kafka, ...)
  - Kryo is not recommended, because its binary data format is not 
stable. It changes between major Kryo versions and between Kryo setups 
with different type registrations.
  - A stable format with schema evolution support (Avro, Thrift, ...) 
is recommended here.




On Thu, Jun 22, 2017 at 9:28 AM, Urs Schoenenberger 
<urs.schoenenber...@tngtech.com 
<mailto:urs.schoenenber...@tngtech.com>> wrote:


Hi Greg,

do you have a link where I could read up on the rationale behind
avoiding Kryo? I'm currently facing a similar decision and would
like to
get some more background on this.

Thank you very much,
Urs

On 21.06.2017 12:10, Greg Hogan wrote:
> The recommendation has been to avoid Kryo where possible.
>
> General data exchange: avro or thrift.
>
> Flink internal data exchange: POJO (or Tuple, which are slightly
faster though less readable, and there is an outstanding PR to
narrow or close the performance gap).
>
> Kryo is useful for types which cannot be modified to be a POJO.
There are also cases where Kryo must be used because Flink has
insufficient TypeInformation, such as when returning an interface
or abstract type when the actual concrete type can be known.
>
>
>
>> On Jun 21, 2017, at 3:19 AM, nragon
<nuno.goncal...@wedotechnologies.com
<mailto:nuno.goncal...@wedotechnologies.com>> wrote:
>>
>> So, serialization between producer application -> kafka ->
flink kafka
>> consumer will use avro, thrift or kryo right? From there, the
remaining
>> pipeline can just use standard pojo serialization, which would
be better?
>>
>>
>>
>> --
    >> View this message in context:

http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-and-Flink-integration-tp13792p13885.html

<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-and-Flink-integration-tp13792p13885.html>
>> Sent from the Apache Flink User Mailing List archive. mailing
list archive at Nabble.com.
>

--
Urs Schönenberger - urs.schoenenber...@tngtech.com
<mailto:urs.schoenenber...@tngtech.com>

TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
Sitz: Unterföhring * Amtsgericht München * HRB 135082




--
Jürgen Thomann
System Administrator


InnoGames GmbH
Friesenstraße 13 - 20097 Hamburg - Germany
Tel +49 40 7889335-0

Managing Directors: Hendrik Klindworth, Michael Zillmer
VAT-ID: DE264068907 Amtsgericht Hamburg, HRB 108973

http://www.innogames.com – juergen.thom...@innogames.com



Re: Kafka and Flink integration

2017-07-03 Thread Stephan Ewen
Hi Urs!

Inside Flink (between Flink operators)
  - Kryo is not a problem, but types must be registered up front for good
performance
  - Tuples and POJOs are often faster than the types that fall back to Kryo

Persistent-storage (HDFS, Kafka, ...)
  - Kryo is not recommended, because its binary data format is not stable.
It changes between major Kryo versions and between Kryo setups with
different type registrations.
  - A stable format with schema evolution support (Avro, Thrift, ...) is
recommended here.



On Thu, Jun 22, 2017 at 9:28 AM, Urs Schoenenberger <
urs.schoenenber...@tngtech.com> wrote:

> Hi Greg,
>
> do you have a link where I could read up on the rationale behind
> avoiding Kryo? I'm currently facing a similar decision and would like to
> get some more background on this.
>
> Thank you very much,
> Urs
>
> On 21.06.2017 12:10, Greg Hogan wrote:
> > The recommendation has been to avoid Kryo where possible.
> >
> > General data exchange: avro or thrift.
> >
> > Flink internal data exchange: POJO (or Tuple, which are slightly faster
> though less readable, and there is an outstanding PR to narrow or close the
> performance gap).
> >
> > Kryo is useful for types which cannot be modified to be a POJO. There
> are also cases where Kryo must be used because Flink has insufficient
> TypeInformation, such as when returning an interface or abstract type when
> the actual concrete type can be known.
> >
> >
> >
> >> On Jun 21, 2017, at 3:19 AM, nragon <nuno.goncalves@
> wedotechnologies.com> wrote:
> >>
> >> So, serialization between producer application -> kafka -> flink kafka
> >> consumer will use avro, thrift or kryo right? From there, the remaining
> >> pipeline can just use standard pojo serialization, which would be
> better?
> >>
> >>
> >>
> >> --
> >> View this message in context: http://apache-flink-user-
> mailing-list-archive.2336050.n4.nabble.com/Kafka-and-Flink-
> integration-tp13792p13885.html
> >> Sent from the Apache Flink User Mailing List archive. mailing list
> archive at Nabble.com.
> >
>
> --
> Urs Schönenberger - urs.schoenenber...@tngtech.com
>
> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
> Sitz: Unterföhring * Amtsgericht München * HRB 135082
>


Re: Kafka and Flink integration

2017-06-22 Thread Urs Schoenenberger
Hi Greg,

do you have a link where I could read up on the rationale behind
avoiding Kryo? I'm currently facing a similar decision and would like to
get some more background on this.

Thank you very much,
Urs

On 21.06.2017 12:10, Greg Hogan wrote:
> The recommendation has been to avoid Kryo where possible.
> 
> General data exchange: avro or thrift.
> 
> Flink internal data exchange: POJO (or Tuple, which are slightly faster 
> though less readable, and there is an outstanding PR to narrow or close the 
> performance gap).
> 
> Kryo is useful for types which cannot be modified to be a POJO. There are 
> also cases where Kryo must be used because Flink has insufficient 
> TypeInformation, such as when returning an interface or abstract type when 
> the actual concrete type can be known.
> 
> 
> 
>> On Jun 21, 2017, at 3:19 AM, nragon <nuno.goncal...@wedotechnologies.com> 
>> wrote:
>>
>> So, serialization between producer application -> kafka -> flink kafka
>> consumer will use avro, thrift or kryo right? From there, the remaining
>> pipeline can just use standard pojo serialization, which would be better?
>>
>>
>>
>> --
>> View this message in context: 
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-and-Flink-integration-tp13792p13885.html
>> Sent from the Apache Flink User Mailing List archive. mailing list archive 
>> at Nabble.com.
> 

-- 
Urs Schönenberger - urs.schoenenber...@tngtech.com

TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
Sitz: Unterföhring * Amtsgericht München * HRB 135082


Re: Kafka and Flink integration

2017-06-21 Thread Greg Hogan
If the concrete type cannot be known then a proper TypeInformation cannot be 
created and Kryo must be used.

There may be a few cases where the TypeInformation can be deduced by the 
developer but not by TypeExtractor and the returns TypeInformation must be 
explicitly given to prevent the use of Kryo.

A recent example in Gelly was a Function with input and output types the same 
generic interface bound to different parameters. The implementation outputs the 
same concrete class as the input, but this programmatic structure cannot be 
deduced by the TypeExtractor so a returns TypeInformation was specified.

Greg


> On Jun 21, 2017, at 6:21 AM, Ted Yu <yuzhih...@gmail.com> wrote:
> 
> Greg:
> Can you clarify he last part?
> Should it be: the concrete type cannot be known ?
> 
>  Original message 
> From: Greg Hogan <c...@greghogan.com>
> Date: 6/21/17 3:10 AM (GMT-08:00)
> To: nragon <nuno.goncal...@wedotechnologies.com>
> Cc: user@flink.apache.org
> Subject: Re: Kafka and Flink integration
> 
> The recommendation has been to avoid Kryo where possible.
> 
> General data exchange: avro or thrift.
> 
> Flink internal data exchange: POJO (or Tuple, which are slightly faster 
> though less readable, and there is an outstanding PR to narrow or close the 
> performance gap).
> 
> Kryo is useful for types which cannot be modified to be a POJO. There are 
> also cases where Kryo must be used because Flink has insufficient 
> TypeInformation, such as when returning an interface or abstract type when 
> the actual concrete type can be known.
> 
> 
> 
> > On Jun 21, 2017, at 3:19 AM, nragon <nuno.goncal...@wedotechnologies.com> 
> > wrote:
> > 
> > So, serialization between producer application -> kafka -> flink kafka
> > consumer will use avro, thrift or kryo right? From there, the remaining
> > pipeline can just use standard pojo serialization, which would be better?
> > 
> > 
> > 
> > --
> > View this message in context: 
> > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-and-Flink-integration-tp13792p13885.html
> > Sent from the Apache Flink User Mailing List archive. mailing list archive 
> > at Nabble.com.


Re: Kafka and Flink integration

2017-06-21 Thread Ted Yu
Greg:Can you clarify he last part?Should it be: the concrete type cannot be 
known ?
 Original message From: Greg Hogan <c...@greghogan.com> Date: 
6/21/17  3:10 AM  (GMT-08:00) To: nragon <nuno.goncal...@wedotechnologies.com> 
Cc: user@flink.apache.org Subject: Re: Kafka and Flink integration 
The recommendation has been to avoid Kryo where possible.

General data exchange: avro or thrift.

Flink internal data exchange: POJO (or Tuple, which are slightly faster though 
less readable, and there is an outstanding PR to narrow or close the 
performance gap).

Kryo is useful for types which cannot be modified to be a POJO. There are also 
cases where Kryo must be used because Flink has insufficient TypeInformation, 
such as when returning an interface or abstract type when the actual concrete 
type can be known.



> On Jun 21, 2017, at 3:19 AM, nragon <nuno.goncal...@wedotechnologies.com> 
> wrote:
> 
> So, serialization between producer application -> kafka -> flink kafka
> consumer will use avro, thrift or kryo right? From there, the remaining
> pipeline can just use standard pojo serialization, which would be better?
> 
> 
> 
> --
> View this message in context: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-and-Flink-integration-tp13792p13885.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at 
> Nabble.com.



Re: Kafka and Flink integration

2017-06-21 Thread Greg Hogan
The recommendation has been to avoid Kryo where possible.

General data exchange: avro or thrift.

Flink internal data exchange: POJO (or Tuple, which are slightly faster though 
less readable, and there is an outstanding PR to narrow or close the 
performance gap).

Kryo is useful for types which cannot be modified to be a POJO. There are also 
cases where Kryo must be used because Flink has insufficient TypeInformation, 
such as when returning an interface or abstract type when the actual concrete 
type can be known.



> On Jun 21, 2017, at 3:19 AM, nragon <nuno.goncal...@wedotechnologies.com> 
> wrote:
> 
> So, serialization between producer application -> kafka -> flink kafka
> consumer will use avro, thrift or kryo right? From there, the remaining
> pipeline can just use standard pojo serialization, which would be better?
> 
> 
> 
> --
> View this message in context: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-and-Flink-integration-tp13792p13885.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at 
> Nabble.com.



Re: Kafka and Flink integration

2017-06-21 Thread nragon
So, serialization between producer application -> kafka -> flink kafka
consumer will use avro, thrift or kryo right? From there, the remaining
pipeline can just use standard pojo serialization, which would be better?



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-and-Flink-integration-tp13792p13885.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: Kafka and Flink integration

2017-06-20 Thread Stephan Ewen
Hi!

For general data exchange between systems, it is often good to have a more
standard format.
Being able to evolve the schema of types is very helpful if you evolve the
data pipeline (which almost always happens eventually).

For that reason, Avro and Thrift are very popular for that type of data
exchange.

While they are not as fast as Kryo, they are more "robust" in the sense
that the format is stable.
Kryo is a good choice for intermediate data that is not persistent or at
least not leaving one specific system.

Greetings,
Stephan



On Tue, Jun 20, 2017 at 7:22 PM, nragon <nuno.goncal...@wedotechnologies.com
> wrote:

> Just one more question :).
> Considering I'm producing into kafka with other application other than
> flink, which serializer should i use in order to use pojo types when
> consuming those same messages (now in flink)?
>
>
>
> --
> View this message in context: http://apache-flink-user-
> mailing-list-archive.2336050.n4.nabble.com/Kafka-and-Flink-
> integration-tp13792p13882.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>


RE: Kafka and Flink integration

2017-06-20 Thread nragon
Just one more question :).
Considering I'm producing into kafka with other application other than
flink, which serializer should i use in order to use pojo types when
consuming those same messages (now in flink)?



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-and-Flink-integration-tp13792p13882.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


RE: Kafka and Flink integration

2017-06-20 Thread nragon
Thanks, I'll try to refactor into POJOs.



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-and-Flink-integration-tp13792p13879.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


RE: Kafka and Flink integration

2017-06-20 Thread Tzu-Li (Gordon) Tai
Yes, POJOs can contain other nested POJO types. You just have to make sure that 
the nested field is either public, or has a corresponding public getter- and 
setter- method that follows the Java beans naming conventions.


On 21 June 2017 at 12:20:31 AM, nragon (nuno.goncal...@wedotechnologies.com) 
wrote:

Can i have pojo has composition of other pojo? 
My custom object has many dependencies and in order to refactor it I must 
also change another 5 classes as well. 



-- 
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-and-Flink-integration-tp13792p13874.html
 
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com. 


RE: Kafka and Flink integration

2017-06-20 Thread nragon
Can i have pojo has composition of other pojo?
My custom object has many dependencies and in order to refactor it I must
also change another 5 classes as well.



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-and-Flink-integration-tp13792p13874.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


RE: Kafka and Flink integration

2017-06-20 Thread Nuno Rafael Goncalves
Nico,

I'll try some different approaches and will be back here, hopefully with some 
results :)
Thanks for this brainstorming :)

-Original Message-
From: Nico Kruber [mailto:n...@data-artisans.com] 
Sent: 20 de junho de 2017 16:44
To: Nuno Rafael Goncalves <nuno.goncal...@wedotechnologies.com>
Cc: user@flink.apache.org
Subject: Re: Kafka and Flink integration

I can only repeat what Gordon wrote on Friday: "It’s usually always recommended 
to register your classes with Kryo [using registerKryoType()], to avoid the 
somewhat inefficient classname writing.
Also, depending on the case, to decrease serialization overhead, nothing really 
beats specific custom serialization. So, you can also register specific 
serializers for Kryo to use for the type."

I also guess, this highly depends on your actual use case and in particular the 
class you are trying to de/serialize.
Unfortunately, your image is to small to read, but does your performance 
improve when registering the class as a Kryo type?

Also, I don't think, mapping it to a tuple will improve performance since Kryo 
would have to do something similar anyway. Instead, you could really have your 
own de/serializer and go from "Class (<-> Tuple) <-> Kryo <-> bytes" directly 
to "Class <-> bytes".

Nico

On Tuesday, 20 June 2017 17:20:38 CEST Nuno Rafael Goncalves wrote:
> I believe there are some performance impact while de/serializing, 
> which is "normal". What I'm trying to understand is if there are any 
> tips to improve this process. For instance, tuples vs general class 
> types. Do you know if it's worth it to map a custom object into tuple 
> just for de/serialization process?
> 
> According to jfr analysis, kryo methods are hit a lot.
> 
> [cid:image003.jpg@01D2E9E1.26D2D370]
> 
> 
> 
> 
> 
> 
> 
> -Original Message-
> From: Nico Kruber [mailto:n...@data-artisans.com]
> Sent: 20 de junho de 2017 16:04
> To: user@flink.apache.org
> Cc: Nuno Rafael Goncalves <nuno.goncal...@wedotechnologies.com>
> Subject: Re: Kafka and Flink integration
> 
> 
> 
> No, this is only necessary if you want to register a custom serializer 
> itself [1]. Also, in case you are wondering about registerKryoType() - 
> this is only needed as a performance optimisation.
> 
> 
> 
> What exactly is your problem? What are you trying to solve?
> 
> (I can't read JFR files here, and from what I read at Oracle's site, 
> this requires a commercial license, too...)
> 
> 
> 
> 
> 
> Nico
> 
> 
> 
> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/
> 
> custom_serializers.html
> 
> On Tuesday, 20 June 2017 11:54:45 CEST nragon wrote:
> > Do I need to use registerTypeWithKryoSerializer() in my execution
> > 
> > environment?
> > 
> > My serialization into kafka is done with the following snippet
> > 
> > 
> > 
> > try (ByteArrayOutputStream byteArrayOutStream = new 
> > ByteArrayOutputStream();
> > 
> > Output output = new Output(byteArrayOutStream)) {
> > 
> >   Kryo kryo = new Kryo();
> >   
> >   kryo.writeClassAndObject(output, event);
> >   
> >   output.flush();
> >   
> >   return byteArrayOutStream.toByteArray();
> > 
> > } catch (IOException e) {
> > 
> >   return null;
> > 
> > }
> > 
> > "event" is my custom object.
> > 
> > 
> > 
> > then i desirialize it in flink's kafka consumer
> > 
> > try (ByteArrayInputStream byteArrayInStream = new
> > 
> > ByteArrayInputStream(bytes); Input input = new 
> > Input(byteArrayInStream,
> > 
> > bytes.length)) {
> > 
> >   Kryo kryo = new Kryo();
> >   
> >   return kryo.readClassAndObject(input);
> > 
> > } catch (IOException e) {
> > 
> >   return null;
> > 
> > }
> > 
> > Thanks
> > 
> > 
> > 
> > 
> > 
> > 
> > 
> > --
> > 
> > View this message in context:
> > 
> > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
> > Kafka-> > a
> > 
> > nd-Flink-integration-tp13792p13841.html Sent from the Apache Flink 
> > User
> > 
> > Mailing List archive. mailing list archive at Nabble.com.



RE: Kafka and Flink integration

2017-06-20 Thread Tzu-Li (Gordon) Tai
Hi Nuno,

In general, if it is possible, it is recommended that you map your generic
classes to Tuples / POJOs [1].
For Tuples / POJOs, Flink will create specialized serializers for them,
whereas for generic classes (i.e. types which cannot be treated as POJOs)
Flink simply fallbacks to using Kryo for them.
The actual performance gain may depend a bit on what the original generic
class type looked like.

One other thing probably to look at is enabling object reuse for
de-/serialization. However, be aware that the user code needs to be aware
of this, otherwise it may lead to unexpected errors.

Cheers,
Gordon

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/types_serialization.html#rules-for-pojo-types

On 20 June 2017 at 11:24:03 PM, Nuno Rafael Goncalves (
nuno.goncal...@wedotechnologies.com) wrote:

I believe there are some performance impact while de/serializing, which is
“normal”. What I’m trying to understand is if there are any tips to improve
this process. For instance, tuples vs general class types. Do you know if
it’s worth it to map a custom object into tuple just for de/serialization
process?

According to jfr analysis, kryo methods are hit a lot.







-Original Message-
From: Nico Kruber [mailto:n...@data-artisans.com]
Sent: 20 de junho de 2017 16:04
To: user@flink.apache.org
Cc: Nuno Rafael Goncalves <nuno.goncal...@wedotechnologies.com>
Subject: Re: Kafka and Flink integration



No, this is only necessary if you want to register a custom serializer
itself [1]. Also, in case you are wondering about registerKryoType() - this
is only needed as a performance optimisation.



What exactly is your problem? What are you trying to solve?

(I can't read JFR files here, and from what I read at Oracle's site, this
requires a commercial license, too...)





Nico



[1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/

custom_serializers.html



On Tuesday, 20 June 2017 11:54:45 CEST nragon wrote:

> Do I need to use registerTypeWithKryoSerializer() in my execution

> environment?

> My serialization into kafka is done with the following snippet

>

> try (ByteArrayOutputStream byteArrayOutStream = new
ByteArrayOutputStream();

> Output output = new Output(byteArrayOutStream)) {

>   Kryo kryo = new Kryo();

>   kryo.writeClassAndObject(output, event);

>   output.flush();

>   return byteArrayOutStream.toByteArray();

> } catch (IOException e) {

>   return null;

> }

>

> "event" is my custom object.

>

> then i desirialize it in flink's kafka consumer

> try (ByteArrayInputStream byteArrayInStream = new

> ByteArrayInputStream(bytes); Input input = new Input(byteArrayInStream,

> bytes.length)) {

>   Kryo kryo = new Kryo();

>   return kryo.readClassAndObject(input);

> } catch (IOException e) {

>   return null;

> }

>

> Thanks

>

>

>

> --

> View this message in context:

>
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-a

> nd-Flink-integration-tp13792p13841.html Sent from the Apache Flink User

> Mailing List archive. mailing list archive at Nabble.com.


image003.jpg@01D2E9E1.26D2D370
Description: Binary data


Re: Kafka and Flink integration

2017-06-20 Thread Nico Kruber
I can only repeat what Gordon wrote on Friday: "It’s usually always 
recommended to register your classes with Kryo [using registerKryoType()], to 
avoid the somewhat inefficient classname writing.
Also, depending on the case, to decrease serialization overhead, nothing 
really beats specific custom serialization. So, you can also register specific 
serializers for Kryo to use for the type."

I also guess, this highly depends on your actual use case and in particular 
the class you are trying to de/serialize.
Unfortunately, your image is to small to read, but does your performance 
improve when registering the class as a Kryo type?

Also, I don't think, mapping it to a tuple will improve performance since Kryo 
would have to do something similar anyway. Instead, you could really have your 
own de/serializer and go from "Class (<-> Tuple) <-> Kryo <-> bytes" directly 
to "Class <-> bytes".

Nico

On Tuesday, 20 June 2017 17:20:38 CEST Nuno Rafael Goncalves wrote:
> I believe there are some performance impact while de/serializing, which is
> "normal". What I'm trying to understand is if there are any tips to improve
> this process. For instance, tuples vs general class types. Do you know if
> it's worth it to map a custom object into tuple just for de/serialization
> process?
> 
> According to jfr analysis, kryo methods are hit a lot.
> 
> [cid:image003.jpg@01D2E9E1.26D2D370]
> 
> 
> 
> 
> 
> 
> 
> -Original Message-
> From: Nico Kruber [mailto:n...@data-artisans.com]
> Sent: 20 de junho de 2017 16:04
> To: user@flink.apache.org
> Cc: Nuno Rafael Goncalves <nuno.goncal...@wedotechnologies.com>
> Subject: Re: Kafka and Flink integration
> 
> 
> 
> No, this is only necessary if you want to register a custom serializer
> itself [1]. Also, in case you are wondering about registerKryoType() - this
> is only needed as a performance optimisation.
> 
> 
> 
> What exactly is your problem? What are you trying to solve?
> 
> (I can't read JFR files here, and from what I read at Oracle's site, this
> requires a commercial license, too...)
> 
> 
> 
> 
> 
> Nico
> 
> 
> 
> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/
> 
> custom_serializers.html
> 
> On Tuesday, 20 June 2017 11:54:45 CEST nragon wrote:
> > Do I need to use registerTypeWithKryoSerializer() in my execution
> > 
> > environment?
> > 
> > My serialization into kafka is done with the following snippet
> > 
> > 
> > 
> > try (ByteArrayOutputStream byteArrayOutStream = new
> > ByteArrayOutputStream();
> > 
> > Output output = new Output(byteArrayOutStream)) {
> > 
> >   Kryo kryo = new Kryo();
> >   
> >   kryo.writeClassAndObject(output, event);
> >   
> >   output.flush();
> >   
> >   return byteArrayOutStream.toByteArray();
> > 
> > } catch (IOException e) {
> > 
> >   return null;
> > 
> > }
> > 
> > "event" is my custom object.
> > 
> > 
> > 
> > then i desirialize it in flink's kafka consumer
> > 
> > try (ByteArrayInputStream byteArrayInStream = new
> > 
> > ByteArrayInputStream(bytes); Input input = new Input(byteArrayInStream,
> > 
> > bytes.length)) {
> > 
> >   Kryo kryo = new Kryo();
> >   
> >   return kryo.readClassAndObject(input);
> > 
> > } catch (IOException e) {
> > 
> >   return null;
> > 
> > }
> > 
> > Thanks
> > 
> > 
> > 
> > 
> > 
> > 
> > 
> > --
> > 
> > View this message in context:
> > 
> > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-> 
> > > a
> > 
> > nd-Flink-integration-tp13792p13841.html Sent from the Apache Flink User
> > 
> > Mailing List archive. mailing list archive at Nabble.com.



signature.asc
Description: This is a digitally signed message part.


RE: Kafka and Flink integration

2017-06-20 Thread Nuno Rafael Goncalves
I believe there are some performance impact while de/serializing, which is 
"normal". What I'm trying to understand is if there are any tips to improve 
this process. For instance, tuples vs general class types. Do you know if it's 
worth it to map a custom object into tuple just for de/serialization process?

According to jfr analysis, kryo methods are hit a lot.

[cid:image003.jpg@01D2E9E1.26D2D370]







-Original Message-
From: Nico Kruber [mailto:n...@data-artisans.com]
Sent: 20 de junho de 2017 16:04
To: user@flink.apache.org
Cc: Nuno Rafael Goncalves <nuno.goncal...@wedotechnologies.com>
Subject: Re: Kafka and Flink integration



No, this is only necessary if you want to register a custom serializer itself 
[1]. Also, in case you are wondering about registerKryoType() - this is only 
needed as a performance optimisation.



What exactly is your problem? What are you trying to solve?

(I can't read JFR files here, and from what I read at Oracle's site, this 
requires a commercial license, too...)





Nico



[1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/

custom_serializers.html



On Tuesday, 20 June 2017 11:54:45 CEST nragon wrote:

> Do I need to use registerTypeWithKryoSerializer() in my execution

> environment?

> My serialization into kafka is done with the following snippet

>

> try (ByteArrayOutputStream byteArrayOutStream = new ByteArrayOutputStream();

> Output output = new Output(byteArrayOutStream)) {

>   Kryo kryo = new Kryo();

>   kryo.writeClassAndObject(output, event);

>   output.flush();

>   return byteArrayOutStream.toByteArray();

> } catch (IOException e) {

>   return null;

> }

>

> "event" is my custom object.

>

> then i desirialize it in flink's kafka consumer

> try (ByteArrayInputStream byteArrayInStream = new

> ByteArrayInputStream(bytes); Input input = new Input(byteArrayInStream,

> bytes.length)) {

>   Kryo kryo = new Kryo();

>   return kryo.readClassAndObject(input);

> } catch (IOException e) {

>   return null;

> }

>

> Thanks

>

>

>

> --

> View this message in context:

> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-a

> nd-Flink-integration-tp13792p13841.html Sent from the Apache Flink User

> Mailing List archive. mailing list archive at Nabble.com.




Re: Kafka and Flink integration

2017-06-20 Thread Nico Kruber
No, this is only necessary if you want to register a custom serializer itself 
[1]. Also, in case you are wondering about registerKryoType() - this is only 
needed as a performance optimisation.

What exactly is your problem? What are you trying to solve?
(I can't read JFR files here, and from what I read at Oracle's site, this 
requires a commercial license, too...)


Nico

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/
custom_serializers.html

On Tuesday, 20 June 2017 11:54:45 CEST nragon wrote:
> Do I need to use registerTypeWithKryoSerializer() in my execution
> environment?
> My serialization into kafka is done with the following snippet
> 
> try (ByteArrayOutputStream byteArrayOutStream = new ByteArrayOutputStream();
> Output output = new Output(byteArrayOutStream)) {
>   Kryo kryo = new Kryo();
>   kryo.writeClassAndObject(output, event);
>   output.flush();
>   return byteArrayOutStream.toByteArray();
> } catch (IOException e) {
>   return null;
> }
> 
> "event" is my custom object.
> 
> then i desirialize it in flink's kafka consumer
> try (ByteArrayInputStream byteArrayInStream = new
> ByteArrayInputStream(bytes); Input input = new Input(byteArrayInStream,
> bytes.length)) {
>   Kryo kryo = new Kryo();
>   return kryo.readClassAndObject(input);
> } catch (IOException e) {
>   return null;
> }
> 
> Thanks
> 
> 
> 
> --
> View this message in context:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-a
> nd-Flink-integration-tp13792p13841.html Sent from the Apache Flink User
> Mailing List archive. mailing list archive at Nabble.com.



signature.asc
Description: This is a digitally signed message part.


Re: Kafka and Flink integration

2017-06-20 Thread nragon
Do I need to use registerTypeWithKryoSerializer() in my execution
environment?
My serialization into kafka is done with the following snippet

try (ByteArrayOutputStream byteArrayOutStream = new ByteArrayOutputStream();
Output output = new Output(byteArrayOutStream)) {
  Kryo kryo = new Kryo();
  kryo.writeClassAndObject(output, event);
  output.flush();
  return byteArrayOutStream.toByteArray();
} catch (IOException e) {
  return null;
}

"event" is my custom object.

then i desirialize it in flink's kafka consumer
try (ByteArrayInputStream byteArrayInStream = new
ByteArrayInputStream(bytes); Input input = new Input(byteArrayInStream,
bytes.length)) {
  Kryo kryo = new Kryo();
  return kryo.readClassAndObject(input);
} catch (IOException e) {
  return null;
}

Thanks



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-and-Flink-integration-tp13792p13841.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: Kafka and Flink integration

2017-06-16 Thread nragon
My custom object is used across all job, so it'll be part of checkpoints. Can
you point me some references with some examples?



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-and-Flink-integration-tp13792p13802.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: Kafka and Flink integration

2017-06-16 Thread Tzu-Li (Gordon) Tai
Hi!

It’s usually always recommended to register your classes with Kryo, to avoid 
the somewhat inefficient classname writing.
Also, depending on the case, to decrease serialization overhead, nothing really 
beats specific custom serialization. So, you can also register specific 
serializers for Kryo to use for the type.
If you need to store these custom objects as managed state for your operators, 
you can also have your own custom Flink TypeSerializer for that.

Best,
Gordon

On 16 June 2017 at 12:27:06 PM, nragon (nuno.goncal...@wedotechnologies.com) 
wrote:

I have to produce custom objects into kafka and read them with flink. Any  
tuning advices to use kryo? Such as class registration or something like  
that? Any examples?  

Thanks  



--  
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-and-Flink-integration-tp13792.html
  
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.  


Kafka and Flink integration

2017-06-16 Thread nragon
I have to produce custom objects into kafka and read them with flink. Any
tuning advices to use kryo? Such as class registration or something like
that? Any examples?

Thanks



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-and-Flink-integration-tp13792.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: kafka and flink integration issue

2016-02-29 Thread Stephan Ewen
Good to hear. Thanks for letting us know!

On Mon, Feb 29, 2016 at 8:14 PM, Pankaj Kumar  wrote:

> yes versioning was issue . Job is working fine on flink 0.10.2.
>
> On Mon, Feb 29, 2016 at 3:15 PM, Stephan Ewen  wrote:
>
>> Hi!
>>
>> A "NoSuchMethodError" is always a sign of a version mixup. Please make
>> sure both versions (cluster and client) are exactly the same.
>>
>> Stephan
>>
>>
>> On Sat, Feb 27, 2016 at 11:05 AM, Pankaj Kumar 
>> wrote:
>>
>>> Yes Robert ,
>>> i was trying to start Flink on cluster 0.10.1.
>>>
>>> But after changing flink version to 0.10.1 , also i am getting the same
>>> error.
>>>
>>> On Sat, Feb 27, 2016 at 2:47 PM, Robert Metzger 
>>> wrote:
>>>
 Hi Pankaj,

 I suspect you are trying to start Flink on a cluster with Flink 0.10.1
 installed?

 On Sat, Feb 27, 2016 at 9:20 AM, Pankaj Kumar 
 wrote:

> I am trying to integrate kafka and flink.
> my pom file is where {flink.version} is 0.10.2
>
> 
> org.apache.flink
> flink-java
> ${flink.version}
> 
> 
> org.apache.flink
> flink-streaming-java
> ${flink.version}
> 
> 
> org.apache.flink
> flink-clients
> ${flink.version}
> 
> 
> com.fasterxml.jackson.core
> jackson-databind
> 2.6.4
> 
> 
> org.projectlombok
> lombok
> 1.16.6
> provided
> 
> 
> org.springframework
> spring-web
> 4.2.4.RELEASE
> 
> 
> org.apache.flink
> flink-connector-kafka
> ${flink.version}
> 
>
> when i am running my jar with flink command line using command
>
> bin/flink run ~/apache_flink/target/Application-0.0.1.jar
>
> I am getting error
>
> tatus of job d20986a31f2a8166986d7db354de0ff7 changed to FAILED.
> java.lang.NoSuchMethodError: 
> org.apache.flink.streaming.api.operators.StreamingRuntimeContext.isCheckpointingEnabled()Z
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.run(FlinkKafkaConsumer.java:387)
>   at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58)
>   at 
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:55)
>
>
> I am building final jar using command *mvn clean package -Pbuild-jar*
>
>

>>>
>>
>


Re: kafka and flink integration issue

2016-02-29 Thread Pankaj Kumar
yes versioning was issue . Job is working fine on flink 0.10.2.

On Mon, Feb 29, 2016 at 3:15 PM, Stephan Ewen  wrote:

> Hi!
>
> A "NoSuchMethodError" is always a sign of a version mixup. Please make
> sure both versions (cluster and client) are exactly the same.
>
> Stephan
>
>
> On Sat, Feb 27, 2016 at 11:05 AM, Pankaj Kumar 
> wrote:
>
>> Yes Robert ,
>> i was trying to start Flink on cluster 0.10.1.
>>
>> But after changing flink version to 0.10.1 , also i am getting the same
>> error.
>>
>> On Sat, Feb 27, 2016 at 2:47 PM, Robert Metzger 
>> wrote:
>>
>>> Hi Pankaj,
>>>
>>> I suspect you are trying to start Flink on a cluster with Flink 0.10.1
>>> installed?
>>>
>>> On Sat, Feb 27, 2016 at 9:20 AM, Pankaj Kumar 
>>> wrote:
>>>
 I am trying to integrate kafka and flink.
 my pom file is where {flink.version} is 0.10.2

 
 org.apache.flink
 flink-java
 ${flink.version}
 
 
 org.apache.flink
 flink-streaming-java
 ${flink.version}
 
 
 org.apache.flink
 flink-clients
 ${flink.version}
 
 
 com.fasterxml.jackson.core
 jackson-databind
 2.6.4
 
 
 org.projectlombok
 lombok
 1.16.6
 provided
 
 
 org.springframework
 spring-web
 4.2.4.RELEASE
 
 
 org.apache.flink
 flink-connector-kafka
 ${flink.version}
 

 when i am running my jar with flink command line using command

 bin/flink run ~/apache_flink/target/Application-0.0.1.jar

 I am getting error

 tatus of job d20986a31f2a8166986d7db354de0ff7 changed to FAILED.
 java.lang.NoSuchMethodError: 
 org.apache.flink.streaming.api.operators.StreamingRuntimeContext.isCheckpointingEnabled()Z
at 
 org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.run(FlinkKafkaConsumer.java:387)
at 
 org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58)
at 
 org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:55)


 I am building final jar using command *mvn clean package -Pbuild-jar*


>>>
>>
>


Re: kafka and flink integration issue

2016-02-29 Thread Stephan Ewen
Hi!

A "NoSuchMethodError" is always a sign of a version mixup. Please make sure
both versions (cluster and client) are exactly the same.

Stephan


On Sat, Feb 27, 2016 at 11:05 AM, Pankaj Kumar  wrote:

> Yes Robert ,
> i was trying to start Flink on cluster 0.10.1.
>
> But after changing flink version to 0.10.1 , also i am getting the same
> error.
>
> On Sat, Feb 27, 2016 at 2:47 PM, Robert Metzger 
> wrote:
>
>> Hi Pankaj,
>>
>> I suspect you are trying to start Flink on a cluster with Flink 0.10.1
>> installed?
>>
>> On Sat, Feb 27, 2016 at 9:20 AM, Pankaj Kumar 
>> wrote:
>>
>>> I am trying to integrate kafka and flink.
>>> my pom file is where {flink.version} is 0.10.2
>>>
>>> 
>>> org.apache.flink
>>> flink-java
>>> ${flink.version}
>>> 
>>> 
>>> org.apache.flink
>>> flink-streaming-java
>>> ${flink.version}
>>> 
>>> 
>>> org.apache.flink
>>> flink-clients
>>> ${flink.version}
>>> 
>>> 
>>> com.fasterxml.jackson.core
>>> jackson-databind
>>> 2.6.4
>>> 
>>> 
>>> org.projectlombok
>>> lombok
>>> 1.16.6
>>> provided
>>> 
>>> 
>>> org.springframework
>>> spring-web
>>> 4.2.4.RELEASE
>>> 
>>> 
>>> org.apache.flink
>>> flink-connector-kafka
>>> ${flink.version}
>>> 
>>>
>>> when i am running my jar with flink command line using command
>>>
>>> bin/flink run ~/apache_flink/target/Application-0.0.1.jar
>>>
>>> I am getting error
>>>
>>> tatus of job d20986a31f2a8166986d7db354de0ff7 changed to FAILED.
>>> java.lang.NoSuchMethodError: 
>>> org.apache.flink.streaming.api.operators.StreamingRuntimeContext.isCheckpointingEnabled()Z
>>> at 
>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.run(FlinkKafkaConsumer.java:387)
>>> at 
>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58)
>>> at 
>>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:55)
>>>
>>>
>>> I am building final jar using command *mvn clean package -Pbuild-jar*
>>>
>>>
>>
>


Re: kafka and flink integration issue

2016-02-27 Thread Pankaj Kumar
Yes Robert ,
i was trying to start Flink on cluster 0.10.1.

But after changing flink version to 0.10.1 , also i am getting the same
error.

On Sat, Feb 27, 2016 at 2:47 PM, Robert Metzger  wrote:

> Hi Pankaj,
>
> I suspect you are trying to start Flink on a cluster with Flink 0.10.1
> installed?
>
> On Sat, Feb 27, 2016 at 9:20 AM, Pankaj Kumar 
> wrote:
>
>> I am trying to integrate kafka and flink.
>> my pom file is where {flink.version} is 0.10.2
>>
>> 
>> org.apache.flink
>> flink-java
>> ${flink.version}
>> 
>> 
>> org.apache.flink
>> flink-streaming-java
>> ${flink.version}
>> 
>> 
>> org.apache.flink
>> flink-clients
>> ${flink.version}
>> 
>> 
>> com.fasterxml.jackson.core
>> jackson-databind
>> 2.6.4
>> 
>> 
>> org.projectlombok
>> lombok
>> 1.16.6
>> provided
>> 
>> 
>> org.springframework
>> spring-web
>> 4.2.4.RELEASE
>> 
>> 
>> org.apache.flink
>> flink-connector-kafka
>> ${flink.version}
>> 
>>
>> when i am running my jar with flink command line using command
>>
>> bin/flink run ~/apache_flink/target/Application-0.0.1.jar
>>
>> I am getting error
>>
>> tatus of job d20986a31f2a8166986d7db354de0ff7 changed to FAILED.
>> java.lang.NoSuchMethodError: 
>> org.apache.flink.streaming.api.operators.StreamingRuntimeContext.isCheckpointingEnabled()Z
>>  at 
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.run(FlinkKafkaConsumer.java:387)
>>  at 
>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58)
>>  at 
>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:55)
>>
>>
>> I am building final jar using command *mvn clean package -Pbuild-jar*
>>
>>
>


Re: kafka and flink integration issue

2016-02-27 Thread Robert Metzger
Hi Pankaj,

I suspect you are trying to start Flink on a cluster with Flink 0.10.1
installed?

On Sat, Feb 27, 2016 at 9:20 AM, Pankaj Kumar  wrote:

> I am trying to integrate kafka and flink.
> my pom file is where {flink.version} is 0.10.2
>
> 
> org.apache.flink
> flink-java
> ${flink.version}
> 
> 
> org.apache.flink
> flink-streaming-java
> ${flink.version}
> 
> 
> org.apache.flink
> flink-clients
> ${flink.version}
> 
> 
> com.fasterxml.jackson.core
> jackson-databind
> 2.6.4
> 
> 
> org.projectlombok
> lombok
> 1.16.6
> provided
> 
> 
> org.springframework
> spring-web
> 4.2.4.RELEASE
> 
> 
> org.apache.flink
> flink-connector-kafka
> ${flink.version}
> 
>
> when i am running my jar with flink command line using command
>
> bin/flink run ~/apache_flink/target/Application-0.0.1.jar
>
> I am getting error
>
> tatus of job d20986a31f2a8166986d7db354de0ff7 changed to FAILED.
> java.lang.NoSuchMethodError: 
> org.apache.flink.streaming.api.operators.StreamingRuntimeContext.isCheckpointingEnabled()Z
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.run(FlinkKafkaConsumer.java:387)
>   at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58)
>   at 
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:55)
>
>
> I am building final jar using command *mvn clean package -Pbuild-jar*
>
>


kafka and flink integration issue

2016-02-27 Thread Pankaj Kumar
I am trying to integrate kafka and flink.
my pom file is where {flink.version} is 0.10.2


org.apache.flink
flink-java
${flink.version}


org.apache.flink
flink-streaming-java
${flink.version}


org.apache.flink
flink-clients
${flink.version}


com.fasterxml.jackson.core
jackson-databind
2.6.4


org.projectlombok
lombok
1.16.6
provided


org.springframework
spring-web
4.2.4.RELEASE


org.apache.flink
flink-connector-kafka
${flink.version}


when i am running my jar with flink command line using command

bin/flink run ~/apache_flink/target/Application-0.0.1.jar

I am getting error

tatus of job d20986a31f2a8166986d7db354de0ff7 changed to FAILED.
java.lang.NoSuchMethodError:
org.apache.flink.streaming.api.operators.StreamingRuntimeContext.isCheckpointingEnabled()Z
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.run(FlinkKafkaConsumer.java:387)
at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58)
at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:55)


I am building final jar using command *mvn clean package -Pbuild-jar*