Logging Kafka during exceptions

2018-11-21 Thread Scott Sue
Hi all,

When I'm running my jobs I am consuming data from Kafka to process in my
job.  Unfortunately my job receives unexpected data from time to time which
I'm trying to find the root cause of the issue.

Ideally, I want to be able to have a way to know when the job has failed due
to an exception, to then log to file the last message that it was consuming
at the time to help track down the offending message consumed.  How is this
possible within Flink?

Thinking about this more, it may not be a consumed message that killed the
job, but maybe a transformation within the job itself and it died in a
downstream Operator.  In this case, is there a way to log to file the
message that an Operator was processing at the time that caused the
exception?


Thanks in advance!



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Logging Kafka during exceptions

2018-11-21 Thread Paul Lam
Hi Scott,

I think you can do it by catching the exception in the user function and log 
the current 
message that the operator is processing before re-throwing (or not) the 
exception to
Flink runtime.

Best,
Paul Lam

> 在 2018年11月22日,12:59,Scott Sue  写道:
> 
> Hi all,
> 
> When I'm running my jobs I am consuming data from Kafka to process in my
> job.  Unfortunately my job receives unexpected data from time to time which
> I'm trying to find the root cause of the issue.
> 
> Ideally, I want to be able to have a way to know when the job has failed due
> to an exception, to then log to file the last message that it was consuming
> at the time to help track down the offending message consumed.  How is this
> possible within Flink?
> 
> Thinking about this more, it may not be a consumed message that killed the
> job, but maybe a transformation within the job itself and it died in a
> downstream Operator.  In this case, is there a way to log to file the
> message that an Operator was processing at the time that caused the
> exception?
> 
> 
> Thanks in advance!
> 
> 
> 
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/



Re: Logging Kafka during exceptions

2018-11-21 Thread Scott Sue
Hi Paul,

Thanks for the quick reply.  Ok does that mean that as general practice, I 
should be catching all exceptions for the purpose of logging in any of my 
Operators?  This seems like something that could be handled by Flink itself as 
they are unexpected exceptions. Otherwise every single operator I create must 
have a try catch block?


Regards,
Scott

SCOTT SUE
CHIEF TECHNOLOGY OFFICER

Support Line : +44(0) 2031 371 603
Mobile : +852 9611 3969

9/F, 33 Lockhart Road, Wan Chai, Hong Kong
www.celer-tech.com 






> On 22 Nov 2018, at 13:17, Paul Lam  wrote:
> 
> ng the exception in the user function and log the current 
> message that the operator is processing before re-throwing (or not) the 
> exception to


-- 








_This message, including any attachments, may include private, 
privileged and confidential information and is intended only for the 
personal and confidential use of the intended recipient(s). If the reader 
of this message is not an intended recipient, you are hereby notified that 
any review, use, dissemination, distribution, printing or copying of this 
message or its contents is strictly prohibited and may be unlawful. If you 
are not an intended recipient or have received this communication in error, 
please immediately notify the sender by telephone and/or a reply email and 
permanently delete the original message, including any attachments, without 
making a copy._


Re: Logging Kafka during exceptions

2018-11-21 Thread Paul Lam
Hi Scott,

IMHO, the exception is caused by the user codes so it should be handled by the 
user function,
and Flink runtime shouldn’t make any assumption about what’s happening in the 
user function. 

The exception may or may not be caused by unexpected data, so logging the 
current processing
records is not always helpful. What do you think?

Best,
Paul Lam

> 在 2018年11月22日,13:20,Scott Sue  写道:
> 
> Hi Paul,
> 
> Thanks for the quick reply.  Ok does that mean that as general practice, I 
> should be catching all exceptions for the purpose of logging in any of my 
> Operators?  This seems like something that could be handled by Flink itself 
> as they are unexpected exceptions. Otherwise every single operator I create 
> must have a try catch block?
> 
> 
> Regards,
> Scott
> 
> SCOTT SUE
> CHIEF TECHNOLOGY OFFICER
> 
> Support Line : +44(0) 2031 371 603
> Mobile : +852 9611 3969
> 
> 9/F, 33 Lockhart Road, Wan Chai, Hong Kong
> www.celer-tech.com 
> 
> 
> 
> 
> 
> 
>> On 22 Nov 2018, at 13:17, Paul Lam > > wrote:
>> 
>> ng the exception in the user function and log the current 
>> message that the operator is processing before re-throwing (or not) the 
>> exception to
> 
> 
> This message, including any attachments, may include private, privileged and 
> confidential information and is intended only for the personal and 
> confidential use of the intended recipient(s). If the reader of this message 
> is not an intended recipient, you are hereby notified that any review, use, 
> dissemination, distribution, printing or copying of this message or its 
> contents is strictly prohibited and may be unlawful. If you are not an 
> intended recipient or have received this communication in error, please 
> immediately notify the sender by telephone and/or a reply email and 
> permanently delete the original message, including any attachments, without 
> making a copy.



