Re: [External Sender] Debugging "Container is running beyond physical memory limits" on YARN for a long running streaming job

2020-10-04 Thread Shubham Kumar
@Kye , Thanks for your suggestions, we are using one yarn app per job mode
and your point is still valid in Flink 1.10 as per docs, it does make sense
to avoid dynamic classloading for such jobs. Also, we seemed to have enough
off heap for resources mentioned and what turned out to be the issue was
RocksDB memory usage (check below).

@Xintong, Yeah, I did try out the solution, the problem is definitely due
to RocksDB, however the problem got solved by something else:

Short answer:
Setting this property in flink-conf.yaml solves the issue:

> state.backend.rocksdb.managed.memory : false


Long answer:

I observed that the OOM kills are a function of the number of restarts
rather than the time for which the application is running. For every
restart, the Taskmanager's RES memory rises by 3.5GB (which is the Flink
managed memory allotted to TM). So, it could only withstand 2-3 restarts
after which OOM kills become frequent as now the other TM will start
getting killed. I enabled RocksDB  block-cache usage metric and it rises up
until it reaches ~3.5 GB.

At this point I tried setting

> containerized.taskmanager.env.MALLOC_ARENA_MAX : 2


This did seem to reduce memory increase for few of the task managers(for
e.g. if there are 12 task managers,  after a restart the RES memory
increases by 3.5 GB for only few of them but not for others), but didn't
solve the issue for me and OOM kills begin to occur after 4-5 restarts. I
also tried setting it to 1, but got similar results. I didn't try using
jemalloc because as per the JIRA issue [1], MALLOC_ARENA_MAX solution
intends to produce similar results.

After setting state.backend.rocksdb.managed.memory: false, the TM RES
memory doesn't increase after any number of restarts, infact after enabling
RocksDB cache usage metrics, it shows around only ~100MB usage (ofcourse
its dependent on the operators and state involved in the job). This might
indicate that Flink is trying to allot more memory than required for
RocksDB and also upon restart the RES memory rises again which is
definitely not the intended behavior.

[1]: https://issues.apache.org/jira/browse/FLINK-18712

Thanks
Shubham


On Fri, Sep 25, 2020 at 8:46 PM Kye Bae  wrote:

> Not sure about Flink 1.10.x. Can share a few things up to Flink 1.9.x:
>
> 1. If your Flink cluster runs only one job, avoid using dynamic
> classloader for your job: start it from one of the Flink class paths. As of
> Flink 1.9.x, using the dynamic classloader results in the same classes
> getting loaded every time the job restarts (self-recovery or otherwise),
> and it could eat up all the JVM "off-heap" memory. Yarn seems to
> immediately kill the container when that happens.
>
> 2. Be sure to leave enough for the JVM "off-heap" area: GC + code cache +
> thread stacks + other Java internal resources end up there.
>
> -K
>
> On Sat, Sep 19, 2020 at 12:09 PM Shubham Kumar 
> wrote:
>
>> Hey everyone,
>>
>> We had deployed a streaming job using Flink 1.10.1 one month back and now
>> we are encountering a Yarn container killed due to memory issues very
>> frequently. I am trying to figure out the root cause of this issue in order
>> to fix it.
>>
>> We have a streaming job whose basic structure looks like this:
>> - Read 6 kafka streams and combine stats from them (union) to form a
>> single stream
>> - stream.keyBy(MyKey)
>>  .window(TumblingEventTimeWindows.of(Time.minutes(1)))
>>  .reduce(MyReduceFunction)
>>  .addSink(new FlinkKafkaProducer011<>...);
>>
>> We are using RocksDB as state backend. In flink-conf.yaml, we used
>> taskmanager.memory.process.size = 10GB with a parallelism of 12 and only
>> one slot per task manager.
>>
>> So, a taskmanager process gets started with the following memory
>> components as indicated in logs:
>>
>> TaskExecutor container... will be started on ... with
>>> TaskExecutorProcessSpec {cpuCores=1.0, frameworkHeapSize=128.000mb (
>>> 134217728 bytes), frameworkOffHeapSize=128.000mb (134217728 bytes),
>>> taskHeapSize=4.125gb (4429184954 bytes), taskOffHeapSize=0 bytes,
>>> networkMemSize=896.000mb (939524110 bytes), managedMemorySize=3.500gb (
>>> 3758096440 bytes), jvmMetaspaceSize=256.000mb (268435456 bytes),
>>> jvmOverheadSize=1024.000mb (1073741824 bytes)}.
>>>
>>
>>>
>>
>>  which are as per defaults.
>>
>> Now, after 25 days we started encountering the following yarn container
>> kill error:
>>
>>> Association with remote system [akka.tcp://flink@...] has failed,
>>> address is now gated for [50] ms. Reason: [Association failed with
>>> [akka.tcp://flink@...]] Caused by: [java.net
>>> 
>>> .ConnectException: Connection refused: .../...:37679]
>>> 2020-09-09 00:53:24 INFO Closing TaskExecutor connection
>>> container_e193_1592804717489_149347_01_11 because: [2020-09-09 00:53
>>> :21.417]Container 
>>> 

??????Flink??table-api??????.

2020-10-04 Thread ??????
???
???




----
??: 
   "user-zh"

<153488...@qq.com;
:2020??10??4??(??) 5:09
??:"user-zh"

Flink??table-api??????.

2020-10-04 Thread ????????
Hi,all:


Table api??sqljoin"."???
?? 
Exception in thread "main" org.apache.flink.table.api.SqlParserException: SQL 
parse failed. Encountered "." at line 1, column 36.
Was expecting one of:
  

Re: 退订

2020-10-04 Thread 黄潇
Hi,

退订需要发邮件到  user-zh-unsubscr...@flink.apache.org

可以参考 https://flink.apache.org/zh/community.html#section-1


天分 <775794...@qq.com> 于2020年10月4日周日 上午11:19写道:

> 退订