[ 
https://issues.apache.org/jira/browse/FLINK-14431?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lining updated FLINK-14431:
---------------------------
    Description: 
h3. Motivation

There are several shortcomings of current (Flink 1.9) Flink TaskManager memory 
information show in rest api.
h4. (1) There isn't enough information for manage memory.
 * First Manage Memory segment's total and avaliable in metrics.FLINK-14406
 * Second what's the type and segment size of it. Because if user want to 
calculate Task Heap, it need to know manage memory.FLINK-14422

h4. (2) The information from HardwareDescription and TaskManagerMetricsInfo is 
difficult to match the memory composition of the TaskManager.

!image-2019-10-24-16-20-23-210.png!
 * Accoding to TaskManagerMetricsInfo's memorySegmentsTotal(ps: Network Segment 
total size), user couldn't get Network memory.

 * What's the meaning of HardwareDescription.sizeOfJvmHeap.

 * User couldn't get something about TaskHeap.

 * From this information, user couldn't know to change which TaskManager's 
configuration need change. 

h3. Proposed Changes
h4. Add manage memory metric
 * Add registerMemoryManagerMetrics in MemoryManager:

{code:java}
public void registerMemoryManagerMetrics(
        MetricGroup metricGroup) {
        checkNotNull(metricGroup);
        checkNotNull(this.memoryPool);
        MetricGroup memoryManagerGroup = metricGroup.addGroup("MemoryManager");
        this.memoryPool.getNumberOfAvailableMemorySegments();
        memoryManagerGroup.<Integer, Gauge<Integer>>gauge("TotalMemorySegments",
            this.memoryPool::getNumberOfTotalMemorySegments);
        memoryManagerGroup.<Integer, 
Gauge<Integer>>gauge("AvailableMemorySegments",
            this.memoryPool::getNumberOfTotalMemorySegments);
    }
{code}
 * Register it in TaskManagerServices.createMemoryManager. 

h4. Get TaskManager Resource Config from rest api
 * Because of the resource configuration in each TaskManager may be different.

 * Add TaskManagerResourceConfiguration In TaskManagerServicesConfiguration:

{code:java}
public class TaskManagerResourceConfiguration {
    private final long configuredMemory;

    private final MemoryType memoryType;

    private final boolean preAllocateMemory;

    private final float memoryFraction;

    private final int pageSize;
}{code}
 * In TaskManagerRunner.startTaskManager add TaskManagerResourceConfiguration 
to  taskManagerConfiguration.

 * Add TaskManagerResourceConfiguration in WorkerRegistration, so rest api 
could get it through ResourceManager.requestTaskManagerInfo . 