Re: Logging Kafka during exceptions

2018-11-21 Thread Scott Sue
Hi Paul,

Yes correct, Flink shouldn’t make any assumptions on what is inside the user 
function.

That is true, the exception may not be from a direct result of unexpected data, 
but the incoming data coupled by the state of the job is causing unexpected 
behaviour.  From my perspective, I wouldn’t want to defensively code my 
function to the n-th degree, I would rather try to understand why my function 
was failing (either due to the message I’m processing at the time, along with 
the state of the function) and fix the issue that way.  Then I’d defensively 
code the function as appropriate.

Thoughts?


Regards,
Scott

SCOTT SUE
CHIEF TECHNOLOGY OFFICER

Support Line : +44(0) 2031 371 603
Mobile : +852 9611 3969

9/F, 33 Lockhart Road, Wan Chai, Hong Kong
www.celer-tech.com 






> On 22 Nov 2018, at 15:04, Paul Lam  wrote:
> 
> 
> a


-- 








_This message, including any attachments, may include private, 
privileged and confidential information and is intended only for the 
personal and confidential use of the intended recipient(s). If the reader 
of this message is not an intended recipient, you are hereby notified that 
any review, use, dissemination, distribution, printing or copying of this 
message or its contents is strictly prohibited and may be unlawful. If you 
are not an intended recipient or have received this communication in error, 
please immediately notify the sender by telephone and/or a reply email and 
permanently delete the original message, including any attachments, without 
making a copy._


Re: Logging Kafka during exceptions

2018-11-21 Thread miki haiat
If so , then you can implement your own deserializer[1] with costume logic
and error handling



1.
https://ci.apache.org/projects/flink/flink-docs-stable/api/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchemaWrapper.html


On Thu, Nov 22, 2018 at 8:57 AM Scott Sue  wrote:

> Json is sent into Kafka
>
>
> Regards,
> Scott
>
> SCOTT SUE
> CHIEF TECHNOLOGY OFFICER
>
> Support Line : +44(0) 2031 371 603
> Mobile : +852 9611 3969
>
> 9/F, 33 Lockhart Road, Wan Chai, Hong Kong
> www.celer-tech.com
>
>
>
>
>
>
>
> On 22 Nov 2018, at 14:55, miki haiat  wrote:
>
> Which data format   is sent to kafka ?
> Json Avro Other ?
>
>
>
> On Thu, Nov 22, 2018 at 7:36 AM Scott Sue 
> wrote:
>
>> Unexpected data meaning business level data that I didn’t expect to
>> receive. So business level data that doesn’t quite conform
>>
>> On Thu, 22 Nov 2018 at 13:30, miki haiat  wrote:
>>
>>>  Unexpected data you mean parsing error ?
>>> Which format is sent to Kafka ?
>>>
>>>
>>>
>>> On Thu, 22 Nov 2018, 6:59 Scott Sue >>
 Hi all,

 When I'm running my jobs I am consuming data from Kafka to process in my
 job.  Unfortunately my job receives unexpected data from time to time
 which
 I'm trying to find the root cause of the issue.

 Ideally, I want to be able to have a way to know when the job has
 failed due
 to an exception, to then log to file the last message that it was
 consuming
 at the time to help track down the offending message consumed.  How is
 this
 possible within Flink?

 Thinking about this more, it may not be a consumed message that killed
 the
 job, but maybe a transformation within the job itself and it died in a
 downstream Operator.  In this case, is there a way to log to file the
 message that an Operator was processing at the time that caused the
 exception?


 Thanks in advance!



 --
 Sent from:
 http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

