[GitHub] [flink] xintongsong commented on a change in pull request #15112: [FLINK-21480][runtime] Respect external resources from resource requirements

2021-03-09 Thread GitBox


xintongsong commented on a change in pull request #15112:
URL: https://github.com/apache/flink/pull/15112#discussion_r590131518



##
File path: 
flink-core/src/main/java/org/apache/flink/api/common/operators/ResourceSpec.java
##
@@ -93,7 +92,7 @@ private ResourceSpec(
 final MemorySize taskHeapMemory,
 final MemorySize taskOffHeapMemory,
 final MemorySize managedMemory,
-final Resource... extendedResources) {
+final Map extendedResources) {

Review comment:
   It would be nice to enforce `ExternalResource` rather than `Resource` 
here.
   
   IIUC, what prevent us from doing so is that, return types of 
`Resource#merge/subtract/multiply/divide/create` are hard coded to `Resource`. 
We might consider to solve this with generics. That should also help replacing 
`Resource` with `CPUResource` wherever needed.
   
   ```
   abstract class Resource> {
   T merge(T other);
   }
   
   class ExternalResource extends Resource {}
   ```

##
File path: 
flink-core/src/main/java/org/apache/flink/api/common/operators/ResourceSpec.java
##
@@ -369,14 +373,21 @@ public Builder setManagedMemoryMB(int managedMemoryMB) {
 return this;
 }
 
-public Builder setGPUResource(double gpus) {
-this.gpuResource = new GPUResource(gpus);
+public Builder addExtendedResource(String name, Resource 
extendedResource) {

Review comment:
   I would be fine with providing a shortcut setter for a commonly used 
external resource like GPU. That would also save us from touching all the test 
cases.

##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/WorkerResourceSpec.java
##
@@ -118,13 +141,22 @@ public boolean equals(Object obj) {
 && Objects.equals(this.taskHeapSize, that.taskHeapSize)
 && Objects.equals(this.taskOffHeapSize, 
that.taskOffHeapSize)
 && Objects.equals(this.networkMemSize, that.networkMemSize)
-&& Objects.equals(this.managedMemSize, 
that.managedMemSize);
+&& Objects.equals(this.managedMemSize, that.managedMemSize)
+&& Objects.equals(this.extendedResources, 
that.extendedResources);
 }
 return false;
 }
 
 @Override
 public String toString() {
+final StringBuilder extendedResourceStr = new 
StringBuilder(extendedResources.size() * 10);
+for (Map.Entry resource : 
extendedResources.entrySet()) {
+extendedResourceStr
+.append(", ")
+.append(resource.getKey())
+.append('=')
+.append(resource.getValue().getValue());
+}

Review comment:
   Consider deduplicating this string generation with a util method. We can 
put it in `ExternalResourceUtils`.

##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerUtils.java
##
@@ -29,14 +29,24 @@
  */
 public static ResourceProfile generateDefaultSlotResourceProfile(
 WorkerResourceSpec workerResourceSpec, int numSlotsPerWorker) {
-return ResourceProfile.newBuilder()
-
.setCpuCores(workerResourceSpec.getCpuCores().divide(numSlotsPerWorker))
-
.setTaskHeapMemory(workerResourceSpec.getTaskHeapSize().divide(numSlotsPerWorker))
-.setTaskOffHeapMemory(
-
workerResourceSpec.getTaskOffHeapSize().divide(numSlotsPerWorker))
-
.setManagedMemory(workerResourceSpec.getManagedMemSize().divide(numSlotsPerWorker))
-
.setNetworkMemory(workerResourceSpec.getNetworkMemSize().divide(numSlotsPerWorker))
-.build();
+final ResourceProfile.Builder resourceProfileBuilder =

Review comment:
   I would suggest to add external resources to `WorkerResourceSpec` and 
`TaskExecutorResourceSpec` in one commit, so that this util is always 
consistent with `TaskExecutorResourceUtils` in the git history.

##
File path: 
flink-core/src/main/java/org/apache/flink/api/common/operators/ResourceSpec.java
##
@@ -369,14 +373,21 @@ public Builder setManagedMemoryMB(int managedMemoryMB) {
 return this;
 }
 
-public Builder setGPUResource(double gpus) {
-this.gpuResource = new GPUResource(gpus);
+public Builder addExtendedResource(String name, Resource 
extendedResource) {
+this.extendedResources.put(name, extendedResource);
+return this;
+}
+
+public Builder addExtendedResources(Map 
extendedResources) {

Review comment:
   We should not need the map keys for the argument. Same for 
`ResourceProfile.Builder`.

##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/WorkerResourceSpec.java
##
@@ -174,

[GitHub] [flink] xintongsong commented on a change in pull request #15112: [FLINK-21480][runtime] Respect external resources from resource requirements

2021-03-15 Thread GitBox


xintongsong commented on a change in pull request #15112:
URL: https://github.com/apache/flink/pull/15112#discussion_r594812450



##
File path: 
flink-core/src/main/java/org/apache/flink/api/common/resources/Resource.java
##
@@ -30,7 +30,8 @@
 
 /** Base class for resources one can specify. */
 @Internal
-public abstract class Resource implements Serializable, Comparable {
+public abstract class Resource>

Review comment:
   nit: Convert `other` to `T` rather than `Resource` in `equals()`

##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorProcessSpec.java
##
@@ -97,7 +105,8 @@ public TaskExecutorProcessSpec(
 MemorySize networkMemSize,
 MemorySize managedMemorySize,
 MemorySize jvmMetaspaceSize,
-MemorySize jvmOverheadSize) {
+MemorySize jvmOverheadSize,
+Map extendedResources) {

Review comment:
   I'd suggest replace `Map` with 
`Collection`.
   The problem of the current implementation is that, we rely on the caller to 
guarantee the key of the entry equals the name of the value external resource.

##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/WorkerResourceSpec.java
##
@@ -197,14 +226,30 @@ public Builder setNumSlots(int numSlots) {
 return this;
 }
 
+public Builder addExtendedResource(ExternalResource extendedResource) {
+this.extendedResources.put(extendedResource.getName(), 
extendedResource);
+return this;
+}
+
+public Builder addExtendedResources(Collection 
extendedResources) {
+if (extendedResources != null) {
+extendedResources.forEach(
+extendedResource ->
+this.extendedResources.put(
+extendedResource.getName(), 
extendedResource));
+}
+return this;
+}

Review comment:
   Same here for existing external resource names.

##
File path: 
flink-core/src/main/java/org/apache/flink/api/common/operators/ResourceSpec.java
##
@@ -368,14 +373,24 @@ public Builder setManagedMemoryMB(int managedMemoryMB) {
 return this;
 }
 
-public Builder setGPUResource(double gpus) {
-this.gpuResource = new GPUResource(gpus);
+public Builder addExtendedResource(ExternalResource extendedResource) {
+this.extendedResources.put(extendedResource.getName(), 
extendedResource);
+return this;
+}
+
+public Builder addExtendedResources(Collection 
extendedResources) {
+if (extendedResources != null) {
+extendedResources.forEach(
+extendedResource ->
+this.extendedResources.put(
+extendedResource.getName(), 
extendedResource));
+}

Review comment:
   Same for the `ResourceProfile.Builder`.

##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/externalresource/ExternalResourceUtils.java
##
@@ -157,6 +159,29 @@ private ExternalResourceUtils() {
 return externalResourceAmountMap;
 }
 
+/** Get the map of resource name and Resources of all of enabled external 
resources. */
+public static Map 
getExternalResourcesMap(Configuration config) {
+return getExternalResourceAmountMap(config).entrySet().stream()
+.collect(
+Collectors.toMap(
+Map.Entry::getKey,
+entry -> new ExternalResource(entry.getKey(), 
entry.getValue(;
+}
+
+/** Generate the string expression of the given external resources. */
+public static String generateExternalResourcesString(
+Map extendedResources) {
+final StringBuilder extendedResourceStr = new 
StringBuilder(extendedResources.size() * 10);
+for (Map.Entry resource : 
extendedResources.entrySet()) {
+extendedResourceStr
+.append(", ")
+.append(resource.getKey())
+.append('=')
+.append(resource.getValue().getValue());
+}
+return extendedResourceStr.toString();
+}

Review comment:
   It's against intuition that the external resources string starts with a 
`, `.

##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorProcessSpec.java
##
@@ -97,7 +105,8 @@ public TaskExecutorProcessSpec(
 MemorySize networkMemSize,
 MemorySize managedMemorySize,
 MemorySize jvmMetaspaceSize,
-MemorySize jvmOverheadSize) {
+MemorySize jvmOverheadSize,
+Map extendedResources) {

Review comment:
   Same for 

[GitHub] [flink] xintongsong commented on a change in pull request #15112: [FLINK-21480][runtime] Respect external resources from resource requirements

2021-03-16 Thread GitBox


xintongsong commented on a change in pull request #15112:
URL: https://github.com/apache/flink/pull/15112#discussion_r595681050



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java
##
@@ -640,15 +642,18 @@ public Builder setNetworkMemoryMB(int networkMemoryMB) {
 return this;
 }
 
-public Builder addExtendedResource(String name, Resource 
extendedResource) {
-this.extendedResources.put(name, extendedResource);
+// Add the given extended resource, the old value with the same 
resource name will be
+// override if present.
+public Builder setExtendedResource(Resource extendedResource) {
+this.extendedResources.put(extendedResource.getName(), 
extendedResource);
 return this;
 }
 
-public Builder addExtendedResources(Map 
extendedResources) {
-if (extendedResources != null) {
-this.extendedResources.putAll(extendedResources);
-}
+// Add the given extended resources, this will override all the 
previous records.

Review comment:
   ```
   Add the given extended resources. This will discard all the previous added 
extended resources.
   ```

##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorProcessSpec.java
##
@@ -109,18 +120,27 @@ public TaskExecutorProcessSpec(
 networkMemSize,
 managedMemorySize),
 new JvmMetaspaceAndOverhead(jvmMetaspaceSize, jvmOverheadSize),
-1);
+1,
+extendedResources);
 }
 
 protected TaskExecutorProcessSpec(
 CPUResource cpuCores,
 TaskExecutorFlinkMemory flinkMemory,
 JvmMetaspaceAndOverhead jvmMetaspaceAndOverhead,
-int numSlots) {
+int numSlots,
+Collection extendedResources) {
 
 super(flinkMemory, jvmMetaspaceAndOverhead);
 this.cpuCores = cpuCores;
 this.numSlots = numSlots;
+this.extendedResources =
+Preconditions.checkNotNull(extendedResources).stream()
+.filter(resource -> !resource.isZero())
+.collect(Collectors.toMap(ExternalResource::getName, 
Function.identity()));
+Preconditions.checkState(

Review comment:
   Same for `WorkerResourceSpec` and `TaskExecutorResourceSpec`.

##
File path: 
flink-core/src/main/java/org/apache/flink/api/common/resources/Resource.java
##
@@ -51,50 +52,51 @@ protected Resource(String name, BigDecimal value) {
 this.value = value;
 }
 
-public Resource merge(Resource other) {
+public T merge(T other) {
 checkNotNull(other, "Cannot merge with null resources");
 checkArgument(getClass() == other.getClass(), "Merge with different 
resource type");
-checkArgument(name.equals(other.name), "Merge with different resource 
name");
+checkArgument(name.equals(other.getName()), "Merge with different 
resource name");
 
-return create(value.add(other.value));
+return create(value.add(other.getValue()));
 }
 
-public Resource subtract(Resource other) {
+public T subtract(T other) {
 checkNotNull(other, "Cannot subtract null resources");
 checkArgument(getClass() == other.getClass(), "Minus with different 
resource type");
-checkArgument(name.equals(other.name), "Minus with different resource 
name");
+checkArgument(name.equals(other.getName()), "Minus with different 
resource name");
 checkArgument(
-value.compareTo(other.value) >= 0,
+value.compareTo(other.getValue()) >= 0,
 "Try to subtract a larger resource from this one.");
 
-return create(value.subtract(other.value));
+return create(value.subtract(other.getValue()));
 }
 
-public Resource multiply(BigDecimal multiplier) {
+public T multiply(BigDecimal multiplier) {
 return create(value.multiply(multiplier));
 }
 
-public Resource multiply(int multiplier) {
+public T multiply(int multiplier) {
 return multiply(BigDecimal.valueOf(multiplier));
 }
 
-public Resource divide(BigDecimal by) {
+public T divide(BigDecimal by) {
 return create(value.divide(by, 16, RoundingMode.DOWN));
 }
 
-public Resource divide(int by) {
+public T divide(int by) {
 return divide(BigDecimal.valueOf(by));
 }
 
 @Override
+@SuppressWarnings("unchecked")

Review comment:
   It would be nice to keep the scope of `@SuppressWarnings` as small as 
possible. In this case, the single statement `T other = (T) o;` should be 
enough.

##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorProcessSpec.java
###