h4. Add TaskManageResourceInfo which match the memory composition of the 
TaskManager
h5. data in json
{code:json}
{
  "cpuAllocated": -1,
  "cpuUsage": -1,
  "taskHeapAllocated": 966787072,
  "taskHeapUsed": 76071880,
  "heapManageMemoryMax": 0,
  "heapManageMemoryUsed": 0,
  "offHeapManageMemoryMax": 0,
  "offHeapManageMemoryUsed": 0,
  "networkMemoryMax": 107413504,
  "networkMemoryUsed": 0
}{code}
h5. merge information to match Taskmanager’s memory composition
{code:java}
public static TaskManagerResourceInfo create(HardwareDescription 
hardwareDescription, TaskManagerMetricsInfo taskManagerMetrics, 
TaskManagerResourceConfig taskManagerResourceConfig) {
        long javaHeapAllocated = taskManagerMetrics.getHeapCommitted();
        long javaHeapUsed = taskManagerMetrics.getHeapUsed();
        long pageSize = taskManagerResourceConfig.getSizeOfMemorySegment();
        long heapManageMemoryAllocated = 0L;
        long heapManageMemoryUsed = 0L;
        long offHeapManageMemoryAllocated = 0L;
        long offHeapManageMemoryUsed = 0L;
        long networkMemoryAllocated = 
taskManagerMetrics.getMemorySegmentsTotal() * pageSize;
        long networkMemoryUsed = (taskManagerMetrics.getMemorySegmentsTotal() - 
taskManagerMetrics.getMemorySegmentsAvailable()) * pageSize;
        long manageMemoryAllocated = 
taskManagerMetrics.getManageMemorySegmentsTotal() * pageSize;
        long manageMemoryUsed = 
(taskManagerMetrics.getManageMemorySegmentsTotal() - 
taskManagerMetrics.getManageMemorySegmentsAvailable()) * pageSize;

       
if(taskManagerResourceConfig.getManagedMemoryType().equalsIgnoreCase(MemoryType.HEAP.name(){
            heapManageMemoryAllocated = manageMemoryAllocated;
            heapManageMemoryUsed = manageMemoryUsed;
            javaHeapAllocated = javaHeapAllocated - heapManageMemoryAllocated;
            javaHeapUsed = taskManagerMetrics.getHeapUsed() - 
heapManageMemoryUsed;
        } else {
            offHeapManageMemoryAllocated = manageMemoryAllocated;
            offHeapManageMemoryUsed = manageMemoryUsed;
        }
        return new TaskManagerResourceInfo(-1.0d, -1.0d, javaHeapAllocated, 
javaHeapUsed, 
             heapManageMemoryAllocated, heapManageMemoryUsed, 
offHeapManageMemoryAllocated, 
             offHeapManageMemoryUsed, networkMemoryAllocated, 
networkMemoryUsed);
}{code}
 * cpuAllocated depend on flip-49 (TaskExecutorResourceSpec)

 * cpuUsage = (metric 
Status.JVM.CPU.Load[from|https://docs.oracle.com/javase/7/docs/jre/api/management/extension/com/sun/management/OperatingSystemMXBean.html#getProcessCpuLoad()])
 * cpuAllocated

  was:
h3. Motivation

There are several shortcomings of current (Flink 1.9) Flink TaskManager memory 
information show in rest api.
h4. (1) There isn't enough information for manage memory.
 * First Manage Memory segment's total and avaliable in metrics.FLINK-14406
 * Second what's the type and segment size of it. Because if user want to 
calculate Task Heap, it need to know manage memory.FLINK-14422

h4. (2) The information from HardwareDescription and TaskManagerMetricsInfo is 
difficult to match the memory composition of the TaskManager.

!image-2019-10-24-16-19-15-499.png!
 * Accoding to TaskManagerMetricsInfo's memorySegmentsTotal(ps: Network Segment 
total size), user couldn't get Network memory.

 * What's the meaning of HardwareDescription.sizeOfJvmHeap.

 * User couldn't get something about TaskHeap.

 * From this information, user couldn't know to change which TaskManager's 
configuration need change. 

h3. Proposed Changes
h4. Add manage memory metric
 * Add registerMemoryManagerMetrics in MemoryManager:

{code:java}
public void registerMemoryManagerMetrics(
        MetricGroup metricGroup) {
        checkNotNull(metricGroup);
        checkNotNull(this.memoryPool);
        MetricGroup memoryManagerGroup = metricGroup.addGroup("MemoryManager");
        this.memoryPool.getNumberOfAvailableMemorySegments();
        memoryManagerGroup.<Integer, Gauge<Integer>>gauge("TotalMemorySegments",
            this.memoryPool::getNumberOfTotalMemorySegments);
        memoryManagerGroup.<Integer, 
Gauge<Integer>>gauge("AvailableMemorySegments",
            this.memoryPool::getNumberOfTotalMemorySegments);
    }
{code}
 * Register it in TaskManagerServices.createMemoryManager. 

h4. Get TaskManager Resource Config from rest api
 * Because of the resource configuration in each TaskManager may be different.

 * Add TaskManagerResourceConfiguration In TaskManagerServicesConfiguration:

{code:java}
public class TaskManagerResourceConfiguration {
    private final long configuredMemory;

    private final MemoryType memoryType;

    private final boolean preAllocateMemory;

    private final float memoryFraction;

    private final int pageSize;
}{code}
 * In TaskManagerRunner.startTaskManager add TaskManagerResourceConfiguration 
to  taskManagerConfiguration.

 * Add TaskManagerResourceConfiguration in WorkerRegistration, so rest api 
could get it through ResourceManager.requestTaskManagerInfo . 

h4. Add TaskManageResourceInfo which match the memory composition of the 
TaskManager
h5. data in json
{code:json}
{
  "cpuAllocated": -1,
  "cpuUsage": -1,
  "taskHeapAllocated": 966787072,
  "taskHeapUsed": 76071880,
  "heapManageMemoryMax": 0,
  "heapManageMemoryUsed": 0,
  "offHeapManageMemoryMax": 0,
  "offHeapManageMemoryUsed": 0,
  "networkMemoryMax": 107413504,
  "networkMemoryUsed": 0
}{code}
h5. merge information to match Taskmanager’s memory composition
{code:java}
public static TaskManagerResourceInfo create(HardwareDescription 
hardwareDescription, TaskManagerMetricsInfo taskManagerMetrics, 
TaskManagerResourceConfig taskManagerResourceConfig) {
        long javaHeapAllocated = taskManagerMetrics.getHeapCommitted();
        long javaHeapUsed = taskManagerMetrics.getHeapUsed();
        long pageSize = taskManagerResourceConfig.getSizeOfMemorySegment();
        long heapManageMemoryAllocated = 0L;
        long heapManageMemoryUsed = 0L;
        long offHeapManageMemoryAllocated = 0L;
        long offHeapManageMemoryUsed = 0L;
        long networkMemoryAllocated = 
taskManagerMetrics.getMemorySegmentsTotal() * pageSize;
        long networkMemoryUsed = (taskManagerMetrics.getMemorySegmentsTotal() - 
taskManagerMetrics.getMemorySegmentsAvailable()) * pageSize;
        long manageMemoryAllocated = 
taskManagerMetrics.getManageMemorySegmentsTotal() * pageSize;
        long manageMemoryUsed = 
(taskManagerMetrics.getManageMemorySegmentsTotal() - 
taskManagerMetrics.getManageMemorySegmentsAvailable()) * pageSize;

       
if(taskManagerResourceConfig.getManagedMemoryType().equalsIgnoreCase(MemoryType.HEAP.name(){
            heapManageMemoryAllocated = manageMemoryAllocated;
            heapManageMemoryUsed = manageMemoryUsed;
            javaHeapAllocated = javaHeapAllocated - heapManageMemoryAllocated;
            javaHeapUsed = taskManagerMetrics.getHeapUsed() - 
heapManageMemoryUsed;
        } else {
            offHeapManageMemoryAllocated = manageMemoryAllocated;
            offHeapManageMemoryUsed = manageMemoryUsed;
        }
        return new TaskManagerResourceInfo(-1.0d, -1.0d, javaHeapAllocated, 
javaHeapUsed, 
             heapManageMemoryAllocated, heapManageMemoryUsed, 
offHeapManageMemoryAllocated, 
             offHeapManageMemoryUsed, networkMemoryAllocated, 
networkMemoryUsed);
}{code}
 * cpuAllocated depend on flip-49 (TaskExecutorResourceSpec)

 * cpuUsage = (metric 
Status.JVM.CPU.Load[from|https://docs.oracle.com/javase/7/docs/jre/api/management/extension/com/sun/management/OperatingSystemMXBean.html#getProcessCpuLoad()])
 * cpuAllocated


> Update TaskManager's memory information to match its memory composition
> -----------------------------------------------------------------------
>
>                 Key: FLINK-14431
>                 URL: https://issues.apache.org/jira/browse/FLINK-14431
>             Project: Flink
>          Issue Type: Improvement
>          Components: Runtime / REST, Runtime / Task, Runtime / Web Frontend
>            Reporter: lining
>            Priority: Major
>         Attachments: image-2019-10-17-17-58-50-342.png, 
> image-2019-10-17-18-01-09-353.png, image-2019-10-17-18-29-53-329.png, 
> image-2019-10-24-16-19-15-499.png, image-2019-10-24-16-20-23-210.png
>
>
> h3. Motivation
> There are several shortcomings of current (Flink 1.9) Flink TaskManager 
> memory information show in rest api.
> h4. (1) There isn't enough information for manage memory.
>  * First Manage Memory segment's total and avaliable in metrics.FLINK-14406
>  * Second what's the type and segment size of it. Because if user want to 
> calculate Task Heap, it need to know manage memory.FLINK-14422
> h4. (2) The information from HardwareDescription and TaskManagerMetricsInfo 
> is difficult to match the memory composition of the TaskManager.
> !image-2019-10-24-16-20-23-210.png!
>  * Accoding to TaskManagerMetricsInfo's memorySegmentsTotal(ps: Network 
> Segment total size), user couldn't get Network memory.
>  * What's the meaning of HardwareDescription.sizeOfJvmHeap.
>  * User couldn't get something about TaskHeap.
>  * From this information, user couldn't know to change which TaskManager's 
> configuration need change. 
> h3. Proposed Changes
> h4. Add manage memory metric
>  * Add registerMemoryManagerMetrics in MemoryManager:
> {code:java}
> public void registerMemoryManagerMetrics(
>         MetricGroup metricGroup) {
>         checkNotNull(metricGroup);
>         checkNotNull(this.memoryPool);
>         MetricGroup memoryManagerGroup = 
> metricGroup.addGroup("MemoryManager");
>         this.memoryPool.getNumberOfAvailableMemorySegments();
>         memoryManagerGroup.<Integer, 
> Gauge<Integer>>gauge("TotalMemorySegments",
>             this.memoryPool::getNumberOfTotalMemorySegments);
>         memoryManagerGroup.<Integer, 
> Gauge<Integer>>gauge("AvailableMemorySegments",
>             this.memoryPool::getNumberOfTotalMemorySegments);
>     }
> {code}
>  * Register it in TaskManagerServices.createMemoryManager. 
> h4. Get TaskManager Resource Config from rest api
>  * Because of the resource configuration in each TaskManager may be different.
>  * Add TaskManagerResourceConfiguration In TaskManagerServicesConfiguration:
> {code:java}
> public class TaskManagerResourceConfiguration {
>     private final long configuredMemory;
>     private final MemoryType memoryType;
>     private final boolean preAllocateMemory;
>     private final float memoryFraction;
>     private final int pageSize;
> }{code}
>  * In TaskManagerRunner.startTaskManager add TaskManagerResourceConfiguration 
> to  taskManagerConfiguration.
>  * Add TaskManagerResourceConfiguration in WorkerRegistration, so rest api 
> could get it through ResourceManager.requestTaskManagerInfo . 
> h4. Add TaskManageResourceInfo which match the memory composition of the 
> TaskManager
> h5. data in json
> {code:json}
> {
>   "cpuAllocated": -1,
>   "cpuUsage": -1,
>   "taskHeapAllocated": 966787072,
>   "taskHeapUsed": 76071880,
>   "heapManageMemoryMax": 0,
>   "heapManageMemoryUsed": 0,
>   "offHeapManageMemoryMax": 0,
>   "offHeapManageMemoryUsed": 0,
>   "networkMemoryMax": 107413504,
>   "networkMemoryUsed": 0
> }{code}
> h5. merge information to match Taskmanager’s memory composition
> {code:java}
> public static TaskManagerResourceInfo create(HardwareDescription 
> hardwareDescription, TaskManagerMetricsInfo taskManagerMetrics, 
> TaskManagerResourceConfig taskManagerResourceConfig) {
>         long javaHeapAllocated = taskManagerMetrics.getHeapCommitted();
>         long javaHeapUsed = taskManagerMetrics.getHeapUsed();
>         long pageSize = taskManagerResourceConfig.getSizeOfMemorySegment();
>         long heapManageMemoryAllocated = 0L;
>         long heapManageMemoryUsed = 0L;
>         long offHeapManageMemoryAllocated = 0L;
>         long offHeapManageMemoryUsed = 0L;
>         long networkMemoryAllocated = 
> taskManagerMetrics.getMemorySegmentsTotal() * pageSize;
>         long networkMemoryUsed = (taskManagerMetrics.getMemorySegmentsTotal() 
> - taskManagerMetrics.getMemorySegmentsAvailable()) * pageSize;
>         long manageMemoryAllocated = 
> taskManagerMetrics.getManageMemorySegmentsTotal() * pageSize;
>         long manageMemoryUsed = 
> (taskManagerMetrics.getManageMemorySegmentsTotal() - 
> taskManagerMetrics.getManageMemorySegmentsAvailable()) * pageSize;
>        
> if(taskManagerResourceConfig.getManagedMemoryType().equalsIgnoreCase(MemoryType.HEAP.name(){
>             heapManageMemoryAllocated = manageMemoryAllocated;
>             heapManageMemoryUsed = manageMemoryUsed;
>             javaHeapAllocated = javaHeapAllocated - heapManageMemoryAllocated;
>             javaHeapUsed = taskManagerMetrics.getHeapUsed() - 
> heapManageMemoryUsed;
>         } else {
>             offHeapManageMemoryAllocated = manageMemoryAllocated;
>             offHeapManageMemoryUsed = manageMemoryUsed;
>         }
>         return new TaskManagerResourceInfo(-1.0d, -1.0d, javaHeapAllocated, 
> javaHeapUsed, 
>              heapManageMemoryAllocated, heapManageMemoryUsed, 
> offHeapManageMemoryAllocated, 
>              offHeapManageMemoryUsed, networkMemoryAllocated, 
> networkMemoryUsed);
> }{code}
>  * cpuAllocated depend on flip-49 (TaskExecutorResourceSpec)
>  * cpuUsage = (metric 
> Status.JVM.CPU.Load[from|https://docs.oracle.com/javase/7/docs/jre/api/management/extension/com/sun/management/OperatingSystemMXBean.html#getProcessCpuLoad()])
>  * cpuAllocated



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to