[ 
https://issues.apache.org/jira/browse/FLINK-33315?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rui Fan updated FLINK-33315:
----------------------------
    Component/s: Runtime / Task

> Optimize memory usage of large StreamOperator
> ---------------------------------------------
>
>                 Key: FLINK-33315
>                 URL: https://issues.apache.org/jira/browse/FLINK-33315
>             Project: Flink
>          Issue Type: Improvement
>          Components: Runtime / Configuration, Runtime / Task
>    Affects Versions: 1.17.0, 1.18.0
>            Reporter: Rui Fan
>            Assignee: Rui Fan
>            Priority: Major
>         Attachments: 
> 130f436613b52b321bd9bd0211dd109f0b0102000020e860f292a13c0702016976850466192b.png,
>  image-2023-10-19-16-28-16-077.png
>
>
> Some of our batch jobs are upgraded from flink-1.15 to flink-1.17, and TM 
> always fail with java.lang.OutOfMemoryError: Java heap space.
>  
> Here is a example: a hive table with a lot of data, and the 
> HiveSource#partitionBytes is 281MB.
> After analysis, the root cause is that TM maintains the big object with 3 
> replicas:
>  * Replica_1: SourceOperatorFactory (it's necessary for running task)
>  * Replica_2: Temporarily generate the duplicate SourceOperatorFactory object.
>  ** It's introduced in FLINK-30536 (1.17), it's not necessary. ([code 
> link|https://github.com/apache/flink/blob/c2e14ff411e806f9ccf176c85eb8249b8ff12e56/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java#L646])
>  ** When creating a successor operator to a SourceOperator, the call stack is:
>  *** OperatorChain#createOperatorChain ->
>  *** wrapOperatorIntoOutput ->
>  *** getOperatorRecordsOutCounter ->
>  *** operatorConfig.getStreamOperatorFactory(userCodeClassloader)
>  ** It will generate the SourceOperatorFactory temporarily and just check 
> whether it's SinkWriterOperatorFactory
>  * Replica_3: The value of StreamConfig#{color:#9876aa}SERIALIZEDUDF {color}
>  ** It is used to generate SourceOperatorFactory.
>  ** Now the value is always maintained in heap memory.
>  ** However, after generating we can release it or store it in the disk if 
> needed.
>  *** We can define a threshold, when the value size is less than threshold, 
> the release strategy doesn't take effect.
>  ** If so, we can save a lot of heap memory.
> These three replicas use about 800MB of memory. Please note that this is just 
> a subtask. Since each TM has 4 slots, it will run 4 HiveSources at the same 
> time, so 12 replicas are maintained in the TM memory, it's about 3.3 GB.
> These large objects in the JVM cannot be recycled, causing TM to frequently 
> OOM.
> This JIRA focus on optimizing Replica_2 and Replica_3.
>  
> !image-2023-10-19-16-28-16-077.png!
>  
> !https://f.haiserve.com/download/130f436613b52b321bd9bd0211dd109f0b0102000020e860f292a13c0702016976850466192b?userid=146850&token=4e7b7352b30d6e5d2dd2bb7a7479fc93!
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to