>>> --
>>
>>
>> Regards,
>> Scott
>>
>> SCOTT SUE
>> CHIEF TECHNOLOGY OFFICER
>>
>> Support Line : +44(0) 2031 371 603 <+44%2020%203137%201603>
>> Mobile : +852 9611 3969 <9611%203969>
>>
>> 9/F, 33 Lockhart Road, Wanchai, Hong Kong
>> www.celer-tech.com
>>
>> *This message, including any attachments, may include private, privileged
>> and confidential information and is intended only for the personal and
>> confidential use of the intended recipient(s). If the reader of this
>> message is not an intended recipient, you are hereby notified that any
>> review, use, dissemination, distribution, printing or copying of this
>> message or its contents is strictly prohibited and may be unlawful. If you
>> are not an intended recipient or have received this communication in error,
>> please immediately notify the sender by telephone and/or a reply email and
>> permanently delete the original message, including any attachments, without
>> making a copy.*
>>
>
>
> *This message, including any attachments, may include private, privileged
> and confidential information and is intended only for the personal and
> confidential use of the intended recipient(s). If the reader of this
> message is not an intended recipient, you are hereby notified that any
> review, use, dissemination, distribution, printing or copying of this
> message or its contents is strictly prohibited and may be unlawful. If you
> are not an intended recipient or have received this communication in error,
> please immediately notify the sender by telephone and/or a reply email and
> permanently delete the original message, including any attachments, without
> making a copy.*
>


Re: Logging Kafka during exceptions

2018-11-21 Thread Scott Sue
Yeah I think that would work for incorrect data consumed, but not for if 
deserialization passes correctly, but one of my custom functions post 
deserialization generates an error?


Regards,
Scott

SCOTT SUE
CHIEF TECHNOLOGY OFFICER

Support Line : +44(0) 2031 371 603
Mobile : +852 9611 3969

9/F, 33 Lockhart Road, Wan Chai, Hong Kong
www.celer-tech.com 






> On 22 Nov 2018, at 15:15, miki haiat  wrote:
> 
> If so , then you can implement your own deserializer[1] with costume logic  
> and error handling 
> 
> 
> 
> 1.https://ci.apache.org/projects/flink/flink-docs-stable/api/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchemaWrapper.html
>  
> 
> 
> 
> On Thu, Nov 22, 2018 at 8:57 AM Scott Sue  > wrote:
> Json is sent into Kafka
> 
> 
> Regards,
> Scott
> 
> SCOTT SUE
> CHIEF TECHNOLOGY OFFICER
> 
> Support Line : +44(0) 2031 371 603
> Mobile : +852 9611 3969
> 
> 9/F, 33 Lockhart Road, Wan Chai, Hong Kong
> www.celer-tech.com 
> 
> 
> 
> 
> 
> 
>> On 22 Nov 2018, at 14:55, miki haiat > > wrote:
>> 
>> Which data format   is sent to kafka ? 
>> Json Avro Other ?
>> 
>> 
>> 
>> On Thu, Nov 22, 2018 at 7:36 AM Scott Sue > > wrote:
>> Unexpected data meaning business level data that I didn’t expect to receive. 
>> So business level data that doesn’t quite conform
>> 
>> On Thu, 22 Nov 2018 at 13:30, miki haiat > > wrote:
>>  Unexpected data you mean parsing error ?
>> Which format is sent to Kafka ?
>> 
>> 
>> 
>> On Thu, 22 Nov 2018, 6:59 Scott Sue >  wrote:
>> Hi all,
>> 
>> When I'm running my jobs I am consuming data from Kafka to process in my
>> job.  Unfortunately my job receives unexpected data from time to time which
>> I'm trying to find the root cause of the issue.
>> 
>> Ideally, I want to be able to have a way to know when the job has failed due
>> to an exception, to then log to file the last message that it was consuming
>> at the time to help track down the offending message consumed.  How is this
>> possible within Flink?
>> 
>> Thinking about this more, it may not be a consumed message that killed the
>> job, but maybe a transformation within the job itself and it died in a
>> downstream Operator.  In this case, is there a way to log to file the
>> message that an Operator was processing at the time that caused the
>> exception?
>> 
>> 
>> Thanks in advance!
>> 
>> 
>> 
>> --
>> Sent from: 
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ 
>> 
>> -- 
>> 
>> 
>> Regards,
>> Scott
>> 
>> SCOTT SUE
>> CHIEF TECHNOLOGY OFFICER
>> 
>> Support Line : +44(0) 2031 371 603 
>> Mobile : +852 9611 3969 
>> 
>> 9/F, 33 Lockhart Road, Wanchai, Hong Kong
>> www.celer-tech.com 
>> This message, including any attachments, may include private, privileged and 
>> confidential information and is intended only for the personal and 
>> confidential use of the intended recipient(s). If the reader of this message 
>> is not an intended recipient, you are hereby notified that any review, use, 
>> dissemination, distribution, printing or copying of this message or its 
>> contents is strictly prohibited and may be unlawful. If you are not an 
>> intended recipient or have received this communication in error, please 
>> immediately notify the sender by telephone and/or a reply email and 
>> permanently delete the original message, including any attachments, without 
>> making a copy.
> 
> 
> This message, including any attachments, may include private, privileged and 
> confidential information and is intended only for the personal and 
> confidential use of the intended recipient(s). If the reader of this message 
> is not an intended recipient, you are hereby notified that any review, use, 
> dissemination, distribution, printing or copying of this message or its 
> contents is strictly prohibited and may be unlawful. If you are not an 
> intended recipient or have received this communication in error, please 
> immediately notify the sender by telephone and/or a reply email and 
> permanently delete the original message, including any attachments, without 
> making a copy.


