[GitHub] [flink] xintongsong commented on a change in pull request #13397: [FLINK-19178][runtime] Extend managed memory weight/fraction interfaces for various use cases.

2020-09-16 Thread GitBox


xintongsong commented on a change in pull request #13397:
URL: https://github.com/apache/flink/pull/13397#discussion_r489915944



##
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
##
@@ -849,26 +843,23 @@ private static void 
setManagedMemoryFractionForSlotSharingGroup(
}
 
private static void setManagedMemoryFractionForOperator(
-   final ResourceSpec operatorResourceSpec,
final ResourceSpec groupResourceSpec,
final int operatorManagedMemoryWeight,
final int groupManagedMemoryWeight,
final StreamConfig operatorConfig) {
 
-   final double managedMemoryFraction;
-
if (groupResourceSpec.equals(ResourceSpec.UNKNOWN)) {
-   managedMemoryFraction = groupManagedMemoryWeight > 0
-   ? 
getFractionRoundedDown(operatorManagedMemoryWeight, groupManagedMemoryWeight)
-   : 0.0;
+   operatorConfig.setManagedMemoryFraction(
+   groupManagedMemoryWeight > 0 ?
+   
getFractionRoundedDown(operatorManagedMemoryWeight, groupManagedMemoryWeight) :
+   0.0);
} else {
-   final long groupManagedMemoryBytes = 
groupResourceSpec.getManagedMemory().getBytes();
-   managedMemoryFraction = groupManagedMemoryBytes > 0
-   ? 
getFractionRoundedDown(operatorResourceSpec.getManagedMemory().getBytes(), 
groupManagedMemoryBytes)
-   : 0.0;
+   // Supporting for fine grained resource specs is still 
under developing.
+   // This branch should not be executed in production. 
Not throwing exception for testing purpose.
+   LOG.error("Failed setting managed memory fractions. " +
+   " Operators may not be able to use 
managed memory properly." +
+   " Calculating managed memory fractions 
with fine grained resource spec is currently not supported.");

Review comment:
   FYI, I've created FLINK-19267 to track the fraction calculation issue 
for fine grained resource specs.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] xintongsong commented on a change in pull request #13397: [FLINK-19178][runtime] Extend managed memory weight/fraction interfaces for various use cases.

2020-09-16 Thread GitBox


xintongsong commented on a change in pull request #13397:
URL: https://github.com/apache/flink/pull/13397#discussion_r489912221



##
File path: 
flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
##
@@ -401,6 +403,22 @@
.withDescription("Fraction of Total Flink Memory to be 
used as Managed Memory, if Managed Memory size is not"
+ " explicitly specified.");
 
+   /**
+* Weights of managed memory consumers.
+*/
+   // Do not advertise this option until the feature is completed.
+   @Documentation.ExcludeFromDocumentation
+   public static final ConfigOption> 
MANAGED_MEMORY_CONSUMER_WEIGHTS =
+   key("taskmanager.memory.managed.consumer-weights")
+   .mapType()
+   .defaultValue(new HashMap() {{
+   put("DATAPROC", "70");
+   put("PYTHON", "30");

Review comment:
   Since map type `ConfigOption` only accepts `Map`, I'll 
use a static inner class with static final string constants for the consumer 
names, to avoid converting enum values to strings.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] xintongsong commented on a change in pull request #13397: [FLINK-19178][runtime] Extend managed memory weight/fraction interfaces for various use cases.

2020-09-16 Thread GitBox


xintongsong commented on a change in pull request #13397:
URL: https://github.com/apache/flink/pull/13397#discussion_r489903233



##
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java
##
@@ -49,6 +48,8 @@
 @Internal
 public class StreamNode implements Serializable {
 
+   public static final int DEFAULT_MANAGED_MEMORY_WEIGHT = 1;

Review comment:
   This does not really matter since the constant is soon removed in the 
next commit.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] xintongsong commented on a change in pull request #13397: [FLINK-19178][runtime] Extend managed memory weight/fraction interfaces for various use cases.

2020-09-16 Thread GitBox


xintongsong commented on a change in pull request #13397:
URL: https://github.com/apache/flink/pull/13397#discussion_r489895561



##
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
##
@@ -177,11 +178,13 @@ private JobGraph createJobGraph() {
 
setSlotSharingAndCoLocation();
 
+   // For now, only consider managed memory for batch algorithms.
+   // TODO: extend managed memory fraction calculations w.r.t. 
various managed memory use cases.
setManagedMemoryFraction(
Collections.unmodifiableMap(jobVertices),
Collections.unmodifiableMap(vertexConfigs),
Collections.unmodifiableMap(chainedConfigs),
-   id -> 
streamGraph.getStreamNode(id).getManagedMemoryWeight());
+   id -> 
streamGraph.getStreamNode(id).getManagedMemoryUseCaseWeights().getOrDefault(ManagedMemoryUseCase.BATCH_OP,
 0));

Review comment:
   Not sure about this. I think we don't really want a default weight here.
   The purpose for this default is to avoid NPE when performing `mapToInt` and 
`sum` in `setManagedMemoryFractionForSlotSharingGroup`.
   Maybe we should not set default value here and handles the `null` values in 
`setManagedMemoryFractionForSlotSharingGroup`. I think it should be 
straightforward to convert `null` to `0` for a `sum` processing.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] xintongsong commented on a change in pull request #13397: [FLINK-19178][runtime] Extend managed memory weight/fraction interfaces for various use cases.

2020-09-16 Thread GitBox


xintongsong commented on a change in pull request #13397:
URL: https://github.com/apache/flink/pull/13397#discussion_r489890377



##
File path: flink-core/src/main/java/org/apache/flink/api/dag/Transformation.java
##
@@ -139,12 +142,12 @@ public static int getNewNodeId() {
private ResourceSpec preferredResources = ResourceSpec.DEFAULT;
 
/**
-* This weight indicates how much this transformation relies on managed 
memory, so that
-* transformation highly relies on managed memory would be able to 
acquire more managed
-* memory in runtime (linear association). Note that it only works in 
cases of UNKNOWN
-* resources.
+* Each entry in this map represents a use case that this 
transformation needs managed memory for. The key of the
+* entry indicates the use case, while the value is the 
use-case-specific weight for this transformation. Managed
+* memory reserved for an OPERATOR scope use case will be shared by all 
the declaring transformations within a slot
+* according to this weight. For SLOT scope use cases, the weights are 
ignored.
 */
-   private int managedMemoryWeight = DEFAULT_MANAGED_MEMORY_WEIGHT;
+   private final Map 
managedMemoryUseCaseWeights = new HashMap<>();

Review comment:
   `ROCKSDB` and `PYTHON` are legal. The map accepts both `OPERATOR` and 
`SLOT` scope use cases. But only for `OPERATOR` scope use cases do the weights 
(values of the map) matter.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] xintongsong commented on a change in pull request #13397: [FLINK-19178][runtime] Extend managed memory weight/fraction interfaces for various use cases.

2020-09-16 Thread GitBox


xintongsong commented on a change in pull request #13397:
URL: https://github.com/apache/flink/pull/13397#discussion_r489890553



##
File path: flink-core/src/main/java/org/apache/flink/api/dag/Transformation.java
##
@@ -139,12 +142,12 @@ public static int getNewNodeId() {
private ResourceSpec preferredResources = ResourceSpec.DEFAULT;
 
/**
-* This weight indicates how much this transformation relies on managed 
memory, so that
-* transformation highly relies on managed memory would be able to 
acquire more managed
-* memory in runtime (linear association). Note that it only works in 
cases of UNKNOWN
-* resources.
+* Each entry in this map represents a use case that this 
transformation needs managed memory for. The key of the
+* entry indicates the use case, while the value is the 
use-case-specific weight for this transformation. Managed
+* memory reserved for an OPERATOR scope use case will be shared by all 
the declaring transformations within a slot
+* according to this weight. For SLOT scope use cases, the weights are 
ignored.
 */
-   private int managedMemoryWeight = DEFAULT_MANAGED_MEMORY_WEIGHT;
+   private final Map 
managedMemoryUseCaseWeights = new HashMap<>();

Review comment:
   I'll try to clarify a bit in the JavaDocs.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] xintongsong commented on a change in pull request #13397: [FLINK-19178][runtime] Extend managed memory weight/fraction interfaces for various use cases.

2020-09-16 Thread GitBox


xintongsong commented on a change in pull request #13397:
URL: https://github.com/apache/flink/pull/13397#discussion_r489886220



##
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
##
@@ -849,26 +843,23 @@ private static void 
setManagedMemoryFractionForSlotSharingGroup(
}
 
private static void setManagedMemoryFractionForOperator(
-   final ResourceSpec operatorResourceSpec,
final ResourceSpec groupResourceSpec,
final int operatorManagedMemoryWeight,
final int groupManagedMemoryWeight,
final StreamConfig operatorConfig) {
 
-   final double managedMemoryFraction;
-
if (groupResourceSpec.equals(ResourceSpec.UNKNOWN)) {
-   managedMemoryFraction = groupManagedMemoryWeight > 0
-   ? 
getFractionRoundedDown(operatorManagedMemoryWeight, groupManagedMemoryWeight)
-   : 0.0;
+   operatorConfig.setManagedMemoryFraction(
+   groupManagedMemoryWeight > 0 ?
+   
getFractionRoundedDown(operatorManagedMemoryWeight, groupManagedMemoryWeight) :
+   0.0);
} else {
-   final long groupManagedMemoryBytes = 
groupResourceSpec.getManagedMemory().getBytes();
-   managedMemoryFraction = groupManagedMemoryBytes > 0
-   ? 
getFractionRoundedDown(operatorResourceSpec.getManagedMemory().getBytes(), 
groupManagedMemoryBytes)
-   : 0.0;
+   // Supporting for fine grained resource specs is still 
under developing.
+   // This branch should not be executed in production. 
Not throwing exception for testing purpose.
+   LOG.error("Failed setting managed memory fractions. " +
+   " Operators may not be able to use 
managed memory properly." +
+   " Calculating managed memory fractions 
with fine grained resource spec is currently not supported.");

Review comment:
   Adding a `TODO` message and creating a follow-up issue sound good to me.
   
   The problem of throwing an exception is that, existing test cases using fine 
grained resource specs will fail on generating job graphs.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org