This is an automated email from the ASF dual-hosted git repository.

pvary pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new bf340affbf Flink: Backport Use native slot sharing group inheritance 
for maintenance tasks to 2.0 and 1.20 (#16337)
bf340affbf is described below

commit bf340affbf2d166395df35b8e982625134c315e6
Author: GuoYu <[email protected]>
AuthorDate: Fri May 15 01:01:31 2026 +0800

    Flink: Backport Use native slot sharing group inheritance for maintenance 
tasks to 2.0 and 1.20 (#16337)
    
    Backports #16329
---
 .../flink/maintenance/api/DeleteOrphanFiles.java   | 167 +++++++++++----------
 .../flink/maintenance/api/ExpireSnapshots.java     |  48 +++---
 .../maintenance/api/FlinkMaintenanceConfig.java    |   6 +-
 .../maintenance/api/MaintenanceTaskBuilder.java    |   4 +
 .../flink/maintenance/api/RewriteDataFiles.java    |  94 ++++++------
 .../flink/maintenance/api/TableMaintenance.java    | 132 ++++++++--------
 .../org/apache/iceberg/flink/sink/IcebergSink.java |  11 +-
 .../flink/maintenance/api/TestExpireSnapshots.java |   2 +-
 .../maintenance/api/TestRewriteDataFiles.java      |   2 +-
 .../maintenance/api/TestTableMaintenance.java      |  12 +-
 .../api/TestTableMaintenanceCoordinationLock.java  |  12 +-
 .../maintenance/operator/OperatorTestBase.java     |  11 +-
 .../flink/maintenance/api/DeleteOrphanFiles.java   | 167 +++++++++++----------
 .../flink/maintenance/api/ExpireSnapshots.java     |  48 +++---
 .../maintenance/api/FlinkMaintenanceConfig.java    |   6 +-
 .../maintenance/api/MaintenanceTaskBuilder.java    |   4 +
 .../flink/maintenance/api/RewriteDataFiles.java    |  94 ++++++------
 .../flink/maintenance/api/TableMaintenance.java    | 132 ++++++++--------
 .../org/apache/iceberg/flink/sink/IcebergSink.java |  11 +-
 .../flink/maintenance/api/TestExpireSnapshots.java |   2 +-
 .../maintenance/api/TestRewriteDataFiles.java      |   2 +-
 .../maintenance/api/TestTableMaintenance.java      |  12 +-
 .../api/TestTableMaintenanceCoordinationLock.java  |  12 +-
 .../maintenance/operator/OperatorTestBase.java     |  11 +-
 24 files changed, 516 insertions(+), 486 deletions(-)

diff --git 
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/DeleteOrphanFiles.java
 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/DeleteOrphanFiles.java
index 63a267d16e..377413c3c3 100644
--- 
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/DeleteOrphanFiles.java
+++ 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/DeleteOrphanFiles.java
@@ -209,72 +209,73 @@ public class DeleteOrphanFiles {
 
       // Collect all data files
       SingleOutputStreamOperator<MetadataTablePlanner.SplitInfo> splits =
-          trigger
-              .process(
-                  new MetadataTablePlanner(
-                      taskName(),
-                      index(),
-                      tableLoader(),
-                      FILE_PATH_SCAN_CONTEXT,
-                      MetadataTableType.ALL_FILES,
-                      planningWorkerPoolSize))
-              .name(operatorName(PLANNER_TASK_NAME))
-              .uid(PLANNER_TASK_NAME + uidSuffix())
-              .slotSharingGroup(slotSharingGroup())
-              .forceNonParallel();
+          setSlotSharingGroup(
+              trigger
+                  .process(
+                      new MetadataTablePlanner(
+                          taskName(),
+                          index(),
+                          tableLoader(),
+                          FILE_PATH_SCAN_CONTEXT,
+                          MetadataTableType.ALL_FILES,
+                          planningWorkerPoolSize))
+                  .name(operatorName(PLANNER_TASK_NAME))
+                  .uid(PLANNER_TASK_NAME + uidSuffix())
+                  .forceNonParallel());
 
       // Read the records and get all data files
       SingleOutputStreamOperator<String> tableDataFiles =
-          splits
-              .rebalance()
-              .process(
-                  new FileNameReader(
-                      taskName(),
-                      index(),
-                      tableLoader(),
-                      FILE_PATH_SCHEMA,
-                      FILE_PATH_SCAN_CONTEXT,
-                      MetadataTableType.ALL_FILES))
-              .name(operatorName(READER_TASK_NAME))
-              .uid(READER_TASK_NAME + uidSuffix())
-              .slotSharingGroup(slotSharingGroup())
-              .setParallelism(parallelism());
+          setSlotSharingGroup(
+              splits
+                  .rebalance()
+                  .process(
+                      new FileNameReader(
+                          taskName(),
+                          index(),
+                          tableLoader(),
+                          FILE_PATH_SCHEMA,
+                          FILE_PATH_SCAN_CONTEXT,
+                          MetadataTableType.ALL_FILES))
+                  .name(operatorName(READER_TASK_NAME))
+                  .uid(READER_TASK_NAME + uidSuffix())
+                  .setParallelism(parallelism()));
 
       // Collect all meta data files
       SingleOutputStreamOperator<String> tableMetadataFiles =
-          trigger
-              .process(new ListMetadataFiles(taskName(), index(), 
tableLoader()))
-              .name(operatorName(METADATA_FILES_TASK_NAME))
-              .uid(METADATA_FILES_TASK_NAME + uidSuffix())
-              .slotSharingGroup(slotSharingGroup())
-              .forceNonParallel();
+          setSlotSharingGroup(
+              trigger
+                  .process(new ListMetadataFiles(taskName(), index(), 
tableLoader()))
+                  .name(operatorName(METADATA_FILES_TASK_NAME))
+                  .uid(METADATA_FILES_TASK_NAME + uidSuffix())
+                  .forceNonParallel());
 
       // List the all file system files
       SingleOutputStreamOperator<String> allFsFiles =
-          trigger
-              .process(
-                  new ListFileSystemFiles(
-                      taskName(),
-                      index(),
-                      tableLoader(),
-                      location,
-                      minAge.toMillis(),
-                      usePrefixListing))
-              .name(operatorName(FILESYSTEM_FILES_TASK_NAME))
-              .uid(FILESYSTEM_FILES_TASK_NAME + uidSuffix())
-              .slotSharingGroup(slotSharingGroup())
-              .forceNonParallel();
+          setSlotSharingGroup(
+              trigger
+                  .process(
+                      new ListFileSystemFiles(
+                          taskName(),
+                          index(),
+                          tableLoader(),
+                          location,
+                          minAge.toMillis(),
+                          usePrefixListing))
+                  .name(operatorName(FILESYSTEM_FILES_TASK_NAME))
+                  .uid(FILESYSTEM_FILES_TASK_NAME + uidSuffix())
+                  .forceNonParallel());
 
       SingleOutputStreamOperator<String> filesToDelete =
-          tableMetadataFiles
-              .union(tableDataFiles)
-              .keyBy(new FileUriKeySelector(equalSchemes, equalAuthorities))
-              .connect(allFsFiles.keyBy(new FileUriKeySelector(equalSchemes, 
equalAuthorities)))
-              .process(new OrphanFilesDetector(prefixMismatchMode, 
equalSchemes, equalAuthorities))
-              .slotSharingGroup(slotSharingGroup())
-              .name(operatorName(FILTER_FILES_TASK_NAME))
-              .uid(FILTER_FILES_TASK_NAME + uidSuffix())
-              .setParallelism(parallelism());
+          setSlotSharingGroup(
+              tableMetadataFiles
+                  .union(tableDataFiles)
+                  .keyBy(new FileUriKeySelector(equalSchemes, 
equalAuthorities))
+                  .connect(allFsFiles.keyBy(new 
FileUriKeySelector(equalSchemes, equalAuthorities)))
+                  .process(
+                      new OrphanFilesDetector(prefixMismatchMode, 
equalSchemes, equalAuthorities))
+                  .name(operatorName(FILTER_FILES_TASK_NAME))
+                  .uid(FILTER_FILES_TASK_NAME + uidSuffix())
+                  .setParallelism(parallelism()));
 
       DataStream<Exception> errorStream =
           tableMetadataFiles
@@ -287,38 +288,38 @@ public class DeleteOrphanFiles {
 
       // Stop deleting the files if there is an error
       SingleOutputStreamOperator<String> filesOrSkip =
-          filesToDelete
-              .connect(errorStream)
-              .transform(
-                  operatorName(SKIP_ON_ERROR_TASK_NAME),
-                  TypeInformation.of(String.class),
-                  new SkipOnError())
-              .uid(SKIP_ON_ERROR_TASK_NAME + uidSuffix())
-              .slotSharingGroup(slotSharingGroup())
-              .forceNonParallel();
+          setSlotSharingGroup(
+              filesToDelete
+                  .connect(errorStream)
+                  .transform(
+                      operatorName(SKIP_ON_ERROR_TASK_NAME),
+                      TypeInformation.of(String.class),
+                      new SkipOnError())
+                  .uid(SKIP_ON_ERROR_TASK_NAME + uidSuffix())
+                  .forceNonParallel());
 
       // delete the files
-      filesOrSkip
-          .rebalance()
-          .transform(
-              operatorName(DELETE_FILES_TASK_NAME),
-              TypeInformation.of(Void.class),
-              new DeleteFilesProcessor(
-                  tableLoader().loadTable(), taskName(), index(), 
deleteBatchSize))
-          .uid(DELETE_FILES_TASK_NAME + uidSuffix())
-          .slotSharingGroup(slotSharingGroup())
-          .setParallelism(parallelism());
+      setSlotSharingGroup(
+          filesOrSkip
+              .rebalance()
+              .transform(
+                  operatorName(DELETE_FILES_TASK_NAME),
+                  TypeInformation.of(Void.class),
+                  new DeleteFilesProcessor(
+                      tableLoader().loadTable(), taskName(), index(), 
deleteBatchSize))
+              .uid(DELETE_FILES_TASK_NAME + uidSuffix())
+              .setParallelism(parallelism()));
 
       // Ignore the file deletion result and return the DataStream<TaskResult> 
directly
-      return trigger
-          .connect(errorStream)
-          .transform(
-              operatorName(AGGREGATOR_TASK_NAME),
-              TypeInformation.of(TaskResult.class),
-              new TaskResultAggregator(tableName(), taskName(), index()))
-          .uid(AGGREGATOR_TASK_NAME + uidSuffix())
-          .slotSharingGroup(slotSharingGroup())
-          .forceNonParallel();
+      return setSlotSharingGroup(
+          trigger
+              .connect(errorStream)
+              .transform(
+                  operatorName(AGGREGATOR_TASK_NAME),
+                  TypeInformation.of(TaskResult.class),
+                  new TaskResultAggregator(tableName(), taskName(), index()))
+              .uid(AGGREGATOR_TASK_NAME + uidSuffix())
+              .forceNonParallel());
     }
   }
 
diff --git 
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/ExpireSnapshots.java
 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/ExpireSnapshots.java
index c84932f96f..7c524175c4 100644
--- 
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/ExpireSnapshots.java
+++ 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/ExpireSnapshots.java
@@ -126,30 +126,30 @@ public class ExpireSnapshots {
       Preconditions.checkNotNull(tableLoader(), "TableLoader should not be 
null");
 
       SingleOutputStreamOperator<TaskResult> result =
-          trigger
-              .process(
-                  new ExpireSnapshotsProcessor(
-                      tableLoader(),
-                      maxSnapshotAge == null ? null : 
maxSnapshotAge.toMillis(),
-                      numSnapshots,
-                      planningWorkerPoolSize,
-                      cleanExpiredMetadata))
-              .name(operatorName(EXECUTOR_OPERATOR_NAME))
-              .uid(EXECUTOR_OPERATOR_NAME + uidSuffix())
-              .slotSharingGroup(slotSharingGroup())
-              .forceNonParallel();
-
-      result
-          .getSideOutput(ExpireSnapshotsProcessor.DELETE_STREAM)
-          .rebalance()
-          .transform(
-              operatorName(DELETE_FILES_OPERATOR_NAME),
-              TypeInformation.of(Void.class),
-              new DeleteFilesProcessor(
-                  tableLoader().loadTable(), taskName(), index(), 
deleteBatchSize))
-          .uid(DELETE_FILES_OPERATOR_NAME + uidSuffix())
-          .slotSharingGroup(slotSharingGroup())
-          .setParallelism(parallelism());
+          setSlotSharingGroup(
+              trigger
+                  .process(
+                      new ExpireSnapshotsProcessor(
+                          tableLoader(),
+                          maxSnapshotAge == null ? null : 
maxSnapshotAge.toMillis(),
+                          numSnapshots,
+                          planningWorkerPoolSize,
+                          cleanExpiredMetadata))
+                  .name(operatorName(EXECUTOR_OPERATOR_NAME))
+                  .uid(EXECUTOR_OPERATOR_NAME + uidSuffix())
+                  .forceNonParallel());
+
+      setSlotSharingGroup(
+          result
+              .getSideOutput(ExpireSnapshotsProcessor.DELETE_STREAM)
+              .rebalance()
+              .transform(
+                  operatorName(DELETE_FILES_OPERATOR_NAME),
+                  TypeInformation.of(Void.class),
+                  new DeleteFilesProcessor(
+                      tableLoader().loadTable(), taskName(), index(), 
deleteBatchSize))
+              .uid(DELETE_FILES_OPERATOR_NAME + uidSuffix())
+              .setParallelism(parallelism()));
 
       // Ignore the file deletion result and return the DataStream<TaskResult> 
directly
       return result;
diff --git 
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/FlinkMaintenanceConfig.java
 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/FlinkMaintenanceConfig.java
index e6f536273e..34d7330c59 100644
--- 
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/FlinkMaintenanceConfig.java
+++ 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/FlinkMaintenanceConfig.java
@@ -23,7 +23,6 @@ import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.ConfigOptions;
 import org.apache.flink.configuration.ReadableConfig;
-import org.apache.flink.streaming.api.graph.StreamGraphGenerator;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.flink.FlinkConfParser;
 
@@ -60,7 +59,7 @@ public class FlinkMaintenanceConfig {
   public static final ConfigOption<String> SLOT_SHARING_GROUP_OPTION =
       ConfigOptions.key(SLOT_SHARING_GROUP)
           .stringType()
-          .defaultValue(StreamGraphGenerator.DEFAULT_SLOT_SHARING_GROUP)
+          .noDefaultValue()
           .withDescription(
               "The slot sharing group for maintenance tasks. "
                   + "Determines which operators can share slots in the Flink 
execution environment.");
@@ -114,8 +113,7 @@ public class FlinkMaintenanceConfig {
         .stringConf()
         .option(SLOT_SHARING_GROUP)
         .flinkConfig(SLOT_SHARING_GROUP_OPTION)
-        .defaultValue(SLOT_SHARING_GROUP_OPTION.defaultValue())
-        .parse();
+        .parseOptional();
   }
 
   public RewriteDataFilesConfig createRewriteDataFilesConfig() {
diff --git 
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/MaintenanceTaskBuilder.java
 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/MaintenanceTaskBuilder.java
index 5d5f17b0a8..04df6106bd 100644
--- 
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/MaintenanceTaskBuilder.java
+++ 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/MaintenanceTaskBuilder.java
@@ -226,4 +226,8 @@ public abstract class MaintenanceTaskBuilder<T extends 
MaintenanceTaskBuilder<?>
 
     return append(sourceStream);
   }
+
+  <O> SingleOutputStreamOperator<O> 
setSlotSharingGroup(SingleOutputStreamOperator<O> operator) {
+    return slotSharingGroup == null ? operator : 
operator.slotSharingGroup(slotSharingGroup);
+  }
 }
diff --git 
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/RewriteDataFiles.java
 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/RewriteDataFiles.java
index f03f33a3fd..be77fda23d 100644
--- 
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/RewriteDataFiles.java
+++ 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/RewriteDataFiles.java
@@ -290,58 +290,58 @@ public class RewriteDataFiles {
     @Override
     DataStream<TaskResult> append(DataStream<Trigger> trigger) {
       SingleOutputStreamOperator<DataFileRewritePlanner.PlannedGroup> planned =
-          trigger
-              .process(
-                  new DataFileRewritePlanner(
-                      tableName(),
-                      taskName(),
-                      index(),
-                      tableLoader(),
-                      partialProgressEnabled ? partialProgressMaxCommits : 1,
-                      maxRewriteBytes,
-                      rewriteOptions,
-                      filterSupplier,
-                      branch))
-              .name(operatorName(PLANNER_TASK_NAME))
-              .uid(PLANNER_TASK_NAME + uidSuffix())
-              .slotSharingGroup(slotSharingGroup())
-              .forceNonParallel();
+          setSlotSharingGroup(
+              trigger
+                  .process(
+                      new DataFileRewritePlanner(
+                          tableName(),
+                          taskName(),
+                          index(),
+                          tableLoader(),
+                          partialProgressEnabled ? partialProgressMaxCommits : 
1,
+                          maxRewriteBytes,
+                          rewriteOptions,
+                          filterSupplier,
+                          branch))
+                  .name(operatorName(PLANNER_TASK_NAME))
+                  .uid(PLANNER_TASK_NAME + uidSuffix())
+                  .forceNonParallel());
 
       SingleOutputStreamOperator<DataFileRewriteRunner.ExecutedGroup> 
rewritten =
-          planned
-              .rebalance()
-              .process(new DataFileRewriteRunner(tableName(), taskName(), 
index()))
-              .name(operatorName(REWRITE_TASK_NAME))
-              .uid(REWRITE_TASK_NAME + uidSuffix())
-              .slotSharingGroup(slotSharingGroup())
-              .setParallelism(parallelism());
+          setSlotSharingGroup(
+              planned
+                  .rebalance()
+                  .process(new DataFileRewriteRunner(tableName(), taskName(), 
index()))
+                  .name(operatorName(REWRITE_TASK_NAME))
+                  .uid(REWRITE_TASK_NAME + uidSuffix())
+                  .setParallelism(parallelism()));
 
       SingleOutputStreamOperator<Trigger> updated =
-          rewritten
-              .transform(
-                  operatorName(COMMIT_TASK_NAME),
-                  TypeInformation.of(Trigger.class),
-                  new DataFileRewriteCommitter(
-                      tableName(), taskName(), index(), tableLoader(), branch))
-              .uid(COMMIT_TASK_NAME + uidSuffix())
-              .slotSharingGroup(slotSharingGroup())
-              .forceNonParallel();
+          setSlotSharingGroup(
+              rewritten
+                  .transform(
+                      operatorName(COMMIT_TASK_NAME),
+                      TypeInformation.of(Trigger.class),
+                      new DataFileRewriteCommitter(
+                          tableName(), taskName(), index(), tableLoader(), 
branch))
+                  .uid(COMMIT_TASK_NAME + uidSuffix())
+                  .forceNonParallel());
 
-      return trigger
-          .union(updated)
-          .connect(
-              planned
-                  .getSideOutput(TaskResultAggregator.ERROR_STREAM)
-                  .union(
-                      
rewritten.getSideOutput(TaskResultAggregator.ERROR_STREAM),
-                      
updated.getSideOutput(TaskResultAggregator.ERROR_STREAM)))
-          .transform(
-              operatorName(AGGREGATOR_TASK_NAME),
-              TypeInformation.of(TaskResult.class),
-              new TaskResultAggregator(tableName(), taskName(), index()))
-          .uid(AGGREGATOR_TASK_NAME + uidSuffix())
-          .slotSharingGroup(slotSharingGroup())
-          .forceNonParallel();
+      return setSlotSharingGroup(
+          trigger
+              .union(updated)
+              .connect(
+                  planned
+                      .getSideOutput(TaskResultAggregator.ERROR_STREAM)
+                      .union(
+                          
rewritten.getSideOutput(TaskResultAggregator.ERROR_STREAM),
+                          
updated.getSideOutput(TaskResultAggregator.ERROR_STREAM)))
+              .transform(
+                  operatorName(AGGREGATOR_TASK_NAME),
+                  TypeInformation.of(TaskResult.class),
+                  new TaskResultAggregator(tableName(), taskName(), index()))
+              .uid(AGGREGATOR_TASK_NAME + uidSuffix())
+              .forceNonParallel());
     }
   }
 }
diff --git 
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/TableMaintenance.java
 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/TableMaintenance.java
index d98a1c9ab4..025a6d17c0 100644
--- 
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/TableMaintenance.java
+++ 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/TableMaintenance.java
@@ -42,7 +42,6 @@ import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamUtils;
 import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.graph.StreamGraphGenerator;
 import org.apache.iceberg.flink.TableLoader;
 import org.apache.iceberg.flink.maintenance.operator.LockRemover;
 import 
org.apache.iceberg.flink.maintenance.operator.LockRemoverOperatorFactory;
@@ -154,7 +153,7 @@ public class TableMaintenance {
     private final TriggerLockFactory lockFactory;
 
     private String uidSuffix = "TableMaintenance-" + UUID.randomUUID();
-    private String slotSharingGroup = 
StreamGraphGenerator.DEFAULT_SLOT_SHARING_GROUP;
+    private String slotSharingGroup = null;
     private Duration rateLimit = Duration.ofSeconds(RATE_LIMIT_SECOND_DEFAULT);
     private Duration lockCheckDelay = 
Duration.ofSeconds(LOCK_CHECK_DELAY_SECOND_DEFAULT);
     private int parallelism = ExecutionConfig.PARALLELISM_DEFAULT;
@@ -282,57 +281,57 @@ public class TableMaintenance {
         DataStream<Trigger> triggers;
         if (lockFactory == null) {
           triggers =
-              DataStreamUtils.reinterpretAsKeyedStream(
-                      changeStream(tableName, loader), unused -> true)
-                  .transform(
-                      TRIGGER_MANAGER_OPERATOR_NAME,
-                      TypeInformation.of(Trigger.class),
-                      new TriggerManagerOperatorFactory(
-                          tableName,
-                          taskNames,
-                          evaluators,
-                          rateLimit.toMillis(),
-                          lockCheckDelay.toMillis()))
-                  .uid(TRIGGER_MANAGER_OPERATOR_NAME + uidSuffix)
-                  .slotSharingGroup(slotSharingGroup)
-                  .forceNonParallel();
+              setSlotSharingGroup(
+                  DataStreamUtils.reinterpretAsKeyedStream(
+                          changeStream(tableName, loader), unused -> true)
+                      .transform(
+                          TRIGGER_MANAGER_OPERATOR_NAME,
+                          TypeInformation.of(Trigger.class),
+                          new TriggerManagerOperatorFactory(
+                              tableName,
+                              taskNames,
+                              evaluators,
+                              rateLimit.toMillis(),
+                              lockCheckDelay.toMillis()))
+                      .uid(TRIGGER_MANAGER_OPERATOR_NAME + uidSuffix)
+                      .forceNonParallel());
         } else {
           triggers =
-              DataStreamUtils.reinterpretAsKeyedStream(
-                      changeStream(tableName, loader), unused -> true)
-                  .process(
-                      new TriggerManager(
-                          loader,
-                          lockFactory,
-                          taskNames,
-                          evaluators,
-                          rateLimit.toMillis(),
-                          lockCheckDelay.toMillis()))
-                  .name(TRIGGER_MANAGER_OPERATOR_NAME)
-                  .uid(TRIGGER_MANAGER_OPERATOR_NAME + uidSuffix)
-                  .slotSharingGroup(slotSharingGroup)
-                  .forceNonParallel();
+              setSlotSharingGroup(
+                  DataStreamUtils.reinterpretAsKeyedStream(
+                          changeStream(tableName, loader), unused -> true)
+                      .process(
+                          new TriggerManager(
+                              loader,
+                              lockFactory,
+                              taskNames,
+                              evaluators,
+                              rateLimit.toMillis(),
+                              lockCheckDelay.toMillis()))
+                      .name(TRIGGER_MANAGER_OPERATOR_NAME)
+                      .uid(TRIGGER_MANAGER_OPERATOR_NAME + uidSuffix)
+                      .forceNonParallel());
         }
 
         triggers =
-            triggers
-                .assignTimestampsAndWatermarks(new 
PunctuatedWatermarkStrategy())
-                .name(WATERMARK_ASSIGNER_OPERATOR_NAME)
-                .uid(WATERMARK_ASSIGNER_OPERATOR_NAME + uidSuffix)
-                .slotSharingGroup(slotSharingGroup)
-                .forceNonParallel();
+            setSlotSharingGroup(
+                triggers
+                    .assignTimestampsAndWatermarks(new 
PunctuatedWatermarkStrategy())
+                    .name(WATERMARK_ASSIGNER_OPERATOR_NAME)
+                    .uid(WATERMARK_ASSIGNER_OPERATOR_NAME + uidSuffix)
+                    .forceNonParallel());
 
         // Add the specific tasks
         DataStream<TaskResult> unioned = null;
         for (int i = 0; i < taskBuilders.size(); ++i) {
           int taskIndex = i;
           DataStream<Trigger> filtered =
-              triggers
-                  .filter(t -> t.taskId() != null && t.taskId() == taskIndex)
-                  .name(FILTER_OPERATOR_NAME_PREFIX + taskIndex)
-                  .forceNonParallel()
-                  .uid(FILTER_OPERATOR_NAME_PREFIX + taskIndex + "-" + 
uidSuffix)
-                  .slotSharingGroup(slotSharingGroup);
+              setSlotSharingGroup(
+                  triggers
+                      .filter(t -> t.taskId() != null && t.taskId() == 
taskIndex)
+                      .name(FILTER_OPERATOR_NAME_PREFIX + taskIndex)
+                      .forceNonParallel()
+                      .uid(FILTER_OPERATOR_NAME_PREFIX + taskIndex + "-" + 
uidSuffix));
           MaintenanceTaskBuilder<?> builder = taskBuilders.get(taskIndex);
           DataStream<TaskResult> result =
               builder.append(
@@ -353,23 +352,23 @@ public class TableMaintenance {
 
         // Add the LockRemover to the end
         if (lockFactory == null) {
-          unioned
-              .transform(
-                  LOCK_REMOVER_OPERATOR_NAME,
-                  TypeInformation.of(Void.class),
-                  new LockRemoverOperatorFactory(tableName, taskNames))
-              .uid("lock-remover-" + uidSuffix)
-              .forceNonParallel()
-              .slotSharingGroup(slotSharingGroup);
+          setSlotSharingGroup(
+              unioned
+                  .transform(
+                      LOCK_REMOVER_OPERATOR_NAME,
+                      TypeInformation.of(Void.class),
+                      new LockRemoverOperatorFactory(tableName, taskNames))
+                  .uid("lock-remover-" + uidSuffix)
+                  .forceNonParallel());
         } else {
-          unioned
-              .transform(
-                  LOCK_REMOVER_OPERATOR_NAME,
-                  TypeInformation.of(Void.class),
-                  new LockRemover(tableName, lockFactory, taskNames))
-              .forceNonParallel()
-              .uid("lock-remover-" + uidSuffix)
-              .slotSharingGroup(slotSharingGroup);
+          setSlotSharingGroup(
+              unioned
+                  .transform(
+                      LOCK_REMOVER_OPERATOR_NAME,
+                      TypeInformation.of(Void.class),
+                      new LockRemover(tableName, lockFactory, taskNames))
+                  .forceNonParallel()
+                  .uid("lock-remover-" + uidSuffix));
         }
       }
     }
@@ -380,11 +379,13 @@ public class TableMaintenance {
         MonitorSource source =
             new MonitorSource(
                 loader, RateLimiterStrategy.perSecond(1.0 / 
rateLimit.getSeconds()), maxReadBack);
-        return env.fromSource(
-                source, WatermarkStrategy.noWatermarks(), 
SOURCE_OPERATOR_NAME_PREFIX + tableName)
-            .uid(SOURCE_OPERATOR_NAME_PREFIX + uidSuffix)
-            .slotSharingGroup(slotSharingGroup)
-            .forceNonParallel();
+        return setSlotSharingGroup(
+            env.fromSource(
+                    source,
+                    WatermarkStrategy.noWatermarks(),
+                    SOURCE_OPERATOR_NAME_PREFIX + tableName)
+                .uid(SOURCE_OPERATOR_NAME_PREFIX + uidSuffix)
+                .forceNonParallel());
       } else {
         return inputStream.global();
       }
@@ -393,6 +394,11 @@ public class TableMaintenance {
     private static String nameFor(MaintenanceTaskBuilder<?> streamBuilder, int 
taskIndex) {
       return String.format(Locale.ROOT, "%s [%d]", 
streamBuilder.maintenanceTaskName(), taskIndex);
     }
+
+    private <T> SingleOutputStreamOperator<T> setSlotSharingGroup(
+        SingleOutputStreamOperator<T> operator) {
+      return slotSharingGroup == null ? operator : 
operator.slotSharingGroup(slotSharingGroup);
+    }
   }
 
   @Internal
diff --git 
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSink.java
 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSink.java
index b1a58a6d3a..d5f3114d0b 100644
--- 
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSink.java
+++ 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSink.java
@@ -290,9 +290,14 @@ public class IcebergSink
           .add(maintenanceTasks)
           .rateLimit(Duration.ofSeconds(flinkMaintenanceConfig.rateLimit()))
           
.lockCheckDelay(Duration.ofSeconds(flinkMaintenanceConfig.lockCheckDelay()))
-          .slotSharingGroup(flinkMaintenanceConfig.slotSharingGroup())
-          .parallelism(flinkMaintenanceConfig.parallelism())
-          .append();
+          .parallelism(flinkMaintenanceConfig.parallelism());
+
+      String slotSharingGroup = flinkMaintenanceConfig.slotSharingGroup();
+      if (slotSharingGroup != null) {
+        builder.slotSharingGroup(slotSharingGroup);
+      }
+
+      builder.append();
     } catch (IOException e) {
       throw new UncheckedIOException("Failed to create tableMaintenance ", e);
     }
diff --git 
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestExpireSnapshots.java
 
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestExpireSnapshots.java
index b8aa259e2f..a23850d823 100644
--- 
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestExpireSnapshots.java
+++ 
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestExpireSnapshots.java
@@ -164,7 +164,7 @@ class TestExpireSnapshots extends MaintenanceTaskTestBase {
             0,
             tableLoader(),
             UID_SUFFIX,
-            StreamGraphGenerator.DEFAULT_SLOT_SHARING_GROUP,
+            null,
             1)
         .sinkTo(infra.sink());
 
diff --git 
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestRewriteDataFiles.java
 
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestRewriteDataFiles.java
index 88b949a9a7..c27f6081af 100644
--- 
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestRewriteDataFiles.java
+++ 
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestRewriteDataFiles.java
@@ -340,7 +340,7 @@ class TestRewriteDataFiles extends MaintenanceTaskTestBase {
             0,
             tableLoader(),
             UID_SUFFIX,
-            StreamGraphGenerator.DEFAULT_SLOT_SHARING_GROUP,
+            null,
             1)
         .sinkTo(infra.sink());
 
diff --git 
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestTableMaintenance.java
 
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestTableMaintenance.java
index eaa5b5e1b5..49219d5b46 100644
--- 
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestTableMaintenance.java
+++ 
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestTableMaintenance.java
@@ -424,12 +424,12 @@ class TestTableMaintenance extends OperatorTestBase {
     @Override
     DataStream<TaskResult> append(DataStream<Trigger> trigger) {
       String name = TASKS[id];
-      return trigger
-          .map(new DummyMaintenanceTask(success))
-          .name(name)
-          .uid(uidSuffix() + "-test-mapper-" + name + "-" + id)
-          .slotSharingGroup(slotSharingGroup())
-          .forceNonParallel();
+      return setSlotSharingGroup(
+          trigger
+              .map(new DummyMaintenanceTask(success))
+              .name(name)
+              .uid(uidSuffix() + "-test-mapper-" + name + "-" + id)
+              .forceNonParallel());
     }
   }
 
diff --git 
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestTableMaintenanceCoordinationLock.java
 
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestTableMaintenanceCoordinationLock.java
index eb5479045b..9b7941c2d3 100644
--- 
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestTableMaintenanceCoordinationLock.java
+++ 
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestTableMaintenanceCoordinationLock.java
@@ -313,12 +313,12 @@ class TestTableMaintenanceCoordinationLock extends 
OperatorTestBase {
     @Override
     DataStream<TaskResult> append(DataStream<Trigger> trigger) {
       String name = TASKS[id];
-      return trigger
-          .map(new DummyMaintenanceTask(success))
-          .name(name)
-          .uid(uidSuffix() + "-test-mapper-" + name + "-" + id)
-          .slotSharingGroup(slotSharingGroup())
-          .forceNonParallel();
+      return setSlotSharingGroup(
+          trigger
+              .map(new DummyMaintenanceTask(success))
+              .name(name)
+              .uid(uidSuffix() + "-test-mapper-" + name + "-" + id)
+              .forceNonParallel());
     }
   }
 
diff --git 
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/OperatorTestBase.java
 
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/OperatorTestBase.java
index 06ab7861c0..638da61949 100644
--- 
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/OperatorTestBase.java
+++ 
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/OperatorTestBase.java
@@ -35,7 +35,6 @@ import org.apache.flink.core.execution.SavepointFormatType;
 import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
 import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.graph.StreamGraphGenerator;
 import org.apache.flink.streaming.api.transformations.SinkTransformation;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
@@ -416,15 +415,17 @@ public class OperatorTestBase {
   }
 
   protected static void 
checkSlotSharingGroupsAreSet(StreamExecutionEnvironment env, String name) {
-    String nameToCheck = name != null ? name : 
StreamGraphGenerator.DEFAULT_SLOT_SHARING_GROUP;
-
     env.getTransformations().stream()
         .filter(
             t -> !(t instanceof SinkTransformation) && 
!(t.getName().equals(IGNORED_OPERATOR_NAME)))
         .forEach(
             t -> {
-              assertThat(t.getSlotSharingGroup()).isPresent();
-              
assertThat(t.getSlotSharingGroup().get().getName()).isEqualTo(nameToCheck);
+              if (name == null) {
+                assertThat(t.getSlotSharingGroup()).isNotPresent();
+              } else {
+                assertThat(t.getSlotSharingGroup()).isPresent();
+                
assertThat(t.getSlotSharingGroup().get().getName()).isEqualTo(name);
+              }
             });
   }
 
diff --git 
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/DeleteOrphanFiles.java
 
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/DeleteOrphanFiles.java
index 63a267d16e..377413c3c3 100644
--- 
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/DeleteOrphanFiles.java
+++ 
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/DeleteOrphanFiles.java
@@ -209,72 +209,73 @@ public class DeleteOrphanFiles {
 
       // Collect all data files
       SingleOutputStreamOperator<MetadataTablePlanner.SplitInfo> splits =
-          trigger
-              .process(
-                  new MetadataTablePlanner(
-                      taskName(),
-                      index(),
-                      tableLoader(),
-                      FILE_PATH_SCAN_CONTEXT,
-                      MetadataTableType.ALL_FILES,
-                      planningWorkerPoolSize))
-              .name(operatorName(PLANNER_TASK_NAME))
-              .uid(PLANNER_TASK_NAME + uidSuffix())
-              .slotSharingGroup(slotSharingGroup())
-              .forceNonParallel();
+          setSlotSharingGroup(
+              trigger
+                  .process(
+                      new MetadataTablePlanner(
+                          taskName(),
+                          index(),
+                          tableLoader(),
+                          FILE_PATH_SCAN_CONTEXT,
+                          MetadataTableType.ALL_FILES,
+                          planningWorkerPoolSize))
+                  .name(operatorName(PLANNER_TASK_NAME))
+                  .uid(PLANNER_TASK_NAME + uidSuffix())
+                  .forceNonParallel());
 
       // Read the records and get all data files
       SingleOutputStreamOperator<String> tableDataFiles =
-          splits
-              .rebalance()
-              .process(
-                  new FileNameReader(
-                      taskName(),
-                      index(),
-                      tableLoader(),
-                      FILE_PATH_SCHEMA,
-                      FILE_PATH_SCAN_CONTEXT,
-                      MetadataTableType.ALL_FILES))
-              .name(operatorName(READER_TASK_NAME))
-              .uid(READER_TASK_NAME + uidSuffix())
-              .slotSharingGroup(slotSharingGroup())
-              .setParallelism(parallelism());
+          setSlotSharingGroup(
+              splits
+                  .rebalance()
+                  .process(
+                      new FileNameReader(
+                          taskName(),
+                          index(),
+                          tableLoader(),
+                          FILE_PATH_SCHEMA,
+                          FILE_PATH_SCAN_CONTEXT,
+                          MetadataTableType.ALL_FILES))
+                  .name(operatorName(READER_TASK_NAME))
+                  .uid(READER_TASK_NAME + uidSuffix())
+                  .setParallelism(parallelism()));
 
       // Collect all meta data files
       SingleOutputStreamOperator<String> tableMetadataFiles =
-          trigger
-              .process(new ListMetadataFiles(taskName(), index(), 
tableLoader()))
-              .name(operatorName(METADATA_FILES_TASK_NAME))
-              .uid(METADATA_FILES_TASK_NAME + uidSuffix())
-              .slotSharingGroup(slotSharingGroup())
-              .forceNonParallel();
+          setSlotSharingGroup(
+              trigger
+                  .process(new ListMetadataFiles(taskName(), index(), 
tableLoader()))
+                  .name(operatorName(METADATA_FILES_TASK_NAME))
+                  .uid(METADATA_FILES_TASK_NAME + uidSuffix())
+                  .forceNonParallel());
 
       // List the all file system files
       SingleOutputStreamOperator<String> allFsFiles =
-          trigger
-              .process(
-                  new ListFileSystemFiles(
-                      taskName(),
-                      index(),
-                      tableLoader(),
-                      location,
-                      minAge.toMillis(),
-                      usePrefixListing))
-              .name(operatorName(FILESYSTEM_FILES_TASK_NAME))
-              .uid(FILESYSTEM_FILES_TASK_NAME + uidSuffix())
-              .slotSharingGroup(slotSharingGroup())
-              .forceNonParallel();
+          setSlotSharingGroup(
+              trigger
+                  .process(
+                      new ListFileSystemFiles(
+                          taskName(),
+                          index(),
+                          tableLoader(),
+                          location,
+                          minAge.toMillis(),
+                          usePrefixListing))
+                  .name(operatorName(FILESYSTEM_FILES_TASK_NAME))
+                  .uid(FILESYSTEM_FILES_TASK_NAME + uidSuffix())
+                  .forceNonParallel());
 
       SingleOutputStreamOperator<String> filesToDelete =
-          tableMetadataFiles
-              .union(tableDataFiles)
-              .keyBy(new FileUriKeySelector(equalSchemes, equalAuthorities))
-              .connect(allFsFiles.keyBy(new FileUriKeySelector(equalSchemes, 
equalAuthorities)))
-              .process(new OrphanFilesDetector(prefixMismatchMode, 
equalSchemes, equalAuthorities))
-              .slotSharingGroup(slotSharingGroup())
-              .name(operatorName(FILTER_FILES_TASK_NAME))
-              .uid(FILTER_FILES_TASK_NAME + uidSuffix())
-              .setParallelism(parallelism());
+          setSlotSharingGroup(
+              tableMetadataFiles
+                  .union(tableDataFiles)
+                  .keyBy(new FileUriKeySelector(equalSchemes, 
equalAuthorities))
+                  .connect(allFsFiles.keyBy(new 
FileUriKeySelector(equalSchemes, equalAuthorities)))
+                  .process(
+                      new OrphanFilesDetector(prefixMismatchMode, 
equalSchemes, equalAuthorities))
+                  .name(operatorName(FILTER_FILES_TASK_NAME))
+                  .uid(FILTER_FILES_TASK_NAME + uidSuffix())
+                  .setParallelism(parallelism()));
 
       DataStream<Exception> errorStream =
           tableMetadataFiles
@@ -287,38 +288,38 @@ public class DeleteOrphanFiles {
 
       // Stop deleting the files if there is an error
       SingleOutputStreamOperator<String> filesOrSkip =
-          filesToDelete
-              .connect(errorStream)
-              .transform(
-                  operatorName(SKIP_ON_ERROR_TASK_NAME),
-                  TypeInformation.of(String.class),
-                  new SkipOnError())
-              .uid(SKIP_ON_ERROR_TASK_NAME + uidSuffix())
-              .slotSharingGroup(slotSharingGroup())
-              .forceNonParallel();
+          setSlotSharingGroup(
+              filesToDelete
+                  .connect(errorStream)
+                  .transform(
+                      operatorName(SKIP_ON_ERROR_TASK_NAME),
+                      TypeInformation.of(String.class),
+                      new SkipOnError())
+                  .uid(SKIP_ON_ERROR_TASK_NAME + uidSuffix())
+                  .forceNonParallel());
 
       // delete the files
-      filesOrSkip
-          .rebalance()
-          .transform(
-              operatorName(DELETE_FILES_TASK_NAME),
-              TypeInformation.of(Void.class),
-              new DeleteFilesProcessor(
-                  tableLoader().loadTable(), taskName(), index(), 
deleteBatchSize))
-          .uid(DELETE_FILES_TASK_NAME + uidSuffix())
-          .slotSharingGroup(slotSharingGroup())
-          .setParallelism(parallelism());
+      setSlotSharingGroup(
+          filesOrSkip
+              .rebalance()
+              .transform(
+                  operatorName(DELETE_FILES_TASK_NAME),
+                  TypeInformation.of(Void.class),
+                  new DeleteFilesProcessor(
+                      tableLoader().loadTable(), taskName(), index(), 
deleteBatchSize))
+              .uid(DELETE_FILES_TASK_NAME + uidSuffix())
+              .setParallelism(parallelism()));
 
       // Ignore the file deletion result and return the DataStream<TaskResult> 
directly
-      return trigger
-          .connect(errorStream)
-          .transform(
-              operatorName(AGGREGATOR_TASK_NAME),
-              TypeInformation.of(TaskResult.class),
-              new TaskResultAggregator(tableName(), taskName(), index()))
-          .uid(AGGREGATOR_TASK_NAME + uidSuffix())
-          .slotSharingGroup(slotSharingGroup())
-          .forceNonParallel();
+      return setSlotSharingGroup(
+          trigger
+              .connect(errorStream)
+              .transform(
+                  operatorName(AGGREGATOR_TASK_NAME),
+                  TypeInformation.of(TaskResult.class),
+                  new TaskResultAggregator(tableName(), taskName(), index()))
+              .uid(AGGREGATOR_TASK_NAME + uidSuffix())
+              .forceNonParallel());
     }
   }
 
diff --git 
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/ExpireSnapshots.java
 
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/ExpireSnapshots.java
index c84932f96f..7c524175c4 100644
--- 
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/ExpireSnapshots.java
+++ 
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/ExpireSnapshots.java
@@ -126,30 +126,30 @@ public class ExpireSnapshots {
       Preconditions.checkNotNull(tableLoader(), "TableLoader should not be 
null");
 
       SingleOutputStreamOperator<TaskResult> result =
-          trigger
-              .process(
-                  new ExpireSnapshotsProcessor(
-                      tableLoader(),
-                      maxSnapshotAge == null ? null : 
maxSnapshotAge.toMillis(),
-                      numSnapshots,
-                      planningWorkerPoolSize,
-                      cleanExpiredMetadata))
-              .name(operatorName(EXECUTOR_OPERATOR_NAME))
-              .uid(EXECUTOR_OPERATOR_NAME + uidSuffix())
-              .slotSharingGroup(slotSharingGroup())
-              .forceNonParallel();
-
-      result
-          .getSideOutput(ExpireSnapshotsProcessor.DELETE_STREAM)
-          .rebalance()
-          .transform(
-              operatorName(DELETE_FILES_OPERATOR_NAME),
-              TypeInformation.of(Void.class),
-              new DeleteFilesProcessor(
-                  tableLoader().loadTable(), taskName(), index(), 
deleteBatchSize))
-          .uid(DELETE_FILES_OPERATOR_NAME + uidSuffix())
-          .slotSharingGroup(slotSharingGroup())
-          .setParallelism(parallelism());
+          setSlotSharingGroup(
+              trigger
+                  .process(
+                      new ExpireSnapshotsProcessor(
+                          tableLoader(),
+                          maxSnapshotAge == null ? null : 
maxSnapshotAge.toMillis(),
+                          numSnapshots,
+                          planningWorkerPoolSize,
+                          cleanExpiredMetadata))
+                  .name(operatorName(EXECUTOR_OPERATOR_NAME))
+                  .uid(EXECUTOR_OPERATOR_NAME + uidSuffix())
+                  .forceNonParallel());
+
+      setSlotSharingGroup(
+          result
+              .getSideOutput(ExpireSnapshotsProcessor.DELETE_STREAM)
+              .rebalance()
+              .transform(
+                  operatorName(DELETE_FILES_OPERATOR_NAME),
+                  TypeInformation.of(Void.class),
+                  new DeleteFilesProcessor(
+                      tableLoader().loadTable(), taskName(), index(), 
deleteBatchSize))
+              .uid(DELETE_FILES_OPERATOR_NAME + uidSuffix())
+              .setParallelism(parallelism()));
 
       // Ignore the file deletion result and return the DataStream<TaskResult> 
directly
       return result;
diff --git 
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/FlinkMaintenanceConfig.java
 
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/FlinkMaintenanceConfig.java
index e6f536273e..34d7330c59 100644
--- 
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/FlinkMaintenanceConfig.java
+++ 
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/FlinkMaintenanceConfig.java
@@ -23,7 +23,6 @@ import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.ConfigOptions;
 import org.apache.flink.configuration.ReadableConfig;
-import org.apache.flink.streaming.api.graph.StreamGraphGenerator;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.flink.FlinkConfParser;
 
@@ -60,7 +59,7 @@ public class FlinkMaintenanceConfig {
   public static final ConfigOption<String> SLOT_SHARING_GROUP_OPTION =
       ConfigOptions.key(SLOT_SHARING_GROUP)
           .stringType()
-          .defaultValue(StreamGraphGenerator.DEFAULT_SLOT_SHARING_GROUP)
+          .noDefaultValue()
           .withDescription(
               "The slot sharing group for maintenance tasks. "
                   + "Determines which operators can share slots in the Flink 
execution environment.");
@@ -114,8 +113,7 @@ public class FlinkMaintenanceConfig {
         .stringConf()
         .option(SLOT_SHARING_GROUP)
         .flinkConfig(SLOT_SHARING_GROUP_OPTION)
-        .defaultValue(SLOT_SHARING_GROUP_OPTION.defaultValue())
-        .parse();
+        .parseOptional();
   }
 
   public RewriteDataFilesConfig createRewriteDataFilesConfig() {
diff --git 
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/MaintenanceTaskBuilder.java
 
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/MaintenanceTaskBuilder.java
index 5d5f17b0a8..04df6106bd 100644
--- 
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/MaintenanceTaskBuilder.java
+++ 
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/MaintenanceTaskBuilder.java
@@ -226,4 +226,8 @@ public abstract class MaintenanceTaskBuilder<T extends 
MaintenanceTaskBuilder<?>
 
     return append(sourceStream);
   }
+
+  <O> SingleOutputStreamOperator<O> 
setSlotSharingGroup(SingleOutputStreamOperator<O> operator) {
+    return slotSharingGroup == null ? operator : 
operator.slotSharingGroup(slotSharingGroup);
+  }
 }
diff --git 
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/RewriteDataFiles.java
 
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/RewriteDataFiles.java
index f03f33a3fd..be77fda23d 100644
--- 
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/RewriteDataFiles.java
+++ 
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/RewriteDataFiles.java
@@ -290,58 +290,58 @@ public class RewriteDataFiles {
     @Override
     DataStream<TaskResult> append(DataStream<Trigger> trigger) {
       SingleOutputStreamOperator<DataFileRewritePlanner.PlannedGroup> planned =
-          trigger
-              .process(
-                  new DataFileRewritePlanner(
-                      tableName(),
-                      taskName(),
-                      index(),
-                      tableLoader(),
-                      partialProgressEnabled ? partialProgressMaxCommits : 1,
-                      maxRewriteBytes,
-                      rewriteOptions,
-                      filterSupplier,
-                      branch))
-              .name(operatorName(PLANNER_TASK_NAME))
-              .uid(PLANNER_TASK_NAME + uidSuffix())
-              .slotSharingGroup(slotSharingGroup())
-              .forceNonParallel();
+          setSlotSharingGroup(
+              trigger
+                  .process(
+                      new DataFileRewritePlanner(
+                          tableName(),
+                          taskName(),
+                          index(),
+                          tableLoader(),
+                          partialProgressEnabled ? partialProgressMaxCommits : 
1,
+                          maxRewriteBytes,
+                          rewriteOptions,
+                          filterSupplier,
+                          branch))
+                  .name(operatorName(PLANNER_TASK_NAME))
+                  .uid(PLANNER_TASK_NAME + uidSuffix())
+                  .forceNonParallel());
 
       SingleOutputStreamOperator<DataFileRewriteRunner.ExecutedGroup> 
rewritten =
-          planned
-              .rebalance()
-              .process(new DataFileRewriteRunner(tableName(), taskName(), 
index()))
-              .name(operatorName(REWRITE_TASK_NAME))
-              .uid(REWRITE_TASK_NAME + uidSuffix())
-              .slotSharingGroup(slotSharingGroup())
-              .setParallelism(parallelism());
+          setSlotSharingGroup(
+              planned
+                  .rebalance()
+                  .process(new DataFileRewriteRunner(tableName(), taskName(), 
index()))
+                  .name(operatorName(REWRITE_TASK_NAME))
+                  .uid(REWRITE_TASK_NAME + uidSuffix())
+                  .setParallelism(parallelism()));
 
       SingleOutputStreamOperator<Trigger> updated =
-          rewritten
-              .transform(
-                  operatorName(COMMIT_TASK_NAME),
-                  TypeInformation.of(Trigger.class),
-                  new DataFileRewriteCommitter(
-                      tableName(), taskName(), index(), tableLoader(), branch))
-              .uid(COMMIT_TASK_NAME + uidSuffix())
-              .slotSharingGroup(slotSharingGroup())
-              .forceNonParallel();
+          setSlotSharingGroup(
+              rewritten
+                  .transform(
+                      operatorName(COMMIT_TASK_NAME),
+                      TypeInformation.of(Trigger.class),
+                      new DataFileRewriteCommitter(
+                          tableName(), taskName(), index(), tableLoader(), 
branch))
+                  .uid(COMMIT_TASK_NAME + uidSuffix())
+                  .forceNonParallel());
 
-      return trigger
-          .union(updated)
-          .connect(
-              planned
-                  .getSideOutput(TaskResultAggregator.ERROR_STREAM)
-                  .union(
-                      
rewritten.getSideOutput(TaskResultAggregator.ERROR_STREAM),
-                      
updated.getSideOutput(TaskResultAggregator.ERROR_STREAM)))
-          .transform(
-              operatorName(AGGREGATOR_TASK_NAME),
-              TypeInformation.of(TaskResult.class),
-              new TaskResultAggregator(tableName(), taskName(), index()))
-          .uid(AGGREGATOR_TASK_NAME + uidSuffix())
-          .slotSharingGroup(slotSharingGroup())
-          .forceNonParallel();
+      return setSlotSharingGroup(
+          trigger
+              .union(updated)
+              .connect(
+                  planned
+                      .getSideOutput(TaskResultAggregator.ERROR_STREAM)
+                      .union(
+                          
rewritten.getSideOutput(TaskResultAggregator.ERROR_STREAM),
+                          
updated.getSideOutput(TaskResultAggregator.ERROR_STREAM)))
+              .transform(
+                  operatorName(AGGREGATOR_TASK_NAME),
+                  TypeInformation.of(TaskResult.class),
+                  new TaskResultAggregator(tableName(), taskName(), index()))
+              .uid(AGGREGATOR_TASK_NAME + uidSuffix())
+              .forceNonParallel());
     }
   }
 }
diff --git 
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/TableMaintenance.java
 
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/TableMaintenance.java
index d98a1c9ab4..025a6d17c0 100644
--- 
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/TableMaintenance.java
+++ 
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/TableMaintenance.java
@@ -42,7 +42,6 @@ import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamUtils;
 import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.graph.StreamGraphGenerator;
 import org.apache.iceberg.flink.TableLoader;
 import org.apache.iceberg.flink.maintenance.operator.LockRemover;
 import 
org.apache.iceberg.flink.maintenance.operator.LockRemoverOperatorFactory;
@@ -154,7 +153,7 @@ public class TableMaintenance {
     private final TriggerLockFactory lockFactory;
 
     private String uidSuffix = "TableMaintenance-" + UUID.randomUUID();
-    private String slotSharingGroup = 
StreamGraphGenerator.DEFAULT_SLOT_SHARING_GROUP;
+    private String slotSharingGroup = null;
     private Duration rateLimit = Duration.ofSeconds(RATE_LIMIT_SECOND_DEFAULT);
     private Duration lockCheckDelay = 
Duration.ofSeconds(LOCK_CHECK_DELAY_SECOND_DEFAULT);
     private int parallelism = ExecutionConfig.PARALLELISM_DEFAULT;
@@ -282,57 +281,57 @@ public class TableMaintenance {
         DataStream<Trigger> triggers;
         if (lockFactory == null) {
           triggers =
-              DataStreamUtils.reinterpretAsKeyedStream(
-                      changeStream(tableName, loader), unused -> true)
-                  .transform(
-                      TRIGGER_MANAGER_OPERATOR_NAME,
-                      TypeInformation.of(Trigger.class),
-                      new TriggerManagerOperatorFactory(
-                          tableName,
-                          taskNames,
-                          evaluators,
-                          rateLimit.toMillis(),
-                          lockCheckDelay.toMillis()))
-                  .uid(TRIGGER_MANAGER_OPERATOR_NAME + uidSuffix)
-                  .slotSharingGroup(slotSharingGroup)
-                  .forceNonParallel();
+              setSlotSharingGroup(
+                  DataStreamUtils.reinterpretAsKeyedStream(
+                          changeStream(tableName, loader), unused -> true)
+                      .transform(
+                          TRIGGER_MANAGER_OPERATOR_NAME,
+                          TypeInformation.of(Trigger.class),
+                          new TriggerManagerOperatorFactory(
+                              tableName,
+                              taskNames,
+                              evaluators,
+                              rateLimit.toMillis(),
+                              lockCheckDelay.toMillis()))
+                      .uid(TRIGGER_MANAGER_OPERATOR_NAME + uidSuffix)
+                      .forceNonParallel());
         } else {
           triggers =
-              DataStreamUtils.reinterpretAsKeyedStream(
-                      changeStream(tableName, loader), unused -> true)
-                  .process(
-                      new TriggerManager(
-                          loader,
-                          lockFactory,
-                          taskNames,
-                          evaluators,
-                          rateLimit.toMillis(),
-                          lockCheckDelay.toMillis()))
-                  .name(TRIGGER_MANAGER_OPERATOR_NAME)
-                  .uid(TRIGGER_MANAGER_OPERATOR_NAME + uidSuffix)
-                  .slotSharingGroup(slotSharingGroup)
-                  .forceNonParallel();
+              setSlotSharingGroup(
+                  DataStreamUtils.reinterpretAsKeyedStream(
+                          changeStream(tableName, loader), unused -> true)
+                      .process(
+                          new TriggerManager(
+                              loader,
+                              lockFactory,
+                              taskNames,
+                              evaluators,
+                              rateLimit.toMillis(),
+                              lockCheckDelay.toMillis()))
+                      .name(TRIGGER_MANAGER_OPERATOR_NAME)
+                      .uid(TRIGGER_MANAGER_OPERATOR_NAME + uidSuffix)
+                      .forceNonParallel());
         }
 
         triggers =
-            triggers
-                .assignTimestampsAndWatermarks(new 
PunctuatedWatermarkStrategy())
-                .name(WATERMARK_ASSIGNER_OPERATOR_NAME)
-                .uid(WATERMARK_ASSIGNER_OPERATOR_NAME + uidSuffix)
-                .slotSharingGroup(slotSharingGroup)
-                .forceNonParallel();
+            setSlotSharingGroup(
+                triggers
+                    .assignTimestampsAndWatermarks(new 
PunctuatedWatermarkStrategy())
+                    .name(WATERMARK_ASSIGNER_OPERATOR_NAME)
+                    .uid(WATERMARK_ASSIGNER_OPERATOR_NAME + uidSuffix)
+                    .forceNonParallel());
 
         // Add the specific tasks
         DataStream<TaskResult> unioned = null;
         for (int i = 0; i < taskBuilders.size(); ++i) {
           int taskIndex = i;
           DataStream<Trigger> filtered =
-              triggers
-                  .filter(t -> t.taskId() != null && t.taskId() == taskIndex)
-                  .name(FILTER_OPERATOR_NAME_PREFIX + taskIndex)
-                  .forceNonParallel()
-                  .uid(FILTER_OPERATOR_NAME_PREFIX + taskIndex + "-" + 
uidSuffix)
-                  .slotSharingGroup(slotSharingGroup);
+              setSlotSharingGroup(
+                  triggers
+                      .filter(t -> t.taskId() != null && t.taskId() == 
taskIndex)
+                      .name(FILTER_OPERATOR_NAME_PREFIX + taskIndex)
+                      .forceNonParallel()
+                      .uid(FILTER_OPERATOR_NAME_PREFIX + taskIndex + "-" + 
uidSuffix));
           MaintenanceTaskBuilder<?> builder = taskBuilders.get(taskIndex);
           DataStream<TaskResult> result =
               builder.append(
@@ -353,23 +352,23 @@ public class TableMaintenance {
 
         // Add the LockRemover to the end
         if (lockFactory == null) {
-          unioned
-              .transform(
-                  LOCK_REMOVER_OPERATOR_NAME,
-                  TypeInformation.of(Void.class),
-                  new LockRemoverOperatorFactory(tableName, taskNames))
-              .uid("lock-remover-" + uidSuffix)
-              .forceNonParallel()
-              .slotSharingGroup(slotSharingGroup);
+          setSlotSharingGroup(
+              unioned
+                  .transform(
+                      LOCK_REMOVER_OPERATOR_NAME,
+                      TypeInformation.of(Void.class),
+                      new LockRemoverOperatorFactory(tableName, taskNames))
+                  .uid("lock-remover-" + uidSuffix)
+                  .forceNonParallel());
         } else {
-          unioned
-              .transform(
-                  LOCK_REMOVER_OPERATOR_NAME,
-                  TypeInformation.of(Void.class),
-                  new LockRemover(tableName, lockFactory, taskNames))
-              .forceNonParallel()
-              .uid("lock-remover-" + uidSuffix)
-              .slotSharingGroup(slotSharingGroup);
+          setSlotSharingGroup(
+              unioned
+                  .transform(
+                      LOCK_REMOVER_OPERATOR_NAME,
+                      TypeInformation.of(Void.class),
+                      new LockRemover(tableName, lockFactory, taskNames))
+                  .forceNonParallel()
+                  .uid("lock-remover-" + uidSuffix));
         }
       }
     }
@@ -380,11 +379,13 @@ public class TableMaintenance {
         MonitorSource source =
             new MonitorSource(
                 loader, RateLimiterStrategy.perSecond(1.0 / 
rateLimit.getSeconds()), maxReadBack);
-        return env.fromSource(
-                source, WatermarkStrategy.noWatermarks(), 
SOURCE_OPERATOR_NAME_PREFIX + tableName)
-            .uid(SOURCE_OPERATOR_NAME_PREFIX + uidSuffix)
-            .slotSharingGroup(slotSharingGroup)
-            .forceNonParallel();
+        return setSlotSharingGroup(
+            env.fromSource(
+                    source,
+                    WatermarkStrategy.noWatermarks(),
+                    SOURCE_OPERATOR_NAME_PREFIX + tableName)
+                .uid(SOURCE_OPERATOR_NAME_PREFIX + uidSuffix)
+                .forceNonParallel());
       } else {
         return inputStream.global();
       }
@@ -393,6 +394,11 @@ public class TableMaintenance {
     private static String nameFor(MaintenanceTaskBuilder<?> streamBuilder, int 
taskIndex) {
       return String.format(Locale.ROOT, "%s [%d]", 
streamBuilder.maintenanceTaskName(), taskIndex);
     }
+
+    private <T> SingleOutputStreamOperator<T> setSlotSharingGroup(
+        SingleOutputStreamOperator<T> operator) {
+      return slotSharingGroup == null ? operator : 
operator.slotSharingGroup(slotSharingGroup);
+    }
   }
 
   @Internal
diff --git 
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSink.java 
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSink.java
index eaaf4ea6e4..440fdb278b 100644
--- 
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSink.java
+++ 
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSink.java
@@ -290,9 +290,14 @@ public class IcebergSink
           .add(maintenanceTasks)
           .rateLimit(Duration.ofSeconds(flinkMaintenanceConfig.rateLimit()))
           
.lockCheckDelay(Duration.ofSeconds(flinkMaintenanceConfig.lockCheckDelay()))
-          .slotSharingGroup(flinkMaintenanceConfig.slotSharingGroup())
-          .parallelism(flinkMaintenanceConfig.parallelism())
-          .append();
+          .parallelism(flinkMaintenanceConfig.parallelism());
+
+      String slotSharingGroup = flinkMaintenanceConfig.slotSharingGroup();
+      if (slotSharingGroup != null) {
+        builder.slotSharingGroup(slotSharingGroup);
+      }
+
+      builder.append();
     } catch (IOException e) {
       throw new UncheckedIOException("Failed to create tableMaintenance ", e);
     }
diff --git 
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestExpireSnapshots.java
 
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestExpireSnapshots.java
index b8aa259e2f..a23850d823 100644
--- 
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestExpireSnapshots.java
+++ 
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestExpireSnapshots.java
@@ -164,7 +164,7 @@ class TestExpireSnapshots extends MaintenanceTaskTestBase {
             0,
             tableLoader(),
             UID_SUFFIX,
-            StreamGraphGenerator.DEFAULT_SLOT_SHARING_GROUP,
+            null,
             1)
         .sinkTo(infra.sink());
 
diff --git 
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestRewriteDataFiles.java
 
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestRewriteDataFiles.java
index 88b949a9a7..c27f6081af 100644
--- 
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestRewriteDataFiles.java
+++ 
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestRewriteDataFiles.java
@@ -340,7 +340,7 @@ class TestRewriteDataFiles extends MaintenanceTaskTestBase {
             0,
             tableLoader(),
             UID_SUFFIX,
-            StreamGraphGenerator.DEFAULT_SLOT_SHARING_GROUP,
+            null,
             1)
         .sinkTo(infra.sink());
 
diff --git 
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestTableMaintenance.java
 
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestTableMaintenance.java
index eaa5b5e1b5..49219d5b46 100644
--- 
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestTableMaintenance.java
+++ 
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestTableMaintenance.java
@@ -424,12 +424,12 @@ class TestTableMaintenance extends OperatorTestBase {
     @Override
     DataStream<TaskResult> append(DataStream<Trigger> trigger) {
       String name = TASKS[id];
-      return trigger
-          .map(new DummyMaintenanceTask(success))
-          .name(name)
-          .uid(uidSuffix() + "-test-mapper-" + name + "-" + id)
-          .slotSharingGroup(slotSharingGroup())
-          .forceNonParallel();
+      return setSlotSharingGroup(
+          trigger
+              .map(new DummyMaintenanceTask(success))
+              .name(name)
+              .uid(uidSuffix() + "-test-mapper-" + name + "-" + id)
+              .forceNonParallel());
     }
   }
 
diff --git 
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestTableMaintenanceCoordinationLock.java
 
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestTableMaintenanceCoordinationLock.java
index eb5479045b..9b7941c2d3 100644
--- 
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestTableMaintenanceCoordinationLock.java
+++ 
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestTableMaintenanceCoordinationLock.java
@@ -313,12 +313,12 @@ class TestTableMaintenanceCoordinationLock extends 
OperatorTestBase {
     @Override
     DataStream<TaskResult> append(DataStream<Trigger> trigger) {
       String name = TASKS[id];
-      return trigger
-          .map(new DummyMaintenanceTask(success))
-          .name(name)
-          .uid(uidSuffix() + "-test-mapper-" + name + "-" + id)
-          .slotSharingGroup(slotSharingGroup())
-          .forceNonParallel();
+      return setSlotSharingGroup(
+          trigger
+              .map(new DummyMaintenanceTask(success))
+              .name(name)
+              .uid(uidSuffix() + "-test-mapper-" + name + "-" + id)
+              .forceNonParallel());
     }
   }
 
diff --git 
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/OperatorTestBase.java
 
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/OperatorTestBase.java
index d6563e782e..4f394be76f 100644
--- 
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/OperatorTestBase.java
+++ 
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/OperatorTestBase.java
@@ -35,7 +35,6 @@ import org.apache.flink.core.execution.JobClient;
 import org.apache.flink.core.execution.SavepointFormatType;
 import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.graph.StreamGraphGenerator;
 import org.apache.flink.streaming.api.transformations.SinkTransformation;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
@@ -413,15 +412,17 @@ public class OperatorTestBase {
   }
 
   protected static void 
checkSlotSharingGroupsAreSet(StreamExecutionEnvironment env, String name) {
-    String nameToCheck = name != null ? name : 
StreamGraphGenerator.DEFAULT_SLOT_SHARING_GROUP;
-
     env.getTransformations().stream()
         .filter(
             t -> !(t instanceof SinkTransformation) && 
!(t.getName().equals(IGNORED_OPERATOR_NAME)))
         .forEach(
             t -> {
-              assertThat(t.getSlotSharingGroup()).isPresent();
-              
assertThat(t.getSlotSharingGroup().get().getName()).isEqualTo(nameToCheck);
+              if (name == null) {
+                assertThat(t.getSlotSharingGroup()).isNotPresent();
+              } else {
+                assertThat(t.getSlotSharingGroup()).isPresent();
+                
assertThat(t.getSlotSharingGroup().get().getName()).isEqualTo(name);
+              }
             });
   }
 

Reply via email to