-- 








_This message, including any attachments, may include private, 
privileged and confidential information and is intended only for the 
personal and confidential use of the intended recipient(s). If the reader 
of this message is not an intended recipient, you are hereby notified that 
any review, use, dissemination, distribution, printing or copying of this 
message or its contents is strictly prohibited and may be unlawful. If yo

Re: Logging Kafka during exceptions

2018-11-22 Thread Till Rohrmann
Hi Scott,

I think you could write some Wrappers for the different user function types
which could contain the logging logic. That way you would still need to
wrap you actual business logic but don't have to duplicate the logic over
and over again.

If you also want to log the state, then you would need to wrap the
RuntimeContext to interfere all state registering calls so that you can
keep track of them.

Would that work for you?

Cheers,
Till

On Thu, Nov 22, 2018 at 8:44 AM Scott Sue  wrote:

> Yeah I think that would work for incorrect data consumed, but not for if
> deserialization passes correctly, but one of my custom functions
> post deserialization generates an error?
>
>
> Regards,
> Scott
>
> SCOTT SUE
> CHIEF TECHNOLOGY OFFICER
>
> Support Line : +44(0) 2031 371 603
> Mobile : +852 9611 3969
>
> 9/F, 33 Lockhart Road, Wan Chai, Hong Kong
> www.celer-tech.com
>
>
>
>
>
>
>
> On 22 Nov 2018, at 15:15, miki haiat  wrote:
>
> If so , then you can implement your own deserializer[1] with costume
> logic  and error handling
>
>
>
> 1.
> https://ci.apache.org/projects/flink/flink-docs-stable/api/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchemaWrapper.html
>
>
> On Thu, Nov 22, 2018 at 8:57 AM Scott Sue 
> wrote:
>
>> Json is sent into Kafka
>>
>>
>> Regards,
>> Scott
>>
>> SCOTT SUE
>> CHIEF TECHNOLOGY OFFICER
>>
>> Support Line : +44(0) 2031 371 603
>> Mobile : +852 9611 3969
>>
>> 9/F, 33 Lockhart Road, Wan Chai, Hong Kong
>> www.celer-tech.com
>>
>>
>>
>>
>>
>>
>>
>> On 22 Nov 2018, at 14:55, miki haiat  wrote:
>>
>> Which data format   is sent to kafka ?
>> Json Avro Other ?
>>
>>
>>
>> On Thu, Nov 22, 2018 at 7:36 AM Scott Sue 
>> wrote:
>>
>>> Unexpected data meaning business level data that I didn’t expect to
>>> receive. So business level data that doesn’t quite conform
>>>
>>> On Thu, 22 Nov 2018 at 13:30, miki haiat  wrote:
>>>
  Unexpected data you mean parsing error ?
 Which format is sent to Kafka ?



 On Thu, 22 Nov 2018, 6:59 Scott Sue >>>
> Hi all,
>
> When I'm running my jobs I am consuming data from Kafka to process in
> my
> job.  Unfortunately my job receives unexpected data from time to time
> which
> I'm trying to find the root cause of the issue.
>
> Ideally, I want to be able to have a way to know when the job has
> failed due
> to an exception, to then log to file the last message that it was
> consuming
> at the time to help track down the offending message consumed.  How is
> this
> possible within Flink?
>
> Thinking about this more, it may not be a consumed message that killed
> the
> job, but maybe a transformation within the job itself and it died in a
> downstream Operator.  In this case, is there a way to log to file the
> message that an Operator was processing at the time that caused the
> exception?
>
>
> Thanks in advance!
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>
 --
