This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 0544ddd0e643abced35c353f8810db01f2da5090
Author: Yangze Guo <[email protected]>
AuthorDate: Wed Jun 9 11:01:20 2021 +0800

    [FLINK-21925][core] Introduce public SlotSharingGroup
    
    Users can use this class to describe the name and the the different 
resource components of a slot sharing group.
---
 .../api/common/operators/SlotSharingGroup.java     | 235 +++++++++++++++++++++
 .../api/common/operators/SlotSharingGroupTest.java |  84 ++++++++
 2 files changed, 319 insertions(+)

diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/operators/SlotSharingGroup.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/operators/SlotSharingGroup.java
new file mode 100644
index 0000000..029788f
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/operators/SlotSharingGroup.java
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.operators;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.resources.CPUResource;
+import org.apache.flink.configuration.MemorySize;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** Describe the name and the the different resource components of a slot 
sharing group. */
+@PublicEvolving
+public class SlotSharingGroup implements Serializable {
+    private static final long serialVersionUID = 1L;
+
+    private final String name;
+
+    /** How many cpu cores are needed. Can be null only if it is unknown. */
+    @Nullable // can be null only for UNKNOWN
+    private final CPUResource cpuCores;
+
+    /** How much task heap memory is needed. */
+    @Nullable // can be null only for UNKNOWN
+    private final MemorySize taskHeapMemory;
+
+    /** How much task off-heap memory is needed. */
+    @Nullable // can be null only for UNKNOWN
+    private final MemorySize taskOffHeapMemory;
+
+    /** How much managed memory is needed. */
+    @Nullable // can be null only for UNKNOWN
+    private final MemorySize managedMemory;
+
+    /** A extensible field for user specified resources from {@link 
SlotSharingGroup}. */
+    private final Map<String, Double> externalResources = new HashMap<>();
+
+    private SlotSharingGroup(
+            String name,
+            CPUResource cpuCores,
+            MemorySize taskHeapMemory,
+            MemorySize taskOffHeapMemory,
+            MemorySize managedMemory,
+            Map<String, Double> extendedResources) {
+        this.name = checkNotNull(name);
+        this.cpuCores = checkNotNull(cpuCores);
+        this.taskHeapMemory = checkNotNull(taskHeapMemory);
+        this.taskOffHeapMemory = checkNotNull(taskOffHeapMemory);
+        this.managedMemory = checkNotNull(managedMemory);
+        this.externalResources.putAll(checkNotNull(extendedResources));
+    }
+
+    private SlotSharingGroup(String name) {
+        this.name = checkNotNull(name);
+        this.cpuCores = null;
+        this.taskHeapMemory = null;
+        this.taskOffHeapMemory = null;
+        this.managedMemory = null;
+    }
+
+    public String getName() {
+        return name;
+    }
+
+    public Optional<MemorySize> getManagedMemory() {
+        return Optional.ofNullable(managedMemory);
+    }
+
+    public Optional<MemorySize> getTaskHeapMemory() {
+        return Optional.ofNullable(taskHeapMemory);
+    }
+
+    public Optional<MemorySize> getTaskOffHeapMemory() {
+        return Optional.ofNullable(taskOffHeapMemory);
+    }
+
+    public Optional<Double> getCpuCores() {
+        return Optional.ofNullable(cpuCores)
+                .map(cpuResource -> cpuResource.getValue().doubleValue());
+    }
+
+    public Map<String, Double> getExternalResources() {
+        return Collections.unmodifiableMap(externalResources);
+    }
+
+    public static Builder newBuilder(String name) {
+        return new Builder(name);
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (obj == this) {
+            return true;
+        } else if (obj != null && obj.getClass() == SlotSharingGroup.class) {
+            SlotSharingGroup that = (SlotSharingGroup) obj;
+            return Objects.equals(this.cpuCores, that.cpuCores)
+                    && Objects.equals(taskHeapMemory, that.taskHeapMemory)
+                    && Objects.equals(taskOffHeapMemory, 
that.taskOffHeapMemory)
+                    && Objects.equals(managedMemory, that.managedMemory)
+                    && Objects.equals(externalResources, 
that.externalResources);
+        }
+        return false;
+    }
+
+    @Override
+    public int hashCode() {
+        int result = Objects.hashCode(cpuCores);
+        result = 31 * result + Objects.hashCode(taskHeapMemory);
+        result = 31 * result + Objects.hashCode(taskOffHeapMemory);
+        result = 31 * result + Objects.hashCode(managedMemory);
+        result = 31 * result + externalResources.hashCode();
+        return result;
+    }
+
+    /** Builder for the {@link SlotSharingGroup}. */
+    public static class Builder {
+
+        private String name;
+        private CPUResource cpuCores;
+        private MemorySize taskHeapMemory;
+        private MemorySize taskOffHeapMemory;
+        private MemorySize managedMemory;
+        private Map<String, Double> externalResources = new HashMap<>();
+
+        private Builder(String name) {
+            this.name = name;
+        }
+
+        /** Set the CPU cores for this SlotSharingGroup. */
+        public Builder setCpuCores(double cpuCores) {
+            checkArgument(cpuCores > 0, "The cpu cores should be positive.");
+            this.cpuCores = new CPUResource(cpuCores);
+            return this;
+        }
+
+        /** Set the task heap memory for this SlotSharingGroup. */
+        public Builder setTaskHeapMemory(MemorySize taskHeapMemory) {
+            checkArgument(
+                    taskHeapMemory.compareTo(MemorySize.ZERO) > 0,
+                    "The task heap memory should be positive.");
+            this.taskHeapMemory = taskHeapMemory;
+            return this;
+        }
+
+        /** Set the task heap memory for this SlotSharingGroup in MB. */
+        public Builder setTaskHeapMemoryMB(int taskHeapMemoryMB) {
+            checkArgument(taskHeapMemoryMB > 0, "The task heap memory should 
be positive.");
+            this.taskHeapMemory = MemorySize.ofMebiBytes(taskHeapMemoryMB);
+            return this;
+        }
+
+        /** Set the task off-heap memory for this SlotSharingGroup. */
+        public Builder setTaskOffHeapMemory(MemorySize taskOffHeapMemory) {
+            this.taskOffHeapMemory = taskOffHeapMemory;
+            return this;
+        }
+
+        /** Set the task off-heap memory for this SlotSharingGroup in MB. */
+        public Builder setTaskOffHeapMemoryMB(int taskOffHeapMemoryMB) {
+            this.taskOffHeapMemory = 
MemorySize.ofMebiBytes(taskOffHeapMemoryMB);
+            return this;
+        }
+
+        /** Set the task managed memory for this SlotSharingGroup. */
+        public Builder setManagedMemory(MemorySize managedMemory) {
+            this.managedMemory = managedMemory;
+            return this;
+        }
+
+        /** Set the task managed memory for this SlotSharingGroup in MB. */
+        public Builder setManagedMemoryMB(int managedMemoryMB) {
+            this.managedMemory = MemorySize.ofMebiBytes(managedMemoryMB);
+            return this;
+        }
+
+        /**
+         * Add the given external resource. The old value with the same 
resource name will be
+         * replaced if present.
+         */
+        public Builder setExternalResource(String name, double value) {
+            this.externalResources.put(name, value);
+            return this;
+        }
+
+        /** Build the SlotSharingGroup. */
+        public SlotSharingGroup build() {
+            if (cpuCores != null && taskHeapMemory != null) {
+                taskOffHeapMemory = 
Optional.ofNullable(taskOffHeapMemory).orElse(MemorySize.ZERO);
+                managedMemory = 
Optional.ofNullable(managedMemory).orElse(MemorySize.ZERO);
+                return new SlotSharingGroup(
+                        name,
+                        cpuCores,
+                        taskHeapMemory,
+                        taskOffHeapMemory,
+                        managedMemory,
+                        externalResources);
+            } else if (cpuCores != null
+                    || taskHeapMemory != null
+                    || taskOffHeapMemory != null
+                    || managedMemory != null
+                    || !externalResources.isEmpty()) {
+                throw new IllegalArgumentException(
+                        "The cpu cores and task heap memory are required when 
specifying the resource of a slot sharing group. "
+                                + "You need to explicitly configure them with 
positive value.");
+            } else {
+                return new SlotSharingGroup(name);
+            }
+        }
+    }
+}
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/common/operators/SlotSharingGroupTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/common/operators/SlotSharingGroupTest.java
new file mode 100644
index 0000000..e48920d
--- /dev/null
+++ 
b/flink-core/src/test/java/org/apache/flink/api/common/operators/SlotSharingGroupTest.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.operators;
+
+import org.apache.flink.configuration.MemorySize;
+
+import org.junit.Test;
+
+import java.util.Collections;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/** Tests for {@link SlotSharingGroup}. */
+public class SlotSharingGroupTest {
+    @Test
+    public void testBuildSlotSharingGroupWithSpecificResource() {
+        final String name = "ssg";
+        final MemorySize heap = MemorySize.ofMebiBytes(100);
+        final MemorySize offHeap = MemorySize.ofMebiBytes(200);
+        final MemorySize managed = MemorySize.ofMebiBytes(300);
+        final SlotSharingGroup slotSharingGroup =
+                SlotSharingGroup.newBuilder(name)
+                        .setCpuCores(1)
+                        .setTaskHeapMemory(heap)
+                        .setTaskOffHeapMemory(offHeap)
+                        .setManagedMemory(managed)
+                        .setExternalResource("gpu", 1)
+                        .build();
+
+        assertThat(slotSharingGroup.getName(), is(name));
+        assertThat(slotSharingGroup.getCpuCores().get(), is(1.0));
+        assertThat(slotSharingGroup.getTaskHeapMemory().get(), is(heap));
+        assertThat(slotSharingGroup.getTaskOffHeapMemory().get(), is(offHeap));
+        assertThat(slotSharingGroup.getManagedMemory().get(), is(managed));
+        assertThat(
+                slotSharingGroup.getExternalResources(), 
is(Collections.singletonMap("gpu", 1.0)));
+    }
+
+    @Test
+    public void testBuildSlotSharingGroupWithUnknownResource() {
+        final String name = "ssg";
+        final SlotSharingGroup slotSharingGroup = 
SlotSharingGroup.newBuilder(name).build();
+
+        assertThat(slotSharingGroup.getName(), is(name));
+        assertFalse(slotSharingGroup.getCpuCores().isPresent());
+        assertFalse(slotSharingGroup.getTaskHeapMemory().isPresent());
+        assertFalse(slotSharingGroup.getManagedMemory().isPresent());
+        assertFalse(slotSharingGroup.getTaskOffHeapMemory().isPresent());
+        assertTrue(slotSharingGroup.getExternalResources().isEmpty());
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testBuildSlotSharingGroupWithIllegalConfig() {
+        SlotSharingGroup.newBuilder("ssg")
+                .setCpuCores(1)
+                .setTaskHeapMemory(MemorySize.ZERO)
+                .setTaskOffHeapMemoryMB(10)
+                .build();
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testBuildSlotSharingGroupWithoutAllRequiredConfig() {
+        
SlotSharingGroup.newBuilder("ssg").setCpuCores(1).setTaskOffHeapMemoryMB(10).build();
+    }
+}

Reply via email to