Job hangs

2016-04-25 Thread Timur Fayruzov
Hello,

Now I'm at the stage where my job seem to completely hang. Source code is
attached (it won't compile but I think gives a very good idea of what
happens). Unfortunately I can't provide the datasets. Most of them are
about 100-500MM records, I try to process on EMR cluster with 40 tasks 6GB
memory for each.

It was working for smaller input sizes. Any idea on what I can do
differently is appreciated.

Thans,
Timur


FaithResolution.scala
Description: Binary data


Re: Job hangs

2016-04-26 Thread Till Rohrmann
Could you share the logs with us, Timur? That would be very helpful.

Cheers,
Till
On Apr 26, 2016 3:24 AM, "Timur Fayruzov"  wrote:

> Hello,
>
> Now I'm at the stage where my job seem to completely hang. Source code is
> attached (it won't compile but I think gives a very good idea of what
> happens). Unfortunately I can't provide the datasets. Most of them are
> about 100-500MM records, I try to process on EMR cluster with 40 tasks 6GB
> memory for each.
>
> It was working for smaller input sizes. Any idea on what I can do
> differently is appreciated.
>
> Thans,
> Timur
>


Re: Job hangs

2016-04-26 Thread Ufuk Celebi
Hey Timur,

is it possible to connect to the VMs and get stack traces of the Flink
processes as well?

We can first have a look at the logs, but the stack traces will be
helpful if we can't figure out what the issue is.

– Ufuk

On Tue, Apr 26, 2016 at 9:42 AM, Till Rohrmann  wrote:
> Could you share the logs with us, Timur? That would be very helpful.
>
> Cheers,
> Till
>
> On Apr 26, 2016 3:24 AM, "Timur Fayruzov"  wrote:
>>
>> Hello,
>>
>> Now I'm at the stage where my job seem to completely hang. Source code is
>> attached (it won't compile but I think gives a very good idea of what
>> happens). Unfortunately I can't provide the datasets. Most of them are about
>> 100-500MM records, I try to process on EMR cluster with 40 tasks 6GB memory
>> for each.
>>
>> It was working for smaller input sizes. Any idea on what I can do
>> differently is appreciated.
>>
>> Thans,
>> Timur


Re: Job hangs

2016-04-26 Thread Timur Fayruzov
I will do it my tomorrow. Logs don't show anything unusual. Are there any
logs besides what's in flink/log and yarn container logs?
On Apr 26, 2016 1:03 AM, "Ufuk Celebi"  wrote:

Hey Timur,

is it possible to connect to the VMs and get stack traces of the Flink
processes as well?

We can first have a look at the logs, but the stack traces will be
helpful if we can't figure out what the issue is.

– Ufuk

On Tue, Apr 26, 2016 at 9:42 AM, Till Rohrmann  wrote:
> Could you share the logs with us, Timur? That would be very helpful.
>
> Cheers,
> Till
>
> On Apr 26, 2016 3:24 AM, "Timur Fayruzov" 
wrote:
>>
>> Hello,
>>
>> Now I'm at the stage where my job seem to completely hang. Source code is
>> attached (it won't compile but I think gives a very good idea of what
>> happens). Unfortunately I can't provide the datasets. Most of them are
about
>> 100-500MM records, I try to process on EMR cluster with 40 tasks 6GB
memory
>> for each.
>>
>> It was working for smaller input sizes. Any idea on what I can do
>> differently is appreciated.
>>
>> Thans,
>> Timur


Re: Job hangs

2016-04-26 Thread Ufuk Celebi
No.

If you run on YARN, the YARN logs are the relevant ones for the
JobManager and TaskManager. The client log submitting the job should
be found in /log.

– Ufuk

On Tue, Apr 26, 2016 at 10:06 AM, Timur Fayruzov
 wrote:
> I will do it my tomorrow. Logs don't show anything unusual. Are there any
> logs besides what's in flink/log and yarn container logs?
>
> On Apr 26, 2016 1:03 AM, "Ufuk Celebi"  wrote:
>
> Hey Timur,
>
> is it possible to connect to the VMs and get stack traces of the Flink
> processes as well?
>
> We can first have a look at the logs, but the stack traces will be
> helpful if we can't figure out what the issue is.
>
> – Ufuk
>
> On Tue, Apr 26, 2016 at 9:42 AM, Till Rohrmann  wrote:
>> Could you share the logs with us, Timur? That would be very helpful.
>>
>> Cheers,
>> Till
>>
>> On Apr 26, 2016 3:24 AM, "Timur Fayruzov" 
>> wrote:
>>>
>>> Hello,
>>>
>>> Now I'm at the stage where my job seem to completely hang. Source code is
>>> attached (it won't compile but I think gives a very good idea of what
>>> happens). Unfortunately I can't provide the datasets. Most of them are
>>> about
>>> 100-500MM records, I try to process on EMR cluster with 40 tasks 6GB
>>> memory
>>> for each.
>>>
>>> It was working for smaller input sizes. Any idea on what I can do
>>> differently is appreciated.
>>>
>>> Thans,
>>> Timur


Re: Job hangs

2016-04-26 Thread Robert Metzger
Hi Timur,

thank you for sharing the source code of your job. That is helpful!
Its a large pipeline with 7 joins and 2 co-groups. Maybe your job is much
more IO heavy with the larger input data because all the joins start
spilling?
Our monitoring, in particular for batch jobs is really not very advanced..
If we had some monitoring showing the spill status, we would maybe see that
the job is still running.

How long did you wait until you declared the job hanging?

Regards,
Robert


On Tue, Apr 26, 2016 at 10:11 AM, Ufuk Celebi  wrote:

> No.
>
> If you run on YARN, the YARN logs are the relevant ones for the
> JobManager and TaskManager. The client log submitting the job should
> be found in /log.
>
> – Ufuk
>
> On Tue, Apr 26, 2016 at 10:06 AM, Timur Fayruzov
>  wrote:
> > I will do it my tomorrow. Logs don't show anything unusual. Are there any
> > logs besides what's in flink/log and yarn container logs?
> >
> > On Apr 26, 2016 1:03 AM, "Ufuk Celebi"  wrote:
> >
> > Hey Timur,
> >
> > is it possible to connect to the VMs and get stack traces of the Flink
> > processes as well?
> >
> > We can first have a look at the logs, but the stack traces will be
> > helpful if we can't figure out what the issue is.
> >
> > – Ufuk
> >
> > On Tue, Apr 26, 2016 at 9:42 AM, Till Rohrmann 
> wrote:
> >> Could you share the logs with us, Timur? That would be very helpful.
> >>
> >> Cheers,
> >> Till
> >>
> >> On Apr 26, 2016 3:24 AM, "Timur Fayruzov" 
> >> wrote:
> >>>
> >>> Hello,
> >>>
> >>> Now I'm at the stage where my job seem to completely hang. Source code
> is
> >>> attached (it won't compile but I think gives a very good idea of what
> >>> happens). Unfortunately I can't provide the datasets. Most of them are
> >>> about
> >>> 100-500MM records, I try to process on EMR cluster with 40 tasks 6GB
> >>> memory
> >>> for each.
> >>>
> >>> It was working for smaller input sizes. Any idea on what I can do
> >>> differently is appreciated.
> >>>
> >>> Thans,
> >>> Timur
>


Re: Job hangs

2016-04-26 Thread Timur Fayruzov
Hello Robert,

I observed progress for 2 hours(meaning numbers change on dashboard), and
then I waited for 2 hours more. I'm sure it had to spill at some point, but
I figured 2h is enough time.

Thanks,
Timur
On Apr 26, 2016 1:35 AM, "Robert Metzger"  wrote:

> Hi Timur,
>
> thank you for sharing the source code of your job. That is helpful!
> Its a large pipeline with 7 joins and 2 co-groups. Maybe your job is much
> more IO heavy with the larger input data because all the joins start
> spilling?
> Our monitoring, in particular for batch jobs is really not very advanced..
> If we had some monitoring showing the spill status, we would maybe see that
> the job is still running.
>
> How long did you wait until you declared the job hanging?
>
> Regards,
> Robert
>
>
> On Tue, Apr 26, 2016 at 10:11 AM, Ufuk Celebi  wrote:
>
>> No.
>>
>> If you run on YARN, the YARN logs are the relevant ones for the
>> JobManager and TaskManager. The client log submitting the job should
>> be found in /log.
>>
>> – Ufuk
>>
>> On Tue, Apr 26, 2016 at 10:06 AM, Timur Fayruzov
>>  wrote:
>> > I will do it my tomorrow. Logs don't show anything unusual. Are there
>> any
>> > logs besides what's in flink/log and yarn container logs?
>> >
>> > On Apr 26, 2016 1:03 AM, "Ufuk Celebi"  wrote:
>> >
>> > Hey Timur,
>> >
>> > is it possible to connect to the VMs and get stack traces of the Flink
>> > processes as well?
>> >
>> > We can first have a look at the logs, but the stack traces will be
>> > helpful if we can't figure out what the issue is.
>> >
>> > – Ufuk
>> >
>> > On Tue, Apr 26, 2016 at 9:42 AM, Till Rohrmann 
>> wrote:
>> >> Could you share the logs with us, Timur? That would be very helpful.
>> >>
>> >> Cheers,
>> >> Till
>> >>
>> >> On Apr 26, 2016 3:24 AM, "Timur Fayruzov" 
>> >> wrote:
>> >>>
>> >>> Hello,
>> >>>
>> >>> Now I'm at the stage where my job seem to completely hang. Source
>> code is
>> >>> attached (it won't compile but I think gives a very good idea of what
>> >>> happens). Unfortunately I can't provide the datasets. Most of them are
>> >>> about
>> >>> 100-500MM records, I try to process on EMR cluster with 40 tasks 6GB
>> >>> memory
>> >>> for each.
>> >>>
>> >>> It was working for smaller input sizes. Any idea on what I can do
>> >>> differently is appreciated.
>> >>>
>> >>> Thans,
>> >>> Timur
>>
>
>


Re: Job hangs

2016-04-26 Thread Ufuk Celebi
Can you please further provide the execution plan via

env.getExecutionPlan()



On Tue, Apr 26, 2016 at 4:23 PM, Timur Fayruzov
 wrote:
> Hello Robert,
>
> I observed progress for 2 hours(meaning numbers change on dashboard), and
> then I waited for 2 hours more. I'm sure it had to spill at some point, but
> I figured 2h is enough time.
>
> Thanks,
> Timur
>
> On Apr 26, 2016 1:35 AM, "Robert Metzger"  wrote:
>>
>> Hi Timur,
>>
>> thank you for sharing the source code of your job. That is helpful!
>> Its a large pipeline with 7 joins and 2 co-groups. Maybe your job is much
>> more IO heavy with the larger input data because all the joins start
>> spilling?
>> Our monitoring, in particular for batch jobs is really not very advanced..
>> If we had some monitoring showing the spill status, we would maybe see that
>> the job is still running.
>>
>> How long did you wait until you declared the job hanging?
>>
>> Regards,
>> Robert
>>
>>
>> On Tue, Apr 26, 2016 at 10:11 AM, Ufuk Celebi  wrote:
>>>
>>> No.
>>>
>>> If you run on YARN, the YARN logs are the relevant ones for the
>>> JobManager and TaskManager. The client log submitting the job should
>>> be found in /log.
>>>
>>> – Ufuk
>>>
>>> On Tue, Apr 26, 2016 at 10:06 AM, Timur Fayruzov
>>>  wrote:
>>> > I will do it my tomorrow. Logs don't show anything unusual. Are there
>>> > any
>>> > logs besides what's in flink/log and yarn container logs?
>>> >
>>> > On Apr 26, 2016 1:03 AM, "Ufuk Celebi"  wrote:
>>> >
>>> > Hey Timur,
>>> >
>>> > is it possible to connect to the VMs and get stack traces of the Flink
>>> > processes as well?
>>> >
>>> > We can first have a look at the logs, but the stack traces will be
>>> > helpful if we can't figure out what the issue is.
>>> >
>>> > – Ufuk
>>> >
>>> > On Tue, Apr 26, 2016 at 9:42 AM, Till Rohrmann 
>>> > wrote:
>>> >> Could you share the logs with us, Timur? That would be very helpful.
>>> >>
>>> >> Cheers,
>>> >> Till
>>> >>
>>> >> On Apr 26, 2016 3:24 AM, "Timur Fayruzov" 
>>> >> wrote:
>>> >>>
>>> >>> Hello,
>>> >>>
>>> >>> Now I'm at the stage where my job seem to completely hang. Source
>>> >>> code is
>>> >>> attached (it won't compile but I think gives a very good idea of what
>>> >>> happens). Unfortunately I can't provide the datasets. Most of them
>>> >>> are
>>> >>> about
>>> >>> 100-500MM records, I try to process on EMR cluster with 40 tasks 6GB
>>> >>> memory
>>> >>> for each.
>>> >>>
>>> >>> It was working for smaller input sizes. Any idea on what I can do
>>> >>> differently is appreciated.
>>> >>>
>>> >>> Thans,
>>> >>> Timur
>>
>>
>


Re: Job hangs

2016-04-26 Thread Timur Fayruzov
Robert, Ufuk, logs, execution plan and a screenshot of the console are in
the archive:
https://www.dropbox.com/s/68gyl6f3rdzn7o1/debug-stuck.tar.gz?dl=0

Note that when I looked in the backpressure view I saw back pressure 'high'
on following paths:

Input->code_line:123,124->map->join
Input->code_line:134,135->map->join
Input->code_line:121->map->join

Unfortunately, I was not able to take thread dumps nor heap dumps (neither
kill -3, jstack nor jmap worked, some Amazon AMI problem I assume).

Hope that helps.

Please, let me know if I can assist you in any way. Otherwise, I probably
would not be actively looking at this problem.

Thanks,
Timur


On Tue, Apr 26, 2016 at 8:11 AM, Ufuk Celebi  wrote:

> Can you please further provide the execution plan via
>
> env.getExecutionPlan()
>
>
>
> On Tue, Apr 26, 2016 at 4:23 PM, Timur Fayruzov
>  wrote:
> > Hello Robert,
> >
> > I observed progress for 2 hours(meaning numbers change on dashboard), and
> > then I waited for 2 hours more. I'm sure it had to spill at some point,
> but
> > I figured 2h is enough time.
> >
> > Thanks,
> > Timur
> >
> > On Apr 26, 2016 1:35 AM, "Robert Metzger"  wrote:
> >>
> >> Hi Timur,
> >>
> >> thank you for sharing the source code of your job. That is helpful!
> >> Its a large pipeline with 7 joins and 2 co-groups. Maybe your job is
> much
> >> more IO heavy with the larger input data because all the joins start
> >> spilling?
> >> Our monitoring, in particular for batch jobs is really not very
> advanced..
> >> If we had some monitoring showing the spill status, we would maybe see
> that
> >> the job is still running.
> >>
> >> How long did you wait until you declared the job hanging?
> >>
> >> Regards,
> >> Robert
> >>
> >>
> >> On Tue, Apr 26, 2016 at 10:11 AM, Ufuk Celebi  wrote:
> >>>
> >>> No.
> >>>
> >>> If you run on YARN, the YARN logs are the relevant ones for the
> >>> JobManager and TaskManager. The client log submitting the job should
> >>> be found in /log.
> >>>
> >>> – Ufuk
> >>>
> >>> On Tue, Apr 26, 2016 at 10:06 AM, Timur Fayruzov
> >>>  wrote:
> >>> > I will do it my tomorrow. Logs don't show anything unusual. Are there
> >>> > any
> >>> > logs besides what's in flink/log and yarn container logs?
> >>> >
> >>> > On Apr 26, 2016 1:03 AM, "Ufuk Celebi"  wrote:
> >>> >
> >>> > Hey Timur,
> >>> >
> >>> > is it possible to connect to the VMs and get stack traces of the
> Flink
> >>> > processes as well?
> >>> >
> >>> > We can first have a look at the logs, but the stack traces will be
> >>> > helpful if we can't figure out what the issue is.
> >>> >
> >>> > – Ufuk
> >>> >
> >>> > On Tue, Apr 26, 2016 at 9:42 AM, Till Rohrmann  >
> >>> > wrote:
> >>> >> Could you share the logs with us, Timur? That would be very helpful.
> >>> >>
> >>> >> Cheers,
> >>> >> Till
> >>> >>
> >>> >> On Apr 26, 2016 3:24 AM, "Timur Fayruzov"  >
> >>> >> wrote:
> >>> >>>
> >>> >>> Hello,
> >>> >>>
> >>> >>> Now I'm at the stage where my job seem to completely hang. Source
> >>> >>> code is
> >>> >>> attached (it won't compile but I think gives a very good idea of
> what
> >>> >>> happens). Unfortunately I can't provide the datasets. Most of them
> >>> >>> are
> >>> >>> about
> >>> >>> 100-500MM records, I try to process on EMR cluster with 40 tasks
> 6GB
> >>> >>> memory
> >>> >>> for each.
> >>> >>>
> >>> >>> It was working for smaller input sizes. Any idea on what I can do
> >>> >>> differently is appreciated.
> >>> >>>
> >>> >>> Thans,
> >>> >>> Timur
> >>
> >>
> >
>


Re: Job hangs

2016-04-27 Thread Vasiliki Kalavri
Hi Timur,

I've previously seen large batch jobs hang because of join deadlocks. We
should have fixed those problems, but we might have missed some corner
case. Did you check whether there was any cpu activity when the job hangs?
Can you try running htop on the taskmanager machines and see if they're
idle?

Cheers,
-Vasia.

On 27 April 2016 at 02:48, Timur Fayruzov  wrote:

> Robert, Ufuk, logs, execution plan and a screenshot of the console are in
> the archive:
> https://www.dropbox.com/s/68gyl6f3rdzn7o1/debug-stuck.tar.gz?dl=0
>
> Note that when I looked in the backpressure view I saw back pressure
> 'high' on following paths:
>
> Input->code_line:123,124->map->join
> Input->code_line:134,135->map->join
> Input->code_line:121->map->join
>
> Unfortunately, I was not able to take thread dumps nor heap dumps (neither
> kill -3, jstack nor jmap worked, some Amazon AMI problem I assume).
>
> Hope that helps.
>
> Please, let me know if I can assist you in any way. Otherwise, I probably
> would not be actively looking at this problem.
>
> Thanks,
> Timur
>
>
> On Tue, Apr 26, 2016 at 8:11 AM, Ufuk Celebi  wrote:
>
>> Can you please further provide the execution plan via
>>
>> env.getExecutionPlan()
>>
>>
>>
>> On Tue, Apr 26, 2016 at 4:23 PM, Timur Fayruzov
>>  wrote:
>> > Hello Robert,
>> >
>> > I observed progress for 2 hours(meaning numbers change on dashboard),
>> and
>> > then I waited for 2 hours more. I'm sure it had to spill at some point,
>> but
>> > I figured 2h is enough time.
>> >
>> > Thanks,
>> > Timur
>> >
>> > On Apr 26, 2016 1:35 AM, "Robert Metzger"  wrote:
>> >>
>> >> Hi Timur,
>> >>
>> >> thank you for sharing the source code of your job. That is helpful!
>> >> Its a large pipeline with 7 joins and 2 co-groups. Maybe your job is
>> much
>> >> more IO heavy with the larger input data because all the joins start
>> >> spilling?
>> >> Our monitoring, in particular for batch jobs is really not very
>> advanced..
>> >> If we had some monitoring showing the spill status, we would maybe see
>> that
>> >> the job is still running.
>> >>
>> >> How long did you wait until you declared the job hanging?
>> >>
>> >> Regards,
>> >> Robert
>> >>
>> >>
>> >> On Tue, Apr 26, 2016 at 10:11 AM, Ufuk Celebi  wrote:
>> >>>
>> >>> No.
>> >>>
>> >>> If you run on YARN, the YARN logs are the relevant ones for the
>> >>> JobManager and TaskManager. The client log submitting the job should
>> >>> be found in /log.
>> >>>
>> >>> – Ufuk
>> >>>
>> >>> On Tue, Apr 26, 2016 at 10:06 AM, Timur Fayruzov
>> >>>  wrote:
>> >>> > I will do it my tomorrow. Logs don't show anything unusual. Are
>> there
>> >>> > any
>> >>> > logs besides what's in flink/log and yarn container logs?
>> >>> >
>> >>> > On Apr 26, 2016 1:03 AM, "Ufuk Celebi"  wrote:
>> >>> >
>> >>> > Hey Timur,
>> >>> >
>> >>> > is it possible to connect to the VMs and get stack traces of the
>> Flink
>> >>> > processes as well?
>> >>> >
>> >>> > We can first have a look at the logs, but the stack traces will be
>> >>> > helpful if we can't figure out what the issue is.
>> >>> >
>> >>> > – Ufuk
>> >>> >
>> >>> > On Tue, Apr 26, 2016 at 9:42 AM, Till Rohrmann <
>> trohrm...@apache.org>
>> >>> > wrote:
>> >>> >> Could you share the logs with us, Timur? That would be very
>> helpful.
>> >>> >>
>> >>> >> Cheers,
>> >>> >> Till
>> >>> >>
>> >>> >> On Apr 26, 2016 3:24 AM, "Timur Fayruzov" <
>> timur.fairu...@gmail.com>
>> >>> >> wrote:
>> >>> >>>
>> >>> >>> Hello,
>> >>> >>>
>> >>> >>> Now I'm at the stage where my job seem to completely hang. Source
>> >>> >>> code is
>> >>> >>> attached (it won't compile but I think gives a very good idea of
>> what
>> >>> >>> happens). Unfortunately I can't provide the datasets. Most of them
>> >>> >>> are
>> >>> >>> about
>> >>> >>> 100-500MM records, I try to process on EMR cluster with 40 tasks
>> 6GB
>> >>> >>> memory
>> >>> >>> for each.
>> >>> >>>
>> >>> >>> It was working for smaller input sizes. Any idea on what I can do
>> >>> >>> differently is appreciated.
>> >>> >>>
>> >>> >>> Thans,
>> >>> >>> Timur
>> >>
>> >>
>> >
>>
>
>


Re: Job hangs

2016-04-27 Thread Fabian Hueske
Hi Timur,

I had a look at the plan you shared.
I could not find any flow that branches and merges again, a pattern which
is prone to cause a deadlocks.

However, I noticed that the plan performs a lot of partitioning steps.
You might want to have a look at forwarded field annotations which can help
to reduce the partitioning and sorting steps [1].
This might help with complex jobs such as yours.

Best, Fabian

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/batch/index.html#semantic-annotations


2016-04-27 10:57 GMT+02:00 Vasiliki Kalavri :

> Hi Timur,
>
> I've previously seen large batch jobs hang because of join deadlocks. We
> should have fixed those problems, but we might have missed some corner
> case. Did you check whether there was any cpu activity when the job hangs?
> Can you try running htop on the taskmanager machines and see if they're
> idle?
>
> Cheers,
> -Vasia.
>
> On 27 April 2016 at 02:48, Timur Fayruzov 
> wrote:
>
>> Robert, Ufuk, logs, execution plan and a screenshot of the console are in
>> the archive:
>> https://www.dropbox.com/s/68gyl6f3rdzn7o1/debug-stuck.tar.gz?dl=0
>>
>> Note that when I looked in the backpressure view I saw back pressure
>> 'high' on following paths:
>>
>> Input->code_line:123,124->map->join
>> Input->code_line:134,135->map->join
>> Input->code_line:121->map->join
>>
>> Unfortunately, I was not able to take thread dumps nor heap dumps
>> (neither kill -3, jstack nor jmap worked, some Amazon AMI problem I assume).
>>
>> Hope that helps.
>>
>> Please, let me know if I can assist you in any way. Otherwise, I probably
>> would not be actively looking at this problem.
>>
>> Thanks,
>> Timur
>>
>>
>> On Tue, Apr 26, 2016 at 8:11 AM, Ufuk Celebi  wrote:
>>
>>> Can you please further provide the execution plan via
>>>
>>> env.getExecutionPlan()
>>>
>>>
>>>
>>> On Tue, Apr 26, 2016 at 4:23 PM, Timur Fayruzov
>>>  wrote:
>>> > Hello Robert,
>>> >
>>> > I observed progress for 2 hours(meaning numbers change on dashboard),
>>> and
>>> > then I waited for 2 hours more. I'm sure it had to spill at some
>>> point, but
>>> > I figured 2h is enough time.
>>> >
>>> > Thanks,
>>> > Timur
>>> >
>>> > On Apr 26, 2016 1:35 AM, "Robert Metzger"  wrote:
>>> >>
>>> >> Hi Timur,
>>> >>
>>> >> thank you for sharing the source code of your job. That is helpful!
>>> >> Its a large pipeline with 7 joins and 2 co-groups. Maybe your job is
>>> much
>>> >> more IO heavy with the larger input data because all the joins start
>>> >> spilling?
>>> >> Our monitoring, in particular for batch jobs is really not very
>>> advanced..
>>> >> If we had some monitoring showing the spill status, we would maybe
>>> see that
>>> >> the job is still running.
>>> >>
>>> >> How long did you wait until you declared the job hanging?
>>> >>
>>> >> Regards,
>>> >> Robert
>>> >>
>>> >>
>>> >> On Tue, Apr 26, 2016 at 10:11 AM, Ufuk Celebi  wrote:
>>> >>>
>>> >>> No.
>>> >>>
>>> >>> If you run on YARN, the YARN logs are the relevant ones for the
>>> >>> JobManager and TaskManager. The client log submitting the job should
>>> >>> be found in /log.
>>> >>>
>>> >>> – Ufuk
>>> >>>
>>> >>> On Tue, Apr 26, 2016 at 10:06 AM, Timur Fayruzov
>>> >>>  wrote:
>>> >>> > I will do it my tomorrow. Logs don't show anything unusual. Are
>>> there
>>> >>> > any
>>> >>> > logs besides what's in flink/log and yarn container logs?
>>> >>> >
>>> >>> > On Apr 26, 2016 1:03 AM, "Ufuk Celebi"  wrote:
>>> >>> >
>>> >>> > Hey Timur,
>>> >>> >
>>> >>> > is it possible to connect to the VMs and get stack traces of the
>>> Flink
>>> >>> > processes as well?
>>> >>> >
>>> >>> > We can first have a look at the logs, but the stack traces will be
>>> >>> > helpful if we can't figure out what the issue is.
>>> >>> >
>>> >>> > – Ufuk
>>> >>> >
>>> >>> > On Tue, Apr 26, 2016 at 9:42 AM, Till Rohrmann <
>>> trohrm...@apache.org>
>>> >>> > wrote:
>>> >>> >> Could you share the logs with us, Timur? That would be very
>>> helpful.
>>> >>> >>
>>> >>> >> Cheers,
>>> >>> >> Till
>>> >>> >>
>>> >>> >> On Apr 26, 2016 3:24 AM, "Timur Fayruzov" <
>>> timur.fairu...@gmail.com>
>>> >>> >> wrote:
>>> >>> >>>
>>> >>> >>> Hello,
>>> >>> >>>
>>> >>> >>> Now I'm at the stage where my job seem to completely hang. Source
>>> >>> >>> code is
>>> >>> >>> attached (it won't compile but I think gives a very good idea of
>>> what
>>> >>> >>> happens). Unfortunately I can't provide the datasets. Most of
>>> them
>>> >>> >>> are
>>> >>> >>> about
>>> >>> >>> 100-500MM records, I try to process on EMR cluster with 40 tasks
>>> 6GB
>>> >>> >>> memory
>>> >>> >>> for each.
>>> >>> >>>
>>> >>> >>> It was working for smaller input sizes. Any idea on what I can do
>>> >>> >>> differently is appreciated.
>>> >>> >>>
>>> >>> >>> Thans,
>>> >>> >>> Timur
>>> >>
>>> >>
>>> >
>>>
>>
>>
>


Simple batch job hangs if run twice

2016-09-09 Thread Yassine MARZOUGUI
Hi all,

When I run the following batch job inside the IDE for the first time, it
outputs results and switches to FINISHED, but when I run it again it is
stuck in the state RUNNING. The csv file size is 160 MB. What could be the
reason for this behaviour?

public class BatchJob {

public static void main(String[] args) throws Exception {
final ExecutionEnvironment env =
ExecutionEnvironment.getExecutionEnvironment();

env.readCsvFile("dump.csv")
.ignoreFirstLine()
.fieldDelimiter(";")
.includeFields("111000")
.types(String.class, String.class, String.class)
.first(100)
.print();

}
}

Best,
Yassine


Flink job hangs using rocksDb as backend

2018-07-11 Thread shishal
Hi,

I am using flink 1.4.2 with rocksdb as backend. I am using process function
with timer on EventTime.  For checkpointing I am using hdfs.

I am trying load testing so Iam reading kafka from beginning (aprox 7 days
data with 50M events).

My job gets stuck after aprox 20 min with no error. There after watermark do
not progress and all checkpoint fails.

Also When I try to cancel my job (using web UI) , it takes several minutes
to finally gets cancelled. Also it makes Task manager down as well. 

There is no logs while my job hanged but while cancelling I get following
error.

/

2018-07-11 09:10:39,385 ERROR
org.apache.flink.runtime.taskmanager.TaskManager  - 
==
==  FATAL  ===
==

A fatal error occurred, forcing the TaskManager to shut down: Task 'process
(3/6)' did not react to cancelling signal in the last 30 seconds, but is
stuck in method:
 org.rocksdb.RocksDB.get(Native Method)
org.rocksdb.RocksDB.get(RocksDB.java:810)
org.apache.flink.contrib.streaming.state.RocksDBMapState.get(RocksDBMapState.java:102)
org.apache.flink.runtime.state.UserFacingMapState.get(UserFacingMapState.java:47)
nl.ing.gmtt.observer.analyser.job.rtpe.process.RtpeProcessFunction.onTimer(RtpeProcessFunction.java:94)
org.apache.flink.streaming.api.operators.KeyedProcessOperator.onEventTime(KeyedProcessOperator.java:75)
org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:288)
org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:108)
org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:885)
org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:288)
org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189)
org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111)
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:189)
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264)
org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
java.lang.Thread.run(Thread.java:748)

2018-07-11 09:10:39,390 DEBUG
org.apache.flink.runtime.akka.StoppingSupervisorWithoutLoggingActorKilledExceptionStrategy
 
- Actor was killed. Stopping it now.
akka.actor.ActorKilledException: Kill
2018-07-11 09:10:39,407 INFO 
org.apache.flink.runtime.taskmanager.TaskManager  - Stopping
TaskManager akka://flink/user/taskmanager#-1231617791.
2018-07-11 09:10:39,408 INFO 
org.apache.flink.runtime.taskmanager.TaskManager  - Cancelling
all computations and discarding all cached data.
2018-07-11 09:10:39,409 INFO  org.apache.flink.runtime.taskmanager.Task 
   
- Attempting to fail task externally process (3/6)
(432fd129f3eea363334521f8c8de5198).
2018-07-11 09:10:39,409 INFO  org.apache.flink.runtime.taskmanager.Task 
   
- Task process (3/6) is already in state CANCELING
2018-07-11 09:10:39,409 INFO  org.apache.flink.runtime.taskmanager.Task 
   
- Attempting to fail task externally process (4/6)
(7c6b96c9f32b067bdf8fa7c283eca2e0).
2018-07-11 09:10:39,409 INFO  org.apache.flink.runtime.taskmanager.Task 
   
- Task process (4/6) is already in state CANCELING
2018-07-11 09:10:39,409 INFO  org.apache.flink.runtime.taskmanager.Task 
   
- Attempting to fail task externally process (2/6)
(a4f731797a7ea210fd0b512b0263bcd9).
2018-07-11 09:10:39,409 INFO  org.apache.flink.runtime.taskmanager.Task 
   
- Task process (2/6) is already in state CANCELING
2018-07-11 09:10:39,409 INFO  org.apache.flink.runtime.taskmanager.Task 
   
- Attempting to fail task externally process (1/6)
(cd8a113779a4c00a051d78ad63bc7963).
2018-07-11 09:10:39,409 INFO  org.apache.flink.runtime.taskmanager.Task 
   
- Task process (1/6) is already in state CANCELING
2018-07-11 09:10:39,409 INFO 
org.apache.flink.runtime.taskmanager.TaskManager  -
Disassociating from JobManager
2018-07-11 09:10:39,412 INFO 
org.apache.flink.runtime.blob.PermanentBlobCache  - Shutting
down BLOB cache
2018-07-11 09:10:39,431 INFO 
org.apache.flink.runtime.blob.TransientBlobCache  - Shutting
down BLOB cache
2018-07-11 09:10:39,444 INFO 
org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  -
Stopping ZooKeeperLeaderRetrievalService.
2018-07-11 09:10:39,444 DEBUG
org.apache.flink.runtime

Re: Simple batch job hangs if run twice

2016-09-17 Thread Aljoscha Krettek
Hi,
when is the "first time". It seems you have tried this repeatedly so what
differentiates a "first time" from the other times? Are you closing your
IDE in-between or do you mean running the job a second time within the same
program?

Cheers,
Aljoscha

On Fri, 9 Sep 2016 at 16:40 Yassine MARZOUGUI 
wrote:

> Hi all,
>
> When I run the following batch job inside the IDE for the first time, it
> outputs results and switches to FINISHED, but when I run it again it is
> stuck in the state RUNNING. The csv file size is 160 MB. What could be the
> reason for this behaviour?
>
> public class BatchJob {
>
> public static void main(String[] args) throws Exception {
> final ExecutionEnvironment env =
> ExecutionEnvironment.getExecutionEnvironment();
>
> env.readCsvFile("dump.csv")
> .ignoreFirstLine()
> .fieldDelimiter(";")
> .includeFields("111000")
> .types(String.class, String.class, String.class)
> .first(100)
> .print();
>
> }
> }
>
> Best,
> Yassine
>


Re: Simple batch job hangs if run twice

2016-09-17 Thread Yassine MARZOUGUI
Hi Aljoscha,

Thanks for your response. By the first time I mean I hit run from the IDE
(I am using Netbeans on Windows) the first time after building the program.
If then I stop it and run it again (without rebuidling) It is stuck in the
state RUNNING. Sometimes I have to rebuild it, or close the IDE to be able
to get an output. The behaviour is random, maybe it's related to the IDE or
the OS and not necessarily Flink itself.

On Sep 17, 2016 15:16, "Aljoscha Krettek"  wrote:

> Hi,
> when is the "first time". It seems you have tried this repeatedly so what
> differentiates a "first time" from the other times? Are you closing your
> IDE in-between or do you mean running the job a second time within the same
> program?
>
> Cheers,
> Aljoscha
>
> On Fri, 9 Sep 2016 at 16:40 Yassine MARZOUGUI 
> wrote:
>
>> Hi all,
>>
>> When I run the following batch job inside the IDE for the first time, it
>> outputs results and switches to FINISHED, but when I run it again it is
>> stuck in the state RUNNING. The csv file size is 160 MB. What could be the
>> reason for this behaviour?
>>
>> public class BatchJob {
>>
>> public static void main(String[] args) throws Exception {
>> final ExecutionEnvironment env = ExecutionEnvironment.
>> getExecutionEnvironment();
>>
>> env.readCsvFile("dump.csv")
>> .ignoreFirstLine()
>> .fieldDelimiter(";")
>> .includeFields("111000")
>> .types(String.class, String.class, String.class)
>> .first(100)
>> .print();
>>
>> }
>> }
>>
>> Best,
>> Yassine
>>
>


Re: Simple batch job hangs if run twice

2016-09-18 Thread Aljoscha Krettek
Hmm, this sound like it could be IDE/Windows specific, unfortunately I
don't have access to a windows machine. I'll loop in Chesnay how is using
windows.

Chesnay, do you maybe have an idea what could be the problem? Have you ever
encountered this?

On Sat, 17 Sep 2016 at 15:30 Yassine MARZOUGUI 
wrote:

> Hi Aljoscha,
>
> Thanks for your response. By the first time I mean I hit run from the IDE
> (I am using Netbeans on Windows) the first time after building the program.
> If then I stop it and run it again (without rebuidling) It is stuck in the
> state RUNNING. Sometimes I have to rebuild it, or close the IDE to be able
> to get an output. The behaviour is random, maybe it's related to the IDE or
> the OS and not necessarily Flink itself.
>
> On Sep 17, 2016 15:16, "Aljoscha Krettek"  wrote:
>
>> Hi,
>> when is the "first time". It seems you have tried this repeatedly so what
>> differentiates a "first time" from the other times? Are you closing your
>> IDE in-between or do you mean running the job a second time within the same
>> program?
>>
>> Cheers,
>> Aljoscha
>>
>> On Fri, 9 Sep 2016 at 16:40 Yassine MARZOUGUI 
>> wrote:
>>
>>> Hi all,
>>>
>>> When I run the following batch job inside the IDE for the first time, it
>>> outputs results and switches to FINISHED, but when I run it again it is
>>> stuck in the state RUNNING. The csv file size is 160 MB. What could be the
>>> reason for this behaviour?
>>>
>>> public class BatchJob {
>>>
>>> public static void main(String[] args) throws Exception {
>>> final ExecutionEnvironment env =
>>> ExecutionEnvironment.getExecutionEnvironment();
>>>
>>> env.readCsvFile("dump.csv")
>>> .ignoreFirstLine()
>>> .fieldDelimiter(";")
>>> .includeFields("111000")
>>> .types(String.class, String.class, String.class)
>>> .first(100)
>>> .print();
>>>
>>> }
>>> }
>>>
>>> Best,
>>> Yassine
>>>
>>


Re: Simple batch job hangs if run twice

2016-09-19 Thread Chesnay Schepler

No, I can't recall that i had this happen to me.

I would enable logging and try again, as well as checking whether the 
second job is actually running through the WebInterface.


If you tell me your NetBeans version i can try to reproduce it.

Also, which version of Flink are you using?

On 19.09.2016 07:45, Aljoscha Krettek wrote:
Hmm, this sound like it could be IDE/Windows specific, unfortunately I 
don't have access to a windows machine. I'll loop in Chesnay how is 
using windows.


Chesnay, do you maybe have an idea what could be the problem? Have you 
ever encountered this?


On Sat, 17 Sep 2016 at 15:30 Yassine MARZOUGUI 
mailto:y.marzou...@mindlytix.com>> wrote:


Hi Aljoscha,

Thanks for your response. By the first time I mean I hit run from
the IDE (I am using Netbeans on Windows) the first time after
building the program. If then I stop it and run it again (without
rebuidling) It is stuck in the state RUNNING. Sometimes I have to
rebuild it, or close the IDE to be able to get an output. The
behaviour is random, maybe it's related to the IDE or the OS and
not necessarily Flink itself.


On Sep 17, 2016 15:16, "Aljoscha Krettek" mailto:aljos...@apache.org>> wrote:

Hi,
when is the "first time". It seems you have tried this
repeatedly so what differentiates a "first time" from the
other times? Are you closing your IDE in-between or do you
mean running the job a second time within the same program?

Cheers,
Aljoscha

On Fri, 9 Sep 2016 at 16:40 Yassine MARZOUGUI
mailto:y.marzou...@mindlytix.com>>
wrote:

Hi all,

When I run the following batch job inside the IDE for the
first time, it outputs results and switches to FINISHED,
but when I run it again it is stuck in the state RUNNING.
The csv file size is 160 MB. What could be the reason for
this behaviour?

public class BatchJob {

public static void main(String[] args) throws Exception {
final ExecutionEnvironment env =
ExecutionEnvironment.getExecutionEnvironment();

env.readCsvFile("dump.csv")
.ignoreFirstLine()
.fieldDelimiter(";")
.includeFields("111000")
.types(String.class, String.class, String.class)
.first(100)
.print();

}
}

Best,
Yassine





Re: Simple batch job hangs if run twice

2016-09-19 Thread Yassine MARZOUGUI
Hi Chensey,

I am running Flink 1.1.2, and using NetBeans 8.1.
I made a screencast reproducing the problem here:
http://recordit.co/P53OnFokN4 .

Best,
Yassine


2016-09-19 10:04 GMT+02:00 Chesnay Schepler :

> No, I can't recall that i had this happen to me.
>
> I would enable logging and try again, as well as checking whether the
> second job is actually running through the WebInterface.
>
> If you tell me your NetBeans version i can try to reproduce it.
>
> Also, which version of Flink are you using?
>
>
> On 19.09.2016 07:45, Aljoscha Krettek wrote:
>
> Hmm, this sound like it could be IDE/Windows specific, unfortunately I
> don't have access to a windows machine. I'll loop in Chesnay how is using
> windows.
>
> Chesnay, do you maybe have an idea what could be the problem? Have you
> ever encountered this?
>
> On Sat, 17 Sep 2016 at 15:30 Yassine MARZOUGUI 
> wrote:
>
>> Hi Aljoscha,
>>
>> Thanks for your response. By the first time I mean I hit run from the IDE
>> (I am using Netbeans on Windows) the first time after building the program.
>> If then I stop it and run it again (without rebuidling) It is stuck in the
>> state RUNNING. Sometimes I have to rebuild it, or close the IDE to be able
>> to get an output. The behaviour is random, maybe it's related to the IDE or
>> the OS and not necessarily Flink itself.
>>
>> On Sep 17, 2016 15:16, "Aljoscha Krettek"  wrote:
>>
>>> Hi,
>>> when is the "first time". It seems you have tried this repeatedly so
>>> what differentiates a "first time" from the other times? Are you closing
>>> your IDE in-between or do you mean running the job a second time within the
>>> same program?
>>>
>>> Cheers,
>>> Aljoscha
>>>
>>> On Fri, 9 Sep 2016 at 16:40 Yassine MARZOUGUI 
>>> wrote:
>>>
 Hi all,

 When I run the following batch job inside the IDE for the first time,
 it outputs results and switches to FINISHED, but when I run it again it is
 stuck in the state RUNNING. The csv file size is 160 MB. What could be the
 reason for this behaviour?

 public class BatchJob {

 public static void main(String[] args) throws Exception {
 final ExecutionEnvironment env = ExecutionEnvironment.
 getExecutionEnvironment();

 env.readCsvFile("dump.csv")
 .ignoreFirstLine()
 .fieldDelimiter(";")
 .includeFields("111000")
 .types(String.class, String.class, String.class)
 .first(100)
 .print();

 }
 }

 Best,
 Yassine

>>>
>


Re: Simple batch job hangs if run twice

2016-09-22 Thread Robert Metzger
Can you try running with DEBUG logging level?
Then you should see if input splits are assigned.
Also, you could try to use a debugger to see what's going on.

On Mon, Sep 19, 2016 at 2:04 PM, Yassine MARZOUGUI <
y.marzou...@mindlytix.com> wrote:

> Hi Chensey,
>
> I am running Flink 1.1.2, and using NetBeans 8.1.
> I made a screencast reproducing the problem here: http://recordit.co/
> P53OnFokN4 .
>
> Best,
> Yassine
>
>
> 2016-09-19 10:04 GMT+02:00 Chesnay Schepler :
>
>> No, I can't recall that i had this happen to me.
>>
>> I would enable logging and try again, as well as checking whether the
>> second job is actually running through the WebInterface.
>>
>> If you tell me your NetBeans version i can try to reproduce it.
>>
>> Also, which version of Flink are you using?
>>
>>
>> On 19.09.2016 07:45, Aljoscha Krettek wrote:
>>
>> Hmm, this sound like it could be IDE/Windows specific, unfortunately I
>> don't have access to a windows machine. I'll loop in Chesnay how is using
>> windows.
>>
>> Chesnay, do you maybe have an idea what could be the problem? Have you
>> ever encountered this?
>>
>> On Sat, 17 Sep 2016 at 15:30 Yassine MARZOUGUI 
>> wrote:
>>
>>> Hi Aljoscha,
>>>
>>> Thanks for your response. By the first time I mean I hit run from the
>>> IDE (I am using Netbeans on Windows) the first time after building the
>>> program. If then I stop it and run it again (without rebuidling) It is
>>> stuck in the state RUNNING. Sometimes I have to rebuild it, or close the
>>> IDE to be able to get an output. The behaviour is random, maybe it's
>>> related to the IDE or the OS and not necessarily Flink itself.
>>>
>>> On Sep 17, 2016 15:16, "Aljoscha Krettek"  wrote:
>>>
 Hi,
 when is the "first time". It seems you have tried this repeatedly so
 what differentiates a "first time" from the other times? Are you closing
 your IDE in-between or do you mean running the job a second time within the
 same program?

 Cheers,
 Aljoscha

 On Fri, 9 Sep 2016 at 16:40 Yassine MARZOUGUI <
 y.marzou...@mindlytix.com> wrote:

> Hi all,
>
> When I run the following batch job inside the IDE for the first time,
> it outputs results and switches to FINISHED, but when I run it again it is
> stuck in the state RUNNING. The csv file size is 160 MB. What could be the
> reason for this behaviour?
>
> public class BatchJob {
>
> public static void main(String[] args) throws Exception {
> final ExecutionEnvironment env = ExecutionEnvironment.getExecut
> ionEnvironment();
>
> env.readCsvFile("dump.csv")
> .ignoreFirstLine()
> .fieldDelimiter(";")
> .includeFields("111000")
> .types(String.class, String.class, String.class)
> .first(100)
> .print();
>
> }
> }
>
> Best,
> Yassine
>

>>
>


Re: Simple batch job hangs if run twice

2016-09-23 Thread Fabian Hueske
Hi Yassine, can you share a stacktrace of the job when it got stuck?

Thanks, Fabian

2016-09-22 14:03 GMT+02:00 Yassine MARZOUGUI :

> The input splits are correctly assgined. I noticed that whenever the job
> is stuck, that is because the task *Combine (GroupReduce at
> first(DataSet.java:573)) *keeps RUNNING and never switches to FINISHED.
> I tried to debug the program at the *first(100), *but I couldn't do much.
> I attahced the full DEBUG output.
>
> 2016-09-22 12:10 GMT+02:00 Robert Metzger :
>
>> Can you try running with DEBUG logging level?
>> Then you should see if input splits are assigned.
>> Also, you could try to use a debugger to see what's going on.
>>
>> On Mon, Sep 19, 2016 at 2:04 PM, Yassine MARZOUGUI <
>> y.marzou...@mindlytix.com> wrote:
>>
>>> Hi Chensey,
>>>
>>> I am running Flink 1.1.2, and using NetBeans 8.1.
>>> I made a screencast reproducing the problem here:
>>> http://recordit.co/P53OnFokN4 .
>>>
>>> Best,
>>> Yassine
>>>
>>>
>>> 2016-09-19 10:04 GMT+02:00 Chesnay Schepler :
>>>
 No, I can't recall that i had this happen to me.

 I would enable logging and try again, as well as checking whether the
 second job is actually running through the WebInterface.

 If you tell me your NetBeans version i can try to reproduce it.

 Also, which version of Flink are you using?


 On 19.09.2016 07:45, Aljoscha Krettek wrote:

 Hmm, this sound like it could be IDE/Windows specific, unfortunately I
 don't have access to a windows machine. I'll loop in Chesnay how is using
 windows.

 Chesnay, do you maybe have an idea what could be the problem? Have you
 ever encountered this?

 On Sat, 17 Sep 2016 at 15:30 Yassine MARZOUGUI <
 y.marzou...@mindlytix.com> wrote:

> Hi Aljoscha,
>
> Thanks for your response. By the first time I mean I hit run from the
> IDE (I am using Netbeans on Windows) the first time after building the
> program. If then I stop it and run it again (without rebuidling) It is
> stuck in the state RUNNING. Sometimes I have to rebuild it, or close the
> IDE to be able to get an output. The behaviour is random, maybe it's
> related to the IDE or the OS and not necessarily Flink itself.
>
> On Sep 17, 2016 15:16, "Aljoscha Krettek"  wrote:
>
>> Hi,
>> when is the "first time". It seems you have tried this repeatedly so
>> what differentiates a "first time" from the other times? Are you closing
>> your IDE in-between or do you mean running the job a second time within 
>> the
>> same program?
>>
>> Cheers,
>> Aljoscha
>>
>> On Fri, 9 Sep 2016 at 16:40 Yassine MARZOUGUI <
>> y.marzou...@mindlytix.com> wrote:
>>
>>> Hi all,
>>>
>>> When I run the following batch job inside the IDE for the first
>>> time, it outputs results and switches to FINISHED, but when I run it 
>>> again
>>> it is stuck in the state RUNNING. The csv file size is 160 MB. What 
>>> could
>>> be the reason for this behaviour?
>>>
>>> public class BatchJob {
>>>
>>> public static void main(String[] args) throws Exception {
>>> final ExecutionEnvironment env =
>>> ExecutionEnvironment.getExecutionEnvironment();
>>>
>>> env.readCsvFile("dump.csv")
>>> .ignoreFirstLine()
>>> .fieldDelimiter(";")
>>> .includeFields("111000")
>>> .types(String.class, String.class, String.class)
>>> .first(100)
>>> .print();
>>>
>>> }
>>> }
>>>
>>> Best,
>>> Yassine
>>>
>>

>>>
>>
>


Re: Simple batch job hangs if run twice

2016-09-23 Thread Yassine MARZOUGUI
Hi Fabian,

Not sure if this answers your question, here is the stack I got when
debugging the combine and datasource operators when the job got stuck:

"DataSource (at main(BatchTest.java:28)
(org.apache.flink.api.java.io.TupleCsvInputFormat)) (1/8)"
at java.lang.Object.wait(Object.java)
at
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBuffer(LocalBufferPool.java:163)
at
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBlocking(LocalBufferPool.java:133)
at
org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:93)
at
org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
at
org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
at
org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:163)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
at java.lang.Thread.run(Thread.java:745)

"Combine (GroupReduce at first(DataSet.java:573)) (1/8)"
at java.lang.Object.wait(Object.java)
at
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBuffer(LocalBufferPool.java:163)
at
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBlocking(LocalBufferPool.java:133)
at
org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:93)
at
org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
at
org.apache.flink.api.java.functions.FirstReducer.reduce(FirstReducer.java:41)
at
org.apache.flink.api.java.functions.FirstReducer.combine(FirstReducer.java:52)
at
org.apache.flink.runtime.operators.AllGroupReduceDriver.run(AllGroupReduceDriver.java:152)
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:486)
at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:351)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
at java.lang.Thread.run(Thread.java:745)

Best,
Yassine


2016-09-23 11:28 GMT+02:00 Yassine MARZOUGUI :

> Hi Fabian,
>
> Is it different from the output I already sent? (see attached file). If
> yes, how can I obtain the stacktrace of the job programmatically? Thanks.
>
> Best,
> Yassine
>
> 2016-09-23 10:55 GMT+02:00 Fabian Hueske :
>
>> Hi Yassine, can you share a stacktrace of the job when it got stuck?
>>
>> Thanks, Fabian
>>
>> 2016-09-22 14:03 GMT+02:00 Yassine MARZOUGUI :
>>
>>> The input splits are correctly assgined. I noticed that whenever the job
>>> is stuck, that is because the task *Combine (GroupReduce at
>>> first(DataSet.java:573)) *keeps RUNNING and never switches to FINISHED.
>>> I tried to debug the program at the *first(100), *but I couldn't do
>>> much. I attahced the full DEBUG output.
>>>
>>> 2016-09-22 12:10 GMT+02:00 Robert Metzger :
>>>
 Can you try running with DEBUG logging level?
 Then you should see if input splits are assigned.
 Also, you could try to use a debugger to see what's going on.

 On Mon, Sep 19, 2016 at 2:04 PM, Yassine MARZOUGUI <
 y.marzou...@mindlytix.com> wrote:

> Hi Chensey,
>
> I am running Flink 1.1.2, and using NetBeans 8.1.
> I made a screencast reproducing the problem here:
> http://recordit.co/P53OnFokN4 .
>
> Best,
> Yassine
>
>
> 2016-09-19 10:04 GMT+02:00 Chesnay Schepler :
>
>> No, I can't recall that i had this happen to me.
>>
>> I would enable logging and try again, as well as checking whether the
>> second job is actually running through the WebInterface.
>>
>> If you tell me your NetBeans version i can try to reproduce it.
>>
>> Also, which version of Flink are you using?
>>
>>
>> On 19.09.2016 07:45, Aljoscha Krettek wrote:
>>
>> Hmm, this sound like it could be IDE/Windows specific, unfortunately
>> I don't have access to a windows machine. I'll loop in Chesnay how is 
>> using
>> windows.
>>
>> Chesnay, do you maybe have an idea what could be the problem? Have
>> you ever encountered this?
>>
>> On Sat, 17 Sep 2016 at 15:30 Yassine MARZOUGUI <
>> y.marzou...@mindlytix.com> wrote:
>>
>>> Hi Aljoscha,
>>>
>>> Thanks for your response. By the first time I mean I hit run from
>>> the IDE (I am using Netbeans on Windows) the first time after building 
>>> the
>>> program. If then I stop it and run it again (without rebuidling) It is
>>> stuck in the state RUNNING. Sometimes I have to rebuild it, or close the
>>> IDE to be able to get an output. The behaviour is random, maybe it's
>>> related to the IDE or the OS and not necessarily Flink itself.
>>>
>>> On Sep 17, 2016 15:16, "Aljoscha Krettek" 
>>> wrote:
>>>
 Hi,
 when is the "first time". It seems you have tried this repeatedly
 so what differentiates a "first time" from the other times? Are you 
 closing

Re: Simple batch job hangs if run twice

2016-09-23 Thread Fabian Hueske
Yes, log files and stacktraces are different things.
A stacktrace shows the call hierarchy of all threads in a JVM at the time
when it is taken. So you can see the method that is currently executed (and
from where it was called) when the stacktrace is taken. In case of a
deadlock, you see where the program is waiting.

The stack you sent is only a part of the complete stacktrace. Most IDEs
have a feature to take a stacktrace while they are executing a program.

2016-09-23 11:43 GMT+02:00 Yassine MARZOUGUI :