>>>
>>>
>>> Regards,
>>> Scott
>>>
>>> SCOTT SUE
>>> CHIEF TECHNOLOGY OFFICER
>>>
>>> Support Line : +44(0) 2031 371 603 <+44%2020%203137%201603>
>>> Mobile : +852 9611 3969 <9611%203969>
>>>
>>> 9/F, 33 Lockhart Road, Wanchai, Hong Kong
>>> www.celer-tech.com
>>>
>>> *This message, including any attachments, may include private,
>>> privileged and confidential information and is intended only for the
>>> personal and confidential use of the intended recipient(s). If the reader
>>> of this message is not an intended recipient, you are hereby notified that
>>> any review, use, dissemination, distribution, printing or copying of this
>>> message or its contents is strictly prohibited and may be unlawful. If you
>>> are not an intended recipient or have received this communication in error,
>>> please immediately notify the sender by telephone and/or a reply email and
>>> permanently delete the original message, including any attachments, without
>>> making a copy.*
>>>
>>
>>
>> *This message, including any attachments, may include private, privileged
>> and confidential information and is intended only for the personal and
>> confidential use of the intended recipient(s). If the reader of this
>> message is not an intended recipient, you are hereby notified that any
>> review, use, dissemination, distribution, printing or copying of this
>> message or its contents is strictly prohibited and may be unlawful. If you
>> are not an intended recipient or have received this communication in error,
>> please immediately notify the sender by telephone and/or a reply email and
>> permanently delete the original message, including any attachments, without
>> making a copy.*
>>
>
>
> *This message, including any attachments, may include private, privileged
> and confidential information and is intended only for the personal and
> confidential use of the intended recipient(s). If

Re: Logging Kafka during exceptions

2018-11-22 Thread Scott Sue
Hi Till,

Yeah I think that would work especially knowing this isn’ something that is out 
of the box at the moment.  Do you think its worth raising this as a feature 
request at all?  I think that’s one thing with my experience with Flink is that 
its quite hard to debug what is going on when there is an unexpected exception.


Regards,
Scott

SCOTT SUE
CHIEF TECHNOLOGY OFFICER

Support Line : +44(0) 2031 371 603
Mobile : +852 9611 3969

9/F, 33 Lockhart Road, Wan Chai, Hong Kong
www.celer-tech.com 






