Re: [External Sender] Debugging "Container is running beyond physical memory limits" on YARN for a long running streaming job

2020-10-04 Thread Shubham Kumar
@Kye , Thanks for your suggestions, we are using one yarn app per job mode
and your point is still valid in Flink 1.10 as per docs, it does make sense
to avoid dynamic classloading for such jobs. Also, we seemed to have enough
off heap for resources mentioned and what turned out to be the issue was
RocksDB memory usage (check below).

@Xintong, Yeah, I did try out the solution, the problem is definitely due
to RocksDB, however the problem got solved by something else:

Short answer:
Setting this property in flink-conf.yaml solves the issue:

> state.backend.rocksdb.managed.memory : false


Long answer:

I observed that the OOM kills are a function of the number of restarts
rather than the time for which the application is running. For every
restart, the Taskmanager's RES memory rises by 3.5GB (which is the Flink
managed memory allotted to TM). So, it could only withstand 2-3 restarts
after which OOM kills become frequent as now the other TM will start
getting killed. I enabled RocksDB  block-cache usage metric and it rises up
until it reaches ~3.5 GB.

At this point I tried setting

> containerized.taskmanager.env.MALLOC_ARENA_MAX : 2


This did seem to reduce memory increase for few of the task managers(for
e.g. if there are 12 task managers,  after a restart the RES memory
increases by 3.5 GB for only few of them but not for others), but didn't
solve the issue for me and OOM kills begin to occur after 4-5 restarts. I
also tried setting it to 1, but got similar results. I didn't try using
jemalloc because as per the JIRA issue [1], MALLOC_ARENA_MAX solution
intends to produce similar results.

After setting state.backend.rocksdb.managed.memory: false, the TM RES
memory doesn't increase after any number of restarts, infact after enabling
RocksDB cache usage metrics, it shows around only ~100MB usage (ofcourse
its dependent on the operators and state involved in the job). This might
indicate that Flink is trying to allot more memory than required for
RocksDB and also upon restart the RES memory rises again which is
definitely not the intended behavior.

[1]: https://issues.apache.org/jira/browse/FLINK-18712

Thanks
Shubham


On Fri, Sep 25, 2020 at 8:46 PM Kye Bae  wrote:

> Not sure about Flink 1.10.x. Can share a few things up to Flink 1.9.x:
>
> 1. If your Flink cluster runs only one job, avoid using dynamic
> classloader for your job: start it from one of the Flink class paths. As of
> Flink 1.9.x, using the dynamic classloader results in the same classes
> getting loaded every time the job restarts (self-recovery or otherwise),
> and it could eat up all the JVM "off-heap" memory. Yarn seems to
> immediately kill the container when that happens.
>
> 2. Be sure to leave enough for the JVM "off-heap" area: GC + code cache +
> thread stacks + other Java internal resources end up there.
>
> -K
>
> On Sat, Sep 19, 2020 at 12:09 PM Shubham Kumar 
> wrote:
>
>> Hey everyone,
>>
>> We had deployed a streaming job using Flink 1.10.1 one month back and now
>> we are encountering a Yarn container killed due to memory issues very
>> frequently. I am trying to figure out the root cause of this issue in order
>> to fix it.
>>
>> We have a streaming job whose basic structure looks like this:
>> - Read 6 kafka streams and combine stats from them (union) to form a
>> single stream
>> - stream.keyBy(MyKey)
>>  .window(TumblingEventTimeWindows.of(Time.minutes(1)))
>>  .reduce(MyReduceFunction)
>>  .addSink(new FlinkKafkaProducer011<>...);
>>
>> We are using RocksDB as state backend. In flink-conf.yaml, we used
>> taskmanager.memory.process.size = 10GB with a parallelism of 12 and only
>> one slot per task manager.
>>
>> So, a taskmanager process gets started with the following memory
>> components as indicated in logs:
>>
>> TaskExecutor container... will be started on ... with
>>> TaskExecutorProcessSpec {cpuCores=1.0, frameworkHeapSize=128.000mb (
>>> 134217728 bytes), frameworkOffHeapSize=128.000mb (134217728 bytes),
>>> taskHeapSize=4.125gb (4429184954 bytes), taskOffHeapSize=0 bytes,
>>> networkMemSize=896.000mb (939524110 bytes), managedMemorySize=3.500gb (
>>> 3758096440 bytes), jvmMetaspaceSize=256.000mb (268435456 bytes),
>>> jvmOverheadSize=1024.000mb (1073741824 bytes)}.
>>>
>>
>>>
>>
>>  which are as per defaults.
>>
>> Now, after 25 days we started encountering the following yarn container
>> kill error:
>>
>>> Association with remote system [akka.tcp://flink@...] has failed,
>>> address is now gated for [50] ms. Reason: [Association failed with
>>> [akka.tcp://flink@...]] Caused by: [java.net
>>> <h

Re: Debugging "Container is running beyond physical memory limits" on YARN for a long running streaming job

2020-09-22 Thread Shubham Kumar
Hi Xintong,

Thanks for your insights, they are really helpful.

I understand now that it most certainly is a native memory issue rather
than a heap memory issue and about not trusting Flink's Non-Heap metrics.

I do believe that our structure of job is so simple that I couldn't find
any use of mmap memory or any other straight forward native memory leak
issue. That leads me to believing that it can be a rocksDB issue, although
you do make a valid point about that there is extra 2GB in the yarn
container which should account for RocksDB extra usage. I also saw this
JIRA ticket for RocksDB memory leak issue on K8 kubernetes and was
wondering if the same could happen on yarn containers and is related to my
issue [1]. Let me know what you guys think about this.

Also, I tried running the same job using FileSystemBackend (as a separate
job) and it went fine with no container kills and native memory not rising
over time, which hints further towards RocksDB being the culprit. My state
size in the checkpoint is around 1GB (can probably even think of switching
to FileSystemBackend for this job but still want to figure out the case for
RocksDB). I am using incremental checkpoints in my main job which has
RocksDB state backend, if that's relevant.

I read about native memory tracking and probably go ahead and use Native
Memory Tracking (NMT) or jemalloc to confirm about the RocksDB issue and
update here.

[1]: https://issues.apache.org/jira/browse/FLINK-18712

Thanks
Shubham

On Mon, Sep 21, 2020 at 8:23 AM Xintong Song  wrote:

> Hi Shubham,
>
> Java heap memory cannot cause a container memory exceeding. Heap memory is
> strictly limited by the JVM `-Xmx` parameter. If the program does need more
> memory than the limit, it will run into a heap space OOM, rather than
> implicitly using more memory than the limit.
>
> Several reasons that might lead to container memory exceeding.
> - RocksDB, whose memory controlling is based on estimation rather than
> hard limit. This is one of the most common reasons for such memory
> exceedings. However, usually the extra memory usage introduced by RocksDB,
> if there's any, should not be too large. Given that your container size is
> 12GB and Flink only plans to use 10GB, I'm not sure whether RocksDB is the
> cause in your case. I've CC'ed @Yun Tang, who is the expert of Flink's
> RocksDB state backend.
> - Does your job use mmap memory? MMap memory, if used, is controlled by
> the operating system, not Flink. Depending on your Yarn cgroup
> configurations, some clusters would also count that as part of the
> container memory consumption.
> - Native memory leaks in user code dependencies and libraries could also
> lead to container memory exceeding.
>
> Another suggestion is, do not trust Flink's "Non-Heap" metrics. It is
> practically helpless and misleading. The "Non-Heap" accounts for SOME of
> the non-heap memory usage, but NOT ALL of them. The community is working on
> a new set of metrics and Web UI for the task manager memory tuning.
>
> Thank you~
>
> Xintong Song
>
>
>
> On Sun, Sep 20, 2020 at 12:10 AM Shubham Kumar 
> wrote:
>
>> Hey everyone,
>>
>> We had deployed a streaming job using Flink 1.10.1 one month back and now
>> we are encountering a Yarn container killed due to memory issues very
>> frequently. I am trying to figure out the root cause of this issue in order
>> to fix it.
>>
>> We have a streaming job whose basic structure looks like this:
>> - Read 6 kafka streams and combine stats from them (union) to form a
>> single stream
>> - stream.keyBy(MyKey)
>>  .window(TumblingEventTimeWindows.of(Time.minutes(1)))
>>  .reduce(MyReduceFunction)
>>  .addSink(new FlinkKafkaProducer011<>...);
>>
>> We are using RocksDB as state backend. In flink-conf.yaml, we used
>> taskmanager.memory.process.size = 10GB with a parallelism of 12 and only
>> one slot per task manager.
>>
>> So, a taskmanager process gets started with the following memory
>> components as indicated in logs:
>>
>> TaskExecutor container... will be started on ... with
>>> TaskExecutorProcessSpec {cpuCores=1.0, frameworkHeapSize=128.000mb (
>>> 134217728 bytes), frameworkOffHeapSize=128.000mb (134217728 bytes),
>>> taskHeapSize=4.125gb (4429184954 bytes), taskOffHeapSize=0 bytes,
>>> networkMemSize=896.000mb (939524110 bytes), managedMemorySize=3.500gb (
>>> 3758096440 bytes), jvmMetaspaceSize=256.000mb (268435456 bytes),
>>> jvmOverheadSize=1024.000mb (1073741824 bytes)}.
>>>
>>
>>>
>>
>>  which are as per defaults.
>>
>> Now, after 25 days we started encountering the following yarn container
&

Re: Publishing Sink Task watermarks outside flink

2020-05-03 Thread Shubham Kumar
Following up on this,

I tried tweaking the Jdbc Sink as Timo suggested and was successful in it.
Basically I added a member *long maxWatermarkSeen *in JDBCOutputFormat,
so whenever a new record is added to the batch it updates the
*maxWatermarkSeen* for this subtask with
*org.apache.flink.streaming.api.functions.sink.SinkFunction.Context.watermark*
(if its greater).
 So whenever a *JDBCOutputFormat.flush()* is called I can be sure that
after executing batch, all records having timestamp below *maxWatermarkSeen*
are pushed to JDBC.

Now, the actual answer I am looking for is minimum of *maxWatermarkSeen*
for all subtasks. I can constantly update this to DB as <*Subtask Index,
Watermark*> and take minimum in DB.
 I guess the aggregation can't be done inside flink amongst subtasks?

Now, I have two questions:

1) Should I update this to DB using async I/O feature of flink or just
perform a blocking query in *JDBCOutputFormat.flush()* function after
executing the batch.
2) If I will be using Kafka sink (or any other sink for that matter), do I
have to again tweak around with its SinkFunction for this functionality?
   General idea being that this a common functionality for users to know
till what timestamp is sink complete and can have simpler solutions.

Thanks
Shubham

On Wed, Apr 29, 2020 at 3:27 AM Shubham Kumar 
wrote:

> Hi Timo,
>
> Yeah, I got the idea of getting access to timers through process function
> and had the same result which you explained
> that is a side output doesn't guarantee that the data is written out to
> sink. (so maybe Fabian in that post pointed out something
> else which I am missing). If I am correct then, data is written to side
> output as soon as it is processed in the Process function (maybe in
> process function itself on Ontimer call if a timer has been set, right?
>
> I am doing all computation in Datastream and then adding a mapper
> to convert to DataStream to sink through JdbcAppendTableSink
> which is part of Table API I think. I will definitely try exploring the
> Jdbc Sink function and context to get the watermark.
>
> Thinking out of the box, is it possible to add some extra operator after
> sink which will always have watermark which is greater than sink function
> watermarks,
> as its a downstream operator.
> Also, does the problem simplify if we have Kafka sink?
>
> On Tue, Apr 28, 2020 at 10:35 PM Timo Walther  wrote:
>
>> Hi Shubham,
>>
>> you can call stream.process(...). The context of ProcessFunction gives
>> you access to TimerService which let's you access the current watermark.
>>
>> I'm assuming your are using the Table API? As far as I remember,
>> watermark are travelling through the stream even if there is no
>> time-based operation happening. But you should double check that.
>>
>> However, a side output does not guarantee that the data has already been
>> written out to the sink. So I would recommend to customize the JDBC sink
>> instead and look into the row column for getting the current timestamp.
>>
>> Or even better, there should also be
>> org.apache.flink.streaming.api.functions.sink.SinkFunction.Context with
>> access to watermark.
>>
>> I hope this helps.
>>
>> Regards,
>> Timo
>>
>> On 28.04.20 13:07, Shubham Kumar wrote:
>> > Hi everyone,
>> >
>> > I have a flink application having kafka sources which calculates some
>> > stats based on it and pushes it to JDBC. Now, I want to know till what
>> > timestamp is the data completely pushed in JDBC (i.e. no more data will
>> > be pushed to timestamp smaller or equal than this). There doesn't seem
>> > to be any direct programmatic way to do so.
>> >
>> > I came across the following thread which seemed most relevant to my
>> > problem:
>> >
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/End-of-Window-Marker-td29345.html#a29461
>> >
>> > However, I can't seem to understand how to chain a process function
>> > before the sink task so as to put watermarks to a side output. (I
>> > suspect it might have something to do with datastream.addSink in
>> regular
>> > datastream sinks vs sink.consumeDataStream(stream) in
>> JDBCAppendTableSink).
>> >
>> > Further what happens if there are no windows, how to approach the
>> > problem then?
>> >
>> > Please share any pointers or relevant solution to tackle this.
>> >
>> > --
>> > Thanks & Regards
>> >
>> > Shubham Kumar
>> >
>>
>>
>
> --
> Thanks & Regards
>
> Shubham Kumar
>
>

-- 
Thanks & Regards

Shubham Kumar


Re: Publishing Sink Task watermarks outside flink

2020-04-28 Thread Shubham Kumar
Hi Timo,

Yeah, I got the idea of getting access to timers through process function
and had the same result which you explained
that is a side output doesn't guarantee that the data is written out to
sink. (so maybe Fabian in that post pointed out something
else which I am missing). If I am correct then, data is written to side
output as soon as it is processed in the Process function (maybe in
process function itself on Ontimer call if a timer has been set, right?

I am doing all computation in Datastream and then adding a mapper
to convert to DataStream to sink through JdbcAppendTableSink
which is part of Table API I think. I will definitely try exploring the
Jdbc Sink function and context to get the watermark.

Thinking out of the box, is it possible to add some extra operator after
sink which will always have watermark which is greater than sink function
watermarks,
as its a downstream operator.
Also, does the problem simplify if we have Kafka sink?

On Tue, Apr 28, 2020 at 10:35 PM Timo Walther  wrote:

> Hi Shubham,
>
> you can call stream.process(...). The context of ProcessFunction gives
> you access to TimerService which let's you access the current watermark.
>
> I'm assuming your are using the Table API? As far as I remember,
> watermark are travelling through the stream even if there is no
> time-based operation happening. But you should double check that.
>
> However, a side output does not guarantee that the data has already been
> written out to the sink. So I would recommend to customize the JDBC sink
> instead and look into the row column for getting the current timestamp.
>
> Or even better, there should also be
> org.apache.flink.streaming.api.functions.sink.SinkFunction.Context with
> access to watermark.
>
> I hope this helps.
>
> Regards,
> Timo
>
> On 28.04.20 13:07, Shubham Kumar wrote:
> > Hi everyone,
> >
> > I have a flink application having kafka sources which calculates some
> > stats based on it and pushes it to JDBC. Now, I want to know till what
> > timestamp is the data completely pushed in JDBC (i.e. no more data will
> > be pushed to timestamp smaller or equal than this). There doesn't seem
> > to be any direct programmatic way to do so.
> >
> > I came across the following thread which seemed most relevant to my
> > problem:
> >
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/End-of-Window-Marker-td29345.html#a29461
> >
> > However, I can't seem to understand how to chain a process function
> > before the sink task so as to put watermarks to a side output. (I
> > suspect it might have something to do with datastream.addSink in regular
> > datastream sinks vs sink.consumeDataStream(stream) in
> JDBCAppendTableSink).
> >
> > Further what happens if there are no windows, how to approach the
> > problem then?
> >
> > Please share any pointers or relevant solution to tackle this.
> >
> > --
> > Thanks & Regards
> >
> > Shubham Kumar
> >
>
>

-- 
Thanks & Regards

Shubham Kumar


Publishing Sink Task watermarks outside flink

2020-04-28 Thread Shubham Kumar
Hi everyone,

I have a flink application having kafka sources which calculates some stats
based on it and pushes it to JDBC. Now, I want to know till what timestamp
is the data completely pushed in JDBC (i.e. no more data will be pushed to
timestamp smaller or equal than this). There doesn't seem to be any direct
programmatic way to do so.

I came across the following thread which seemed most relevant to my
problem:
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/End-of-Window-Marker-td29345.html#a29461

However, I can't seem to understand how to chain a process function before
the sink task so as to put watermarks to a side output. (I suspect it might
have something to do with datastream.addSink in regular datastream sinks vs
sink.consumeDataStream(stream) in JDBCAppendTableSink).

Further what happens if there are no windows, how to approach the problem
then?

Please share any pointers or relevant solution to tackle this.

-- 
Thanks & Regards

Shubham Kumar