> Hi Fabian,
>
> Not sure if this answers your question, here is the stack I got when
> debugging the combine and datasource operators when the job got stuck:
>
> "DataSource (at main(BatchTest.java:28) 
> (org.apache.flink.api.java.io.TupleCsvInputFormat))
> (1/8)"
> at java.lang.Object.wait(Object.java)
> at org.apache.flink.runtime.io.network.buffer.
> LocalBufferPool.requestBuffer(LocalBufferPool.java:163)
> at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.
> requestBufferBlocking(LocalBufferPool.java:133)
> at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(
> RecordWriter.java:93)
> at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(
> OutputCollector.java:65)
> at org.apache.flink.runtime.operators.util.metrics.
> CountingCollector.collect(CountingCollector.java:35)
> at org.apache.flink.runtime.operators.DataSourceTask.
> invoke(DataSourceTask.java:163)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
> at java.lang.Thread.run(Thread.java:745)
>
> "Combine (GroupReduce at first(DataSet.java:573)) (1/8)"
> at java.lang.Object.wait(Object.java)
> at org.apache.flink.runtime.io.network.buffer.
> LocalBufferPool.requestBuffer(LocalBufferPool.java:163)
> at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.
> requestBufferBlocking(LocalBufferPool.java:133)
> at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(
> RecordWriter.java:93)
> at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(
> OutputCollector.java:65)
> at org.apache.flink.api.java.functions.FirstReducer.reduce(
> FirstReducer.java:41)
> at org.apache.flink.api.java.functions.FirstReducer.
> combine(FirstReducer.java:52)
> at org.apache.flink.runtime.operators.AllGroupReduceDriver.run(
> AllGroupReduceDriver.java:152)
> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:486)
> at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:351)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
> at java.lang.Thread.run(Thread.java:745)
>
> Best,
> Yassine
>
>
> 2016-09-23 11:28 GMT+02:00 Yassine MARZOUGUI :
>
>> Hi Fabian,
>>
>> Is it different from the output I already sent? (see attached file). If
>> yes, how can I obtain the stacktrace of the job programmatically? Thanks.
>>
>> Best,
>> Yassine
>>
>> 2016-09-23 10:55 GMT+02:00 Fabian Hueske :
>>
>>> Hi Yassine, can you share a stacktrace of the job when it got stuck?
>>>
>>> Thanks, Fabian
>>>
>>> 2016-09-22 14:03 GMT+02:00 Yassine MARZOUGUI 
>>> :
>>>
 The input splits are correctly assgined. I noticed that whenever the
 job is stuck, that is because the task *Combine (GroupReduce at
 first(DataSet.java:573)) *keeps RUNNING and never switches to FINISHED.
 I tried to debug the program at the *first(100), *but I couldn't do
 much. I attahced the full DEBUG output.

 2016-09-22 12:10 GMT+02:00 Robert Metzger :

> Can you try running with DEBUG logging level?
> Then you should see if input splits are assigned.
> Also, you could try to use a debugger to see what's going on.
>
> On Mon, Sep 19, 2016 at 2:04 PM, Yassine MARZOUGUI <
> y.marzou...@mindlytix.com> wrote:
>
>> Hi Chensey,
>>
>> I am running Flink 1.1.2, and using NetBeans 8.1.
>> I made a screencast reproducing the problem here:
>> http://recordit.co/P53OnFokN4 .
>>
>> Best,
>> Yassine
>>
>>
>> 2016-09-19 10:04 GMT+02:00 Chesnay Schepler :
>>
>>> No, I can't recall that i had this happen to me.
>>>
>>> I would enable logging and try again, as well as checking whether
>>> the second job is actually running through the WebInterface.
>>>
>>> If you tell me your NetBeans version i can try to reproduce it.
>>>
>>> Also, which version of Flink are you using?
>>>
>>>
>>> On 19.09.2016 07:45, Aljoscha Krettek wrote:
>>>
>>> Hmm, this sound like it could be IDE/Windows specific, unfortunately
>>> I don't have access to a windows machine. I'll loop in Chesnay how is 
>>> using
>>> windows.
>>>
>>> Chesnay, do you maybe have an idea what could be the problem? Have
>>> you ever encountered this?
>>>
>>> On Sat, 17 Sep 2016 at 15:30 Yassine MARZOUGUI <
>>> y.marzou...@mindlytix.com> wrote:
>>>
 Hi Aljoscha,

 Thanks for your response. By the f

Re: Simple batch job hangs if run twice

2016-09-23 Thread Yassine MARZOUGUI
I found out how to dump the stacktrace (using jps & jtrace). Please find
attached the stacktrace I got when the job got stuck.

Thanks,
Yassine

2016-09-23 11:48 GMT+02:00 Fabian Hueske :

> Yes, log files and stacktraces are different things.
> A stacktrace shows the call hierarchy of all threads in a JVM at the time
> when it is taken. So you can see the method that is currently executed (and
> from where it was called) when the stacktrace is taken. In case of a
> deadlock, you see where the program is waiting.
>
> The stack you sent is only a part of the complete stacktrace. Most IDEs
> have a feature to take a stacktrace while they are executing a program.
>
> 2016-09-23 11:43 GMT+02:00 Yassine MARZOUGUI :
>
>> Hi Fabian,
>>
>> Not sure if this answers your question, here is the stack I got when
>> debugging the combine and datasource operators when the job got stuck:
>>
>> "DataSource (at main(BatchTest.java:28) 
>> (org.apache.flink.api.java.io.TupleCsvInputFormat))
>> (1/8)"
>> at java.lang.Object.wait(Object.java)
>> at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.
>> requestBuffer(LocalBufferPool.java:163)
>> at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.r
>> equestBufferBlocking(LocalBufferPool.java:133)
>> at org.apache.flink.runtime.io.network.api.writer.RecordWriter.
>> emit(RecordWriter.java:93)
>> at org.apache.flink.runtime.operators.shipping.OutputCollector.
>> collect(OutputCollector.java:65)
>> at org.apache.flink.runtime.operators.util.metrics.CountingColl
>> ector.collect(CountingCollector.java:35)
>> at org.apache.flink.runtime.operators.DataSourceTask.invoke(
>> DataSourceTask.java:163)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
>> at java.lang.Thread.run(Thread.java:745)
>>
>> "Combine (GroupReduce at first(DataSet.java:573)) (1/8)"
>> at java.lang.Object.wait(Object.java)
>> at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.
>> requestBuffer(LocalBufferPool.java:163)
>> at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.r
>> equestBufferBlocking(LocalBufferPool.java:133)
>> at org.apache.flink.runtime.io.network.api.writer.RecordWriter.
>> emit(RecordWriter.java:93)
>> at org.apache.flink.runtime.operators.shipping.OutputCollector.
>> collect(OutputCollector.java:65)
>> at org.apache.flink.api.java.functions.FirstReducer.reduce(Firs
>> tReducer.java:41)
>> at org.apache.flink.api.java.functions.FirstReducer.combine(
>> FirstReducer.java:52)
>> at org.apache.flink.runtime.operators.AllGroupReduceDriver.run(
>> AllGroupReduceDriver.java:152)
>> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:486)
>> at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTas
>> k.java:351)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
>> at java.lang.Thread.run(Thread.java:745)
>>
>> Best,
>> Yassine
>>
>>
>> 2016-09-23 11:28 GMT+02:00 Yassine MARZOUGUI :
>>
>>> Hi Fabian,
>>>
>>> Is it different from the output I already sent? (see attached file). If
>>> yes, how can I obtain the stacktrace of the job programmatically? Thanks.
>>>
>>> Best,
>>> Yassine
>>>
>>> 2016-09-23 10:55 GMT+02:00 Fabian Hueske :
>>>
 Hi Yassine, can you share a stacktrace of the job when it got stuck?

 Thanks, Fabian

 2016-09-22 14:03 GMT+02:00 Yassine MARZOUGUI >>> >:

> The input splits are correctly assgined. I noticed that whenever the
> job is stuck, that is because the task *Combine (GroupReduce at
> first(DataSet.java:573)) *keeps RUNNING and never switches to
> FINISHED.
> I tried to debug the program at the *first(100), *but I couldn't do
> much. I attahced the full DEBUG output.
>
> 2016-09-22 12:10 GMT+02:00 Robert Metzger :
>
>> Can you try running with DEBUG logging level?
>> Then you should see if input splits are assigned.
>> Also, you could try to use a debugger to see what's going on.
>>
>> On Mon, Sep 19, 2016 at 2:04 PM, Yassine MARZOUGUI <
>> y.marzou...@mindlytix.com> wrote:
>>
>>> Hi Chensey,
>>>
>>> I am running Flink 1.1.2, and using NetBeans 8.1.
>>> I made a screencast reproducing the problem here:
>>> http://recordit.co/P53OnFokN4 .
>>>
>>> Best,
>>> Yassine
>>>
>>>
>>> 2016-09-19 10:04 GMT+02:00 Chesnay Schepler :
>>>
 No, I can't recall that i had this happen to me.

 I would enable logging and try again, as well as checking whether
 the second job is actually running through the WebInterface.

 If you tell me your NetBeans version i can try to reproduce it.

 Also, which version of Flink are you using?


 On 19.09.2016 07:45, Aljoscha Krettek wrote:

 Hmm, this sound like it could be IDE/Windows specific,
 unfortunately I don't have access to a windows machine. I'll loop in
 Chesnay how is usin

Re: Flink job hangs using rocksDb as backend

2018-07-11 Thread Stephan Ewen
Hi shishal!

I think there is an issue with cancellation when many timers fire at the
same time. These timers have to finish before shutdown happens, this seems
to take a while in your case.

Did the TM process actually kill itself in the end (and got restarted)?



On Wed, Jul 11, 2018 at 9:29 AM, shishal  wrote:

> Hi,
>
> I am using flink 1.4.2 with rocksdb as backend. I am using process function
> with timer on EventTime.  For checkpointing I am using hdfs.
>
> I am trying load testing so Iam reading kafka from beginning (aprox 7 days
> data with 50M events).
>
> My job gets stuck after aprox 20 min with no error. There after watermark
> do
> not progress and all checkpoint fails.
>
> Also When I try to cancel my job (using web UI) , it takes several minutes
> to finally gets cancelled. Also it makes Task manager down as well.
>
> There is no logs while my job hanged but while cancelling I get following
> error.
>
> /
>
> 2018-07-11 09:10:39,385 ERROR
> org.apache.flink.runtime.taskmanager.TaskManager  -
> ==
> ==  FATAL  ===
> ==
>
> A fatal error occurred, forcing the TaskManager to shut down: Task 'process
> (3/6)' did not react to cancelling signal in the last 30 seconds, but is
> stuck in method:
>  org.rocksdb.RocksDB.get(Native Method)
> org.rocksdb.RocksDB.get(RocksDB.java:810)
> org.apache.flink.contrib.streaming.state.RocksDBMapState.get(
> RocksDBMapState.java:102)
> org.apache.flink.runtime.state.UserFacingMapState.get(
> UserFacingMapState.java:47)
> nl.ing.gmtt.observer.analyser.job.rtpe.process.
> RtpeProcessFunction.onTimer(RtpeProcessFunction.java:94)
> org.apache.flink.streaming.api.operators.KeyedProcessOperator.onEventTime(
> KeyedProcessOperator.java:75)
> org.apache.flink.streaming.api.operators.HeapInternalTimerService.
> advanceWatermark(HeapInternalTimerService.java:288)
> org.apache.flink.streaming.api.operators.InternalTimeServiceManager.
> advanceWatermark(InternalTimeServiceManager.java:108)
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.
> processWatermark(AbstractStreamOperator.java:885)
> org.apache.flink.streaming.runtime.io.StreamInputProcessor$
> ForwardingValveOutputHandler.handleWatermark(
> StreamInputProcessor.java:288)
> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.
> findAndOutputNewMinWatermarkAcrossAlignedChannels(
> StatusWatermarkValve.java:189)
> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.
> inputWatermark(StatusWatermarkValve.java:111)
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(
> StreamInputProcessor.java:189)
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(
> OneInputStreamTask.java:69)
> org.apache.flink.streaming.runtime.tasks.StreamTask.
> invoke(StreamTask.java:264)
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
> java.lang.Thread.run(Thread.java:748)
>
> 2018-07-11 09:10:39,390 DEBUG
> org.apache.flink.runtime.akka.StoppingSupervisorWithoutLoggingActorKilledExceptionStrategy
>
> - Actor was killed. Stopping it now.
> akka.actor.ActorKilledException: Kill
> 2018-07-11 09:10:39,407 INFO
> org.apache.flink.runtime.taskmanager.TaskManager  - Stopping
> TaskManager akka://flink/user/taskmanager#-1231617791.
> 2018-07-11 09:10:39,408 INFO
> org.apache.flink.runtime.taskmanager.TaskManager  - Cancelling
> all computations and discarding all cached data.
> 2018-07-11 09:10:39,409 INFO  org.apache.flink.runtime.taskmanager.Task
>
> - Attempting to fail task externally process (3/6)
> (432fd129f3eea363334521f8c8de5198).
> 2018-07-11 09:10:39,409 INFO  org.apache.flink.runtime.taskmanager.Task
>
> - Task process (3/6) is already in state CANCELING
> 2018-07-11 09:10:39,409 INFO  org.apache.flink.runtime.taskmanager.Task
>
> - Attempting to fail task externally process (4/6)
> (7c6b96c9f32b067bdf8fa7c283eca2e0).
> 2018-07-11 09:10:39,409 INFO  org.apache.flink.runtime.taskmanager.Task
>
> - Task process (4/6) is already in state CANCELING
> 2018-07-11 09:10:39,409 INFO  org.apache.flink.runtime.taskmanager.Task
>
> - Attempting to fail task externally process (2/6)
> (a4f731797a7ea210fd0b512b0263bcd9).
> 2018-07-11 09:10:39,409 INFO  org.apache.flink.runtime.taskmanager.Task
>
> - Task process (2/6) is already in state CANCELING
> 2018-07-11 09:10:39,409 INFO  org.apache.flink.runtime.taskmanager.Task
>
> - Attempting to fail task externally process (1/6)
> (cd8a113779a4c00a051d78ad63bc7963).
> 2018-07-11 09:10:39,409 INFO  org.apache.flink.runtime.taskmanager.Task
>
> - Task process (1/6) is already in state CANCELING
> 2018-07-11 09:10:39,409 INFO
> org.apache.flink.runtime.taskmanager.TaskManager  -
> Disassociating from JobManager
> 2018-07-11 09:10:39,412 INFO
> org.apache.flink.runtime.blob.PermanentBlobCache   

Re: Flink job hangs using rocksDb as backend

2018-07-11 Thread Nico Kruber
If this is about too many timers and your application allows it, you may
also try to reduce the timer resolution and thus frequency by coalescing
them [1].


Nico

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/stream/operators/process_function.html#timer-coalescing

On 11/07/18 18:27, Stephan Ewen wrote:
> Hi shishal!
> 
> I think there is an issue with cancellation when many timers fire at the
> same time. These timers have to finish before shutdown happens, this
> seems to take a while in your case.
> 
> Did the TM process actually kill itself in the end (and got restarted)?
> 
> 
> 
> On Wed, Jul 11, 2018 at 9:29 AM, shishal  > wrote:
> 
> Hi,
> 
> I am using flink 1.4.2 with rocksdb as backend. I am using process
> function
> with timer on EventTime.  For checkpointing I am using hdfs.
> 
> I am trying load testing so Iam reading kafka from beginning (aprox
> 7 days
> data with 50M events).
> 
> My job gets stuck after aprox 20 min with no error. There after
> watermark do
> not progress and all checkpoint fails.
> 
> Also When I try to cancel my job (using web UI) , it takes several
> minutes
> to finally gets cancelled. Also it makes Task manager down as well.
> 
> There is no logs while my job hanged but while cancelling I get
> following
> error.
> 
> /
> 
> 2018-07-11 09:10:39,385 ERROR
> org.apache.flink.runtime.taskmanager.TaskManager              -
> ==
> ==      FATAL      ===
> ==
> 
> A fatal error occurred, forcing the TaskManager to shut down: Task
> 'process
> (3/6)' did not react to cancelling signal in the last 30 seconds, but is
> stuck in method:
>  org.rocksdb.RocksDB.get(Native Method)
> org.rocksdb.RocksDB.get(RocksDB.java:810)
> 
> org.apache.flink.contrib.streaming.state.RocksDBMapState.get(RocksDBMapState.java:102)
> 
> org.apache.flink.runtime.state.UserFacingMapState.get(UserFacingMapState.java:47)
> 
> nl.ing.gmtt.observer.analyser.job.rtpe.process.RtpeProcessFunction.onTimer(RtpeProcessFunction.java:94)
> 
> org.apache.flink.streaming.api.operators.KeyedProcessOperator.onEventTime(KeyedProcessOperator.java:75)
> 
> org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:288)
> 
> org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:108)
> 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:885)
> org.apache.flink.streaming.runtime.io
> 
> .StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:288)
> 
> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189)
> 
> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111)
> org.apache.flink.streaming.runtime.io
> 
> .StreamInputProcessor.processInput(StreamInputProcessor.java:189)
> 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
> 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264)
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
> java.lang.Thread.run(Thread.java:748)
> 
> 2018-07-11 09:10:39,390 DEBUG
> 
> org.apache.flink.runtime.akka.StoppingSupervisorWithoutLoggingActorKilledExceptionStrategy
> 
> - Actor was killed. Stopping it now.
> akka.actor.ActorKilledException: Kill
> 2018-07-11 09:10:39,407 INFO
> org.apache.flink.runtime.taskmanager.TaskManager              - Stopping
> TaskManager akka://flink/user/taskmanager#-1231617791.
> 2018-07-11 09:10:39,408 INFO
> org.apache.flink.runtime.taskmanager.TaskManager              -
> Cancelling
> all computations and discarding all cached data.
> 2018-07-11 09:10:39,409 INFO 
> org.apache.flink.runtime.taskmanager.Task                   
> - Attempting to fail task externally process (3/6)
> (432fd129f3eea363334521f8c8de5198).
> 2018-07-11 09:10:39,409 INFO 
> org.apache.flink.runtime.taskmanager.Task                   
> - Task process (3/6) is already in state CANCELING
> 2018-07-11 09:10:39,409 INFO 
> org.apache.flink.runtime.taskmanager.Task                   
> - Attempting to fail task externally process (4/6)
> (7c6b96c9f32b067bdf8fa7c283eca2e0).
> 2018-07-11 09:10:39,409 INFO 
> org.apache.flink.runtime.taskmanager.Task                   
> - Task process (4/6) is already in state CANCELING
>