> On 23 Nov 2018, at 00:12, Till Rohrmann  wrote:
> 
> Hi Scott,
> 
> I think you could write some Wrappers for the different user function types 
> which could contain the logging logic. That way you would still need to wrap 
> you actual business logic but don't have to duplicate the logic over and over 
> again.
> 
> If you also want to log the state, then you would need to wrap the 
> RuntimeContext to interfere all state registering calls so that you can keep 
> track of them.
> 
> Would that work for you?
> 
> Cheers,
> Till
> 
> On Thu, Nov 22, 2018 at 8:44 AM Scott Sue  > wrote:
> Yeah I think that would work for incorrect data consumed, but not for if 
> deserialization passes correctly, but one of my custom functions post 
> deserialization generates an error?
> 
> 
> Regards,
> Scott
> 
> SCOTT SUE
> CHIEF TECHNOLOGY OFFICER
> 
> Support Line : +44(0) 2031 371 603
> Mobile : +852 9611 3969
> 
> 9/F, 33 Lockhart Road, Wan Chai, Hong Kong
> www.celer-tech.com 
> 
> 
> 
> 
> 
> 
>> On 22 Nov 2018, at 15:15, miki haiat > > wrote:
>> 
>> If so , then you can implement your own deserializer[1] with costume logic  
>> and error handling 
>> 
>> 
>> 
>> 1.https://ci.apache.org/projects/flink/flink-docs-stable/api/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchemaWrapper.html
>>  
>> 
>> 
>> 
>> On Thu, Nov 22, 2018 at 8:57 AM Scott Sue > > wrote:
>> Json is sent into Kafka
>> 
>> 
>> Regards,
>> Scott
>> 
>> SCOTT SUE
>> CHIEF TECHNOLOGY OFFICER
>> 
>> Support Line : +44(0) 2031 371 603
>> Mobile : +852 9611 3969
>> 
>> 9/F, 33 Lockhart Road, Wan Chai, Hong Kong
>> www.celer-tech.com 
>> 
>> 
>> 
>> 
>> 
>> 
>>> On 22 Nov 2018, at 14:55, miki haiat >> > wrote:
>>> 
>>> Which data format   is sent to kafka ? 
>>> Json Avro Other ?
>>> 
>>> 
>>> 
>>> On Thu, Nov 22, 2018 at 7:36 AM Scott Sue >> > wrote:
>>> Unexpected data meaning business level data that I didn’t expect to 
>>> receive. So business level data that doesn’t quite conform
>>> 
>>> On Thu, 22 Nov 2018 at 13:30, miki haiat >> > wrote:
>>>  Unexpected data you mean parsing error ?
>>> Which format is sent to Kafka ?
>>> 
>>> 
>>> 
>>> On Thu, 22 Nov 2018, 6:59 Scott Sue >>  wrote:
>>> Hi all,
>>> 
>>> When I'm running my jobs I am consuming data from Kafka to process in my
>>> job.  Unfortunately my job receives unexpected data from time to time which
>>> I'm trying to find the root cause of the issue.
>>> 
>>> Ideally, I want to be able to have a way to know when the job has failed due
>>> to an exception, to then log to file the last message that it was consuming
>>> at the time to help track down the offending message consumed.  How is this
>>> possible within Flink?
>>> 
>>> Thinking about this more, it may not be a consumed message that killed the
>>> job, but maybe a transformation within the job itself and it died in a
>>> downstream Operator.  In this case, is there a way to log to file the
>>> message that an Operator was processing at the time that caused the
>>> exception?
>>> 
>>> 
>>> Thanks in advance!
>>> 
>>> 
>>> 
>>> --
>>> Sent from: 
>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ 
>>> 
>>> -- 
>>> 
>>> 
>>> Regards,
>>> Scott
>>> 
>>> SCOTT SUE
>>> CHIEF TECHNOLOGY OFFICER
>>> 
>>> Support Line : +44(0) 2031 371 603 
>>> Mobile : +852 9611 3969 
>>> 
>>> 9/F, 33 Lockhart Road, Wanchai, Hong Kong
>>> www.celer-tech.com 
>>> This message, including any attachments, may include private, privileged 
>>> and confidential information and is intended only for the personal and 
>>> confidential use of the intended recipient(s). If the reader of this 
>>> message is not an intended recipient, you are hereby notified that any 
>>> review, use, dissemination, distribution, printing or copying of this 
>>> message or its contents is strictly prohibited and may be unlawful. If you 
>>> are not an intended recipient or have received this communicati

Re: Logging Kafka during exceptions

2018-11-22 Thread Till Rohrmann
I can see the benefit for other users as well. One could include it as part
of some development/debugging tools, for example. It would not strictly
need to go into Flink but it would have the benefit of better/increased
visibility I guess. In that sense, opening a JIRA issue and posting on dev
might be a good idea to check how much interest is there.

Cheers,
Till

On Thu, Nov 22, 2018 at 5:17 PM Scott Sue  wrote:

