xintongsong commented on a change in pull request #13397: URL: https://github.com/apache/flink/pull/13397#discussion_r489895561
########## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java ########## @@ -177,11 +178,13 @@ private JobGraph createJobGraph() { setSlotSharingAndCoLocation(); + // For now, only consider managed memory for batch algorithms. + // TODO: extend managed memory fraction calculations w.r.t. various managed memory use cases. setManagedMemoryFraction( Collections.unmodifiableMap(jobVertices), Collections.unmodifiableMap(vertexConfigs), Collections.unmodifiableMap(chainedConfigs), - id -> streamGraph.getStreamNode(id).getManagedMemoryWeight()); + id -> streamGraph.getStreamNode(id).getManagedMemoryUseCaseWeights().getOrDefault(ManagedMemoryUseCase.BATCH_OP, 0)); Review comment: Not sure about this. I think we don't really want a default weight here. The purpose for this default is to avoid NPE when performing `mapToInt` and `sum` in `setManagedMemoryFractionForSlotSharingGroup`. Maybe we should not set default value here and handles the `null` values in `setManagedMemoryFractionForSlotSharingGroup`. I think it should be straightforward to convert `null` to `0` for a `sum` processing. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org