Re: Flink job hangs using rocksDb as backend

2018-07-12 Thread Stefan Richter
Hi,

adding to what has already been said, I think that here can be two orthogonal 
problems here: i) why is your job slowing down/getting stuck? and ii) why is 
cancellation blocked? As for ii) I think Stephan already gave to right reason 
that shutdown could take longer and that is what gets the TM killed.

A more interesting question could still be i), why is your job slowing down 
until shutdown in the first place. I have two questions here.First, are you 
running on RocksDB on EBS volumes, then please have a look at this thread [1] 
because there can be some performance pitfalls. Second, how many timers are you 
expecting, and how are they firing? For example, if you have a huge amount of 
timers and the watermark makes a bug jump, there is a possibility that it takes 
a while until the job makes progress because it has to handle so many timer 
callbacks first. Metrics from even throughput and from your I/O subsystem could 
be helpful to see if something is stuck/underperforming or if there is just a 
lot of timer processing going on.

Best,
Stefan 

[1] 
https://mail-archives.apache.org/mod_mbox/flink-user/201803.mbox/%3ccakhqddzamdqewiz5b1qndqv4+-mtvefhbhewrpxftlu7dv9...@mail.gmail.com%3E
 


> Am 11.07.2018 um 19:31 schrieb Nico Kruber :
> 
> If this is about too many timers and your application allows it, you may
> also try to reduce the timer resolution and thus frequency by coalescing
> them [1].
> 
> 
> Nico
> 
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/stream/operators/process_function.html#timer-coalescing
> 
> On 11/07/18 18:27, Stephan Ewen wrote:
>> Hi shishal!
>> 
>> I think there is an issue with cancellation when many timers fire at the
>> same time. These timers have to finish before shutdown happens, this
>> seems to take a while in your case.
>> 
>> Did the TM process actually kill itself in the end (and got restarted)?
>> 
>> 
>> 
>> On Wed, Jul 11, 2018 at 9:29 AM, shishal > > wrote:
>> 
>>Hi,
>> 
>>I am using flink 1.4.2 with rocksdb as backend. I am using process
>>function
>>with timer on EventTime.  For checkpointing I am using hdfs.
>> 
>>I am trying load testing so Iam reading kafka from beginning (aprox
>>7 days
>>data with 50M events).
>> 
>>My job gets stuck after aprox 20 min with no error. There after
>>watermark do
>>not progress and all checkpoint fails.
>> 
>>Also When I try to cancel my job (using web UI) , it takes several
>>minutes
>>to finally gets cancelled. Also it makes Task manager down as well.
>> 
>>There is no logs while my job hanged but while cancelling I get
>>following
>>error.
>> 
>>/
>> 
>>2018-07-11 09:10:39,385 ERROR
>>org.apache.flink.runtime.taskmanager.TaskManager  -
>>==
>>==  FATAL  ===
>>==
>> 
>>A fatal error occurred, forcing the TaskManager to shut down: Task
>>'process
>>(3/6)' did not react to cancelling signal in the last 30 seconds, but is
>>stuck in method:
>> org.rocksdb.RocksDB.get(Native Method)
>>org.rocksdb.RocksDB.get(RocksDB.java:810)
>>
>> org.apache.flink.contrib.streaming.state.RocksDBMapState.get(RocksDBMapState.java:102)
>>
>> org.apache.flink.runtime.state.UserFacingMapState.get(UserFacingMapState.java:47)
>>
>> nl.ing.gmtt.observer.analyser.job.rtpe.process.RtpeProcessFunction.onTimer(RtpeProcessFunction.java:94)
>>
>> org.apache.flink.streaming.api.operators.KeyedProcessOperator.onEventTime(KeyedProcessOperator.java:75)
>>
>> org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:288)
>>
>> org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:108)
>>
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:885)
>>org.apache.flink.streaming.runtime.io
>>
>> .StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:288)
>>
>> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189)
>>
>> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111)
>>org.apache.flink.streaming.runtime.io
>>
>> .StreamInputProcessor.processInput(StreamInputProcessor.java:189)
>>
>> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
>>
>> org.apache.flink.streaming.runtime.ta

Re: Flink job hangs using rocksDb as backend

2018-07-12 Thread shishal singh
Thanks Stefan/Stephan/Nico,

Indeed there are 2 problem. For the 2nd problem ,I am almost certain that
explanation given by Stephan is the true as in my case as there number of
timers are in millions. (Each for different key so I guess coalescing is
not an option for me).

If I simplify my problem, each day I receive millions of events (10-20M)
and I have to schedule a timer for next day 8 AM to check if matching
events are there , if not I have to send it to Elastic sink as Alert. I
suspected that having so many timers fires at same time could cause my jobs
to hang, so I am now scheduling times randomly between (8AM-to 10AM). But
still my job gets hang after some time.  One more thing which I noticed
that when my job gets hang CPU utilization shoot to almost 100%.
I tried to isolate problem by removing ES sink and just did stream.print()
and yet problem persist.

In my current setup, I am running a standalone cluster of 3 machine (All
three server has Task manger, Job manager and Hadoop on it). So I am not
using EBS for rocksDB.

 Also I verified that when jobs gets hang even timers are not being called
as I have debug statement in Timers and only logs I see at that time are
following :

*2018-07-12 14:35:30,423 DEBUG
org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Got
ping response for sessionid: 0x2648355f7c6010f after 11ms*
*2018-07-12 14:35:31,957 DEBUG
org.apache.flink.runtime.taskmanager.TaskManager  - Sending
heartbeat to JobManager*
*2018-07-12 14:35:36,946 DEBUG
org.apache.flink.runtime.taskmanager.TaskManager  - Sending
heartbeat to JobManager*
*2018-07-12 14:35:41,963 DEBUG
org.apache.flink.runtime.taskmanager.TaskManager  - Sending
heartbeat to JobManager*
*2018-07-12 14:35:43,775 DEBUG
org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Got
ping response for sessionid: 0x2648355f7c6010f after 10ms*
*2018-07-12 14:35:46,946 DEBUG
org.apache.flink.runtime.taskmanager.TaskManager  - Sending
heartbeat to JobManager*
*2018-07-12 14:35:51,954 DEBUG
org.apache.flink.runtime.taskmanager.TaskManager  - Sending
heartbeat to JobManager*
*2018-07-12 14:35:56,967 DEBUG
org.apache.flink.runtime.taskmanager.TaskManager  - Sending
heartbeat to JobManager*
*2018-07-12 14:35:57,127 DEBUG
org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Got
ping response for sessionid: 0x2648355f7c6010f after 8ms*
*2018-07-12 14:36:01,944 DEBUG
org.apache.flink.runtime.taskmanager.TaskManager  - Sending
heartbeat to JobManager*
*2018-07-12 14:36:06,955 DEBUG
org.apache.flink.runtime.taskmanager.TaskManager  - Sending
heartbeat to JobManager*
*2018-07-12 14:36:08,287 DEBUG
org.apache.flink.runtime.taskmanager.TaskManager  - Receiver
TriggerCheckpoint 155@1531398968248 for d9af2f1da87b7268cc03e152a6179eae.*
*2018-07-12 14:36:08,287 DEBUG org.apache.flink.runtime.taskmanager.Task
 - Invoking async call Checkpoint Trigger for Source: Event
Source -> filter (1/1) (d9af2f1da87b7268cc03e152a6179eae). on task Source:
Event Source -> filter (1/1)*
*2018-07-12 14:36:10,476 DEBUG
org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Got
ping response for sessionid: 0x2648355f7c6010f after 10ms*
*2018-07-12 14:36:11,957 DEBUG
org.apache.flink.runtime.taskmanager.TaskManager  - Sending
heartbeat to JobManager*

As I expected checkpoint also start to fail during this time.

My Job Graph is pretty much simple : Source-->filter->Sink


Regards,
Shishal


On Thu, Jul 12, 2018 at 9:54 AM Stefan Richter 
wrote:

> Hi,
>
> adding to what has already been said, I think that here can be two
> orthogonal problems here: i) why is your job slowing down/getting stuck?
> and ii) why is cancellation blocked? As for ii) I think Stephan already
> gave to right reason that shutdown could take longer and that is what gets
> the TM killed.
>
> A more interesting question could still be i), why is your job slowing
> down until shutdown in the first place. I have two questions here.First,
> are you running on RocksDB on EBS volumes, then please have a look at this
> thread [1] because there can be some performance pitfalls. Second, how many
> timers are you expecting, and how are they firing? For example, if you have
> a huge amount of timers and the watermark makes a bug jump, there is a
> possibility that it takes a while until the job makes progress because it
> has to handle so many timer callbacks first. Metrics from even throughput
> and from your I/O subsystem could be helpful to see if something is
> stuck/underperforming or if there is just a lot of timer processing going
> on.
>
> Best,
> Stefan
>
> [1]
> https://mail-archives.apache.org/mod_mbox/flink-user/201803.mbox/%3ccakhqddzamdqewiz5b1qndqv4+-mtvefhbhewrpxftlu7dv9...@mail.gmail.com%3E
>
> Am 11.07.2018 um 19:31 schrieb Nico Kruber :
>
> If this is about too many timers and your applicati

Re: Flink job hangs using rocksDb as backend

2018-07-12 Thread Stefan Richter
Hi,

Did you check the metrics for the garbage collector? Stuck with high CPU 
consumption and lots of timers sound like there could be a possible problem, 
because timer are currently on-heap objects, but we are working on 
RocksDB-based timers right now.

Best,
Stefan

> Am 12.07.2018 um 14:54 schrieb shishal singh :
> 
> Thanks Stefan/Stephan/Nico,
> 
> Indeed there are 2 problem. For the 2nd problem ,I am almost certain that 
> explanation given by Stephan is the true as in my case as there number of 
> timers are in millions. (Each for different key so I guess coalescing is not 
> an option for me). 
> 
> If I simplify my problem, each day I receive millions of events (10-20M) and 
> I have to schedule a timer for next day 8 AM to check if matching events are 
> there , if not I have to send it to Elastic sink as Alert. I suspected that 
> having so many timers fires at same time could cause my jobs to hang, so I am 
> now scheduling times randomly between (8AM-to 10AM). But still my job gets 
> hang after some time.  One more thing which I noticed that when my job gets 
> hang CPU utilization shoot to almost 100%.
> I tried to isolate problem by removing ES sink and just did stream.print() 
> and yet problem persist. 
> 
> In my current setup, I am running a standalone cluster of 3 machine (All 
> three server has Task manger, Job manager and Hadoop on it). So I am not 
> using EBS for rocksDB.
> 
>  Also I verified that when jobs gets hang even timers are not being called as 
> I have debug statement in Timers and only logs I see at that time are 
> following :
> 
> 2018-07-12 14:35:30,423 DEBUG 
> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Got ping 
> response for sessionid: 0x2648355f7c6010f after 11ms
> 2018-07-12 14:35:31,957 DEBUG 
> org.apache.flink.runtime.taskmanager.TaskManager  - Sending 
> heartbeat to JobManager
> 2018-07-12 14:35:36,946 DEBUG 
> org.apache.flink.runtime.taskmanager.TaskManager  - Sending 
> heartbeat to JobManager
> 2018-07-12 14:35:41,963 DEBUG 
> org.apache.flink.runtime.taskmanager.TaskManager  - Sending 
> heartbeat to JobManager
> 2018-07-12 14:35:43,775 DEBUG 
> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Got ping 
> response for sessionid: 0x2648355f7c6010f after 10ms
> 2018-07-12 14:35:46,946 DEBUG 
> org.apache.flink.runtime.taskmanager.TaskManager  - Sending 
> heartbeat to JobManager
> 2018-07-12 14:35:51,954 DEBUG 
> org.apache.flink.runtime.taskmanager.TaskManager  - Sending 
> heartbeat to JobManager
> 2018-07-12 14:35:56,967 DEBUG 
> org.apache.flink.runtime.taskmanager.TaskManager  - Sending 
> heartbeat to JobManager
> 2018-07-12 14:35:57,127 DEBUG 
> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Got ping 
> response for sessionid: 0x2648355f7c6010f after 8ms
> 2018-07-12 14:36:01,944 DEBUG 
> org.apache.flink.runtime.taskmanager.TaskManager  - Sending 
> heartbeat to JobManager
> 2018-07-12 14:36:06,955 DEBUG 
> org.apache.flink.runtime.taskmanager.TaskManager  - Sending 
> heartbeat to JobManager
> 2018-07-12 14:36:08,287 DEBUG 
> org.apache.flink.runtime.taskmanager.TaskManager  - Receiver 
> TriggerCheckpoint 155@1531398968248 for d9af2f1da87b7268cc03e152a6179eae.
> 2018-07-12 14:36:08,287 DEBUG org.apache.flink.runtime.taskmanager.Task   
>   - Invoking async call Checkpoint Trigger for Source: Event 
> Source -> filter (1/1) (d9af2f1da87b7268cc03e152a6179eae). on task Source: 
> Event Source -> filter (1/1)
> 2018-07-12 14:36:10,476 DEBUG 
> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Got ping 
> response for sessionid: 0x2648355f7c6010f after 10ms
> 2018-07-12 14:36:11,957 DEBUG 
> org.apache.flink.runtime.taskmanager.TaskManager  - Sending 
> heartbeat to JobManager
> 
> As I expected checkpoint also start to fail during this time.
> 
> My Job Graph is pretty much simple : Source-->filter-- times>--->Sink
> 
> 
> Regards,
> Shishal
> 
> 
> On Thu, Jul 12, 2018 at 9:54 AM Stefan Richter  > wrote:
> Hi,
> 
> adding to what has already been said, I think that here can be two orthogonal 
> problems here: i) why is your job slowing down/getting stuck? and ii) why is 
> cancellation blocked? As for ii) I think Stephan already gave to right reason 
> that shutdown could take longer and that is what gets the TM killed.
> 
> A more interesting question could still be i), why is your job slowing down 
> until shutdown in the first place. I have two questions here.First, are you 
> running on RocksDB on EBS volumes, then please have a look at this thread [1] 
> because there can be some performance pitfalls. Second, how many timers are 
> you expecting, and how are they firing? For example, if you have a huge 
> amount of timers and the watermark makes a bug jump, there is a possibility 
> 

Re: Flink job hangs using rocksDb as backend

2018-07-20 Thread shishal singh
Hi Richer,

Actually for the testing , now I have reduced the number of timers to few
thousands (5-6K) but my job still gets stuck randomly.  And its not
reproducible each time. next time when I restart the job it again starts
working  for few few hours/days then gets stuck again.
I took thread dump when my job was hanged with almost 100% cpu . The most
cpu taking thread has following stack:

It look like sometimes its not able to read data from RocksDB.

*"process (3/6)" #782 prio=5 os_prio=0 tid=0x7f68b81ddcf0 nid=0xee73
runnable [0x7f688d83a000]*
*   java.lang.Thread.State: RUNNABLE*
* at org.rocksdb.RocksDB.get(Native Method)*
* at org.rocksdb.RocksDB.get(RocksDB.java:810)*
* at
org.apache.flink.contrib.streaming.state.RocksDBMapState.contains(RocksDBMapState.java:137)*
* at
org.apache.flink.runtime.state.UserFacingMapState.contains(UserFacingMapState.java:72)*
* at
nl.ing.gmtt.observer.analyser.job.rtpe.process.RtpeProcessFunction.isEventExist(RtpeProcessFunction.java:150)*
* at
nl.ing.gmtt.observer.analyser.job.rtpe.process.RtpeProcessFunction.onTimer(RtpeProcessFunction.java:93)*
* at
org.apache.flink.streaming.api.operators.KeyedProcessOperator.onEventTime(KeyedProcessOperator.java:75)*
* at
org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:288)*
* at
org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:108)*
* at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:885)*
* at
org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:288)*
* - locked <0x000302b61458> (a java.lang.Object)*
* at
org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189)*
* at
org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111)*
* at
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:189)*
* at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)*
* at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264)*
* at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)*
* at java.lang.Thread.run(Thread.java:748)*

*   Locked ownable synchronizers:*
* - None*

*"process (2/6)" #781 prio=5 os_prio=0 tid=0x7f68b81dcef0 nid=0xee72
runnable [0x7f688fe54000]*
*   java.lang.Thread.State: RUNNABLE*
* at org.rocksdb.RocksDB.get(Native Method)*
* at org.rocksdb.RocksDB.get(RocksDB.java:810)*
* at
org.apache.flink.contrib.streaming.state.RocksDBMapState.get(RocksDBMapState.java:102)*
* at
org.apache.flink.runtime.state.UserFacingMapState.get(UserFacingMapState.java:47)*
* at
nl.ing.gmtt.observer.analyser.job.rtpe.process.RtpeProcessFunction.onTimer(RtpeProcessFunction.java:99)*
* at
org.apache.flink.streaming.api.operators.KeyedProcessOperator.onEventTime(KeyedProcessOperator.java:75)*
* at
org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:288)*
* at
org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:108)*
* at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:885)*
* at
org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:288)*
* - locked <0x000302b404a0> (a java.lang.Object)*
* at
org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189)*
* at
org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111)*
* at
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:189)*
* at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)*
* at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264)*
* at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)*
* at java.lang.Thread.run(Thread.java:748)*

*   Locked ownable synchronizers:*


Regards,
Shishal


On Thu, Jul 12, 2018 at 4:11 PM Stefan Richter 
wrote:

> Hi,
>
> Did you check the metrics for the garbage collector? Stuck with high CPU
> consumption and lots of timers sound like there could be a possible
> problem, because timer are currently on-heap objects, but we are working on
> RocksDB-based timers right now.
>
> Best,
> Stefan
>
> Am 12.07.2018 um 14:54 schrieb shishal singh :
>
> Thanks Stefan/Stephan/Nico,
>
> Indeed there are 2 problem. For the 2nd problem ,I am almost certain that
> explanation given by Stephan is the tr

Re: Flink job hangs using rocksDb as backend

2018-07-23 Thread Stefan Richter
Hi,