> Hi Till,
>
> Yeah I think that would work especially knowing this isn’ something that
> is out of the box at the moment.  Do you think its worth raising this as a
> feature request at all?  I think that’s one thing with my experience with
> Flink is that its quite hard to debug what is going on when there is an
> unexpected exception.
>
>
> Regards,
> Scott
>
> SCOTT SUE
> CHIEF TECHNOLOGY OFFICER
>
> Support Line : +44(0) 2031 371 603
> Mobile : +852 9611 3969
>
> 9/F, 33 Lockhart Road, Wan Chai, Hong Kong
> www.celer-tech.com
>
>
>
>
>
>
>
> On 23 Nov 2018, at 00:12, Till Rohrmann  wrote:
>
> Hi Scott,
>
> I think you could write some Wrappers for the different user function
> types which could contain the logging logic. That way you would still need
> to wrap you actual business logic but don't have to duplicate the logic
> over and over again.
>
> If you also want to log the state, then you would need to wrap the
> RuntimeContext to interfere all state registering calls so that you can
> keep track of them.
>
> Would that work for you?
>
> Cheers,
> Till
>
> On Thu, Nov 22, 2018 at 8:44 AM Scott Sue 
> wrote:
>
>> Yeah I think that would work for incorrect data consumed, but not for if
>> deserialization passes correctly, but one of my custom functions
>> post deserialization generates an error?
>>
>>
>> Regards,
>> Scott
>>
>> SCOTT SUE
>> CHIEF TECHNOLOGY OFFICER
>>
>> Support Line : +44(0) 2031 371 603
>> Mobile : +852 9611 3969
>>
>> 9/F, 33 Lockhart Road, Wan Chai, Hong Kong
>> www.celer-tech.com
>>
>>
>>
>>
>>
>>
>>
>> On 22 Nov 2018, at 15:15, miki haiat  wrote:
>>
>> If so , then you can implement your own deserializer[1] with costume
>> logic  and error handling
>>
>>
>>
>> 1.
>> https://ci.apache.org/projects/flink/flink-docs-stable/api/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchemaWrapper.html
>>
>>
>> On Thu, Nov 22, 2018 at 8:57 AM Scott Sue 
>> wrote:
>>
>>> Json is sent into Kafka
>>>
>>>
>>> Regards,
>>> Scott
>>>
>>> SCOTT SUE
>>> CHIEF TECHNOLOGY OFFICER
>>>
>>> Support Line : +44(0) 2031 371 603
>>> Mobile : +852 9611 3969
>>>
>>> 9/F, 33 Lockhart Road, Wan Chai, Hong Kong
>>> www.celer-tech.com
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> On 22 Nov 2018, at 14:55, miki haiat  wrote:
>>>
>>> Which data format   is sent to kafka ?
>>> Json Avro Other ?
>>>
>>>
>>>
>>> On Thu, Nov 22, 2018 at 7:36 AM Scott Sue 
>>> wrote:
>>>
 Unexpected data meaning business level data that I didn’t expect to
 receive. So business level data that doesn’t quite conform

 On Thu, 22 Nov 2018 at 13:30, miki haiat  wrote:

>  Unexpected data you mean parsing error ?
> Which format is sent to Kafka ?
>
>
>
> On Thu, 22 Nov 2018, 6:59 Scott Sue 
>> Hi all,
>>
>> When I'm running my jobs I am consuming data from Kafka to process in
>> my
>> job.  Unfortunately my job receives unexpected data from time to time
>> which
>> I'm trying to find the root cause of the issue.
>>
>> Ideally, I want to be able to have a way to know when the job has
>> failed due
>> to an exception, to then log to file the last message that it was
>> consuming
>> at the time to help track down the offending message consumed.  How
>> is this
>> possible within Flink?
>>
>> Thinking about this more, it may not be a consumed message that
>> killed the
>> job, but maybe a transformation within the job itself and it died in a
>> downstream Operator.  In this case, is there a way to log to file the
>> message that an Operator was processing at the time that caused the
>> exception?
>>
>>
>> Thanks in advance!
>>
>>
>>
>> --
>> Sent from:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>
> --


 Regards,
 Scott

 SCOTT SUE
 CHIEF TECHNOLOGY OFFICER

 Support Line : +44(0) 2031 371 603 <+44%2020%203137%201603>
 Mobile : +852 9611 3969 <9611%203969>

 9/F, 33 Lockhart Road, Wanchai, Hong Kong
 www.celer-tech.com

 *This message, including any attachments, may include private,
 privileged and confidential information and is intended only for the
 personal and confidential use of the intended recipient(s). If the reader
 of this message is not an intended recipient, you are hereby notified that
 any review, use, dissemination, distribution, printing or copying of this
 message or its contents is strictly prohibited and may be unlawful. 

Re: Logging Kafka during exceptions

2018-11-22 Thread Scott Sue
Thanks Till,

I’ve raised a JIRA for this: https://issues.apache.org/jira/browse/FLINK-10988. 
 Let me know if theres anything else I can add to the JIRA to help


Regards,
Scott

SCOTT SUE
CHIEF TECHNOLOGY OFFICER

Support Line : +44(0) 2031 371 603
Mobile : +852 9611 3969

9/F, 33 Lockhart Road, Wan Chai, Hong Kong
www.celer-tech.com 






