This is an automated email from the ASF dual-hosted git repository. xtsong pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 2e86133703bbc9f7876a0534c584be11ef120fd8 Author: Yangze Guo <[email protected]> AuthorDate: Fri Jun 25 14:53:24 2021 +0800 [FLINK-21925][core] Introduce #slotSharingGroup(SlotSharingGroup) for configuring slot sharing group with its resource --- .../org/apache/flink/api/dag/Transformation.java | 31 +++++++++--- .../apache/flink/python/util/PythonConfigUtil.java | 4 +- .../streaming/api/datastream/DataStreamSink.java | 19 ++++++++ .../api/datastream/SingleOutputStreamOperator.java | 19 ++++++++ .../streaming/api/graph/StreamGraphGenerator.java | 46 +++++++++++++++-- .../api/graph/StreamGraphGeneratorTest.java | 57 ++++++++++++++++++++++ 6 files changed, 163 insertions(+), 13 deletions(-) diff --git a/flink-core/src/main/java/org/apache/flink/api/dag/Transformation.java b/flink-core/src/main/java/org/apache/flink/api/dag/Transformation.java index 0334b07..915bafb 100644 --- a/flink-core/src/main/java/org/apache/flink/api/dag/Transformation.java +++ b/flink-core/src/main/java/org/apache/flink/api/dag/Transformation.java @@ -21,6 +21,7 @@ package org.apache.flink.api.dag; import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.functions.InvalidTypesException; import org.apache.flink.api.common.operators.ResourceSpec; +import org.apache.flink.api.common.operators.SlotSharingGroup; import org.apache.flink.api.common.operators.util.OperatorValidationUtils; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.typeutils.MissingTypeInfo; @@ -167,7 +168,7 @@ public abstract class Transformation<T> { protected long bufferTimeout = -1; - private String slotSharingGroup; + private Optional<SlotSharingGroup> slotSharingGroup; @Nullable private String coLocationGroupKey; @@ -184,7 +185,7 @@ public abstract class Transformation<T> { this.name = Preconditions.checkNotNull(name); this.outputType = outputType; this.parallelism = parallelism; - this.slotSharingGroup = null; + this.slotSharingGroup = Optional.empty(); } /** Returns the unique ID of this {@code Transformation}. */ @@ -390,11 +391,11 @@ public abstract class Transformation<T> { } /** - * Returns the slot sharing group of this transformation. + * Returns the slot sharing group of this transformation if present. * - * @see #setSlotSharingGroup(String) + * @see #setSlotSharingGroup(SlotSharingGroup) */ - public String getSlotSharingGroup() { + public Optional<SlotSharingGroup> getSlotSharingGroup() { return slotSharingGroup; } @@ -405,10 +406,24 @@ public abstract class Transformation<T> { * <p>Initially, an operation is in the default slot sharing group. This can be explicitly set * using {@code setSlotSharingGroup("default")}. * - * @param slotSharingGroup The slot sharing group name. + * @param slotSharingGroupName The slot sharing group's name. */ - public void setSlotSharingGroup(String slotSharingGroup) { - this.slotSharingGroup = slotSharingGroup; + public void setSlotSharingGroup(String slotSharingGroupName) { + this.slotSharingGroup = + Optional.of(SlotSharingGroup.newBuilder(slotSharingGroupName).build()); + } + + /** + * Sets the slot sharing group of this transformation. Parallel instances of operations that are + * in the same slot sharing group will be co-located in the same TaskManager slot, if possible. + * + * <p>Initially, an operation is in the default slot sharing group. This can be explicitly set + * with constructing a {@link SlotSharingGroup} with name {@code "default"}. + * + * @param slotSharingGroup which contains name and its resource spec. + */ + public void setSlotSharingGroup(SlotSharingGroup slotSharingGroup) { + this.slotSharingGroup = Optional.of(slotSharingGroup); } /** diff --git a/flink-python/src/main/java/org/apache/flink/python/util/PythonConfigUtil.java b/flink-python/src/main/java/org/apache/flink/python/util/PythonConfigUtil.java index 8181266..cdba06e 100644 --- a/flink-python/src/main/java/org/apache/flink/python/util/PythonConfigUtil.java +++ b/flink-python/src/main/java/org/apache/flink/python/util/PythonConfigUtil.java @@ -215,7 +215,9 @@ public class PythonConfigUtil { private static void chainTransformation( Transformation<?> firstTransformation, Transformation<?> secondTransformation) { - firstTransformation.setSlotSharingGroup(secondTransformation.getSlotSharingGroup()); + secondTransformation + .getSlotSharingGroup() + .ifPresent(firstTransformation::setSlotSharingGroup); firstTransformation.setCoLocationGroupKey(secondTransformation.getCoLocationGroupKey()); firstTransformation.setParallelism(secondTransformation.getParallelism()); } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java index a3da7c6..eea988e 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java @@ -21,6 +21,7 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.annotation.Public; import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.common.operators.ResourceSpec; +import org.apache.flink.api.common.operators.SlotSharingGroup; import org.apache.flink.api.connector.sink.Sink; import org.apache.flink.streaming.api.operators.ChainingStrategy; import org.apache.flink.streaming.api.operators.StreamSink; @@ -205,4 +206,22 @@ public class DataStreamSink<T> { transformation.setSlotSharingGroup(slotSharingGroup); return this; } + + /** + * Sets the slot sharing group of this operation. Parallel instances of operations that are in + * the same slot sharing group will be co-located in the same TaskManager slot, if possible. + * + * <p>Operations inherit the slot sharing group of input operations if all input operations are + * in the same slot sharing group and no slot sharing group was explicitly specified. + * + * <p>Initially an operation is in the default slot sharing group. An operation can be put into + * the default group explicitly by setting the slot sharing group with name {@code "default"}. + * + * @param slotSharingGroup which contains name and its resource spec. + */ + @PublicEvolving + public DataStreamSink<T> slotSharingGroup(SlotSharingGroup slotSharingGroup) { + transformation.setSlotSharingGroup(slotSharingGroup); + return this; + } } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java index 6b77c24..7b64d21 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java @@ -21,6 +21,7 @@ import org.apache.flink.annotation.Public; import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.common.functions.InvalidTypesException; import org.apache.flink.api.common.operators.ResourceSpec; +import org.apache.flink.api.common.operators.SlotSharingGroup; import org.apache.flink.api.common.operators.util.OperatorValidationUtils; import org.apache.flink.api.common.typeinfo.TypeHint; import org.apache.flink.api.common.typeinfo.TypeInformation; @@ -374,6 +375,24 @@ public class SingleOutputStreamOperator<T> extends DataStream<T> { } /** + * Sets the slot sharing group of this operation. Parallel instances of operations that are in + * the same slot sharing group will be co-located in the same TaskManager slot, if possible. + * + * <p>Operations inherit the slot sharing group of input operations if all input operations are + * in the same slot sharing group and no slot sharing group was explicitly specified. + * + * <p>Initially an operation is in the default slot sharing group. An operation can be put into + * the default group explicitly by setting the slot sharing group with name {@code "default"}. + * + * @param slotSharingGroup which contains name and its resource spec. + */ + @PublicEvolving + public SingleOutputStreamOperator<T> slotSharingGroup(SlotSharingGroup slotSharingGroup) { + transformation.setSlotSharingGroup(slotSharingGroup); + return this; + } + + /** * Gets the {@link DataStream} that contains the elements that are emitted from an operation * into the side output with the given {@link OutputTag}. * diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java index 5a1d26c..d39fd1f 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java @@ -22,11 +22,14 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.RuntimeExecutionMode; import org.apache.flink.api.common.cache.DistributedCache; +import org.apache.flink.api.common.operators.ResourceSpec; +import org.apache.flink.api.common.operators.util.SlotSharingGroupUtils; import org.apache.flink.api.connector.source.Boundedness; import org.apache.flink.api.dag.Transformation; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.ExecutionOptions; +import org.apache.flink.configuration.MemorySize; import org.apache.flink.configuration.ReadableConfig; import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.clusterframework.types.ResourceProfile; @@ -141,7 +144,7 @@ public class StreamGraphGenerator { private final ReadableConfig configuration; - // Records the slot sharing groups and their corresponding ResourceProfile + // Records the slot sharing groups and their corresponding fine-grained ResourceProfile private final Map<String, ResourceProfile> slotSharingGroupResources = new HashMap<>(); private Path savepointDir; @@ -293,7 +296,12 @@ public class StreamGraphGenerator { */ public StreamGraphGenerator setSlotSharingGroupResource( Map<String, ResourceProfile> slotSharingGroupResources) { - this.slotSharingGroupResources.putAll(slotSharingGroupResources); + slotSharingGroupResources.forEach( + (name, profile) -> { + if (!profile.equals(ResourceProfile.UNKNOWN)) { + this.slotSharingGroupResources.put(name, profile); + } + }); return this; } @@ -312,6 +320,8 @@ public class StreamGraphGenerator { transform(transformation); } + streamGraph.setSlotSharingGroupResource(slotSharingGroupResources); + for (StreamNode node : streamGraph.getStreamNodes()) { if (node.getInEdges().stream().anyMatch(this::shouldDisableUnalignedCheckpointing)) { for (StreamEdge edge : node.getInEdges()) { @@ -342,7 +352,6 @@ public class StreamGraphGenerator { graph.setTimeCharacteristic(timeCharacteristic); graph.setJobName(jobName); graph.setJobType(shouldExecuteInBatchMode ? JobType.BATCH : JobType.STREAMING); - graph.setSlotSharingGroupResource(slotSharingGroupResources); if (shouldExecuteInBatchMode) { @@ -452,6 +461,33 @@ public class StreamGraphGenerator { } } + transform + .getSlotSharingGroup() + .ifPresent( + slotSharingGroup -> { + final ResourceSpec resourceSpec = + SlotSharingGroupUtils.extractResourceSpec(slotSharingGroup); + if (!resourceSpec.equals(ResourceSpec.UNKNOWN)) { + slotSharingGroupResources.compute( + slotSharingGroup.getName(), + (name, profile) -> { + if (profile == null) { + return ResourceProfile.fromResourceSpec( + resourceSpec, MemorySize.ZERO); + } else if (!ResourceProfile.fromResourceSpec( + resourceSpec, MemorySize.ZERO) + .equals(profile)) { + throw new IllegalArgumentException( + "The slot sharing group " + + slotSharingGroup.getName() + + " has been configured with two different resource spec."); + } else { + return profile; + } + }); + } + }); + // call at least once to trigger exceptions about MissingTypeInfo transform.getOutputType(); @@ -720,7 +756,9 @@ public class StreamGraphGenerator { final String slotSharingGroup = determineSlotSharingGroup( - transform.getSlotSharingGroup(), + transform.getSlotSharingGroup().isPresent() + ? transform.getSlotSharingGroup().get().getName() + : null, allInputIds.stream() .flatMap(Collection::stream) .collect(Collectors.toList())); diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java index 23e3807..4a59ec4 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java @@ -21,6 +21,7 @@ package org.apache.flink.streaming.api.graph; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.functions.Function; import org.apache.flink.api.common.operators.ResourceSpec; +import org.apache.flink.api.common.operators.SlotSharingGroup; import org.apache.flink.api.common.state.MapStateDescriptor; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeinfo.TypeInformation; @@ -749,6 +750,62 @@ public class StreamGraphGeneratorTest extends TestLogger { equalTo(SavepointRestoreSettings.forPath("/tmp/savepoint1"))); } + @Test + public void testConfigureSlotSharingGroupResource() { + final SlotSharingGroup ssg1 = + SlotSharingGroup.newBuilder("ssg1").setCpuCores(1).setTaskHeapMemoryMB(100).build(); + final SlotSharingGroup ssg2 = + SlotSharingGroup.newBuilder("ssg2").setCpuCores(2).setTaskHeapMemoryMB(200).build(); + final SlotSharingGroup ssg3 = + SlotSharingGroup.newBuilder(StreamGraphGenerator.DEFAULT_SLOT_SHARING_GROUP) + .setCpuCores(3) + .setTaskHeapMemoryMB(300) + .build(); + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + final DataStream<Integer> source = env.fromElements(1).slotSharingGroup("ssg1"); + source.map(value -> value) + .slotSharingGroup(ssg2) + .map(value -> value * 2) + .map(value -> value * 3) + .slotSharingGroup(SlotSharingGroup.newBuilder("ssg4").build()) + .map(value -> value * 4) + .slotSharingGroup(ssg3) + .addSink(new DiscardingSink<>()) + .slotSharingGroup(ssg1); + + final StreamGraph streamGraph = env.getStreamGraph(); + assertThat( + streamGraph.getSlotSharingGroupResource("ssg1").get(), + is(ResourceProfile.fromResources(1, 100))); + assertThat( + streamGraph.getSlotSharingGroupResource("ssg2").get(), + is(ResourceProfile.fromResources(2, 200))); + assertThat( + streamGraph + .getSlotSharingGroupResource( + StreamGraphGenerator.DEFAULT_SLOT_SHARING_GROUP) + .get(), + is(ResourceProfile.fromResources(3, 300))); + } + + @Test(expected = IllegalArgumentException.class) + public void testConflictSlotSharingGroup() { + final SlotSharingGroup ssg = + SlotSharingGroup.newBuilder("ssg").setCpuCores(1).setTaskHeapMemoryMB(100).build(); + final SlotSharingGroup ssgConflict = + SlotSharingGroup.newBuilder("ssg").setCpuCores(2).setTaskHeapMemoryMB(200).build(); + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + final DataStream<Integer> source = env.fromElements(1).slotSharingGroup(ssg); + source.map(value -> value) + .slotSharingGroup(ssgConflict) + .addSink(new DiscardingSink<>()) + .slotSharingGroup(ssgConflict); + + env.getStreamGraph(); + } + private static class OutputTypeConfigurableFunction<T> implements OutputTypeConfigurable<T>, Function { private TypeInformation<T> typeInformation;
