[ 
https://issues.apache.org/jira/browse/FLINK-16267?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17109425#comment-17109425
 ] 

Andrey Zagrebin edited comment on FLINK-16267 at 5/17/20, 10:47 AM:
--------------------------------------------------------------------

[~trystan]
I think what [~xintongsong] meant is that Flink cannot fully control what user 
code does. This is not necessary about RocksDB state backend. The user code is 
regular java code where you can allocate any type of memory from OS. Java code 
usually relies on JVM heap as in any Java application and this type of memory 
usage is of course strictly limited by JVM. On the other hand, nothing prevents 
user code from allocating e.g. native memory outside of JVM heap. This may be 
even not immediately obvious if the user code uses external dependencies which 
do the allocation under the hood. It is mentioned in docs 
[here|https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/memory/mem_tuning.html#configure-memory-for-containers].
 The docs state that Flink only uses `taskmanager.memory.process.size` to size 
the containers but not to strictly limit the overall system memory usage.

RocksDB is exactly such a dependency which uses native off-heap memory but the 
1.10 release introduced native memory control for RocksDB using internal 
features of RocksDB. We are trying to understand here firstly whether this 
control works correctly. The filesystem backend uses only JVM heap memory which 
is limited by JVM and therefore, it cannot cause problems.


was (Author: azagrebin):
[~trystan]
I think what [~xintongsong] meant is that Flink cannot fully control what user 
code does. This is not necessary about RocksDB state backend. The user code is 
regular java code where you can allocate any type of memory from OS. Java code 
usually relies on JVM heap as in any Java application and this type of memory 
usage is of course strictly limited by JVM. On the other hand, nothing prevents 
user code from allocating e.g. native memory outside of JVM heap. This may be 
even not immediately obvious if the user code uses external dependencies which 
do the allocation under the hood. It is mentioned in docs 
[here|https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/memory/mem_tuning.html#configure-memory-for-containers].
 The docs state that Flink only uses `taskmanager.memory.process.size` to size 
the containers but not to strictly limit the overall system memory usage.

RocksDB is exactly such a dependency which uses native off-heap memory but the 
1.10 release introduced native memory control for RocksDB using internal 
features of RocksDB. We are trying to understand whether this control works 
correctly. The filesystem backend uses only JVM heap memory which is limited by 
JVM and therefore, it cannot cause problems.

> Flink uses more memory than taskmanager.memory.process.size in Kubernetes
> -------------------------------------------------------------------------
>
>                 Key: FLINK-16267
>                 URL: https://issues.apache.org/jira/browse/FLINK-16267
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Task
>    Affects Versions: 1.10.0
>            Reporter: ChangZhuo Chen (陳昌倬)
>            Priority: Major
>         Attachments: flink-conf_1.10.0.yaml, flink-conf_1.9.1.yaml, 
> oomkilled_taskmanager.log
>
>          Time Spent: 10m
>  Remaining Estimate: 0h
>
> This issue is from 
> [https://stackoverflow.com/questions/60336764/flink-uses-more-memory-than-taskmanager-memory-process-size-in-kubernetes]
> h1. Description
>  * In Flink 1.10.0, we try to use `taskmanager.memory.process.size` to limit 
> the resource used by taskmanager to ensure they are not killed by Kubernetes. 
> However, we still get lots of taskmanager `OOMKilled`. The setup is in the 
> following section.
>  * The taskmanager log is in attachment [^oomkilled_taskmanager.log].
> h2. Kubernete
>  * The Kubernetes setup is the same as described in 
> [https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/deployment/kubernetes.html].
>  * The following is resource configuration for taskmanager deployment in 
> Kubernetes:
> {{resources:}}
>  {{  requests:}}
>  {{    cpu: 1000m}}
>  {{    memory: 4096Mi}}
>  {{  limits:}}
>  {{    cpu: 1000m}}
>  {{    memory: 4096Mi}}
> h2. Flink Docker
>  * The Flink docker is built by the following Docker file.
> {{FROM flink:1.10-scala_2.11}}
> RUN mkdir -p /opt/flink/plugins/s3 &&
> ln -s /opt/flink/opt/flink-s3-fs-presto-1.10.0.jar /opt/flink/plugins/s3/
>  {{RUN ln -s /opt/flink/opt/flink-metrics-prometheus-1.10.0.jar 
> /opt/flink/lib/}}
> h2. Flink Configuration
>  * The following are all memory related configurations in `flink-conf.yaml` 
> in 1.10.0:
> {{jobmanager.heap.size: 820m}}
>  {{taskmanager.memory.jvm-metaspace.size: 128m}}
>  {{taskmanager.memory.process.size: 4096m}}
>  * We use RocksDB and we don't set `state.backend.rocksdb.memory.managed` in 
> `flink-conf.yaml`.
>  ** Use S3 as checkpoint storage.
>  * The code uses DateStream API
>  ** input/output are both Kafka.
> h2. Project Dependencies
>  * The following is our dependencies.
> {{val flinkVersion = "1.10.0"}}{{libraryDependencies += 
> "com.squareup.okhttp3" % "okhttp" % "4.2.2"}}
>  {{libraryDependencies += "com.typesafe" % "config" % "1.4.0"}}
>  {{libraryDependencies += "joda-time" % "joda-time" % "2.10.5"}}
>  {{libraryDependencies += "org.apache.flink" %% "flink-connector-kafka" % 
> flinkVersion}}
>  {{libraryDependencies += "org.apache.flink" % "flink-metrics-dropwizard" % 
> flinkVersion}}
>  {{libraryDependencies += "org.apache.flink" %% "flink-scala" % flinkVersion 
> % "provided"}}
>  {{libraryDependencies += "org.apache.flink" %% "flink-statebackend-rocksdb" 
> % flinkVersion % "provided"}}
>  {{libraryDependencies += "org.apache.flink" %% "flink-streaming-scala" % 
> flinkVersion % "provided"}}
>  {{libraryDependencies += "org.json4s" %% "json4s-jackson" % "3.6.7"}}
>  {{libraryDependencies += "org.log4s" %% "log4s" % "1.8.2"}}
>  {{libraryDependencies += "org.rogach" %% "scallop" % "3.3.1"}}
> h2. Previous Flink 1.9.1 Configuration
>  * The configuration we used in Flink 1.9.1 are the following. It does not 
> have `OOMKilled`.
> h3. Kubernetes
> {{resources:}}
>  {{  requests:}}
>  {{    cpu: 1200m}}
>  {{    memory: 2G}}
>  {{  limits:}}
>  {{    cpu: 1500m}}
>  {{    memory: 2G}}
> h3. Flink 1.9.1
> {{jobmanager.heap.size: 820m}}
>  {{taskmanager.heap.size: 1024m}}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to