This is an automated email from the ASF dual-hosted git repository.

kevinjqliu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new e2569ef893 Backport #16065 to Flink v2.0 and v1.20 (#16429)
e2569ef893 is described below

commit e2569ef893e8f9acbea22798cc938ac3b885bd02
Author: Han You <[email protected]>
AuthorDate: Wed May 20 11:13:52 2026 -0500

    Backport #16065 to Flink v2.0 and v1.20 (#16429)
    
    Co-authored-by: Han You <[email protected]>
---
 .../flink/sink/dynamic/DynamicIcebergSink.java     | 37 ++++++++++++-
 .../flink/sink/dynamic/FlinkDynamicSinkConf.java   | 16 ++++++
 .../sink/dynamic/FlinkDynamicSinkOptions.java      | 20 +++++++
 .../flink/sink/dynamic/TestDynamicIcebergSink.java | 63 ++++++++++++++++++++++
 .../flink/sink/dynamic/DynamicIcebergSink.java     | 37 ++++++++++++-
 .../flink/sink/dynamic/FlinkDynamicSinkConf.java   | 16 ++++++
 .../sink/dynamic/FlinkDynamicSinkOptions.java      | 20 +++++++
 .../flink/sink/dynamic/TestDynamicIcebergSink.java | 63 ++++++++++++++++++++++
 8 files changed, 270 insertions(+), 2 deletions(-)

diff --git 
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java
 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java
index e7cd2c1645..eab3d81c44 100644
--- 
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java
+++ 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java
@@ -312,6 +312,28 @@ public class DynamicIcebergSink
       return this;
     }
 
