This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 3f36bbf79ce9b34228f20f2bba50c85d1b6a29e3
Author: Yangze Guo <[email protected]>
AuthorDate: Fri Jun 25 15:50:06 2021 +0800

    [FLINK-21925][core] Introduce 
StreamExecutionEnvironment#registerSlotSharingGroup
---
 ...st_stream_execution_environment_completeness.py |  3 +-
 .../environment/StreamExecutionEnvironment.java    | 37 +++++++++++++++++-
 .../api/StreamExecutionEnvironmentTest.java        | 44 ++++++++++++++++++++++
 .../api/scala/StreamExecutionEnvironment.scala     | 17 +++++++++
 4 files changed, 99 insertions(+), 2 deletions(-)

diff --git 
a/flink-python/pyflink/datastream/tests/test_stream_execution_environment_completeness.py
 
b/flink-python/pyflink/datastream/tests/test_stream_execution_environment_completeness.py
index 878d042..5f6a1a8 100644
--- 
a/flink-python/pyflink/datastream/tests/test_stream_execution_environment_completeness.py
+++ 
b/flink-python/pyflink/datastream/tests/test_stream_execution_environment_completeness.py
@@ -39,6 +39,7 @@ class 
StreamExecutionEnvironmentCompletenessTests(PythonAPICompletenessTestCase,
         # Currently only the methods for configuration is added.
         # 'isForceCheckpointing', 'getNumberOfExecutionRetries', 
'setNumberOfExecutionRetries'
         # is deprecated, exclude them.
+        # TODO the registerSlotSharingGroup should be removed from this list 
after FLINK-23165.
         return {'getLastJobExecutionResult', 'getId', 'getIdString',
                 'registerCachedFile', 'createCollectionsEnvironment', 
'createLocalEnvironment',
                 'createRemoteEnvironment', 'addOperator', 'fromElements',
@@ -48,7 +49,7 @@ class 
StreamExecutionEnvironmentCompletenessTests(PythonAPICompletenessTestCase,
                 'createInput', 'createLocalEnvironmentWithWebUI', 
'fromCollection',
                 'socketTextStream', 'initializeContextEnvironment', 
'readTextFile',
                 'setNumberOfExecutionRetries', 'configure', 'executeAsync', 
'registerJobListener',
-                'clearJobListeners', 'getJobListeners', "fromSequence"}
+                'clearJobListeners', 'getJobListeners', "fromSequence", 
"registerSlotSharingGroup"}
 
 
 if __name__ == '__main__':
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index 77efe15..ba5d107 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -31,6 +31,9 @@ import 
org.apache.flink.api.common.functions.InvalidTypesException;
 import org.apache.flink.api.common.io.FileInputFormat;
 import org.apache.flink.api.common.io.FilePathFilter;
 import org.apache.flink.api.common.io.InputFormat;
+import org.apache.flink.api.common.operators.ResourceSpec;
+import org.apache.flink.api.common.operators.SlotSharingGroup;
+import org.apache.flink.api.common.operators.util.SlotSharingGroupUtils;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
@@ -53,6 +56,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.DeploymentOptions;
 import org.apache.flink.configuration.ExecutionOptions;
+import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.configuration.PipelineOptions;
 import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.configuration.RestOptions;
@@ -64,6 +68,7 @@ import org.apache.flink.core.execution.PipelineExecutor;
 import org.apache.flink.core.execution.PipelineExecutorFactory;
 import org.apache.flink.core.execution.PipelineExecutorServiceLoader;
 import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
 import org.apache.flink.runtime.state.StateBackend;
 import org.apache.flink.runtime.state.StateBackendLoader;
@@ -110,8 +115,10 @@ import java.net.URI;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
@@ -189,6 +196,9 @@ public class StreamExecutionEnvironment {
 
     private final List<JobListener> jobListeners = new ArrayList<>();
 
+    // Records the slot sharing groups and their corresponding fine-grained 
ResourceProfile
+    private final Map<String, ResourceProfile> slotSharingGroupResources = new 
HashMap<>();
+
     // 
--------------------------------------------------------------------------------------------
     // Constructor and Properties
     // 
--------------------------------------------------------------------------------------------
@@ -337,6 +347,30 @@ public class StreamExecutionEnvironment {
     }
 
     /**
+     * Register a slot sharing group with its resource spec.
+     *
+     * <p>Note that a slot sharing group hints the scheduler that the grouped 
operators CAN be
+     * deployed into a shared slot. There's no guarantee that the scheduler 
always deploy the
+     * grouped operators together. In cases grouped operators are deployed 
into separate slots, the
+     * slot resources will be derived from the specified group requirements.
+     *
+     * @param slotSharingGroup which contains name and its resource spec.
+     */
+    @PublicEvolving
+    public StreamExecutionEnvironment 
registerSlotSharingGroup(SlotSharingGroup slotSharingGroup) {
+        final ResourceSpec resourceSpec =
+                SlotSharingGroupUtils.extractResourceSpec(slotSharingGroup);
+        if (!resourceSpec.equals(ResourceSpec.UNKNOWN)) {
+            this.slotSharingGroupResources.put(
+                    slotSharingGroup.getName(),
+                    ResourceProfile.fromResourceSpec(
+                            
SlotSharingGroupUtils.extractResourceSpec(slotSharingGroup),
+                            MemorySize.ZERO));
+        }
+        return this;
+    }
+
+    /**
      * Gets the parallelism with which operation are executed by default. 
Operations can
      * individually override this value to use a specific parallelism.
      *
@@ -2087,7 +2121,8 @@ public class StreamExecutionEnvironment {
                 .setChaining(isChainingEnabled)
                 .setUserArtifacts(cacheFile)
                 .setTimeCharacteristic(timeCharacteristic)
-                .setDefaultBufferTimeout(bufferTimeout);
+                .setDefaultBufferTimeout(bufferTimeout)
+                .setSlotSharingGroupResource(slotSharingGroupResources);
     }
 
     /**
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java
index f86dad2..683e31f 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java
@@ -18,6 +18,7 @@
 package org.apache.flink.streaming.api;
 
 import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.operators.SlotSharingGroup;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeinfo.Types;
@@ -25,6 +26,7 @@ import org.apache.flink.api.java.typeutils.GenericTypeInfo;
 import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.PipelineOptions;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
 import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
@@ -48,7 +50,10 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.NoSuchElementException;
 
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
@@ -275,6 +280,45 @@ public class StreamExecutionEnvironmentTest {
     }
 
     @Test
+    public void testRegisterSlotSharingGroup() {
+        final SlotSharingGroup ssg1 =
+                
SlotSharingGroup.newBuilder("ssg1").setCpuCores(1).setTaskHeapMemoryMB(100).build();
+        final SlotSharingGroup ssg2 =
+                
SlotSharingGroup.newBuilder("ssg2").setCpuCores(2).setTaskHeapMemoryMB(200).build();
+        final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.registerSlotSharingGroup(ssg1);
+        env.registerSlotSharingGroup(ssg2);
+        
env.registerSlotSharingGroup(SlotSharingGroup.newBuilder("ssg3").build());
+
+        final DataStream<Integer> source = 
env.fromElements(1).slotSharingGroup("ssg1");
+        source.map(value -> value).slotSharingGroup(ssg2).addSink(new 
DiscardingSink<>());
+
+        final StreamGraph streamGraph = env.getStreamGraph();
+        assertThat(
+                streamGraph.getSlotSharingGroupResource("ssg1").get(),
+                is(ResourceProfile.fromResources(1, 100)));
+        assertThat(
+                streamGraph.getSlotSharingGroupResource("ssg2").get(),
+                is(ResourceProfile.fromResources(2, 200)));
+        
assertFalse(streamGraph.getSlotSharingGroupResource("ssg3").isPresent());
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testRegisterSlotSharingGroupConflict() {
+        final SlotSharingGroup ssg =
+                
SlotSharingGroup.newBuilder("ssg1").setCpuCores(1).setTaskHeapMemoryMB(100).build();
+        final SlotSharingGroup ssgConflict =
+                
SlotSharingGroup.newBuilder("ssg1").setCpuCores(2).setTaskHeapMemoryMB(200).build();
+        final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.registerSlotSharingGroup(ssg);
+
+        final DataStream<Integer> source = 
env.fromElements(1).slotSharingGroup("ssg1");
+        source.map(value -> value).slotSharingGroup(ssgConflict).addSink(new 
DiscardingSink<>());
+
+        env.getStreamGraph();
+    }
+
+    @Test
     public void testGetStreamGraph() {
         try {
             TypeInformation<Integer> typeInfo = BasicTypeInfo.INT_TYPE_INFO;
diff --git 
a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
 
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
index 3ee460c..d0551c3 100644
--- 
a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
+++ 
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
@@ -24,6 +24,7 @@ import org.apache.flink.annotation.{Experimental, Internal, 
Public, PublicEvolvi
 import org.apache.flink.api.common.RuntimeExecutionMode
 import org.apache.flink.api.common.eventtime.WatermarkStrategy
 import org.apache.flink.api.common.io.{FileInputFormat, FilePathFilter, 
InputFormat}
+import org.apache.flink.api.common.operators.SlotSharingGroup
 import 
org.apache.flink.api.common.restartstrategy.RestartStrategies.RestartStrategyConfiguration
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.connector.source.{Source, SourceSplit}
@@ -107,6 +108,22 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
   }
 
   /**
+   * Register a slot sharing group with its resource spec.
+   *
+   * <p>Note that a slot sharing group hints the scheduler that the grouped 
operators CAN be
+   * deployed into a shared slot. There's no guarantee that the scheduler 
always deploy the
+   * grouped operators together. In cases grouped operators are deployed into 
separate slots, the
+   * slot resources will be derived from the specified group requirements.
+   *
+   * @param slotSharingGroup which contains name and its resource spec.
+   */
+  @PublicEvolving
+  def registerSlotSharingGroup(slotSharingGroup: SlotSharingGroup): 
StreamExecutionEnvironment = {
+    javaEnv.registerSlotSharingGroup(slotSharingGroup)
+    this
+  }
+
+  /**
    * Returns the default parallelism for this execution environment. Note that 
this
    * value can be overridden by individual operations using 
[[DataStream#setParallelism(int)]]
    */

Reply via email to