Flink Kafka reads too many bytes .... Very rarely

2018-02-26 Thread Philip Doctor
Hello,
I’m using Flink 1.4.0 with FlinkKafkaConsumer010 and have been for almost a 
year.  Recently, I started getting messages of the wrong length in Flink 
causing my deserializer to fail.  Let me share what I’ve learned:


  1.  All of my messages are 520 bytes exactly when my producer places them in 
kafka
  2.  About 1% of these messages have this deserialization issue in flink
  3.  When it happens, I read 10104 bytes in flink
  4.  When I write the bytes my producer creates to a file on disk (rather than 
kafka) my code reads 520 bytes and consumes them without issue off of disk
  5.  When I use kafka tool (http://www.kafkatool.com/index.html)  to dump the 
contents of my topic to disk, and read each message one at a time off of disk, 
my code reads 520 bytes per message and consumes them without issue
  6.  When I write a simple Kafka consumer (not using flink) I read one message 
at a time it’s 520 bytes and my code runs without issue

#5 and #6 are what lead me to believe that this issue is squarely a problem 
with Flink.

However, it gets more complicated, I took the messages I wrote out with both my 
simple consumer and the kafka tool, and I load them into a local kafka server, 
then attach a local flink cluster and I cannot reproduce the error, yet I can 
reproduce it 100% of the time in something closer to my production environment.

I realize this latter sounds suspicious, but I have not found anything in the 
Kafka docs indicating that I might have a configuration issue here, yet my 
simple local setup that would allow me to iterate on this and debug has failed 
me.

I’m really quite at a loss here, I believe there’s a Flink Kafka consumer bug, 
it happens exceedingly rarely as I went a year without seeing it.  I can 
reproduce it in an expensive environment but not in a “cheap” environment.

Thank you for your time, I can provide my sample data set in case that helps.  
I dumped it on my google drive 
https://drive.google.com/file/d/1h8jpAFdkSolMrT8n47JJdS6x21nd_b7n/view?usp=sharing
 that’s the full data set, about 1% of it ends up failing, it’s really hard to 
figure out which message since I can’t read any of the message that I receive 
and I get data out of order.




Re: Flink Kafka reads too many bytes .... Very rarely

2018-02-27 Thread Fabian Hueske
Hi,

Thanks for reporting the error and sharing the results of your detailed
analysis!

I don't have enough insight into the Kafka consumer, but noticed that you
said you've been running your application for over a year and only noticed
the faulty behavior recently.
Flink 1.4.0 was released in mid December last year. Did you observe the bug
before you migrated to 1.4.0?

I'll add Gordon to the thread who is the expert about Flink's Kafka
consumer.

Best, Fabian


2018-02-27 0:02 GMT+01:00 Philip Doctor :

> Hello,
>
> I’m using Flink 1.4.0 with FlinkKafkaConsumer010 and have been for almost
> a year.  Recently, I started getting messages of the wrong length in Flink
> causing my deserializer to fail.  Let me share what I’ve learned:
>
>
>
>1. All of my messages are 520 bytes exactly when my producer places
>them in kafka
>2. About 1% of these messages have this deserialization issue in flink
>3. When it happens, I read 10104 bytes in flink
>4. When I write the bytes my producer creates to a file on disk
>(rather than kafka) my code reads 520 bytes and consumes them without issue
>off of disk
>5. When I use kafka tool (http://www.kafkatool.com/index.html)  to
>dump the contents of my topic to disk, and read each message one at a time
>off of disk, my code reads 520 bytes per message and consumes them without
>issue
>6. When I write a simple Kafka consumer (not using flink) I read one
>message at a time it’s 520 bytes and my code runs without issue
>
>
>
> #5 and #6 are what lead me to believe that this issue is squarely a
> problem with Flink.
>
>
>
> However, it gets more complicated, I took the messages I wrote out with
> both my simple consumer and the kafka tool, and I load them into a local
> kafka server, then attach a local flink cluster and I cannot reproduce the
> error, yet I can reproduce it 100% of the time in something closer to my
> production environment.
>
>
>
> I realize this latter sounds suspicious, but I have not found anything in
> the Kafka docs indicating that I might have a configuration issue here, yet
> my simple local setup that would allow me to iterate on this and debug has
> failed me.
>
>
>
> I’m really quite at a loss here, I believe there’s a Flink Kafka consumer
> bug, it happens exceedingly rarely as I went a year without seeing it.  I
> can reproduce it in an expensive environment but not in a “cheap”
> environment.
>
>
>
> Thank you for your time, I can provide my sample data set in case that
> helps.  I dumped it on my google drive https://drive.google.com/file/d/
> 1h8jpAFdkSolMrT8n47JJdS6x21nd_b7n/view?usp=sharing that’s the full data
> set, about 1% of it ends up failing, it’s really hard to figure out which
> message since I can’t read any of the message that I receive and I get data
> out of order.
>
>
>
>
>


Re: Flink Kafka reads too many bytes .... Very rarely

2018-02-27 Thread Tzu-Li (Gordon) Tai
Hi Philip,

Yes, I also have the question that Fabian mentioned. Did you start observing 
this only after upgrading to 1.4.0?

Could you let me know what exactly your deserialization schema is doing? I 
don’t have any clues at the moment, but maybe there are hints there.
Also, you mentioned that the issue could not be reproduced on a local setup, 
only in “near-production” environments. What main differences are there between 
the two?

Cheers,
Gordon

On 27 February 2018 at 5:05:33 PM, Fabian Hueske (fhue...@gmail.com) wrote:

Hi,

Thanks for reporting the error and sharing the results of your detailed 
analysis!

I don't have enough insight into the Kafka consumer, but noticed that you said 
you've been running your application for over a year and only noticed the 
faulty behavior recently.
Flink 1.4.0 was released in mid December last year. Did you observe the bug 
before you migrated to 1.4.0?

I'll add Gordon to the thread who is the expert about Flink's Kafka consumer.

Best, Fabian


2018-02-27 0:02 GMT+01:00 Philip Doctor :
Hello,

I’m using Flink 1.4.0 with FlinkKafkaConsumer010 and have been for almost a 
year.  Recently, I started getting messages of the wrong length in Flink 
causing my deserializer to fail.  Let me share what I’ve learned:

 

All of my messages are 520 bytes exactly when my producer places them in kafka
About 1% of these messages have this deserialization issue in flink
When it happens, I read 10104 bytes in flink
When I write the bytes my producer creates to a file on disk (rather than 
kafka) my code reads 520 bytes and consumes them without issue off of disk
When I use kafka tool (http://www.kafkatool.com/index.html)  to dump the 
contents of my topic to disk, and read each message one at a time off of disk, 
my code reads 520 bytes per message and consumes them without issue
When I write a simple Kafka consumer (not using flink) I read one message at a 
time it’s 520 bytes and my code runs without issue
 

#5 and #6 are what lead me to believe that this issue is squarely a problem 
with Flink.

 

However, it gets more complicated, I took the messages I wrote out with both my 
simple consumer and the kafka tool, and I load them into a local kafka server, 
then attach a local flink cluster and I cannot reproduce the error, yet I can 
reproduce it 100% of the time in something closer to my production environment.

 

I realize this latter sounds suspicious, but I have not found anything in the 
Kafka docs indicating that I might have a configuration issue here, yet my 
simple local setup that would allow me to iterate on this and debug has failed 
me.

 

I’m really quite at a loss here, I believe there’s a Flink Kafka consumer bug, 
it happens exceedingly rarely as I went a year without seeing it.  I can 
reproduce it in an expensive environment but not in a “cheap” environment.

 

Thank you for your time, I can provide my sample data set in case that helps.  
I dumped it on my google drive 
https://drive.google.com/file/d/1h8jpAFdkSolMrT8n47JJdS6x21nd_b7n/view?usp=sharing
 that’s the full data set, about 1% of it ends up failing, it’s really hard to 
figure out which message since I can’t read any of the message that I receive 
and I get data out of order.

 

 




Re: Flink Kafka reads too many bytes .... Very rarely

2018-02-27 Thread Philip Doctor
Hi Gordon and Fabian,
I just re-ran test case vs Flink 1.3.2, I could not reproduce this error, so it 
does appear to be new to Flink 1.4.0 if my test is good.

The difference between my local env and prod is mostly the scale, production 
has multi-broker Kafka cluster with durable backups, etc.  Flink has 
Active-standby-standby Job Managers, multiple task-managers, full checkpoints 
to hdfs, etc; my local execution is a single Kafka instance, a single in memory 
StreamExecutionEnvironment.  But my application code is identical.  I tried 
comparing all of my kafka settings (max message size, etc), they seem in line 
aside from being single instance.   I’m not trying to rule out my environment 
as a factor but I have tried very hard to examine it, this issue has proved 
very frustrating to reproduce otherwise I would have happily sent a test case 
or even made a pass at debugging it myself.



  *   what exactly your deserialization schema is doing?

It’s some google flatbuffers data, so it’s a byte array that gets read into a 
flatbuffer schema that will read at certain offsets to pull out values (ex: ID, 
timestamp, Array).  I think it’s out of scope since I can see that 
the byte count is wrong in problem cases, which is before I get to stick it 
into a flatbuffer deserializer.  At the same time, something does seem 
important about the payload.  I’m loading ~2 million data points across ~30 
datasets into flink.  This is the only one that exhibits this problem.  I spent 
over a day digging in to what might be different about this dataset, I can’t 
manage to find it.  This makes me incredibly suspicious, I wouldn’t have 
emailed you all had I not measured the bytes in kafka vs after the flink read.

Honestly this has been a very frustrating issue to dig in to.  The fact that I 
seem to get all of my data is currently leading me to discard and ignore this 
error, it’s rare, flink still seems to work, but something is very hard to 
debug here and despite some confusing observations, most of my evidence 
suggests that this originates in the flink kafka consumer.

If I can help more, please let me know.  Thank you for your replies.

-Phil




From: "Tzu-Li (Gordon) Tai" 
Date: Tuesday, February 27, 2018 at 3:12 AM
To: Fabian Hueske , Philip Doctor 
Cc: "user@flink.apache.org" 
Subject: Re: Flink Kafka reads too many bytes  Very rarely

Hi Philip,

Yes, I also have the question that Fabian mentioned. Did you start observing 
this only after upgrading to 1.4.0?

Could you let me know what exactly your deserialization schema is doing? I 
don’t have any clues at the moment, but maybe there are hints there.
Also, you mentioned that the issue could not be reproduced on a local setup, 
only in “near-production” environments. What main differences are there between 
the two?

Cheers,
Gordon


On 27 February 2018 at 5:05:33 PM, Fabian Hueske 
(fhue...@gmail.com<mailto:fhue...@gmail.com>) wrote:
Hi,
Thanks for reporting the error and sharing the results of your detailed 
analysis!
I don't have enough insight into the Kafka consumer, but noticed that you said 
you've been running your application for over a year and only noticed the 
faulty behavior recently.
Flink 1.4.0 was released in mid December last year. Did you observe the bug 
before you migrated to 1.4.0?
I'll add Gordon to the thread who is the expert about Flink's Kafka consumer.
Best, Fabian


2018-02-27 0:02 GMT+01:00 Philip Doctor 
mailto:philip.doc...@physiq.com>>:
Hello,
I’m using Flink 1.4.0 with FlinkKafkaConsumer010 and have been for almost a 
year.  Recently, I started getting messages of the wrong length in Flink 
causing my deserializer to fail.  Let me share what I’ve learned:


  1.  All of my messages are 520 bytes exactly when my producer places them in 
kafka
  2.  About 1% of these messages have this deserialization issue in flink
  3.  When it happens, I read 10104 bytes in flink
  4.  When I write the bytes my producer creates to a file on disk (rather than 
kafka) my code reads 520 bytes and consumes them without issue off of disk
  5.  When I use kafka tool (http://www.kafkatool.com/index.html)  to dump the 
contents of my topic to disk, and read each message one at a time off of disk, 
my code reads 520 bytes per message and consumes them without issue
  6.  When I write a simple Kafka consumer (not using flink) I read one message 
at a time it’s 520 bytes and my code runs without issue

#5 and #6 are what lead me to believe that this issue is squarely a problem 
with Flink.

However, it gets more complicated, I took the messages I wrote out with both my 
simple consumer and the kafka tool, and I load them into a local kafka server, 
then attach a local flink cluster and I cannot reproduce the error, yet I can 
reproduce it 100% of the time in something closer to my production environment.

I realize this latter sounds suspicious, but I have not found anything in the 
Kafka doc

Re: Flink Kafka reads too many bytes .... Very rarely

2018-02-27 Thread Philip Doctor
  *   The fact that I seem to get all of my data is currently leading me to 
discard and ignore this error

Please ignore this statement, I typed this email as I was testing a theory, I 
meant to delete this line.  This is still a very real issue for me.  I was 
looking to try a work around tomorrow, I saw that the Kafka 11 consumer 
supported transactions for exactly once processing, I was going to read about 
that and see if I could somehow fail a read that I couldn’t deserialize and try 
again, and if that might make a difference (can I just retry this ?).  I’m not 
sure how that’ll go.  If you’ve got an idea for a work around, I’d be all ears 
too.


From: Philip Doctor 
Date: Tuesday, February 27, 2018 at 10:02 PM
To: "Tzu-Li (Gordon) Tai" , Fabian Hueske 

Cc: "user@flink.apache.org" 
Subject: Re: Flink Kafka reads too many bytes  Very rarely

Honestly this has been a very frustrating issue to dig in to.  The fact that I 
seem to get all of my data is currently leading me to discard and ignore this 
error, it’s rare, flink still seems to work, but something is very hard to 
debug here and despite some confusing observations, most of my evidence 
suggests that this originates in the flink kafka consumer.


Re: Flink Kafka reads too many bytes .... Very rarely

2018-03-01 Thread Fabian Hueske
Hi Phil,

I've created a JIRA ticket for the problem that you described and linked it
to this thread: FLINK-8820.

Thank you, Fabian

[1] https://issues.apache.org/jira/browse/FLINK-8820

2018-02-28 5:13 GMT+01:00 Philip Doctor :

>
>- The fact that I seem to get all of my data is currently leading me
>to discard and ignore this error
>
>
>
> Please ignore this statement, I typed this email as I was testing a
> theory, I meant to delete this line.  This is still a very real issue for
> me.  I was looking to try a work around tomorrow, I saw that the Kafka 11
> consumer supported transactions for exactly once processing, I was going to
> read about that and see if I could somehow fail a read that I couldn’t
> deserialize and try again, and if that might make a difference (can I just
> retry this ?).  I’m not sure how that’ll go.  If you’ve got an idea for a
> work around, I’d be all ears too.
>
>
>
>
>
> *From: *Philip Doctor 
> *Date: *Tuesday, February 27, 2018 at 10:02 PM
> *To: *"Tzu-Li (Gordon) Tai" , Fabian Hueske <
> fhue...@gmail.com>
> *Cc: *"user@flink.apache.org" 
> *Subject: *Re: Flink Kafka reads too many bytes  Very rarely
>
>
>
> Honestly this has been a very frustrating issue to dig in to.  The fact
> that I seem to get all of my data is currently leading me to discard and
> ignore this error, it’s rare, flink still seems to work, but something is
> very hard to debug here and despite some confusing observations, most of my
> evidence suggests that this originates in the flink kafka consumer.
>


Re: Flink Kafka reads too many bytes .... Very rarely

2018-03-01 Thread Stephan Ewen
Can you specify exactly where you have that excess of data?

Flink uses basically Kafka's standard consumer and passes byte[] unmodified
to the DeserializationSchema. Can you help us check whether the "too many
bytes" happens already before or after the DeserializationSchema?

  - If the "too many bytes" already arrive at the DeserializationSchema,
then we should dig into the way that Kafka's consumer is configured

  - If the "too many bytes" appears after the  DeserializationSchema, then
we should look into the DeserializationSchema, for example whether it is
stateful, accidentally shared across threads, etc.



On Thu, Mar 1, 2018 at 11:08 AM, Fabian Hueske  wrote:

> Hi Phil,
>
> I've created a JIRA ticket for the problem that you described and linked
> it to this thread: FLINK-8820.
>
> Thank you, Fabian
>
> [1] https://issues.apache.org/jira/browse/FLINK-8820
>
> 2018-02-28 5:13 GMT+01:00 Philip Doctor :
>
>>
>>- The fact that I seem to get all of my data is currently leading me
>>to discard and ignore this error
>>
>>
>>
>> Please ignore this statement, I typed this email as I was testing a
>> theory, I meant to delete this line.  This is still a very real issue for
>> me.  I was looking to try a work around tomorrow, I saw that the Kafka 11
>> consumer supported transactions for exactly once processing, I was going to
>> read about that and see if I could somehow fail a read that I couldn’t
>> deserialize and try again, and if that might make a difference (can I just
>> retry this ?).  I’m not sure how that’ll go.  If you’ve got an idea for a
>> work around, I’d be all ears too.
>>
>>
>>
>>
>>
>> *From: *Philip Doctor 
>> *Date: *Tuesday, February 27, 2018 at 10:02 PM
>> *To: *"Tzu-Li (Gordon) Tai" , Fabian Hueske <
>> fhue...@gmail.com>
>> *Cc: *"user@flink.apache.org" 
>> *Subject: *Re: Flink Kafka reads too many bytes  Very rarely
>>
>>
>>
>> Honestly this has been a very frustrating issue to dig in to.  The fact
>> that I seem to get all of my data is currently leading me to discard and
>> ignore this error, it’s rare, flink still seems to work, but something is
>> very hard to debug here and despite some confusing observations, most of my
>> evidence suggests that this originates in the flink kafka consumer.
>>
>
>


Re: Flink Kafka reads too many bytes .... Very rarely

2018-03-07 Thread Philip Doctor
Hi Stephan,
Sorry for the slow response.

I added some logging inside of my DeserializationSchema’s `deserialize(byte[] 
message)` method.

I see the extra bytes appearing in that method.

If there’s another place I should add logging, please let me know and I’m happy 
to do so.

Additionally (and this is weird), I write all my messages to the DB, so I was 
looking for what messages didn’t make it (i.e. input message 1->10,000 which of 
those isn’t in the DB).  Turns out all 10k are in the DB.  I’m not sure if that 
indicates this message is read and then retried, or what.  I would have guessed 
that somehow extra data got written to my topic, but kafka tool tell me 
otherwise.  So from my application’s perspective it just looks like I get extra 
garbage data every now and then.

This is actually a big relief, I toss out the garbage data and keep rolling.

I hope this helps, thank you.



From: Stephan Ewen 
Date: Thursday, March 1, 2018 at 9:26 AM
To: "user@flink.apache.org" 
Cc: Philip Doctor 
Subject: Re: Flink Kafka reads too many bytes .... Very rarely

Can you specify exactly where you have that excess of data?

Flink uses basically Kafka's standard consumer and passes byte[] unmodified to 
the DeserializationSchema. Can you help us check whether the "too many bytes" 
happens already before or after the DeserializationSchema?

  - If the "too many bytes" already arrive at the DeserializationSchema, then 
we should dig into the way that Kafka's consumer is configured


  - If the "too many bytes" appears after the  DeserializationSchema, then we 
should look into the DeserializationSchema, for example whether it is stateful, 
accidentally shared across threads, etc.





On Thu, Mar 1, 2018 at 11:08 AM, Fabian Hueske 
mailto:fhue...@gmail.com>> wrote:
Hi Phil,
I've created a JIRA ticket for the problem that you described and linked it to 
this thread: FLINK-8820.
Thank you, Fabian

[1] https://issues.apache.org/jira/browse/FLINK-8820

2018-02-28 5:13 GMT+01:00 Philip Doctor 
mailto:philip.doc...@physiq.com>>:

  *   The fact that I seem to get all of my data is currently leading me to 
discard and ignore this error

Please ignore this statement, I typed this email as I was testing a theory, I 
meant to delete this line.  This is still a very real issue for me.  I was 
looking to try a work around tomorrow, I saw that the Kafka 11 consumer 
supported transactions for exactly once processing, I was going to read about 
that and see if I could somehow fail a read that I couldn’t deserialize and try 
again, and if that might make a difference (can I just retry this ?).  I’m not 
sure how that’ll go.  If you’ve got an idea for a work around, I’d be all ears 
too.


From: Philip Doctor mailto:philip.doc...@physiq.com>>
Date: Tuesday, February 27, 2018 at 10:02 PM
To: "Tzu-Li (Gordon) Tai" mailto:tzuli...@apache.org>>, 
Fabian Hueske mailto:fhue...@gmail.com>>
Cc: "user@flink.apache.org<mailto:user@flink.apache.org>" 
mailto:user@flink.apache.org>>
Subject: Re: Flink Kafka reads too many bytes  Very rarely

Honestly this has been a very frustrating issue to dig in to.  The fact that I 
seem to get all of my data is currently leading me to discard and ignore this 
error, it’s rare, flink still seems to work, but something is very hard to 
debug here and despite some confusing observations, most of my evidence 
suggests that this originates in the flink kafka consumer.




Re: Flink Kafka reads too many bytes .... Very rarely

2018-03-08 Thread Stephan Ewen
Double checking: The "deserialize(byte[] message)" already receives an
additional byte[] with too many bytes?

I wonder if this might be an issue in Kafka then, or in the specific way
Kafka is configured.

On Wed, Mar 7, 2018 at 5:40 PM, Philip Doctor 
wrote:

> Hi Stephan,
>
> Sorry for the slow response.
>
>
>
> I added some logging inside of my DeserializationSchema’s
> `deserialize(byte[] message)` method.
>
>
>
> I see the extra bytes appearing in that method.
>
>
>
> If there’s another place I should add logging, please let me know and I’m
> happy to do so.
>
>
>
> Additionally (and this is weird), I write all my messages to the DB, so I
> was looking for what messages didn’t make it (i.e. input message 1->10,000
> which of those isn’t in the DB).  Turns out all 10k are in the DB.  I’m not
> sure if that indicates this message is read and then retried, or what.  I
> would have guessed that somehow extra data got written to my topic, but
> kafka tool tell me otherwise.  So from my application’s perspective it just
> looks like I get extra garbage data every now and then.
>
>
>
> This is actually a big relief, I toss out the garbage data and keep
> rolling.
>
>
>
> I hope this helps, thank you.
>
>
>
>
>
>
>
> *From: *Stephan Ewen 
> *Date: *Thursday, March 1, 2018 at 9:26 AM
> *To: *"user@flink.apache.org" 
> *Cc: *Philip Doctor 
> *Subject: *Re: Flink Kafka reads too many bytes  Very rarely
>
>
>
> Can you specify exactly where you have that excess of data?
>
>
>
> Flink uses basically Kafka's standard consumer and passes byte[]
> unmodified to the DeserializationSchema. Can you help us check whether the 
> "too
> many bytes" happens already before or after the DeserializationSchema?
>
>
>
>   - If the "too many bytes" already arrive at the DeserializationSchema,
> then we should dig into the way that Kafka's consumer is configured
>
>
>
>   - If the "too many bytes" appears after the  DeserializationSchema, then
> we should look into the DeserializationSchema, for example whether it is
> stateful, accidentally shared across threads, etc.
>
>
>
>
>
>
>
> On Thu, Mar 1, 2018 at 11:08 AM, Fabian Hueske  wrote:
>
> Hi Phil,
>
> I've created a JIRA ticket for the problem that you described and linked
> it to this thread: FLINK-8820.
>
> Thank you, Fabian
>
> [1] https://issues.apache.org/jira/browse/FLINK-8820
>
>
>
> 2018-02-28 5:13 GMT+01:00 Philip Doctor :
>
>
>- The fact that I seem to get all of my data is currently leading me
>to discard and ignore this error
>
>
>
> Please ignore this statement, I typed this email as I was testing a
> theory, I meant to delete this line.  This is still a very real issue for
> me.  I was looking to try a work around tomorrow, I saw that the Kafka 11
> consumer supported transactions for exactly once processing, I was going to
> read about that and see if I could somehow fail a read that I couldn’t
> deserialize and try again, and if that might make a difference (can I just
> retry this ?).  I’m not sure how that’ll go.  If you’ve got an idea for a
> work around, I’d be all ears too.
>
>
>
>
>
> *From: *Philip Doctor 
> *Date: *Tuesday, February 27, 2018 at 10:02 PM
> *To: *"Tzu-Li (Gordon) Tai" , Fabian Hueske <
> fhue...@gmail.com>
> *Cc: *"user@flink.apache.org" 
> *Subject: *Re: Flink Kafka reads too many bytes  Very rarely
>
>
>
> Honestly this has been a very frustrating issue to dig in to.  The fact
> that I seem to get all of my data is currently leading me to discard and
> ignore this error, it’s rare, flink still seems to work, but something is
> very hard to debug here and despite some confusing observations, most of my
> evidence suggests that this originates in the flink kafka consumer.
>
>
>
>
>