+    /**
+     * Set the slot sharing group for the generator (and the forward-writer 
chained to it) by name.
+     * Register the slot sharing group with its resource spec on the {@code
+     * StreamExecutionEnvironment} via {@code 
env.registerSlotSharingGroup(...)}. If unset, Flink
+     * inherits the slot sharing group from the upstream operator.
+     */
+    public Builder<T> generatorSlotSharingGroup(String ssg) {
+      
writeOptions.put(FlinkDynamicSinkOptions.GENERATOR_SLOT_SHARING_GROUP.key(), 
ssg);
+      return this;
+    }
+
+    /**
+     * Set the slot sharing group for the shuffling sink (writer plus 
committer) by name. Register
+     * the slot sharing group with its resource spec on the {@code 
StreamExecutionEnvironment} via
+     * {@code env.registerSlotSharingGroup(...)}. If unset, Flink inherits the 
slot sharing group
+     * from the upstream operator.
+     */
+    public Builder<T> shuffleSinkSlotSharingGroup(String ssg) {
+      
writeOptions.put(FlinkDynamicSinkOptions.SHUFFLE_SINK_SLOT_SHARING_GROUP.key(), 
ssg);
+      return this;
+    }
+
     /**
      * Set the uid prefix for IcebergSink operators. Note that IcebergSink 
internally consists of
      * multiple operators (like writer, committer, aggregator) Actual operator 
uid will be appended
@@ -438,7 +460,7 @@ public class DynamicIcebergSink
       TypeInformation<CommittableMessage<DynamicWriteResult>> 
writeResultTypeInfo =
           CommittableMessageTypeInfo.of(DynamicWriteResultSerializer::new);
 
-      DataStream<CommittableMessage<DynamicWriteResult>> forwardWriteResults =
+      SingleOutputStreamOperator<CommittableMessage<DynamicWriteResult>> 
forwardWriteResults =
           converted
               .getSideOutput(
                   new 
OutputTag<>(DynamicRecordProcessor.DYNAMIC_FORWARD_STREAM, sideOutputType))
@@ -449,6 +471,11 @@ public class DynamicIcebergSink
               .setParallelism(converted.getParallelism())
               .uid(prefixIfNotNull(uidPrefix, "-forward-writer"));
 
+      String generatorSsg = flinkDynamicSinkConf.generatorSlotSharingGroup();
+      if (generatorSsg != null) {
+        forwardWriteResults.slotSharingGroup(generatorSsg);
+      }
+
       // Inject forward write results into sink — they'll be unioned in 
addPreCommitTopology
       return instantiateSink(writeOptions, flinkConfig, forwardWriteResults);
     }
@@ -513,6 +540,10 @@ public class DynamicIcebergSink
               .uid(prefixIfNotNull(uidPrefix, "-generator"))
               .name(operatorName("generator"))
               .returns(type);
+      String generatorSsg = flinkDynamicSinkConf.generatorSlotSharingGroup();
+      if (generatorSsg != null) {
+        converted.slotSharingGroup(generatorSsg);
+      }
 
       DynamicIcebergSink sink = build(converted, sideOutputType);
 
@@ -534,6 +565,10 @@ public class DynamicIcebergSink
           shuffleInput
               .sinkTo(sink) // Forward write results are implicitly injected 
here
               .uid(prefixIfNotNull(uidPrefix, "-sink"));
+      String shuffleSinkSsg = 
flinkDynamicSinkConf.shuffleSinkSlotSharingGroup();
+      if (shuffleSinkSsg != null) {
+        result.slotSharingGroup(shuffleSinkSsg);
+      }
 
       FlinkWriteConf flinkWriteConf = new FlinkWriteConf(writeOptions, 
readableConfig);
       if (flinkWriteConf.writeParallelism() != null) {
diff --git 
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/FlinkDynamicSinkConf.java
 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/FlinkDynamicSinkConf.java
index 75b169c4b5..23c57535cd 100644
--- 
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/FlinkDynamicSinkConf.java
+++ 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/FlinkDynamicSinkConf.java
@@ -99,4 +99,20 @@ class FlinkDynamicSinkConf {
         .defaultValue(FlinkDynamicSinkOptions.CASE_SENSITIVE.defaultValue())
         .parse();
   }
+
+  String generatorSlotSharingGroup() {
+    return confParser
+        .stringConf()
+        .option(FlinkDynamicSinkOptions.GENERATOR_SLOT_SHARING_GROUP.key())
+        .flinkConfig(FlinkDynamicSinkOptions.GENERATOR_SLOT_SHARING_GROUP)
+        .parseOptional();
+  }
+
+  String shuffleSinkSlotSharingGroup() {
+    return confParser
+        .stringConf()
+        .option(FlinkDynamicSinkOptions.SHUFFLE_SINK_SLOT_SHARING_GROUP.key())
+        .flinkConfig(FlinkDynamicSinkOptions.SHUFFLE_SINK_SLOT_SHARING_GROUP)
+        .parseOptional();
+  }
 }
diff --git 
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/FlinkDynamicSinkOptions.java
 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/FlinkDynamicSinkOptions.java
index 7a4f038219..7c0f7c70d5 100644
--- 
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/FlinkDynamicSinkOptions.java
+++ 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/FlinkDynamicSinkOptions.java
@@ -68,4 +68,24 @@ public class FlinkDynamicSinkOptions {
           .defaultValue(true)
           .withDescription(
               "Controls whether schema field name matching should be 
case-sensitive in Dynamic Sink.");
+
+  public static final ConfigOption<String> GENERATOR_SLOT_SHARING_GROUP =
+      ConfigOptions.key("dynamic-sink.generator-slot-sharing-group")
+          .stringType()
+          .noDefaultValue()
+          .withDescription(
+              "Name of the slot sharing group for the generator and the 
forward-writer chained to it. "
+                  + "Register the slot sharing group with its resource spec on 
the StreamExecutionEnvironment "
+                  + "via env.registerSlotSharingGroup(...). If unset, Flink 
inherits the slot sharing group "
+                  + "from the upstream operator.");
+
+  public static final ConfigOption<String> SHUFFLE_SINK_SLOT_SHARING_GROUP =
+      ConfigOptions.key("dynamic-sink.shuffle-sink-slot-sharing-group")
+          .stringType()
+          .noDefaultValue()
+          .withDescription(
+              "Name of the slot sharing group for the shuffling sink (writer 
plus committer). "
+                  + "Register the slot sharing group with its resource spec on 
the StreamExecutionEnvironment "
+                  + "via env.registerSlotSharingGroup(...). If unset, Flink 
inherits the slot sharing group "
+                  + "from the upstream operator.");
 }
diff --git 
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java
 
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java
index bafd0276b7..355bc6805f 100644
--- 
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java
+++ 
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java
@@ -35,12 +35,14 @@ import java.util.function.Function;
 import java.util.stream.Collectors;
 import java.util.stream.StreamSupport;
 import javax.annotation.Nullable;
+import org.apache.flink.api.common.operators.SlotSharingGroup;
 import org.apache.flink.api.common.typeinfo.TypeHint;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.connector.sink2.Committer;
 import org.apache.flink.api.connector.sink2.CommitterInitContext;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.configuration.RestartStrategyOptions;
 import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import org.apache.flink.runtime.client.JobExecutionException;
@@ -326,6 +328,67 @@ class TestDynamicIcebergSink extends 
TestFlinkIcebergSinkBase {
     assertThat(generatorAndSinkChained).isTrue();
   }
 
+  @Test
+  void testSlotSharingGroup() {
+    DataStream<DynamicIcebergDataImpl> dataStream =
+        env.fromData(Collections.emptyList(), TypeInformation.of(new 
TypeHint<>() {}));
+
+    // Slot sharing group resource specs are registered on the env by name and 
referenced from
+    // operators via the String overload of .slotSharingGroup(...). We verify 
the effect by reading
+    // back the resource profile from the resulting job graph vertices.
+    String shuffleSinkSsgName = "shuffle-sink-ssg";
+    MemorySize shuffleSinkMemorySize = new MemorySize(123);
+    env.registerSlotSharingGroup(
+        SlotSharingGroup.newBuilder(shuffleSinkSsgName)
+            .setCpuCores(123)
+            .setTaskHeapMemory(shuffleSinkMemorySize)
+            .build());
+
+    String generatorSsgName = "generator-ssg";
+    MemorySize generatorMemorySize = new MemorySize(456);
+    env.registerSlotSharingGroup(
+        SlotSharingGroup.newBuilder(generatorSsgName)
+            .setCpuCores(456)
+            .setTaskHeapMemory(generatorMemorySize)
+            .build());
+
+    DynamicIcebergSink.forInput(dataStream)
+        .generator(new ForwardGenerator())
+        .catalogLoader(CATALOG_EXTENSION.catalogLoader())
+        .immediateTableUpdate(false)
+        .shuffleSinkSlotSharingGroup(shuffleSinkSsgName)
+        .generatorSlotSharingGroup(generatorSsgName)
+        .append();
+
+    List<JobVertex> vertices =
+        
StreamSupport.stream(env.getStreamGraph().getJobGraph().getVertices().spliterator(),
 false)
+            .toList();
+
+    boolean shufflingWriterSSGApplied =
+        vertices.stream()
+            .filter(vertex -> vertex.getName() != null && 
vertex.getName().contains("Sink: Writer"))
+            .anyMatch(
+                vertex ->
+                    vertex
+                        .getSlotSharingGroup()
+                        .getResourceProfile()
+                        .getTaskHeapMemory()
+                        .equals(shuffleSinkMemorySize));
+    boolean generatorSSGApplied =
+        vertices.stream()
+            .filter(vertex -> vertex.getName() != null && 
vertex.getName().contains("generator"))
+            .anyMatch(
+                vertex ->
+                    vertex
+                        .getSlotSharingGroup()
+                        .getResourceProfile()
+                        .getTaskHeapMemory()
+                        .equals(generatorMemorySize));
+
+    assertThat(shufflingWriterSSGApplied).isTrue();
+    assertThat(generatorSSGApplied).isTrue();
+  }
+
   @Test
   void testForwardWrite() throws Exception {
     runForwardWriteTest(new ForwardGenerator());
diff --git 
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java
 
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java
index ad430cbf13..c8f9fece23 100644
--- 
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java
+++ 
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java
@@ -310,6 +310,28 @@ public class DynamicIcebergSink
       return this;
     }
 
+    /**
+     * Set the slot sharing group for the generator (and the forward-writer 
chained to it) by name.
+     * Register the slot sharing group with its resource spec on the {@code
+     * StreamExecutionEnvironment} via {@code 
env.registerSlotSharingGroup(...)}. If unset, Flink
+     * inherits the slot sharing group from the upstream operator.
+     */
+    public Builder<T> generatorSlotSharingGroup(String ssg) {
+      
writeOptions.put(FlinkDynamicSinkOptions.GENERATOR_SLOT_SHARING_GROUP.key(), 
ssg);
+      return this;
+    }
+
+    /**
+     * Set the slot sharing group for the shuffling sink (writer plus 
committer) by name. Register
+     * the slot sharing group with its resource spec on the {@code 
StreamExecutionEnvironment} via
+     * {@code env.registerSlotSharingGroup(...)}. If unset, Flink inherits the 
slot sharing group
+     * from the upstream operator.
+     */
+    public Builder<T> shuffleSinkSlotSharingGroup(String ssg) {
+      
writeOptions.put(FlinkDynamicSinkOptions.SHUFFLE_SINK_SLOT_SHARING_GROUP.key(), 
ssg);
+      return this;
+    }
+
     /**
      * Set the uid prefix for IcebergSink operators. Note that IcebergSink 
internally consists of
      * multiple operators (like writer, committer, aggregator) Actual operator 
uid will be appended
@@ -436,7 +458,7 @@ public class DynamicIcebergSink
       TypeInformation<CommittableMessage<DynamicWriteResult>> 
writeResultTypeInfo =
           CommittableMessageTypeInfo.of(DynamicWriteResultSerializer::new);
 
-      DataStream<CommittableMessage<DynamicWriteResult>> forwardWriteResults =
+      SingleOutputStreamOperator<CommittableMessage<DynamicWriteResult>> 
forwardWriteResults =
           converted
               .getSideOutput(
                   new 
OutputTag<>(DynamicRecordProcessor.DYNAMIC_FORWARD_STREAM, sideOutputType))
@@ -447,6 +469,11 @@ public class DynamicIcebergSink
               .setParallelism(converted.getParallelism())
               .uid(prefixIfNotNull(uidPrefix, "-forward-writer"));
 
+      String generatorSsg = flinkDynamicSinkConf.generatorSlotSharingGroup();
+      if (generatorSsg != null) {
+        forwardWriteResults.slotSharingGroup(generatorSsg);
+      }
+
       // Inject forward write results into sink — they'll be unioned in 
addPreCommitTopology
       return instantiateSink(writeOptions, flinkConfig, forwardWriteResults);
     }
@@ -511,6 +538,10 @@ public class DynamicIcebergSink
               .uid(prefixIfNotNull(uidPrefix, "-generator"))
               .name(operatorName("generator"))
               .returns(type);
+      String generatorSsg = flinkDynamicSinkConf.generatorSlotSharingGroup();
+      if (generatorSsg != null) {
+        converted.slotSharingGroup(generatorSsg);
+      }
 
       DynamicIcebergSink sink = build(converted, sideOutputType);
 
@@ -532,6 +563,10 @@ public class DynamicIcebergSink
           shuffleInput
               .sinkTo(sink) // Forward write results are implicitly injected 
here
               .uid(prefixIfNotNull(uidPrefix, "-sink"));
+      String shuffleSinkSsg = 
flinkDynamicSinkConf.shuffleSinkSlotSharingGroup();
+      if (shuffleSinkSsg != null) {
+        result.slotSharingGroup(shuffleSinkSsg);
+      }
 
       FlinkWriteConf flinkWriteConf = new FlinkWriteConf(writeOptions, 
readableConfig);
       if (flinkWriteConf.writeParallelism() != null) {
diff --git 
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/FlinkDynamicSinkConf.java
 
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/FlinkDynamicSinkConf.java
index 75b169c4b5..23c57535cd 100644
--- 
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/FlinkDynamicSinkConf.java
+++ 
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/FlinkDynamicSinkConf.java
@@ -99,4 +99,20 @@ class FlinkDynamicSinkConf {
         .defaultValue(FlinkDynamicSinkOptions.CASE_SENSITIVE.defaultValue())
         .parse();
   }
+
+  String generatorSlotSharingGroup() {
+    return confParser
+        .stringConf()
+        .option(FlinkDynamicSinkOptions.GENERATOR_SLOT_SHARING_GROUP.key())
+        .flinkConfig(FlinkDynamicSinkOptions.GENERATOR_SLOT_SHARING_GROUP)
+        .parseOptional();
+  }
+
+  String shuffleSinkSlotSharingGroup() {
+    return confParser
+        .stringConf()
+        .option(FlinkDynamicSinkOptions.SHUFFLE_SINK_SLOT_SHARING_GROUP.key())
+        .flinkConfig(FlinkDynamicSinkOptions.SHUFFLE_SINK_SLOT_SHARING_GROUP)
+        .parseOptional();
+  }
 }
diff --git 
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/FlinkDynamicSinkOptions.java
 
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/FlinkDynamicSinkOptions.java
index 7a4f038219..7c0f7c70d5 100644
--- 
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/FlinkDynamicSinkOptions.java
+++ 
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/FlinkDynamicSinkOptions.java
@@ -68,4 +68,24 @@ public class FlinkDynamicSinkOptions {
           .defaultValue(true)
           .withDescription(
               "Controls whether schema field name matching should be 
case-sensitive in Dynamic Sink.");
+
+  public static final ConfigOption<String> GENERATOR_SLOT_SHARING_GROUP =
+      ConfigOptions.key("dynamic-sink.generator-slot-sharing-group")
+          .stringType()
+          .noDefaultValue()
+          .withDescription(
+              "Name of the slot sharing group for the generator and the 
forward-writer chained to it. "
+                  + "Register the slot sharing group with its resource spec on 
the StreamExecutionEnvironment "
+                  + "via env.registerSlotSharingGroup(...). If unset, Flink 
inherits the slot sharing group "
+                  + "from the upstream operator.");
+
+  public static final ConfigOption<String> SHUFFLE_SINK_SLOT_SHARING_GROUP =
+      ConfigOptions.key("dynamic-sink.shuffle-sink-slot-sharing-group")
+          .stringType()
+          .noDefaultValue()
+          .withDescription(
+              "Name of the slot sharing group for the shuffling sink (writer 
plus committer). "
+                  + "Register the slot sharing group with its resource spec on 
the StreamExecutionEnvironment "
+                  + "via env.registerSlotSharingGroup(...). If unset, Flink 
inherits the slot sharing group "
+                  + "from the upstream operator.");
 }
diff --git 
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java
 
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java
index 89befb9e8e..3ba579df49 100644
--- 
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java
+++ 
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java
@@ -35,12 +35,14 @@ import java.util.function.Function;
 import java.util.stream.Collectors;
 import java.util.stream.StreamSupport;
 import javax.annotation.Nullable;
+import org.apache.flink.api.common.operators.SlotSharingGroup;
 import org.apache.flink.api.common.typeinfo.TypeHint;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.connector.sink2.Committer;
 import org.apache.flink.api.connector.sink2.CommitterInitContext;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.configuration.RestartStrategyOptions;
 import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import org.apache.flink.runtime.OperatorIDPair;
@@ -338,6 +340,67 @@ class TestDynamicIcebergSink extends 
TestFlinkIcebergSinkBase {
     assertThat(generatorAndSinkChained).isTrue();
   }
 
+  @Test
+  void testSlotSharingGroup() {
+    DataStream<DynamicIcebergDataImpl> dataStream =
+        env.fromData(Collections.emptyList(), TypeInformation.of(new 
TypeHint<>() {}));
+
+    // Slot sharing group resource specs are registered on the env by name and 
referenced from
+    // operators via the String overload of .slotSharingGroup(...). We verify 
the effect by reading
+    // back the resource profile from the resulting job graph vertices.
+    String shuffleSinkSsgName = "shuffle-sink-ssg";
+    MemorySize shuffleSinkMemorySize = new MemorySize(123);
+    env.registerSlotSharingGroup(
+        SlotSharingGroup.newBuilder(shuffleSinkSsgName)
+            .setCpuCores(123)
+            .setTaskHeapMemory(shuffleSinkMemorySize)
+            .build());
+
+    String generatorSsgName = "generator-ssg";
+    MemorySize generatorMemorySize = new MemorySize(456);
+    env.registerSlotSharingGroup(
+        SlotSharingGroup.newBuilder(generatorSsgName)
+            .setCpuCores(456)
+            .setTaskHeapMemory(generatorMemorySize)
+            .build());
+
+    DynamicIcebergSink.forInput(dataStream)
+        .generator(new ForwardGenerator())
+        .catalogLoader(CATALOG_EXTENSION.catalogLoader())
+        .immediateTableUpdate(false)
+        .shuffleSinkSlotSharingGroup(shuffleSinkSsgName)
+        .generatorSlotSharingGroup(generatorSsgName)
+        .append();
+
+    List<JobVertex> vertices =
+        
StreamSupport.stream(env.getStreamGraph().getJobGraph().getVertices().spliterator(),
 false)
+            .toList();
+
+    boolean shufflingWriterSSGApplied =
+        vertices.stream()
+            .filter(vertex -> vertex.getName() != null && 
vertex.getName().contains("Sink: Writer"))
+            .anyMatch(
+                vertex ->
+                    vertex
+                        .getSlotSharingGroup()
+                        .getResourceProfile()
+                        .getTaskHeapMemory()
+                        .equals(shuffleSinkMemorySize));
+    boolean generatorSSGApplied =
+        vertices.stream()
+            .filter(vertex -> vertex.getName() != null && 
vertex.getName().contains("generator"))
+            .anyMatch(
+                vertex ->
+                    vertex
+                        .getSlotSharingGroup()
+                        .getResourceProfile()
+                        .getTaskHeapMemory()
+                        .equals(generatorMemorySize));
+
+    assertThat(shufflingWriterSSGApplied).isTrue();
+    assertThat(generatorSSGApplied).isTrue();
+  }
+
   @Test
   void testForwardWrite() throws Exception {
     runForwardWriteTest(new ForwardGenerator());

Reply via email to