let me first clarify what you mean by „stuck“, just because your job stops 
consuming events for some time does not necessarily mean that it is „stuck“. 
That is very hard to evaluate from the information we have so far, because from 
the stack trace you cannot conclude that the thread is „stuck“, because it 
looks like it is just processing firing timers. And while timers are firing, 
the pipeline will stop consuming further events until all timers have been 
processed. Even if your thread dump looks the same all the time, it could just 
be that you observe the same call (the most expensive one) across multiple 
invocations and is not necessarily an indicator for the thread being stuck. 
Attaching a sampler or introducing logging to one of the seemingly stuck task 
JVMs could clarify this a bit more. For now I am assuming that it makes 
progress but spends a lot of work on timers. Why you might experience this 
randomly is, for example, if your watermark makes a bigger jump and many (or 
all) of your timers suddenly fire. From the perspective of consuming events, 
this could look like being stuck.
In case that the job really is stuck in the strict sense, it does not look like 
a Flink problem because your threads are in some call against RocksDB. Since we 
are not aware of any similar problem from the mailing list, a setup problem 
would be the most likely explanation, e.g. what types of disk are you using, 
how many threads are available on the TM machine so that also RocksDB 
compaction, processing, async checkpointing etc. can work in parallel. But for 
now, the most important piece of information would be what exactly „stuck“ 
means in your problem.

Best,
Stefan

> Am 20.07.2018 um 18:58 schrieb shishal singh :
> 
> Hi Richer,
> 
> Actually for the testing , now I have reduced the number of timers to few 
> thousands (5-6K) but my job still gets stuck randomly.  And its not 
> reproducible each time. next time when I restart the job it again starts 
> working  for few few hours/days then gets stuck again.
> I took thread dump when my job was hanged with almost 100% cpu . The most cpu 
> taking thread has following stack:
> 
> It look like sometimes its not able to read data from RocksDB. 
> 
> "process (3/6)" #782 prio=5 os_prio=0 tid=0x7f68b81ddcf0 nid=0xee73 
> runnable [0x7f688d83a000]
>java.lang.Thread.State: RUNNABLE
>   at org.rocksdb.RocksDB.get(Native Method)
>   at org.rocksdb.RocksDB.get(RocksDB.java:810)
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBMapState.contains(RocksDBMapState.java:137)
>   at 
> org.apache.flink.runtime.state.UserFacingMapState.contains(UserFacingMapState.java:72)
>   at 
> nl.ing.gmtt.observer.analyser.job.rtpe.process.RtpeProcessFunction.isEventExist(RtpeProcessFunction.java:150)
>   at 
> nl.ing.gmtt.observer.analyser.job.rtpe.process.RtpeProcessFunction.onTimer(RtpeProcessFunction.java:93)
>   at 
> org.apache.flink.streaming.api.operators.KeyedProcessOperator.onEventTime(KeyedProcessOperator.java:75)
>   at 
> org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:288)
>   at 
> org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:108)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:885)
>   at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:288)
>   - locked <0x000302b61458> (a java.lang.Object)
>   at 
> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189)
>   at 
> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111)
>   at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:189)
>   at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
>   at java.lang.Thread.run(Thread.java:748)
> 
>Locked ownable synchronizers:
>   - None
> 
> "process (2/6)" #781 prio=5 os_prio=0 tid=0x7f68b81dcef0 nid=0xee72 
> runnable [0x7f688fe54000]
>java.lang.Thread.State: RUNNABLE
>   at org.rocksdb.RocksDB.get(Native Method)
>   at org.rocksdb.RocksDB.get(RocksDB.java:810)
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBMapState.get(RocksDBMapState.java:102)
>   at 
> org.apache.flink.runtime.state.UserFacingMapState.get(UserFacingMapState.java:47)
>   at 
> nl.ing.gmtt.observer.analyser.job.rtpe.process.RtpeProcessFunction.onTimer(RtpeProcessFuncti

Re: Flink job hangs using rocksDb as backend

2018-07-23 Thread shishal singh
Thanks Stefan,

You are correct , I learned the hard way that when timers fires it stops
processing new events till the time all timers callback completes. This is
the points when I decided to isolate the problem by scheduling only 5-6K
timers in total so that even if its taking time in timers it should
progress after a reasonable period of time. But event after I left it
running whole night, watermark didn't progressed at all and cpu still shows
100% usages without any error log(either JM of TM). The stack trace I
shared in the one I took in the morning.

Also to isolate any problem with elastic sink, I removed sink and just did
stream.print() at the end.

I am using  spinning disk and set following option

*setPredefinedOptions(PredefinedOptions.SPINNING_DISK_OPTIMIZED); // Also
tried SPINNING_DISK_OPTIMIZED_HIGH_MEM*

My cluster setup has 3 node (Its a private cloud machine and has 4 cpu core
each) and 1 TM with 4 slot each running on each node.  Also Job manager and
hadoop is also running on same 3 node.

My job graph look like this:

[image: image.png]
I am using following config with checkpointing interval of 10min and hadoop
to store checkpoint.

* RocksDBStateBackend backend = new
RocksDBStateBackend(baseDir+"/checkpoints", true);*
*
backend.setPredefinedOptions(PredefinedOptions.SPINNING_DISK_OPTIMIZED);*
*env.setStateBackend(backend);*
*env.enableCheckpointing(intervalMilli);*
*
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(intervalMilli);*
*env.getCheckpointConfig().setCheckpointTimeout(timeoutMilli);*
*
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);*
*env.getCheckpointConfig().enableExternalizedCheckpoints(
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);*
*env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);*


The last thing I am intended to try is using FSSatebackend to be sure if
its rocksDB related issue, but the problem is sometimes issue get
reproduced after couple of days.

Regards,
Shishal


On Mon, Jul 23, 2018 at 10:08 AM Stefan Richter 
wrote:

> Hi,
>
> let me first clarify what you mean by „stuck“, just because your job stops
> consuming events for some time does not necessarily mean that it is
> „stuck“. That is very hard to evaluate from the information we have so far,
> because from the stack trace you cannot conclude that the thread is
> „stuck“, because it looks like it is just processing firing timers. And
> while timers are firing, the pipeline will stop consuming further events
> until all timers have been processed. Even if your thread dump looks the
> same all the time, it could just be that you observe the same call (the
> most expensive one) across multiple invocations and is not necessarily an
> indicator for the thread being stuck. Attaching a sampler or introducing
> logging to one of the seemingly stuck task JVMs could clarify this a bit
> more. For now I am assuming that it makes progress but spends a lot of work
> on timers. Why you might experience this randomly is, for example, if your
> watermark makes a bigger jump and many (or all) of your timers suddenly
> fire. From the perspective of consuming events, this could look like being
> stuck.
> In case that the job really is stuck in the strict sense, it does not look
> like a Flink problem because your threads are in some call against RocksDB.
> Since we are not aware of any similar problem from the mailing list, a
> setup problem would be the most likely explanation, e.g. what types of disk
> are you using, how many threads are available on the TM machine so that
> also RocksDB compaction, processing, async checkpointing etc. can work in
> parallel. But for now, the most important piece of information would be
> what exactly „stuck“ means in your problem.
>
> Best,
> Stefan
>
> Am 20.07.2018 um 18:58 schrieb shishal singh :
>
> Hi Richer,
>
> Actually for the testing , now I have reduced the number of timers to few
> thousands (5-6K) but my job still gets stuck randomly.  And its not
> reproducible each time. next time when I restart the job it again starts
> working  for few few hours/days then gets stuck again.
> I took thread dump when my job was hanged with almost 100% cpu . The most
> cpu taking thread has following stack:
>
> It look like sometimes its not able to read data from RocksDB.
>
> *"process (3/6)" #782 prio=5 os_prio=0 tid=0x7f68b81ddcf0 nid=0xee73
> runnable [0x7f688d83a000]*
> *   java.lang.Thread.State: RUNNABLE*
> * at org.rocksdb.RocksDB.get(Native Method)*
> * at org.rocksdb.RocksDB.get(RocksDB.java:810)*
> * at
> org.apache.flink.contrib.streaming.state.RocksDBMapState.contains(RocksDBMapState.java:137)*
> * at
> org.apache.flink.runtime.state.UserFacingMapState.contains(UserFacingMapState.java:72)*
> * at
> nl.ing.gmtt.observer.analyser.job.rtpe.process.RtpeProcessFunction.isEventExist(RtpeProcessFunction.java:150)*
> * at
> nl.ing.gmtt.observer.analyser.job.rtpe.proc

Re: Flink job hangs using rocksDb as backend

2018-07-23 Thread Stefan Richter
Hi,

yes, timers cannot easily fire in parallel to event processing for correctness 
reasons because they both manipulate the state and there should be a distinct 
order of operations. If it is literally stuck, then it is obviously a problem. 
From the stack trace it looks pretty clear that the culprit would be RocksDB, 
if that is where it blocks. I cannot remember any report of a similar problem 
so far, and we are running this version of RocksDB for quiet some time with 
many users. At the same time I feel like many people are using SSDs for local 
storage these days. You could run the JVM with a tool that allows you to also 
get the native traces and system calls to see where RocksDB is potentially 
stuck. Something we could eventually try is updating the RocksDB version, but 
that is currently still blocked by a performance regression in newer RocksDB 
versions, see https://github.com/facebook/rocksdb/issues/3865 
.

Best,
Stefan 

> Am 23.07.2018 um 12:56 schrieb shishal singh :
> 
> Thanks Stefan,
> 
> You are correct , I learned the hard way that when timers fires it stops 
> processing new events till the time all timers callback completes. This is 
> the points when I decided to isolate the problem by scheduling only 5-6K 
> timers in total so that even if its taking time in timers it should progress 
> after a reasonable period of time. But event after I left it running whole 
> night, watermark didn't progressed at all and cpu still shows 100% usages 
> without any error log(either JM of TM). The stack trace I shared in the one I 
> took in the morning. 
> 
> Also to isolate any problem with elastic sink, I removed sink and just did 
> stream.print() at the end.
> 
> I am using  spinning disk and set following option 
> 
> setPredefinedOptions(PredefinedOptions.SPINNING_DISK_OPTIMIZED); // Also 
> tried SPINNING_DISK_OPTIMIZED_HIGH_MEM
> 
> My cluster setup has 3 node (Its a private cloud machine and has 4 cpu core 
> each) and 1 TM with 4 slot each running on each node.  Also Job manager and 
> hadoop is also running on same 3 node. 
> 
> My job graph look like this:
> 
> 
> I am using following config with checkpointing interval of 10min and hadoop 
> to store checkpoint.
> 
>  RocksDBStateBackend backend = new 
> RocksDBStateBackend(baseDir+"/checkpoints", true);
> 
> backend.setPredefinedOptions(PredefinedOptions.SPINNING_DISK_OPTIMIZED);
> env.setStateBackend(backend);
> env.enableCheckpointing(intervalMilli);
> 
> env.getCheckpointConfig().setMinPauseBetweenCheckpoints(intervalMilli);
> env.getCheckpointConfig().setCheckpointTimeout(timeoutMilli);
> 
> env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
> env.getCheckpointConfig().enableExternalizedCheckpoints( 
> CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
> env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
> 
> 
> The last thing I am intended to try is using FSSatebackend to be sure if its 
> rocksDB related issue, but the problem is sometimes issue get reproduced 
> after couple of days. 
> 
> Regards,
> Shishal
> 
> 
> On Mon, Jul 23, 2018 at 10:08 AM Stefan Richter  > wrote:
> Hi,
> 
> let me first clarify what you mean by „stuck“, just because your job stops 
> consuming events for some time does not necessarily mean that it is „stuck“. 
> That is very hard to evaluate from the information we have so far, because 
> from the stack trace you cannot conclude that the thread is „stuck“, because 
> it looks like it is just processing firing timers. And while timers are 
> firing, the pipeline will stop consuming further events until all timers have 
> been processed. Even if your thread dump looks the same all the time, it 
> could just be that you observe the same call (the most expensive one) across 
> multiple invocations and is not necessarily an indicator for the thread being 
> stuck. Attaching a sampler or introducing logging to one of the seemingly 
> stuck task JVMs could clarify this a bit more. For now I am assuming that it 
> makes progress but spends a lot of work on timers. Why you might experience 
> this randomly is, for example, if your watermark makes a bigger jump and many 
> (or all) of your timers suddenly fire. From the perspective of consuming 
> events, this could look like being stuck.
> In case that the job really is stuck in the strict sense, it does not look 
> like a Flink problem because your threads are in some call against RocksDB. 
> Since we are not aware of any similar problem from the mailing list, a setup 
> problem would be the most likely explanation, e.g. what types of disk are you 
> using, how many threads are available on the TM machine so that also RocksDB 
> compaction, processing, async checkpointing etc. can work in parallel. But 
> for now, the most important piece of information 

Flink job hangs/deadlocks (possibly related to out of memory)

2018-06-29 Thread gerardg
Hello,We have experienced some problems where a task just hangs without
showing any kind of log error while other tasks running in the same task
manager continue without problems. When these tasks are restarted the task
manager gets killed and shows several errors similar to these
ones:[Canceler/Interrupts for (...)' did not react to cancelling signal for
30 seconds, but is stuck in method:
java.nio.ByteBuffer.wrap(ByteBuffer.java:373)java.nio.ByteBuffer.wrap(ByteBuffer.java:396)org.apache.flink.core.memory.DataOutputSerializer.resize(DataOutputSerializer.java:330)org.apache.flink.core.memory.DataOutputSerializer.writeInt(DataOutputSerializer.java:212)org.apache.flink.api.common.typeutils.base.IntSerializer.serialize(IntSerializer.java:63)org.apache.flink.api.common.typeutils.base.IntSerializer.serialize(IntSerializer.java:27)org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:113)org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:32)org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$serialize$1.apply(TraversableSerializer.scala:98)org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$serialize$1.apply(TraversableSerializer.scala:93)scala.collection.immutable.List.foreach(List.scala:392)org.apache.flink.api.scala.typeutils.TraversableSerializer.serialize(TraversableSerializer.scala:93)org.apache.flink.api.scala.typeutils.TraversableSerializer.serialize(TraversableSerializer.scala:33)org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:113)org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:32)org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:177)org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:49)org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:54)org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:88)org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:129)org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:105)org.apache.flink.streaming.runtime.io.StreamRecordWriter.emit(StreamRecordWriter.java:81)org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107)org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45)org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingBroadcastingOutputCollector.collect(OperatorChain.java:667)org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingBroadcastingOutputCollector.collect(OperatorChain.java:653)org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:679)org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:657)org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)(...)org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:50)org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.pushToOperator(OperatorChain.java:469)org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:446)org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:405)org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingBroadcastingOutputCollector.collect(OperatorChain.java:672)org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingBroadcastingOutputCollector.collect(OperatorChain.java:653)org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:679)org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:657)org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)(...)org.apache.flink.streaming.api.scala.function.util.ScalaProcessWindowFunctionWrapper.process(ScalaProcessWindowFunctionWrapper.scala:63)org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction.process(InternalIterableProcessWindowFunction.java:50)org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction.process(InternalIterableProcessWindowFunction.java:32)org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:550)org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:403)org.apache.flink.streaming.runtime.io.Stre

Re: Flink job hangs/deadlocks (possibly related to out of memory)

2018-06-29 Thread gerardg
(fixed formatting)

Hello, 

We have experienced some problems where a task just hangs without showing
any kind of log error while other tasks running in the same task manager
continue without problems. When these tasks are restarted the task manager
gets killed and shows several errors similar to these ones: 

[Canceler/Interrupts for (...)' did not react to cancelling signal for 30
seconds, but is stuck in method:
java.nio.ByteBuffer.wrap(ByteBuffer.java:373)
java.nio.ByteBuffer.wrap(ByteBuffer.java:396)
org.apache.flink.core.memory.DataOutputSerializer.resize(DataOutputSerializer.java:330)
org.apache.flink.core.memory.DataOutputSerializer.writeInt(DataOutputSerializer.java:212)
org.apache.flink.api.common.typeutils.base.IntSerializer.serialize(IntSerializer.java:63)
org.apache.flink.api.common.typeutils.base.IntSerializer.serialize(IntSerializer.java:27)
org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:113)
org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:32)
org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$serialize$1.apply(TraversableSerializer.scala:98)
org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$serialize$1.apply(TraversableSerializer.scala:93)
scala.collection.immutable.List.foreach(List.scala:392)
org.apache.flink.api.scala.typeutils.TraversableSerializer.serialize(TraversableSerializer.scala:93)
org.apache.flink.api.scala.typeutils.TraversableSerializer.serialize(TraversableSerializer.scala:33)
org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:113)
org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:32)
org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:177)
org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:49)
org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:54)
org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:88)
org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:129)
org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:105)
org.apache.flink.streaming.runtime.io.StreamRecordWriter.emit(StreamRecordWriter.java:81)
org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107)
org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)
org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45)
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingBroadcastingOutputCollector.collect(OperatorChain.java:667)
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingBroadcastingOutputCollector.collect(OperatorChain.java:653)
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:679)
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:657)
org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
(...)
org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:50)
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.pushToOperator(OperatorChain.java:469)
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:446)
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:405)
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingBroadcastingOutputCollector.collect(OperatorChain.java:672)
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingBroadcastingOutputCollector.collect(OperatorChain.java:653)
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:679)
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:657)
org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
(...)
org.apache.flink.streaming.api.scala.function.util.ScalaProcessWindowFunctionWrapper.process(ScalaProcessWindowFunctionWrapper.scala:63)
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction.process(InternalIterableProcessWindowFunction.java:50)
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction.process(InternalIterableProcessWindowFunction.java:32)
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:550)
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processEle

Re: Flink job hangs/deadlocks (possibly related to out of memory)

2018-07-13 Thread Gerard Garcia
Hi Zhijiang,

The problem is that no other task failed first. We have a task that
sometimes just stops processing data, and when we cancel it, we see the
logs messages  saying:

" Task (...) did not react to cancelling signal for 30 seconds, but is
stuck in method:
org.apache.flink.core.memory.DataOutputSerializer.resize(DataOutputSerializer.java:305)
org.apache.flink.core.memory.DataOutputSerializer.write(DataOutputSerializer.java:133)
org.apache.flink.types.StringValue.writeString(StringValue.java:802)
(...)"

That is why we suspect that it hangs forever at that point and that is why
it stops processing data. I don;t see any increase in memory use in the
heap (I guess because these buffers are managed by Flink) so I'm not sure
if that is really the problem.

