[ https://issues.apache.org/jira/browse/FLINK-14431?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17021315#comment-17021315 ]
Andrey Zagrebin commented on FLINK-14431: ----------------------------------------- [~lining] [~xintongsong] What is the plan here? Can we update the UI to reflect FLIP-49 memory model better? e.g. as part of 1.10.1 minor release? > Update TaskManager's memory information to match its memory composition > ----------------------------------------------------------------------- > > Key: FLINK-14431 > URL: https://issues.apache.org/jira/browse/FLINK-14431 > Project: Flink > Issue Type: Improvement > Components: Runtime / REST, Runtime / Task, Runtime / Web Frontend > Reporter: lining > Priority: Major > Attachments: image-2019-10-17-17-58-50-342.png, > image-2019-10-17-18-01-09-353.png, image-2019-10-17-18-29-53-329.png, > image-2019-10-24-16-19-15-499.png, image-2019-10-24-16-20-23-210.png, > image-2019-10-24-16-22-27-360.png, image-2019-12-19-18-09-05-542.png, > image-2019-12-19-18-27-38-589.png, image-2019-12-19-18-28-01-447.png > > > h3. Motivation > There are several shortcomings of current (Flink 1.10) Flink TaskManager > memory information show in rest api. > h4. (1) The information from HardwareDescription is difficult to match the > memory compositions of TaskManager in flip-49. As below picture show: > !image-2019-12-19-18-09-05-542.png|width=444,height=389! > * what's the meaning of HardwareDescription.sizeOfJvmHeap. > * the user couldn't get resource config about TaskManager. > h4. (2) There isn't information for managed memory. > * no metric for managed memory. > h4. (3) There isn't information for shuffle memory > * according to TaskManagerMetricsInfo's memorySegmentsTotal(ps: shuffle > segment total size), the user couldn't get shuffle memory. > h4. (4) The metrics in the TaskManager's metrics page do not correspond to > the resource configuration of taskmanager > * It is difficult for users to update taskmanager's resource configuration > based on metrics because users couldn’t find configuration items related to > metrics. > h3. Proposed Changes > h4. Add TaskManageResourceInfo which match the memory compositions > * information from TaskExecutorResourceSpec in flip-49, add it to > TaskExecutorRegistration. > {code:java} > public class TaskManagerResourceInfo { > private final double cpuCores; > private final long frameworkHeap; > private final long frameworkOffHeap; > private final long taskHeap; > private final long taskOffHeap; > private final long shuffleMemory; > private final long managedMemory; > private final long jvmMetaSpace; > private final long jvmOverhead; > private final long totalProcessMemory; > } > {code} > * url: /taskmanagers/:taskmanagerid > * response: add > {code:json} > resource: { > cpuCores: 4, > frameworkHeap: 134217728, > frameworkOffHeap: 134217728, > taskHeap: 181193928, > taskOffHeap: 0, > shuffleMemory: 33554432, > managedMemory: 322122552, > jvmMetaSpace: 134217728, > jvmOverhead: 134217728, > totalProcessMemory: 1073741824 > } > {code} > h4. Add shuffle memory metric > * add getTotalMemorySize and in getAvaliableMemorySize NetworkBufferPool > {code:java} > public long getTotalMemorySize() { > return 1L * getTotalNumberOfMemorySegments() * memorySegmentSize; > } > public long getAvaliableMemorySize() { > return 1L * getNumberOfAvailableMemorySegments() * memorySegmentSize; > }{code} > * update NettyShuffleMetricFactory#registerShuffleMetrics > {code:java} > private static final String METRIC_TOTAL_MEMORY_SEGMENT_TOTALCAPACITY = > "TotalMemoryCapacity"; > private static final String METRIC_TOTAL_MEMORY_SEGMENT_AVALIABLEMEMORY = > "AvaliableMemory"; > private static void registerShuffleMetrics( > String groupName, > MetricGroup metricGroup, > NetworkBufferPool networkBufferPool) { > MetricGroup networkGroup = metricGroup.addGroup(groupName); > networkGroup.<Integer, Gauge<Integer>>gauge(METRIC_TOTAL_MEMORY_SEGMENT, > > networkBufferPool::getTotalNumberOfMemorySegments); > networkGroup.<Integer, > Gauge<Integer>>gauge(METRIC_AVAILABLE_MEMORY_SEGMENT, > > networkBufferPool::getNumberOfAvailableMemorySegments); > networkGroup.<Long, > Gauge<Long>>gauge(METRIC_TOTAL_MEMORY_SEGMENT_TOTALCAPACITY, > > networkBufferPool::getTotalMemorySize); > networkGroup.<Long, > Gauge<Long>>gauge(METRIC_TOTAL_MEMORY_SEGMENT_AVALIABLEMEMORY, > > networkBufferPool::getAvaliableMemorySize); > } > {code} > h4. Add manage memory metric > * add default memory type in MemoryManager > {code:java} > public static final MemoryType DEFAULT_MEMORY_TYPE = MemoryType.OFF_HEAP; > {code} > * add getManagedMemoryTotal in TaskExecutor: > {code:java} > public long getManagedMemoryTotal() { > return this.taskSlotTable.getAllocatedSlots().stream().mapToLong( > slot -> > slot.getMemoryManager().getMemorySizeByType(MemoryManager.DEFAULT_MEMORY_TYPE) > ).sum(); > }{code} > * add getManagedMemoryUsed in TaskExecutor: > {code:java} > public long getManagedMemoryUsed() { > return this.taskSlotTable.getAllocatedSlots().stream().mapToLong( > slot -> > slot.getMemoryManager().getMemorySizeByType(MemoryManager.DEFAULT_MEMORY_TYPE) > - > slot.getMemoryManager().availableMemory(MemoryManager.DEFAULT_MEMORY_TYPE) > ).sum(); > }{code} > * add instantiateMemoryManagerMetrics in MetricUtils > {code:java} > public static void instantiateMemoryManagerMetrics(MetricGroup > statusMetricGroup, TaskExecutor taskExecutor) { > checkNotNull(statusMetricGroup); > MetricGroup memoryManagerGroup = > statusMetricGroup.addGroup("Managed").addGroup("Memory"); > memoryManagerGroup.<Long, Gauge<Long>>gauge("TotalCapacity", > taskExecutor::getManagedMemoryTotal); > memoryManagerGroup.<Long, Gauge<Long>>gauge("MemoryUsed", > taskExecutor::getManagedMemoryUsed); > }{code} > * register it in TaskManagerRunner#startTaskManager > h4. Change the page of taskmanager's metric > * according to resource configuration in flip-49 and memory metric, as the > below picture shows: > !image-2019-12-19-18-28-01-447.png|width=671,height=282! > * Status.JVM.Memory.Heap.Used as the usage of Flink Heap > * Status.JVM.Memory.Direct.MemoryUsed - (shuffle total) as the usage of > Flink offHeap > * shuffle used as the usage of shuffle > * managed used as the usage of shuffle > * Status.JVM.Memory.NonHeap.Used as the usage of overhead > {code:json} > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)