This is an automated email from the ASF dual-hosted git repository.
kevinjqliu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new e2569ef893 Backport #16065 to Flink v2.0 and v1.20 (#16429)
e2569ef893 is described below
commit e2569ef893e8f9acbea22798cc938ac3b885bd02
Author: Han You <[email protected]>
AuthorDate: Wed May 20 11:13:52 2026 -0500
Backport #16065 to Flink v2.0 and v1.20 (#16429)
Co-authored-by: Han You <[email protected]>
---
.../flink/sink/dynamic/DynamicIcebergSink.java | 37 ++++++++++++-
.../flink/sink/dynamic/FlinkDynamicSinkConf.java | 16 ++++++
.../sink/dynamic/FlinkDynamicSinkOptions.java | 20 +++++++
.../flink/sink/dynamic/TestDynamicIcebergSink.java | 63 ++++++++++++++++++++++
.../flink/sink/dynamic/DynamicIcebergSink.java | 37 ++++++++++++-
.../flink/sink/dynamic/FlinkDynamicSinkConf.java | 16 ++++++
.../sink/dynamic/FlinkDynamicSinkOptions.java | 20 +++++++
.../flink/sink/dynamic/TestDynamicIcebergSink.java | 63 ++++++++++++++++++++++
8 files changed, 270 insertions(+), 2 deletions(-)
diff --git
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java
index e7cd2c1645..eab3d81c44 100644
---
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java
+++
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java
@@ -312,6 +312,28 @@ public class DynamicIcebergSink
return this;
}
+ /**
+ * Set the slot sharing group for the generator (and the forward-writer
chained to it) by name.
+ * Register the slot sharing group with its resource spec on the {@code
+ * StreamExecutionEnvironment} via {@code
env.registerSlotSharingGroup(...)}. If unset, Flink
+ * inherits the slot sharing group from the upstream operator.
+ */
+ public Builder<T> generatorSlotSharingGroup(String ssg) {
+
writeOptions.put(FlinkDynamicSinkOptions.GENERATOR_SLOT_SHARING_GROUP.key(),
ssg);
+ return this;
+ }
+
+ /**
+ * Set the slot sharing group for the shuffling sink (writer plus
committer) by name. Register
+ * the slot sharing group with its resource spec on the {@code
StreamExecutionEnvironment} via
+ * {@code env.registerSlotSharingGroup(...)}. If unset, Flink inherits the
slot sharing group
+ * from the upstream operator.
+ */
+ public Builder<T> shuffleSinkSlotSharingGroup(String ssg) {
+
writeOptions.put(FlinkDynamicSinkOptions.SHUFFLE_SINK_SLOT_SHARING_GROUP.key(),
ssg);
+ return this;
+ }
+
/**
* Set the uid prefix for IcebergSink operators. Note that IcebergSink
internally consists of
* multiple operators (like writer, committer, aggregator) Actual operator
uid will be appended
@@ -438,7 +460,7 @@ public class DynamicIcebergSink
TypeInformation<CommittableMessage<DynamicWriteResult>>
writeResultTypeInfo =
CommittableMessageTypeInfo.of(DynamicWriteResultSerializer::new);
- DataStream<CommittableMessage<DynamicWriteResult>> forwardWriteResults =
+ SingleOutputStreamOperator<CommittableMessage<DynamicWriteResult>>
forwardWriteResults =
converted
.getSideOutput(
new
OutputTag<>(DynamicRecordProcessor.DYNAMIC_FORWARD_STREAM, sideOutputType))
@@ -449,6 +471,11 @@ public class DynamicIcebergSink
.setParallelism(converted.getParallelism())
.uid(prefixIfNotNull(uidPrefix, "-forward-writer"));
+ String generatorSsg = flinkDynamicSinkConf.generatorSlotSharingGroup();
+ if (generatorSsg != null) {
+ forwardWriteResults.slotSharingGroup(generatorSsg);
+ }
+
// Inject forward write results into sink — they'll be unioned in
addPreCommitTopology
return instantiateSink(writeOptions, flinkConfig, forwardWriteResults);
}
@@ -513,6 +540,10 @@ public class DynamicIcebergSink
.uid(prefixIfNotNull(uidPrefix, "-generator"))
.name(operatorName("generator"))
.returns(type);
+ String generatorSsg = flinkDynamicSinkConf.generatorSlotSharingGroup();
+ if (generatorSsg != null) {
+ converted.slotSharingGroup(generatorSsg);
+ }
DynamicIcebergSink sink = build(converted, sideOutputType);
@@ -534,6 +565,10 @@ public class DynamicIcebergSink
shuffleInput
.sinkTo(sink) // Forward write results are implicitly injected
here
.uid(prefixIfNotNull(uidPrefix, "-sink"));
+ String shuffleSinkSsg =
flinkDynamicSinkConf.shuffleSinkSlotSharingGroup();
+ if (shuffleSinkSsg != null) {
+ result.slotSharingGroup(shuffleSinkSsg);
+ }
FlinkWriteConf flinkWriteConf = new FlinkWriteConf(writeOptions,
readableConfig);
if (flinkWriteConf.writeParallelism() != null) {
diff --git
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/FlinkDynamicSinkConf.java
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/FlinkDynamicSinkConf.java
index 75b169c4b5..23c57535cd 100644
---
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/FlinkDynamicSinkConf.java
+++
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/FlinkDynamicSinkConf.java
@@ -99,4 +99,20 @@ class FlinkDynamicSinkConf {
.defaultValue(FlinkDynamicSinkOptions.CASE_SENSITIVE.defaultValue())
.parse();
}
+
+ String generatorSlotSharingGroup() {
+ return confParser
+ .stringConf()
+ .option(FlinkDynamicSinkOptions.GENERATOR_SLOT_SHARING_GROUP.key())
+ .flinkConfig(FlinkDynamicSinkOptions.GENERATOR_SLOT_SHARING_GROUP)
+ .parseOptional();
+ }
+
+ String shuffleSinkSlotSharingGroup() {
+ return confParser
+ .stringConf()
+ .option(FlinkDynamicSinkOptions.SHUFFLE_SINK_SLOT_SHARING_GROUP.key())
+ .flinkConfig(FlinkDynamicSinkOptions.SHUFFLE_SINK_SLOT_SHARING_GROUP)
+ .parseOptional();
+ }
}
diff --git
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/FlinkDynamicSinkOptions.java
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/FlinkDynamicSinkOptions.java
index 7a4f038219..7c0f7c70d5 100644
---
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/FlinkDynamicSinkOptions.java
+++
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/FlinkDynamicSinkOptions.java
@@ -68,4 +68,24 @@ public class FlinkDynamicSinkOptions {
.defaultValue(true)
.withDescription(
"Controls whether schema field name matching should be
case-sensitive in Dynamic Sink.");
+
+ public static final ConfigOption<String> GENERATOR_SLOT_SHARING_GROUP =
+ ConfigOptions.key("dynamic-sink.generator-slot-sharing-group")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "Name of the slot sharing group for the generator and the
forward-writer chained to it. "
+ + "Register the slot sharing group with its resource spec on
the StreamExecutionEnvironment "
+ + "via env.registerSlotSharingGroup(...). If unset, Flink
inherits the slot sharing group "
+ + "from the upstream operator.");
+
+ public static final ConfigOption<String> SHUFFLE_SINK_SLOT_SHARING_GROUP =
+ ConfigOptions.key("dynamic-sink.shuffle-sink-slot-sharing-group")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "Name of the slot sharing group for the shuffling sink (writer
plus committer). "
+ + "Register the slot sharing group with its resource spec on
the StreamExecutionEnvironment "
+ + "via env.registerSlotSharingGroup(...). If unset, Flink
inherits the slot sharing group "
+ + "from the upstream operator.");
}
diff --git
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java
index bafd0276b7..355bc6805f 100644
---
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java
+++
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java
@@ -35,12 +35,14 @@ import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import javax.annotation.Nullable;
+import org.apache.flink.api.common.operators.SlotSharingGroup;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.sink2.Committer;
import org.apache.flink.api.connector.sink2.CommitterInitContext;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.RestartStrategyOptions;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.runtime.client.JobExecutionException;
@@ -326,6 +328,67 @@ class TestDynamicIcebergSink extends
TestFlinkIcebergSinkBase {
assertThat(generatorAndSinkChained).isTrue();
}
+ @Test
+ void testSlotSharingGroup() {
+ DataStream<DynamicIcebergDataImpl> dataStream =
+ env.fromData(Collections.emptyList(), TypeInformation.of(new
TypeHint<>() {}));
+
+ // Slot sharing group resource specs are registered on the env by name and
referenced from
+ // operators via the String overload of .slotSharingGroup(...). We verify
the effect by reading
+ // back the resource profile from the resulting job graph vertices.
+ String shuffleSinkSsgName = "shuffle-sink-ssg";
+ MemorySize shuffleSinkMemorySize = new MemorySize(123);
+ env.registerSlotSharingGroup(
+ SlotSharingGroup.newBuilder(shuffleSinkSsgName)
+ .setCpuCores(123)
+ .setTaskHeapMemory(shuffleSinkMemorySize)
+ .build());
+
+ String generatorSsgName = "generator-ssg";
+ MemorySize generatorMemorySize = new MemorySize(456);
+ env.registerSlotSharingGroup(
+ SlotSharingGroup.newBuilder(generatorSsgName)
+ .setCpuCores(456)
+ .setTaskHeapMemory(generatorMemorySize)
+ .build());
+
+ DynamicIcebergSink.forInput(dataStream)
+ .generator(new ForwardGenerator())
+ .catalogLoader(CATALOG_EXTENSION.catalogLoader())
+ .immediateTableUpdate(false)
+ .shuffleSinkSlotSharingGroup(shuffleSinkSsgName)
+ .generatorSlotSharingGroup(generatorSsgName)
+ .append();
+
+ List<JobVertex> vertices =
+
StreamSupport.stream(env.getStreamGraph().getJobGraph().getVertices().spliterator(),
false)
+ .toList();
+
+ boolean shufflingWriterSSGApplied =
+ vertices.stream()
+ .filter(vertex -> vertex.getName() != null &&
vertex.getName().contains("Sink: Writer"))
+ .anyMatch(
+ vertex ->
+ vertex
+ .getSlotSharingGroup()
+ .getResourceProfile()
+ .getTaskHeapMemory()
+ .equals(shuffleSinkMemorySize));
+ boolean generatorSSGApplied =
+ vertices.stream()
+ .filter(vertex -> vertex.getName() != null &&
vertex.getName().contains("generator"))
+ .anyMatch(
+ vertex ->
+ vertex
+ .getSlotSharingGroup()
+ .getResourceProfile()
+ .getTaskHeapMemory()
+ .equals(generatorMemorySize));
+
+ assertThat(shufflingWriterSSGApplied).isTrue();
+ assertThat(generatorSSGApplied).isTrue();
+ }
+
@Test
void testForwardWrite() throws Exception {
runForwardWriteTest(new ForwardGenerator());
diff --git
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java
index ad430cbf13..c8f9fece23 100644
---
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java
+++
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java
@@ -310,6 +310,28 @@ public class DynamicIcebergSink
return this;
}
+ /**
+ * Set the slot sharing group for the generator (and the forward-writer
chained to it) by name.
+ * Register the slot sharing group with its resource spec on the {@code
+ * StreamExecutionEnvironment} via {@code
env.registerSlotSharingGroup(...)}. If unset, Flink
+ * inherits the slot sharing group from the upstream operator.
+ */
+ public Builder<T> generatorSlotSharingGroup(String ssg) {
+
writeOptions.put(FlinkDynamicSinkOptions.GENERATOR_SLOT_SHARING_GROUP.key(),
ssg);
+ return this;
+ }
+
+ /**
+ * Set the slot sharing group for the shuffling sink (writer plus
committer) by name. Register
+ * the slot sharing group with its resource spec on the {@code
StreamExecutionEnvironment} via
+ * {@code env.registerSlotSharingGroup(...)}. If unset, Flink inherits the
slot sharing group
+ * from the upstream operator.
+ */
+ public Builder<T> shuffleSinkSlotSharingGroup(String ssg) {
+
writeOptions.put(FlinkDynamicSinkOptions.SHUFFLE_SINK_SLOT_SHARING_GROUP.key(),
ssg);
+ return this;
+ }
+
/**
* Set the uid prefix for IcebergSink operators. Note that IcebergSink
internally consists of
* multiple operators (like writer, committer, aggregator) Actual operator
uid will be appended
@@ -436,7 +458,7 @@ public class DynamicIcebergSink
TypeInformation<CommittableMessage<DynamicWriteResult>>
writeResultTypeInfo =
CommittableMessageTypeInfo.of(DynamicWriteResultSerializer::new);
- DataStream<CommittableMessage<DynamicWriteResult>> forwardWriteResults =
+ SingleOutputStreamOperator<CommittableMessage<DynamicWriteResult>>
forwardWriteResults =
converted
.getSideOutput(
new
OutputTag<>(DynamicRecordProcessor.DYNAMIC_FORWARD_STREAM, sideOutputType))
@@ -447,6 +469,11 @@ public class DynamicIcebergSink
.setParallelism(converted.getParallelism())
.uid(prefixIfNotNull(uidPrefix, "-forward-writer"));
+ String generatorSsg = flinkDynamicSinkConf.generatorSlotSharingGroup();
+ if (generatorSsg != null) {
+ forwardWriteResults.slotSharingGroup(generatorSsg);
+ }
+
// Inject forward write results into sink — they'll be unioned in
addPreCommitTopology
return instantiateSink(writeOptions, flinkConfig, forwardWriteResults);
}
@@ -511,6 +538,10 @@ public class DynamicIcebergSink
.uid(prefixIfNotNull(uidPrefix, "-generator"))
.name(operatorName("generator"))
.returns(type);
+ String generatorSsg = flinkDynamicSinkConf.generatorSlotSharingGroup();
+ if (generatorSsg != null) {
+ converted.slotSharingGroup(generatorSsg);
+ }
DynamicIcebergSink sink = build(converted, sideOutputType);
@@ -532,6 +563,10 @@ public class DynamicIcebergSink
shuffleInput
.sinkTo(sink) // Forward write results are implicitly injected
here
.uid(prefixIfNotNull(uidPrefix, "-sink"));
+ String shuffleSinkSsg =
flinkDynamicSinkConf.shuffleSinkSlotSharingGroup();
+ if (shuffleSinkSsg != null) {
+ result.slotSharingGroup(shuffleSinkSsg);
+ }
FlinkWriteConf flinkWriteConf = new FlinkWriteConf(writeOptions,
readableConfig);
if (flinkWriteConf.writeParallelism() != null) {
diff --git
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/FlinkDynamicSinkConf.java
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/FlinkDynamicSinkConf.java
index 75b169c4b5..23c57535cd 100644
---
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/FlinkDynamicSinkConf.java
+++
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/FlinkDynamicSinkConf.java
@@ -99,4 +99,20 @@ class FlinkDynamicSinkConf {
.defaultValue(FlinkDynamicSinkOptions.CASE_SENSITIVE.defaultValue())
.parse();
}
+
+ String generatorSlotSharingGroup() {
+ return confParser
+ .stringConf()
+ .option(FlinkDynamicSinkOptions.GENERATOR_SLOT_SHARING_GROUP.key())
+ .flinkConfig(FlinkDynamicSinkOptions.GENERATOR_SLOT_SHARING_GROUP)
+ .parseOptional();
+ }
+
+ String shuffleSinkSlotSharingGroup() {
+ return confParser
+ .stringConf()
+ .option(FlinkDynamicSinkOptions.SHUFFLE_SINK_SLOT_SHARING_GROUP.key())
+ .flinkConfig(FlinkDynamicSinkOptions.SHUFFLE_SINK_SLOT_SHARING_GROUP)
+ .parseOptional();
+ }
}
diff --git
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/FlinkDynamicSinkOptions.java
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/FlinkDynamicSinkOptions.java
index 7a4f038219..7c0f7c70d5 100644
---
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/FlinkDynamicSinkOptions.java
+++
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/FlinkDynamicSinkOptions.java
@@ -68,4 +68,24 @@ public class FlinkDynamicSinkOptions {
.defaultValue(true)
.withDescription(
"Controls whether schema field name matching should be
case-sensitive in Dynamic Sink.");
+
+ public static final ConfigOption<String> GENERATOR_SLOT_SHARING_GROUP =
+ ConfigOptions.key("dynamic-sink.generator-slot-sharing-group")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "Name of the slot sharing group for the generator and the
forward-writer chained to it. "
+ + "Register the slot sharing group with its resource spec on
the StreamExecutionEnvironment "
+ + "via env.registerSlotSharingGroup(...). If unset, Flink
inherits the slot sharing group "
+ + "from the upstream operator.");
+
+ public static final ConfigOption<String> SHUFFLE_SINK_SLOT_SHARING_GROUP =
+ ConfigOptions.key("dynamic-sink.shuffle-sink-slot-sharing-group")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "Name of the slot sharing group for the shuffling sink (writer
plus committer). "
+ + "Register the slot sharing group with its resource spec on
the StreamExecutionEnvironment "
+ + "via env.registerSlotSharingGroup(...). If unset, Flink
inherits the slot sharing group "
+ + "from the upstream operator.");
}
diff --git
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java
index 89befb9e8e..3ba579df49 100644
---
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java
+++
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java
@@ -35,12 +35,14 @@ import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import javax.annotation.Nullable;
+import org.apache.flink.api.common.operators.SlotSharingGroup;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.sink2.Committer;
import org.apache.flink.api.connector.sink2.CommitterInitContext;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.RestartStrategyOptions;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.runtime.OperatorIDPair;
@@ -338,6 +340,67 @@ class TestDynamicIcebergSink extends
TestFlinkIcebergSinkBase {
assertThat(generatorAndSinkChained).isTrue();
}
+ @Test
+ void testSlotSharingGroup() {
+ DataStream<DynamicIcebergDataImpl> dataStream =
+ env.fromData(Collections.emptyList(), TypeInformation.of(new
TypeHint<>() {}));
+
+ // Slot sharing group resource specs are registered on the env by name and
referenced from
+ // operators via the String overload of .slotSharingGroup(...). We verify
the effect by reading
+ // back the resource profile from the resulting job graph vertices.
+ String shuffleSinkSsgName = "shuffle-sink-ssg";
+ MemorySize shuffleSinkMemorySize = new MemorySize(123);
+ env.registerSlotSharingGroup(
+ SlotSharingGroup.newBuilder(shuffleSinkSsgName)
+ .setCpuCores(123)
+ .setTaskHeapMemory(shuffleSinkMemorySize)
+ .build());
+
+ String generatorSsgName = "generator-ssg";
+ MemorySize generatorMemorySize = new MemorySize(456);
+ env.registerSlotSharingGroup(
+ SlotSharingGroup.newBuilder(generatorSsgName)
+ .setCpuCores(456)
+ .setTaskHeapMemory(generatorMemorySize)
+ .build());
+
+ DynamicIcebergSink.forInput(dataStream)
+ .generator(new ForwardGenerator())
+ .catalogLoader(CATALOG_EXTENSION.catalogLoader())
+ .immediateTableUpdate(false)
+ .shuffleSinkSlotSharingGroup(shuffleSinkSsgName)
+ .generatorSlotSharingGroup(generatorSsgName)
+ .append();
+
+ List<JobVertex> vertices =
+
StreamSupport.stream(env.getStreamGraph().getJobGraph().getVertices().spliterator(),
false)
+ .toList();
+
+ boolean shufflingWriterSSGApplied =
+ vertices.stream()
+ .filter(vertex -> vertex.getName() != null &&
vertex.getName().contains("Sink: Writer"))
+ .anyMatch(
+ vertex ->
+ vertex
+ .getSlotSharingGroup()
+ .getResourceProfile()
+ .getTaskHeapMemory()
+ .equals(shuffleSinkMemorySize));
+ boolean generatorSSGApplied =
+ vertices.stream()
+ .filter(vertex -> vertex.getName() != null &&
vertex.getName().contains("generator"))
+ .anyMatch(
+ vertex ->
+ vertex
+ .getSlotSharingGroup()
+ .getResourceProfile()
+ .getTaskHeapMemory()
+ .equals(generatorMemorySize));
+
+ assertThat(shufflingWriterSSGApplied).isTrue();
+ assertThat(generatorSSGApplied).isTrue();
+ }
+
@Test
void testForwardWrite() throws Exception {
runForwardWriteTest(new ForwardGenerator());