[GitHub] [flink] xintongsong commented on a change in pull request #15112: [FLINK-21480][runtime] Respect external resources from resource requirements
xintongsong commented on a change in pull request #15112: URL: https://github.com/apache/flink/pull/15112#discussion_r590131518 ## File path: flink-core/src/main/java/org/apache/flink/api/common/operators/ResourceSpec.java ## @@ -93,7 +92,7 @@ private ResourceSpec( final MemorySize taskHeapMemory, final MemorySize taskOffHeapMemory, final MemorySize managedMemory, -final Resource... extendedResources) { +final Map extendedResources) { Review comment: It would be nice to enforce `ExternalResource` rather than `Resource` here. IIUC, what prevent us from doing so is that, return types of `Resource#merge/subtract/multiply/divide/create` are hard coded to `Resource`. We might consider to solve this with generics. That should also help replacing `Resource` with `CPUResource` wherever needed. ``` abstract class Resource> { T merge(T other); } class ExternalResource extends Resource {} ``` ## File path: flink-core/src/main/java/org/apache/flink/api/common/operators/ResourceSpec.java ## @@ -369,14 +373,21 @@ public Builder setManagedMemoryMB(int managedMemoryMB) { return this; } -public Builder setGPUResource(double gpus) { -this.gpuResource = new GPUResource(gpus); +public Builder addExtendedResource(String name, Resource extendedResource) { Review comment: I would be fine with providing a shortcut setter for a commonly used external resource like GPU. That would also save us from touching all the test cases. ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/WorkerResourceSpec.java ## @@ -118,13 +141,22 @@ public boolean equals(Object obj) { && Objects.equals(this.taskHeapSize, that.taskHeapSize) && Objects.equals(this.taskOffHeapSize, that.taskOffHeapSize) && Objects.equals(this.networkMemSize, that.networkMemSize) -&& Objects.equals(this.managedMemSize, that.managedMemSize); +&& Objects.equals(this.managedMemSize, that.managedMemSize) +&& Objects.equals(this.extendedResources, that.extendedResources); } return false; } @Override public String toString() { +final StringBuilder extendedResourceStr = new StringBuilder(extendedResources.size() * 10); +for (Map.Entry resource : extendedResources.entrySet()) { +extendedResourceStr +.append(", ") +.append(resource.getKey()) +.append('=') +.append(resource.getValue().getValue()); +} Review comment: Consider deduplicating this string generation with a util method. We can put it in `ExternalResourceUtils`. ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerUtils.java ## @@ -29,14 +29,24 @@ */ public static ResourceProfile generateDefaultSlotResourceProfile( WorkerResourceSpec workerResourceSpec, int numSlotsPerWorker) { -return ResourceProfile.newBuilder() - .setCpuCores(workerResourceSpec.getCpuCores().divide(numSlotsPerWorker)) - .setTaskHeapMemory(workerResourceSpec.getTaskHeapSize().divide(numSlotsPerWorker)) -.setTaskOffHeapMemory( - workerResourceSpec.getTaskOffHeapSize().divide(numSlotsPerWorker)) - .setManagedMemory(workerResourceSpec.getManagedMemSize().divide(numSlotsPerWorker)) - .setNetworkMemory(workerResourceSpec.getNetworkMemSize().divide(numSlotsPerWorker)) -.build(); +final ResourceProfile.Builder resourceProfileBuilder = Review comment: I would suggest to add external resources to `WorkerResourceSpec` and `TaskExecutorResourceSpec` in one commit, so that this util is always consistent with `TaskExecutorResourceUtils` in the git history. ## File path: flink-core/src/main/java/org/apache/flink/api/common/operators/ResourceSpec.java ## @@ -369,14 +373,21 @@ public Builder setManagedMemoryMB(int managedMemoryMB) { return this; } -public Builder setGPUResource(double gpus) { -this.gpuResource = new GPUResource(gpus); +public Builder addExtendedResource(String name, Resource extendedResource) { +this.extendedResources.put(name, extendedResource); +return this; +} + +public Builder addExtendedResources(Map extendedResources) { Review comment: We should not need the map keys for the argument. Same for `ResourceProfile.Builder`. ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/WorkerResourceSpec.java ## @@ -174,
[GitHub] [flink] xintongsong commented on a change in pull request #15112: [FLINK-21480][runtime] Respect external resources from resource requirements
xintongsong commented on a change in pull request #15112: URL: https://github.com/apache/flink/pull/15112#discussion_r594812450 ## File path: flink-core/src/main/java/org/apache/flink/api/common/resources/Resource.java ## @@ -30,7 +30,8 @@ /** Base class for resources one can specify. */ @Internal -public abstract class Resource implements Serializable, Comparable { +public abstract class Resource> Review comment: nit: Convert `other` to `T` rather than `Resource` in `equals()` ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorProcessSpec.java ## @@ -97,7 +105,8 @@ public TaskExecutorProcessSpec( MemorySize networkMemSize, MemorySize managedMemorySize, MemorySize jvmMetaspaceSize, -MemorySize jvmOverheadSize) { +MemorySize jvmOverheadSize, +Map extendedResources) { Review comment: I'd suggest replace `Map` with `Collection`. The problem of the current implementation is that, we rely on the caller to guarantee the key of the entry equals the name of the value external resource. ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/WorkerResourceSpec.java ## @@ -197,14 +226,30 @@ public Builder setNumSlots(int numSlots) { return this; } +public Builder addExtendedResource(ExternalResource extendedResource) { +this.extendedResources.put(extendedResource.getName(), extendedResource); +return this; +} + +public Builder addExtendedResources(Collection extendedResources) { +if (extendedResources != null) { +extendedResources.forEach( +extendedResource -> +this.extendedResources.put( +extendedResource.getName(), extendedResource)); +} +return this; +} Review comment: Same here for existing external resource names. ## File path: flink-core/src/main/java/org/apache/flink/api/common/operators/ResourceSpec.java ## @@ -368,14 +373,24 @@ public Builder setManagedMemoryMB(int managedMemoryMB) { return this; } -public Builder setGPUResource(double gpus) { -this.gpuResource = new GPUResource(gpus); +public Builder addExtendedResource(ExternalResource extendedResource) { +this.extendedResources.put(extendedResource.getName(), extendedResource); +return this; +} + +public Builder addExtendedResources(Collection extendedResources) { +if (extendedResources != null) { +extendedResources.forEach( +extendedResource -> +this.extendedResources.put( +extendedResource.getName(), extendedResource)); +} Review comment: Same for the `ResourceProfile.Builder`. ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/externalresource/ExternalResourceUtils.java ## @@ -157,6 +159,29 @@ private ExternalResourceUtils() { return externalResourceAmountMap; } +/** Get the map of resource name and Resources of all of enabled external resources. */ +public static Map getExternalResourcesMap(Configuration config) { +return getExternalResourceAmountMap(config).entrySet().stream() +.collect( +Collectors.toMap( +Map.Entry::getKey, +entry -> new ExternalResource(entry.getKey(), entry.getValue(; +} + +/** Generate the string expression of the given external resources. */ +public static String generateExternalResourcesString( +Map extendedResources) { +final StringBuilder extendedResourceStr = new StringBuilder(extendedResources.size() * 10); +for (Map.Entry resource : extendedResources.entrySet()) { +extendedResourceStr +.append(", ") +.append(resource.getKey()) +.append('=') +.append(resource.getValue().getValue()); +} +return extendedResourceStr.toString(); +} Review comment: It's against intuition that the external resources string starts with a `, `. ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorProcessSpec.java ## @@ -97,7 +105,8 @@ public TaskExecutorProcessSpec( MemorySize networkMemSize, MemorySize managedMemorySize, MemorySize jvmMetaspaceSize, -MemorySize jvmOverheadSize) { +MemorySize jvmOverheadSize, +Map extendedResources) { Review comment: Same for
[GitHub] [flink] xintongsong commented on a change in pull request #15112: [FLINK-21480][runtime] Respect external resources from resource requirements
xintongsong commented on a change in pull request #15112: URL: https://github.com/apache/flink/pull/15112#discussion_r595681050 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java ## @@ -640,15 +642,18 @@ public Builder setNetworkMemoryMB(int networkMemoryMB) { return this; } -public Builder addExtendedResource(String name, Resource extendedResource) { -this.extendedResources.put(name, extendedResource); +// Add the given extended resource, the old value with the same resource name will be +// override if present. +public Builder setExtendedResource(Resource extendedResource) { +this.extendedResources.put(extendedResource.getName(), extendedResource); return this; } -public Builder addExtendedResources(Map extendedResources) { -if (extendedResources != null) { -this.extendedResources.putAll(extendedResources); -} +// Add the given extended resources, this will override all the previous records. Review comment: ``` Add the given extended resources. This will discard all the previous added extended resources. ``` ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorProcessSpec.java ## @@ -109,18 +120,27 @@ public TaskExecutorProcessSpec( networkMemSize, managedMemorySize), new JvmMetaspaceAndOverhead(jvmMetaspaceSize, jvmOverheadSize), -1); +1, +extendedResources); } protected TaskExecutorProcessSpec( CPUResource cpuCores, TaskExecutorFlinkMemory flinkMemory, JvmMetaspaceAndOverhead jvmMetaspaceAndOverhead, -int numSlots) { +int numSlots, +Collection extendedResources) { super(flinkMemory, jvmMetaspaceAndOverhead); this.cpuCores = cpuCores; this.numSlots = numSlots; +this.extendedResources = +Preconditions.checkNotNull(extendedResources).stream() +.filter(resource -> !resource.isZero()) +.collect(Collectors.toMap(ExternalResource::getName, Function.identity())); +Preconditions.checkState( Review comment: Same for `WorkerResourceSpec` and `TaskExecutorResourceSpec`. ## File path: flink-core/src/main/java/org/apache/flink/api/common/resources/Resource.java ## @@ -51,50 +52,51 @@ protected Resource(String name, BigDecimal value) { this.value = value; } -public Resource merge(Resource other) { +public T merge(T other) { checkNotNull(other, "Cannot merge with null resources"); checkArgument(getClass() == other.getClass(), "Merge with different resource type"); -checkArgument(name.equals(other.name), "Merge with different resource name"); +checkArgument(name.equals(other.getName()), "Merge with different resource name"); -return create(value.add(other.value)); +return create(value.add(other.getValue())); } -public Resource subtract(Resource other) { +public T subtract(T other) { checkNotNull(other, "Cannot subtract null resources"); checkArgument(getClass() == other.getClass(), "Minus with different resource type"); -checkArgument(name.equals(other.name), "Minus with different resource name"); +checkArgument(name.equals(other.getName()), "Minus with different resource name"); checkArgument( -value.compareTo(other.value) >= 0, +value.compareTo(other.getValue()) >= 0, "Try to subtract a larger resource from this one."); -return create(value.subtract(other.value)); +return create(value.subtract(other.getValue())); } -public Resource multiply(BigDecimal multiplier) { +public T multiply(BigDecimal multiplier) { return create(value.multiply(multiplier)); } -public Resource multiply(int multiplier) { +public T multiply(int multiplier) { return multiply(BigDecimal.valueOf(multiplier)); } -public Resource divide(BigDecimal by) { +public T divide(BigDecimal by) { return create(value.divide(by, 16, RoundingMode.DOWN)); } -public Resource divide(int by) { +public T divide(int by) { return divide(BigDecimal.valueOf(by)); } @Override +@SuppressWarnings("unchecked") Review comment: It would be nice to keep the scope of `@SuppressWarnings` as small as possible. In this case, the single statement `T other = (T) o;` should be enough. ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/TaskExecutorProcessSpec.java ###