> On 23 Nov 2018, at 00:23, Till Rohrmann  wrote:
> 
> I can see the benefit for other users as well. One could include it as part 
> of some development/debugging tools, for example. It would not strictly need 
> to go into Flink but it would have the benefit of better/increased visibility 
> I guess. In that sense, opening a JIRA issue and posting on dev might be a 
> good idea to check how much interest is there.
> 
> Cheers,
> Till
> 
> On Thu, Nov 22, 2018 at 5:17 PM Scott Sue  > wrote:
> Hi Till,
> 
> Yeah I think that would work especially knowing this isn’ something that is 
> out of the box at the moment.  Do you think its worth raising this as a 
> feature request at all?  I think that’s one thing with my experience with 
> Flink is that its quite hard to debug what is going on when there is an 
> unexpected exception.
> 
> 
> Regards,
> Scott
> 
> SCOTT SUE
> CHIEF TECHNOLOGY OFFICER
> 
> Support Line : +44(0) 2031 371 603
> Mobile : +852 9611 3969
> 
> 9/F, 33 Lockhart Road, Wan Chai, Hong Kong
> www.celer-tech.com 
> 
> 
> 
> 
> 
> 
>> On 23 Nov 2018, at 00:12, Till Rohrmann > > wrote:
>> 
>> Hi Scott,
>> 
>> I think you could write some Wrappers for the different user function types 
>> which could contain the logging logic. That way you would still need to wrap 
>> you actual business logic but don't have to duplicate the logic over and 
>> over again.
>> 
>> If you also want to log the state, then you would need to wrap the 
>> RuntimeContext to interfere all state registering calls so that you can keep 
>> track of them.
>> 
>> Would that work for you?
>> 
>> Cheers,
>> Till
>> 
>> On Thu, Nov 22, 2018 at 8:44 AM Scott Sue > > wrote:
>> Yeah I think that would work for incorrect data consumed, but not for if 
>> deserialization passes correctly, but one of my custom functions post 
>> deserialization generates an error?
>> 
>> 
>> Regards,
>> Scott
>> 
>> SCOTT SUE
>> CHIEF TECHNOLOGY OFFICER
>> 
>> Support Line : +44(0) 2031 371 603
>> Mobile : +852 9611 3969
>> 
>> 9/F, 33 Lockhart Road, Wan Chai, Hong Kong
>> www.celer-tech.com 
>> 
>> 
>> 
>> 
>> 
>> 
>>> On 22 Nov 2018, at 15:15, miki haiat >> > wrote:
>>> 
>>> If so , then you can implement your own deserializer[1] with costume logic  
>>> and error handling 
>>> 
>>> 
>>> 
>>> 1.https://ci.apache.org/projects/flink/flink-docs-stable/api/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchemaWrapper.html
>>>  
>>> 
>>> 
>>> 
>>> On Thu, Nov 22, 2018 at 8:57 AM Scott Sue >> > wrote:
>>> Json is sent into Kafka
>>> 
>>> 
>>> Regards,
>>> Scott
>>> 
>>> SCOTT SUE
>>> CHIEF TECHNOLOGY OFFICER
>>> 
>>> Support Line : +44(0) 2031 371 603
>>> Mobile : +852 9611 3969
>>> 
>>> 9/F, 33 Lockhart Road, Wan Chai, Hong Kong
>>> www.celer-tech.com 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
 On 22 Nov 2018, at 14:55, miki haiat >>> > wrote:
 
 Which data format   is sent to kafka ? 
 Json Avro Other ?
 
 
 
 On Thu, Nov 22, 2018 at 7:36 AM Scott Sue >>> > wrote:
 Unexpected data meaning business level data that I didn’t expect to 
 receive. So business level data that doesn’t quite conform
 
 On Thu, 22 Nov 2018 at 13:30, miki haiat >>> > wrote:
  Unexpected data you mean parsing error ?
 Which format is sent to Kafka ?
 
 
 
 On Thu, 22 Nov 2018, 6:59 Scott Sue >>>  wrote:
 Hi all,
 
 When I'm running my jobs I am consuming data from Kafka to process in my
 job.  Unfortunately my job receives unexpected data from time to time which
 I'm trying to find the root cause of the issue.
 
 Ideally, I want to be able to have a way to know when the job has failed 
 due
 to an exception, to then log to file the last message that it was consuming
 at the time to help track down the offending message consumed.  How is this
 possible within Flink?
 
 Thinking about this more, it may not be a consumed message that killed the
 job, but maybe a transformation within the job itself and it died in a
 downstream Operator.  In this case, is there a way to log to f