Gerard

On Tue, Jul 3, 2018 at 6:15 AM Zhijiang(wangzhijiang999) <
wangzhijiang...@aliyun.com> wrote:

> Hi Gerard,
>
> I think you can check the job manager log to find which task failed at
> first, and then trace the task manager log containing the failed task to
> find the initial reason.
> The failed task will trigger canceling all the other tasks, and during
> canceling process, the blocked task that is waiting for output buffer can
> not be interrupted by the
> canceler thread which is shown in your description. So I think the cancel
> process is not the key point and is in expectation. Maybe it did not cause
> OOM at all.
> If the taskduring canceling, the task manager process will be exited
> finally to trigger restarting the job.
>
> Zhijiang
>
> --
> 发件人:Gerard Garcia 
> 发送时间:2018年7月2日(星期一) 18:29
> 收件人:wangzhijiang999 
> 抄 送:user 
> 主 题:Re: Flink job hangs/deadlocks (possibly related to out of memory)
>
> Thanks Zhijiang,
>
> We haven't found any other relevant log messages anywhere. These traces
> belong to the unresponsive task, that is why we suspect that at some point
> it did not have enough memory to serialize the message and it blocked. I've
> also found that when it hanged several output buffers were full (see
> attached image buffers.outPoolUsage.png) so I guess the traces just reflect
> that.
>
> Probably the task hanged for some other reason and that is what filled the
> output buffers previous to the blocked operator. I'll have to continue
> investigating to find the real cause.
>
> Gerard
>
>
>
>
> On Mon, Jul 2, 2018 at 9:50 AM Zhijiang(wangzhijiang999) <
> wangzhijiang...@aliyun.com> wrote:
>  Hi Gerard,
>
> From the below stack, it can only indicate the task is canceled that
> may be triggered by job manager becuase of other task failure. If the task
> can not be interrupted within timeout config, the task managerprocess will
> be exited. Do you see any OutOfMemory messages from the task manager log?
> Normally the ouput serialization buffer is managed by task manager
> framework and will not cause OOM, and on the input desearialization side,
> there will be a temp bytes array on each channel for holding partial
> records which is not managed by framework. I think you can confirm whether
> and where caused the OOM. Maybe check the task failure logs.
>
> Zhijiang
>
> --
> 发件人:gerardg 
> 发送时间:2018年6月30日(星期六) 00:12
> 收件人:user 
> 主 题:Re: Flink job hangs/deadlocks (possibly related to out of memory)
>
> (fixed formatting)
>
> Hello,
>
> We have experienced some problems where a task just hangs without showing
> any kind of log error while other tasks running in the same task manager
> continue without problems. When these tasks are restarted the task manager
> gets killed and shows several errors similar to these ones:
>
> [Canceler/Interrupts for (...)' did not react to cancelling signal for 30
> seconds, but is stuck in method:
> java.nio.ByteBuffer.wrap(ByteBuffer.java:373)
> java.nio.ByteBuffer.wrap(ByteBuffer.java:396)
> org.apache.flink.core.memory.DataOutputSerializer.resize(DataOutputSerializer.java:330)
> org.apache.flink.core.memory.DataOutputSerializer.writeInt(DataOutputSerializer.java:212)
> org.apache.flink.api.common.typeutils.base.IntSerializer.serialize(IntSerializer.java:63)
> org.apache.flink.api.common.typeutils.base.IntSerializer.serialize(IntSerializer.java:27)
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:113)
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:32)
> org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$serialize$1.apply(TraversableSerializer.scala:98)
> org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$serialize$1.a

Re: Flink job hangs/deadlocks (possibly related to out of memory)

2018-07-16 Thread Fabian Hueske
Hi Gerard,

Thanks for reporting this issue. I'm pulling in Nico and Piotr who have
been working on the networking stack lately and might have some ideas
regarding your issue.

Best, Fabian

2018-07-13 13:00 GMT+02:00 Zhijiang(wangzhijiang999) <
wangzhijiang...@aliyun.com>:

> Hi Gerard,
>
> I thought the failed task triggers cancel process before, now I am clear
> that you cancel the task when it stops processing data.
> I think you can jstack the process to find where task thread is blocked
> instead of canceling it, then we may find some hints.
>
> In addition, the following stack "DataOutputSerializer.resize" indicates
> the task is serializing the record and there will be overhead byte buffers
> in the serializer for copying data temporarily. And if your record is too
> large, it may cause OOM in this process and this overhead memory is not
> managed by flink framework. Also you can monitor the gc status to check the
> full gc delay.
>
> Best,
> Zhijiang
>
> --
> 发件人:Gerard Garcia 
> 发送时间:2018年7月13日(星期五) 16:22
> 收件人:wangzhijiang999 
> 抄 送:user 
> 主 题:Re: Flink job hangs/deadlocks (possibly related to out of memory)
>
> Hi Zhijiang,
>
> The problem is that no other task failed first. We have a task that
> sometimes just stops processing data, and when we cancel it, we see the
> logs messages  saying:
>
> " Task (...) did not react to cancelling signal for 30 seconds, but is
> stuck in method: 
> org.apache.flink.core.memory.DataOutputSerializer.resize(DataOutputSerializer.java:305)
> org.apache.flink.core.memory.DataOutputSerializer.write(DataOutputSerializer.java:133)
> org.apache.flink.types.StringValue.writeString(StringValue.java:802)
> (...)"
>
> That is why we suspect that it hangs forever at that point and that is why
> it stops processing data. I don;t see any increase in memory use in the
> heap (I guess because these buffers are managed by Flink) so I'm not sure
> if that is really the problem.
>
> Gerard
>
> On Tue, Jul 3, 2018 at 6:15 AM Zhijiang(wangzhijiang999) <
> wangzhijiang...@aliyun.com> wrote:
> Hi Gerard,
>
> I think you can check the job manager log to find which task failed at
> first, and then trace the task manager log containing the failed task to
> find the initial reason.
> The failed task will trigger canceling all the other tasks, and during
> canceling process, the blocked task that is waiting for output buffer can
> not be interrupted by the
> canceler thread which is shown in your description. So I think the cancel
> process is not the key point and is in expectation. Maybe it did not cause
> OOM at all.
> If the taskduring canceling, the task manager process will be exited
> finally to trigger restarting the job.
>
> Zhijiang
> --
> 发件人:Gerard Garcia 
> 发送时间:2018年7月2日(星期一) 18:29
> 收件人:wangzhijiang999 
> 抄 送:user 
> 主 题:Re: Flink job hangs/deadlocks (possibly related to out of memory)
>
> Thanks Zhijiang,
>
> We haven't found any other relevant log messages anywhere. These traces
> belong to the unresponsive task, that is why we suspect that at some point
> it did not have enough memory to serialize the message and it blocked. I've
> also found that when it hanged several output buffers were full (see
> attached image buffers.outPoolUsage.png) so I guess the traces just
> reflect that.
>
> Probably the task hanged for some other reason and that is what filled the
> output buffers previous to the blocked operator. I'll have to continue
> investigating to find the real cause.
>
> Gerard
>
>
>
>
> On Mon, Jul 2, 2018 at 9:50 AM Zhijiang(wangzhijiang999) <
> wangzhijiang...@aliyun.com> wrote:
>  Hi Gerard,
>
> From the below stack, it can only indicate the task is canceled that
> may be triggered by job manager becuase of other task failure. If the task
> can not be interrupted within timeout config, the task managerprocess will
> be exited. Do you see any OutOfMemory messages from the task manager log?
> Normally the ouput serialization buffer is managed by task manager
> framework and will not cause OOM, and on the input desearialization side,
> there will be a temp bytes array on each channel for holding partial
> records which is not managed by framework. I think you can confirm whether
> and where caused the OOM. Maybe check the task failure logs.
>
> Zhijiang
>
> --
> 发件人:gerardg 
> 发送时间:2018年6月30日(星期六) 00:12
> 收件人:user 
> 主 题:Re: Flink job hangs/deadlocks (possibly related to out of memory)
>

Re: Flink job hangs/deadlocks (possibly related to out of memory)

2018-07-16 Thread Piotr Nowojski
Hi Gerard,

I second to what Zhijiang wrote. Please check GC pauses, either via GC logging, 
3rd party tool like jconsole (or some memory profiler) or via enabling resource 
logging in Flink. 

After confirming that this is not the issue next time this happens, instead of 
cancelling the job, please collect thread dumps on a process that is stuck.

Piotrek  

> On 16 Jul 2018, at 13:53, Fabian Hueske  wrote:
> 
> Hi Gerard,
> 
> Thanks for reporting this issue. I'm pulling in Nico and Piotr who have been 
> working on the networking stack lately and might have some ideas regarding 
> your issue.
> 
> Best, Fabian
> 
> 2018-07-13 13:00 GMT+02:00 Zhijiang(wangzhijiang999) 
> mailto:wangzhijiang...@aliyun.com>>:
> Hi Gerard,
> 
> I thought the failed task triggers cancel process before, now I am clear that 
> you cancel the task when it stops processing data.
> I think you can jstack the process to find where task thread is blocked 
> instead of canceling it, then we may find some hints.
> 
> In addition, the following stack "DataOutputSerializer.resize" indicates the 
> task is serializing the record and there will be overhead byte buffers in the 
> serializer for copying data temporarily. And if your record is too large, it 
> may cause OOM in this process and this overhead memory is not managed by 
> flink framework. Also you can monitor the gc status to check the full gc 
> delay.
> 
> Best,
> Zhijiang
> --
> 发件人:Gerard Garcia mailto:ger...@talaia.io>>
> 发送时间:2018年7月13日(星期五) 16:22
> 收件人:wangzhijiang999  <mailto:wangzhijiang...@aliyun.com>>
> 抄 送:user mailto:user@flink.apache.org>>
> 主 题:Re: Flink job hangs/deadlocks (possibly related to out of memory)
> 
> Hi Zhijiang,
> 
> The problem is that no other task failed first. We have a task that sometimes 
> just stops processing data, and when we cancel it, we see the logs messages  
> saying:
> 
> " Task (...) did not react to cancelling signal for 30 seconds, but is stuck 
> in method: 
> org.apache.flink.core.memory.DataOutputSerializer.resize(DataOutputSerializer.java:305)
>  
> org.apache.flink.core.memory.DataOutputSerializer.write(DataOutputSerializer.java:133)
>  org.apache.flink.types.StringValue.writeString(StringValue.java:802)
> (...)"
> 
> That is why we suspect that it hangs forever at that point and that is why it 
> stops processing data. I don;t see any increase in memory use in the heap (I 
> guess because these buffers are managed by Flink) so I'm not sure if that is 
> really the problem.
> 
> Gerard
> 
> On Tue, Jul 3, 2018 at 6:15 AM Zhijiang(wangzhijiang999) 
> mailto:wangzhijiang...@aliyun.com>> wrote:
> Hi Gerard,
> 
> I think you can check the job manager log to find which task failed at first, 
> and then trace the task manager log containing the failed task to find the 
> initial reason.
> The failed task will trigger canceling all the other tasks, and during 
> canceling process, the blocked task that is waiting for output buffer can not 
> be interrupted by the
> canceler thread which is shown in your description. So I think the cancel 
> process is not the key point and is in expectation. Maybe it did not cause 
> OOM at all.
> If the taskduring canceling, the task manager process will be exited finally 
> to trigger restarting the job.
> 
> Zhijiang
> --
> 发件人:Gerard Garcia mailto:ger...@talaia.io>>
> 发送时间:2018年7月2日(星期一) 18:29
> 收件人:wangzhijiang999  <mailto:wangzhijiang...@aliyun.com>>
> 抄 送:user mailto:user@flink.apache.org>>
> 主 题:Re: Flink job hangs/deadlocks (possibly related to out of memory)
> 
> Thanks Zhijiang,
> 
> We haven't found any other relevant log messages anywhere. These traces 
> belong to the unresponsive task, that is why we suspect that at some point it 
> did not have enough memory to serialize the message and it blocked. I've also 
> found that when it hanged several output buffers were full (see attached 
> image buffers.outPoolUsage.png) so I guess the traces just reflect that.
> 
> Probably the task hanged for some other reason and that is what filled the 
> output buffers previous to the blocked operator. I'll have to continue 
> investigating to find the real cause.
> 
> Gerard
> 
> 
> 
> 
> On Mon, Jul 2, 2018 at 9:50 AM Zhijiang(wangzhijiang999) 
> mailto:wangzhijiang...@aliyun.com>> wrote:
>  Hi Gerard,
> 
> From the below stack, it can only indicate the task is canceled that may 
> be triggered by job manager becuase of other task failure. If the task can 
>

Re: Flink job hangs/deadlocks (possibly related to out of memory)

2018-07-17 Thread Piotr Nowojski
  - locked <0x7f4b5488f2b8> (a java.lang.Object)
at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:103)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
at java.lang.Thread.run(Thread.java:748)

> On 16 Jul 2018, at 17:03, Gerard Garcia  <mailto:ger...@talaia.io>> wrote:
> 
> Hi Piotr,
> 
> I attach the GC pauses logged a while back when the task stopped processing 
> during several hours (it stopped at about 20:05) and a jstack dump from the 
> last time the task hanged. 
> 
> Thanks,
> 
> Gerard
> 
> On Mon, Jul 16, 2018 at 4:12 PM Piotr Nowojski  <mailto:pi...@data-artisans.com>> wrote:
> Hi Gerard,
> 
> I second to what Zhijiang wrote. Please check GC pauses, either via GC 
> logging, 3rd party tool like jconsole (or some memory profiler) or via 
> enabling resource logging in Flink. 
> 
> After confirming that this is not the issue next time this happens, instead 
> of cancelling the job, please collect thread dumps on a process that is stuck.
> 
> Piotrek  
> 
>> On 16 Jul 2018, at 13:53, Fabian Hueske > <mailto:fhue...@gmail.com>> wrote:
>> 
>> Hi Gerard,
>> 
>> Thanks for reporting this issue. I'm pulling in Nico and Piotr who have been 
>> working on the networking stack lately and might have some ideas regarding 
>> your issue.
>> 
>> Best, Fabian
>> 
>> 2018-07-13 13:00 GMT+02:00 Zhijiang(wangzhijiang999) 
>> mailto:wangzhijiang...@aliyun.com>>:
>> Hi Gerard,
>> 
>> I thought the failed task triggers cancel process before, now I am clear 
>> that you cancel the task when it stops processing data.
>> I think you can jstack the process to find where task thread is blocked 
>> instead of canceling it, then we may find some hints.
>> 
>> In addition, the following stack "DataOutputSerializer.resize" indicates the 
>> task is serializing the record and there will be overhead byte buffers in 
>> the serializer for copying data temporarily. And if your record is too 
>> large, it may cause OOM in this process and this overhead memory is not 
>> managed by flink framework. Also you can monitor the gc status to check the 
>> full gc delay.
>> 
>> Best,
>> Zhijiang
>> --
>> 发件人:Gerard Garcia mailto:ger...@talaia.io>>
>> 发送时间:2018年7月13日(星期五) 16:22
>> 收件人:wangzhijiang999 > <mailto:wangzhijiang...@aliyun.com>>
>> 抄 送:user mailto:user@flink.apache.org>>
>> 主 题:Re: Flink job hangs/deadlocks (possibly related to out of memory)
>> 
>> Hi Zhijiang,
>> 
>> The problem is that no other task failed first. We have a task that 
>> sometimes just stops processing data, and when we cancel it, we see the logs 
>> messages  saying:
>> 
>> " Task (...) did not react to cancelling signal for 30 seconds, but is stuck 
>> in method: 
>> org.apache.flink.core.memory.DataOutputSerializer.resize(DataOutputSerializer.java:305)
>>  
>> org.apache.flink.core.memory.DataOutputSerializer.write(DataOutputSerializer.java:133)
>>  org.apache.flink.types.StringValue.writeString(StringValue.java:802)
>> (...)"
>> 
>> That is why we suspect that it hangs forever at that point and that is why 
>> it stops processing data. I don;t see any increase in memory use in the heap 
>> (I guess because these buffers are managed by Flink) so I'm not sure if that 
>> is really the problem.
>> 
>> Gerard
>> 
>> On Tue, Jul 3, 2018 at 6:15 AM Zhijiang(wangzhijiang999) 
>> mailto:wangzhijiang...@aliyun.com>> wrote:
>> Hi Gerard,
>> 
>> I think you can check the job manager log to find which task failed at 
>> first, and then trace the task manager log containing the failed task to 
>> find the initial reason.
>> The failed task will trigger canceling all the other tasks, and during 
>> canceling process, the blocked task that is waiting for output buffer can 
>> not be interrupted by the
>> canceler thread which is shown in your description. So I think the cancel 
>> process is not the key point and is in expectation. Maybe it did not cause 
>> OOM at all.
>> If the taskduring canceling, the task manager process will be exited finally 
>> to trigger restarting the job.
>> 
>> Zhijiang
>> --
>> 发件人:Gerard Garcia mailto:ger...@talaia.io>>
>> 发送

Re: Flink job hangs/deadlocks (possibly related to out of memory)

2018-07-23 Thread Gerard Garcia
Thanks Zhijiang,

Yes, I guess our best option right now is to just reduce the structure of
the output record and see if that solves the problem.

Gerard

On Tue, Jul 17, 2018 at 4:56 PM Zhijiang(wangzhijiang999) <
wangzhijiang...@aliyun.com> wrote:

> Hi Gerard,
>
> From the jstack you provided, the task is serializing the output record
> and during this process it will not process the input data any more.
> It can not indicate out of memory issue from this stack. And if the output
> buffer is exhausted, the task will be blocked on requestBufferBlocking
> process.
>
> I think the key point is your output record is too large and complicated
> structure, because every field and collection in this complicated class
> will be traversed to serialize, then it will cost much time and CPU usage.
> Furthermore, the checkpoint can not be done because of waiting for lock
> which is also occupied by task output process.
>
> As you mentioned, it makes sense to check the data structure of the output
> record and reduces the size or make it lightweight to handle.
>
> Best,
>
> Zhijiang
>
> --
> 发件人:Gerard Garcia 
> 发送时间:2018年7月17日(星期二) 21:53
> 收件人:piotr 
> 抄 送:fhueske ; wangzhijiang999 <
> wangzhijiang...@aliyun.com>; user ; nico <
> n...@data-artisans.com>
> 主 题:Re: Flink job hangs/deadlocks (possibly related to out of memory)
>
> Yes, I'm using Flink 1.5.0 and what I'm serializing is a really big record
> (probably too big, we have already started working to reduce its size)
> which consists of several case classes which have (among others) fields of
> type String.
>
> I attach a CPU profile of the thread stuck serializing. I also attach the
> memory and GC telemetry that the profiler shows (which maybe is more
> informative than the one recorded from the JVM metrics). Only one node was
> actually "doing something" all others had CPU usage near zero.
>
> The task is at the same time trying to perform a checkpoint but keeps
> failing. Would it make sense that the problem is that there is not enough
> memory available to perform the checkpoint so all operators are stuck
> waiting for it to finish, and at the same time, the operator stuck
> serializing is keeping all the memory so neither it nor the checkpoint can
> advance?
>
> I realized that I don't have a minimum pause between checkpoints so it is
> continuously trying. Maybe I can reduce the checkpoint timeout from the 10m
> default and introduce a minimum pause (e.g. 5m timeout and 5m minimum
> pause) and this way I could break the deadlock.
>
> Gerard
>
>
> On Tue, Jul 17, 2018 at 9:00 AM Piotr Nowojski 
> wrote:
> Hi,
>
> Thanks for the additional data. Just to make sure, are you using Flink
> 1.5.0?
>
> There are a couple of threads that seams to be looping in serialisation,
> while others are blocked and either waiting for new data or waiting for
> some one to consume some data. Could you debug or CPU profile the code, in
> particularly focusing on threads with stack trace as below [1]. Aren’t you
> trying to serialise some gigantic String?
>
> Piotrek
>
> [1]:
>
> "(...) (19/26)" #2737 prio=5 os_prio=0 tid=0x7f52584d2800 nid=0x6819
> runnable [0x7f451a843000]
>java.lang.Thread.State: RUNNABLE
> at
> org.apache.flink.core.memory.DataOutputSerializer.write(DataOutputSerializer.java:133)
> at org.apache.flink.types.StringValue.writeString(StringValue.java:812)
> at
> org.apache.flink.api.common.typeutils.base.StringSerializer.serialize(StringSerializer.java:64)
> at
> org.apache.flink.api.common.typeutils.base.StringSerializer.serialize(StringSerializer.java:28)
> at
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:113)
> at
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:32)
> at
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:113)
> at
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:32)
> at
> org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$serialize$1.apply(TraversableSerializer.scala:98)
> at
> org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$serialize$1.apply(TraversableSerializer.scala:93)
> at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
> at
> org.apache.flink.api.scala.typeutils.TraversableSerializer.serialize(TraversableSerializer.scala:93)
> at
> org.apache.flink.api.scala.typeutils.Traversab

回复:Flink job hangs/deadlocks (possibly related to out of memory)

2018-07-02 Thread Zhijiang(wangzhijiang999)
 Hi Gerard,

From the below stack, it can only indicate the task is canceled that may be 
triggered by job manager becuase of other task failure. If the task can not be 
interrupted within timeout config, the task managerprocess will be exited. Do 
you see any OutOfMemory messages from the task manager log?  Normally the ouput 
serialization buffer is managed by task manager framework and will not cause 
OOM, and on the input desearialization side, there will be a temp bytes array 
on each channel for holding partial records which is not managed by framework. 
I think you can confirm whether and where caused the OOM. Maybe check the task 
failure logs.

Zhijiang


--
发件人:gerardg 
发送时间:2018年6月30日(星期六) 00:12
收件人:user 
主 题:Re: Flink job hangs/deadlocks (possibly related to out of memory)

(fixed formatting) 

 Hello, 

 We have experienced some problems where a task just hangs without showing any 
kind of log error while other tasks running in the same task manager continue 
without problems. When these tasks are restarted the task manager gets killed 
and shows several errors similar to these ones: 

[Canceler/Interrupts for (...)' did not react to cancelling signal for 30 
seconds, but is stuck in method: java.nio.ByteBuffer.wrap(ByteBuffer.java:373) 
java.nio.ByteBuffer.wrap(ByteBuffer.java:396) 
org.apache.flink.core.memory.DataOutputSerializer.resize(DataOutputSerializer.java:330)
 
org.apache.flink.core.memory.DataOutputSerializer.writeInt(DataOutputSerializer.java:212)
 
org.apache.flink.api.common.typeutils.base.IntSerializer.serialize(IntSerializer.java:63)
 
org.apache.flink.api.common.typeutils.base.IntSerializer.serialize(IntSerializer.java:27)
 
org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:113)
 
org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:32)
 
org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$serialize$1.apply(TraversableSerializer.scala:98)
 
org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$serialize$1.apply(TraversableSerializer.scala:93)
 scala.collection.immutable.List.foreach(List.scala:392) 
org.apache.flink.api.scala.typeutils.TraversableSerializer.serialize(TraversableSerializer.scala:93)
 
org.apache.flink.api.scala.typeutils.TraversableSerializer.serialize(TraversableSerializer.scala:33)
 
org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:113)
 
org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:32)
 
org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:177)
 
org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:49)
 
org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:54)
 
org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:88)
 
org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:129)
 
org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:105)
 
org.apache.flink.streaming.runtime.io.StreamRecordWriter.emit(StreamRecordWriter.java:81)
 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107)
 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)
 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45)
 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingBroadcastingOutputCollector.collect(OperatorChain.java:667)
 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingBroadcastingOutputCollector.collect(OperatorChain.java:653)
 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:679)
 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:657)
 
org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
 (...) 
org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:50)
 
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.pushToOperator(OperatorChain.java:469)
 
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:446)
 
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:405)
 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingBroadcastingOutputCollector.collect(OperatorChain.java:672)
 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingBroadcastingOutputCollector.collect(OperatorChain.jav

回复:Flink job hangs/deadlocks (possibly related to out of memory)

2018-07-02 Thread Zhijiang(wangzhijiang999)
Hi Gerard,

I think you can check the job manager log to find which task failed at first, 
and then trace the task manager log containing the failed task to find the 
initial reason.
The failed task will trigger canceling all the other tasks, and during 
canceling process, the blocked task that is waiting for output buffer can not 
be interrupted by the
canceler thread which is shown in your description. So I think the cancel 
process is not the key point and is in expectation. Maybe it did not cause OOM 
at all. 
If the taskduring canceling, the task manager process will be exited finally to 
trigger restarting the job.

Zhijiang
--
发件人:Gerard Garcia 
发送时间:2018年7月2日(星期一) 18:29
收件人:wangzhijiang999 
抄 送:user 
主 题:Re: Flink job hangs/deadlocks (possibly related to out of memory)

Thanks Zhijiang,

We haven't found any other relevant log messages anywhere. These traces belong 
to the unresponsive task, that is why we suspect that at some point it did not 
have enough memory to serialize the message and it blocked. I've also found 
that when it hanged several output buffers were full (see attached image 
buffers.outPoolUsage.png) so I guess the traces just reflect that.

Probably the task hanged for some other reason and that is what filled the 
output buffers previous to the blocked operator. I'll have to continue 
investigating to find the real cause.

Gerard




On Mon, Jul 2, 2018 at 9:50 AM Zhijiang(wangzhijiang999) 
 wrote:
 Hi Gerard,

From the below stack, it can only indicate the task is canceled that may be 
triggered by job manager becuase of other task failure. If the task can not be 
interrupted within timeout config, the task managerprocess will be exited. Do 
you see any OutOfMemory messages from the task manager log?  Normally the ouput 
serialization buffer is managed by task manager framework and will not cause 
OOM, and on the input desearialization side, there will be a temp bytes array 
on each channel for holding partial records which is not managed by framework. 
I think you can confirm whether and where caused the OOM. Maybe check the task 
failure logs.

Zhijiang

--
发件人:gerardg 
发送时间:2018年6月30日(星期六) 00:12
收件人:user 
主 题:Re: Flink job hangs/deadlocks (possibly related to out of memory)

(fixed formatting) 

 Hello, 

 We have experienced some problems where a task just hangs without showing any 
kind of log error while other tasks running in the same task manager continue 
without problems. When these tasks are restarted the task manager gets killed 
and shows several errors similar to these ones: 

[Canceler/Interrupts for (...)' did not react to cancelling signal for 30 
seconds, but is stuck in method: java.nio.ByteBuffer.wrap(ByteBuffer.java:373) 
java.nio.ByteBuffer.wrap(ByteBuffer.java:396) 
org.apache.flink.core.memory.DataOutputSerializer.resize(DataOutputSerializer.java:330)
 
org.apache.flink.core.memory.DataOutputSerializer.writeInt(DataOutputSerializer.java:212)
 
org.apache.flink.api.common.typeutils.base.IntSerializer.serialize(IntSerializer.java:63)
 
org.apache.flink.api.common.typeutils.base.IntSerializer.serialize(IntSerializer.java:27)
 
org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:113)
 
org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:32)
 
org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$serialize$1.apply(TraversableSerializer.scala:98)
 
org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$serialize$1.apply(TraversableSerializer.scala:93)
 scala.collection.immutable.List.foreach(List.scala:392) 
org.apache.flink.api.scala.typeutils.TraversableSerializer.serialize(TraversableSerializer.scala:93)
 
org.apache.flink.api.scala.typeutils.TraversableSerializer.serialize(TraversableSerializer.scala:33)
 
org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:113)
 
org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:32)
 
org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:177)
 
org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:49)
 
org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:54)
 
org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:88)
 
org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:129)
 
org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:105)
 
org.apache.flink.streaming.runtime.io.StreamRecordWriter.emit(StreamRecordWriter.java:81)
 
org.apache.flink.streaming.runtime.io.RecordWriterOutpu

回复:Flink job hangs/deadlocks (possibly related to out of memory)

2018-07-13 Thread Zhijiang(wangzhijiang999)
Hi Gerard,

I thought the failed task triggers cancel process before, now I am clear that 
you cancel the task when it stops processing data.
I think you can jstack the process to find where task thread is blocked instead 
of canceling it, then we may find some hints.

In addition, the following stack "DataOutputSerializer.resize" indicates the 
task is serializing the record and there will be overhead byte buffers in the 
serializer for copying data temporarily. And if your record is too large, it 
may cause OOM in this process and this overhead memory is not managed by flink 
framework. Also you can monitor the gc status to check the full gc delay.

Best,
Zhijiang
--
发件人:Gerard Garcia 
发送时间:2018年7月13日(星期五) 16:22
收件人:wangzhijiang999 
抄 送:user 
主 题:Re: Flink job hangs/deadlocks (possibly related to out of memory)

Hi Zhijiang,

The problem is that no other task failed first. We have a task that sometimes 
just stops processing data, and when we cancel it, we see the logs messages  
saying:

" Task (...) did not react to cancelling signal for 30 seconds, but is stuck in 
method: 
org.apache.flink.core.memory.DataOutputSerializer.resize(DataOutputSerializer.java:305)
 
org.apache.flink.core.memory.DataOutputSerializer.write(DataOutputSerializer.java:133)
 org.apache.flink.types.StringValue.writeString(StringValue.java:802)
(...)"

That is why we suspect that it hangs forever at that point and that is why it 
stops processing data. I don;t see any increase in memory use in the heap (I 
guess because these buffers are managed by Flink) so I'm not sure if that is 
really the problem.

Gerard
On Tue, Jul 3, 2018 at 6:15 AM Zhijiang(wangzhijiang999) 
 wrote:
Hi Gerard,

I think you can check the job manager log to find which task failed at first, 
and then trace the task manager log containing the failed task to find the 
initial reason.
The failed task will trigger canceling all the other tasks, and during 
canceling process, the blocked task that is waiting for output buffer can not 
be interrupted by the
canceler thread which is shown in your description. So I think the cancel 
process is not the key point and is in expectation. Maybe it did not cause OOM 
at all. 
If the taskduring canceling, the task manager process will be exited finally to 
trigger restarting the job.

Zhijiang
--
发件人:Gerard Garcia 
发送时间:2018年7月2日(星期一) 18:29
收件人:wangzhijiang999 
抄 送:user 
主 题:Re: Flink job hangs/deadlocks (possibly related to out of memory)

Thanks Zhijiang,

We haven't found any other relevant log messages anywhere. These traces belong 
to the unresponsive task, that is why we suspect that at some point it did not 
have enough memory to serialize the message and it blocked. I've also found 
that when it hanged several output buffers were full (see attached image 
buffers.outPoolUsage.png) so I guess the traces just reflect that.

Probably the task hanged for some other reason and that is what filled the 
output buffers previous to the blocked operator. I'll have to continue 
investigating to find the real cause.

Gerard




On Mon, Jul 2, 2018 at 9:50 AM Zhijiang(wangzhijiang999) 
 wrote:
 Hi Gerard,

From the below stack, it can only indicate the task is canceled that may be 
triggered by job manager becuase of other task failure. If the task can not be 
interrupted within timeout config, the task managerprocess will be exited. Do 
you see any OutOfMemory messages from the task manager log?  Normally the ouput 
serialization buffer is managed by task manager framework and will not cause 
OOM, and on the input desearialization side, there will be a temp bytes array 
on each channel for holding partial records which is not managed by framework. 
I think you can confirm whether and where caused the OOM. Maybe check the task 
failure logs.

Zhijiang

--
发件人:gerardg 
发送时间:2018年6月30日(星期六) 00:12
收件人:user 
主 题:Re: Flink job hangs/deadlocks (possibly related to out of memory)

(fixed formatting) 

 Hello, 

 We have experienced some problems where a task just hangs without showing any 
kind of log error while other tasks running in the same task manager continue 
without problems. When these tasks are restarted the task manager gets killed 
and shows several errors similar to these ones: 

[Canceler/Interrupts for (...)' did not react to cancelling signal for 30 
seconds, but is stuck in method: java.nio.ByteBuffer.wrap(ByteBuffer.java:373) 
java.nio.ByteBuffer.wrap(ByteBuffer.java:396) 
org.apache.flink.core.memory.DataOutputSerializer.resize(DataOutputSerializer.java:330)
 
org.apache.flink.core.memory.DataOutputSerializer.writeInt(DataOutputSerializer.java:212)
 
org.apache.flink.api.common.typeutils.base.IntSerializer.serialize(IntSerializer.java:63)
 
org.apache.flink.api.

回复:Flink job hangs/deadlocks (possibly related to out of memory)

2018-07-17 Thread Zhijiang(wangzhijiang999)
Hi Gerard,

From the jstack you provided, the task is serializing the output record and 
during this process it will not process the input data any more. 
It can not indicate out of memory issue from this stack. And if the output 
buffer is exhausted, the task will be blocked on requestBufferBlocking process.

I think the key point is your output record is too large and complicated 
structure, because every field and collection in this complicated class will be 
traversed to serialize, then it will cost much time and CPU usage. Furthermore, 
the checkpoint can not be done because of waiting for lock which is also 
occupied by task output process.

As you mentioned, it makes sense to check the data structure of the output 
record and reduces the size or make it lightweight to handle. 

Best,

Zhijiang


--
发件人:Gerard Garcia 
发送时间:2018年7月17日(星期二) 21:53
收件人:piotr 
抄 送:fhueske ; wangzhijiang999 ; 
user ; nico 
主 题:Re: Flink job hangs/deadlocks (possibly related to out of memory)

Yes, I'm using Flink 1.5.0 and what I'm serializing is a really big record 
(probably too big, we have already started working to reduce its size) which 
consists of several case classes which have (among others) fields of type 
String. 

I attach a CPU profile of the thread stuck serializing. I also attach the 
memory and GC telemetry that the profiler shows (which maybe is more 
informative than the one recorded from the JVM metrics). Only one node was 
actually "doing something" all others had CPU usage near zero.

The task is at the same time trying to perform a checkpoint but keeps failing. 
Would it make sense that the problem is that there is not enough memory 
available to perform the checkpoint so all operators are stuck waiting for it 
to finish, and at the same time, the operator stuck serializing is keeping all 
the memory so neither it nor the checkpoint can advance? 

I realized that I don't have a minimum pause between checkpoints so it is 
continuously trying. Maybe I can reduce the checkpoint timeout from the 10m 
default and introduce a minimum pause (e.g. 5m timeout and 5m minimum pause) 
and this way I could break the deadlock.

Gerard


On Tue, Jul 17, 2018 at 9:00 AM Piotr Nowojski  wrote:
Hi,

Thanks for the additional data. Just to make sure, are you using Flink 1.5.0?

There are a couple of threads that seams to be looping in serialisation, while 
others are blocked and either waiting for new data or waiting for some one to 
consume some data. Could you debug or CPU profile the code, in particularly 
focusing on threads with stack trace as below [1]. Aren’t you trying to 
serialise some gigantic String?

Piotrek

[1]:

"(...) (19/26)" #2737 prio=5 os_prio=0 tid=0x7f52584d2800 nid=0x6819 
runnable [0x7f451a843000]
   java.lang.Thread.State: RUNNABLE
 at 
org.apache.flink.core.memory.DataOutputSerializer.write(DataOutputSerializer.java:133)
 at org.apache.flink.types.StringValue.writeString(StringValue.java:812)
 at 
org.apache.flink.api.common.typeutils.base.StringSerializer.serialize(StringSerializer.java:64)
 at 
org.apache.flink.api.common.typeutils.base.StringSerializer.serialize(StringSerializer.java:28)
 at 
org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:113)
 at 
org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:32)
 at 
org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:113)
 at 
org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:32)
 at 
org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$serialize$1.apply(TraversableSerializer.scala:98)
 at 
org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$serialize$1.apply(TraversableSerializer.scala:93)
 at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
 at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
 at 
org.apache.flink.api.scala.typeutils.TraversableSerializer.serialize(TraversableSerializer.scala:93)
 at 
org.apache.flink.api.scala.typeutils.TraversableSerializer.serialize(TraversableSerializer.scala:33)
 at 
org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:113)
 at 
org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:32)
 at 
org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$serialize$1.apply(TraversableSerializer.scala:98)
 at 
org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$serialize$1.apply(TraversableSerializer.scala:93)
 at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
 at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
 at 
org.apache.flink.api.scala.typeutils.TraversableSerializer.serialize(