This is an automated email from the ASF dual-hosted git repository.
pvary pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new 294f80dfd4 Flink: Use native slot sharing group inheritance for
maintenance tasks (#16329)
294f80dfd4 is described below
commit 294f80dfd41c7919a9ed527d6838fa473897cf07
Author: GuoYu <[email protected]>
AuthorDate: Thu May 14 22:02:12 2026 +0800
Flink: Use native slot sharing group inheritance for maintenance tasks
(#16329)
---
.../flink/maintenance/api/DeleteOrphanFiles.java | 167 +++++++++++----------
.../flink/maintenance/api/ExpireSnapshots.java | 48 +++---
.../maintenance/api/FlinkMaintenanceConfig.java | 6 +-
.../maintenance/api/MaintenanceTaskBuilder.java | 4 +
.../flink/maintenance/api/RewriteDataFiles.java | 94 ++++++------
.../flink/maintenance/api/TableMaintenance.java | 132 ++++++++--------
.../org/apache/iceberg/flink/sink/IcebergSink.java | 11 +-
.../flink/maintenance/api/TestExpireSnapshots.java | 2 +-
.../maintenance/api/TestRewriteDataFiles.java | 2 +-
.../maintenance/api/TestTableMaintenance.java | 12 +-
.../api/TestTableMaintenanceCoordinationLock.java | 12 +-
.../maintenance/operator/OperatorTestBase.java | 11 +-
12 files changed, 258 insertions(+), 243 deletions(-)
diff --git
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/DeleteOrphanFiles.java
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/DeleteOrphanFiles.java
index 63a267d16e..377413c3c3 100644
---
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/DeleteOrphanFiles.java
+++
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/DeleteOrphanFiles.java
@@ -209,72 +209,73 @@ public class DeleteOrphanFiles {
// Collect all data files
SingleOutputStreamOperator<MetadataTablePlanner.SplitInfo> splits =
- trigger
- .process(
- new MetadataTablePlanner(
- taskName(),
- index(),
- tableLoader(),
- FILE_PATH_SCAN_CONTEXT,
- MetadataTableType.ALL_FILES,
- planningWorkerPoolSize))
- .name(operatorName(PLANNER_TASK_NAME))
- .uid(PLANNER_TASK_NAME + uidSuffix())
- .slotSharingGroup(slotSharingGroup())
- .forceNonParallel();
+ setSlotSharingGroup(
+ trigger
+ .process(
+ new MetadataTablePlanner(
+ taskName(),
+ index(),
+ tableLoader(),
+ FILE_PATH_SCAN_CONTEXT,
+ MetadataTableType.ALL_FILES,
+ planningWorkerPoolSize))
+ .name(operatorName(PLANNER_TASK_NAME))
+ .uid(PLANNER_TASK_NAME + uidSuffix())
+ .forceNonParallel());
// Read the records and get all data files
SingleOutputStreamOperator<String> tableDataFiles =
- splits
- .rebalance()
- .process(
- new FileNameReader(
- taskName(),
- index(),
- tableLoader(),
- FILE_PATH_SCHEMA,
- FILE_PATH_SCAN_CONTEXT,
- MetadataTableType.ALL_FILES))
- .name(operatorName(READER_TASK_NAME))
- .uid(READER_TASK_NAME + uidSuffix())
- .slotSharingGroup(slotSharingGroup())
- .setParallelism(parallelism());
+ setSlotSharingGroup(
+ splits
+ .rebalance()
+ .process(
+ new FileNameReader(
+ taskName(),
+ index(),
+ tableLoader(),
+ FILE_PATH_SCHEMA,
+ FILE_PATH_SCAN_CONTEXT,
+ MetadataTableType.ALL_FILES))
+ .name(operatorName(READER_TASK_NAME))
+ .uid(READER_TASK_NAME + uidSuffix())
+ .setParallelism(parallelism()));
// Collect all meta data files
SingleOutputStreamOperator<String> tableMetadataFiles =
- trigger
- .process(new ListMetadataFiles(taskName(), index(),
tableLoader()))
- .name(operatorName(METADATA_FILES_TASK_NAME))
- .uid(METADATA_FILES_TASK_NAME + uidSuffix())
- .slotSharingGroup(slotSharingGroup())
- .forceNonParallel();
+ setSlotSharingGroup(
+ trigger
+ .process(new ListMetadataFiles(taskName(), index(),
tableLoader()))
+ .name(operatorName(METADATA_FILES_TASK_NAME))
+ .uid(METADATA_FILES_TASK_NAME + uidSuffix())
+ .forceNonParallel());
// List the all file system files
SingleOutputStreamOperator<String> allFsFiles =
- trigger
- .process(
- new ListFileSystemFiles(
- taskName(),
- index(),
- tableLoader(),
- location,
- minAge.toMillis(),
- usePrefixListing))
- .name(operatorName(FILESYSTEM_FILES_TASK_NAME))
- .uid(FILESYSTEM_FILES_TASK_NAME + uidSuffix())
- .slotSharingGroup(slotSharingGroup())
- .forceNonParallel();
+ setSlotSharingGroup(
+ trigger
+ .process(
+ new ListFileSystemFiles(
+ taskName(),
+ index(),
+ tableLoader(),
+ location,
+ minAge.toMillis(),
+ usePrefixListing))
+ .name(operatorName(FILESYSTEM_FILES_TASK_NAME))
+ .uid(FILESYSTEM_FILES_TASK_NAME + uidSuffix())
+ .forceNonParallel());
SingleOutputStreamOperator<String> filesToDelete =
- tableMetadataFiles
- .union(tableDataFiles)
- .keyBy(new FileUriKeySelector(equalSchemes, equalAuthorities))
- .connect(allFsFiles.keyBy(new FileUriKeySelector(equalSchemes,
equalAuthorities)))
- .process(new OrphanFilesDetector(prefixMismatchMode,
equalSchemes, equalAuthorities))
- .slotSharingGroup(slotSharingGroup())
- .name(operatorName(FILTER_FILES_TASK_NAME))
- .uid(FILTER_FILES_TASK_NAME + uidSuffix())
- .setParallelism(parallelism());
+ setSlotSharingGroup(
+ tableMetadataFiles
+ .union(tableDataFiles)
+ .keyBy(new FileUriKeySelector(equalSchemes,
equalAuthorities))
+ .connect(allFsFiles.keyBy(new
FileUriKeySelector(equalSchemes, equalAuthorities)))
+ .process(
+ new OrphanFilesDetector(prefixMismatchMode,
equalSchemes, equalAuthorities))
+ .name(operatorName(FILTER_FILES_TASK_NAME))
+ .uid(FILTER_FILES_TASK_NAME + uidSuffix())
+ .setParallelism(parallelism()));
DataStream<Exception> errorStream =
tableMetadataFiles
@@ -287,38 +288,38 @@ public class DeleteOrphanFiles {
// Stop deleting the files if there is an error
SingleOutputStreamOperator<String> filesOrSkip =
- filesToDelete
- .connect(errorStream)
- .transform(
- operatorName(SKIP_ON_ERROR_TASK_NAME),
- TypeInformation.of(String.class),
- new SkipOnError())
- .uid(SKIP_ON_ERROR_TASK_NAME + uidSuffix())
- .slotSharingGroup(slotSharingGroup())
- .forceNonParallel();
+ setSlotSharingGroup(
+ filesToDelete
+ .connect(errorStream)
+ .transform(
+ operatorName(SKIP_ON_ERROR_TASK_NAME),
+ TypeInformation.of(String.class),
+ new SkipOnError())
+ .uid(SKIP_ON_ERROR_TASK_NAME + uidSuffix())
+ .forceNonParallel());
// delete the files
- filesOrSkip
- .rebalance()
- .transform(
- operatorName(DELETE_FILES_TASK_NAME),
- TypeInformation.of(Void.class),
- new DeleteFilesProcessor(
- tableLoader().loadTable(), taskName(), index(),
deleteBatchSize))
- .uid(DELETE_FILES_TASK_NAME + uidSuffix())
- .slotSharingGroup(slotSharingGroup())
- .setParallelism(parallelism());
+ setSlotSharingGroup(
+ filesOrSkip
+ .rebalance()
+ .transform(
+ operatorName(DELETE_FILES_TASK_NAME),
+ TypeInformation.of(Void.class),
+ new DeleteFilesProcessor(
+ tableLoader().loadTable(), taskName(), index(),
deleteBatchSize))
+ .uid(DELETE_FILES_TASK_NAME + uidSuffix())
+ .setParallelism(parallelism()));
// Ignore the file deletion result and return the DataStream<TaskResult>
directly
- return trigger
- .connect(errorStream)
- .transform(
- operatorName(AGGREGATOR_TASK_NAME),
- TypeInformation.of(TaskResult.class),
- new TaskResultAggregator(tableName(), taskName(), index()))
- .uid(AGGREGATOR_TASK_NAME + uidSuffix())
- .slotSharingGroup(slotSharingGroup())
- .forceNonParallel();
+ return setSlotSharingGroup(
+ trigger
+ .connect(errorStream)
+ .transform(
+ operatorName(AGGREGATOR_TASK_NAME),
+ TypeInformation.of(TaskResult.class),
+ new TaskResultAggregator(tableName(), taskName(), index()))
+ .uid(AGGREGATOR_TASK_NAME + uidSuffix())
+ .forceNonParallel());
}
}
diff --git
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/ExpireSnapshots.java
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/ExpireSnapshots.java
index c84932f96f..7c524175c4 100644
---
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/ExpireSnapshots.java
+++
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/ExpireSnapshots.java
@@ -126,30 +126,30 @@ public class ExpireSnapshots {
Preconditions.checkNotNull(tableLoader(), "TableLoader should not be
null");
SingleOutputStreamOperator<TaskResult> result =
- trigger
- .process(
- new ExpireSnapshotsProcessor(
- tableLoader(),
- maxSnapshotAge == null ? null :
maxSnapshotAge.toMillis(),
- numSnapshots,
- planningWorkerPoolSize,
- cleanExpiredMetadata))
- .name(operatorName(EXECUTOR_OPERATOR_NAME))
- .uid(EXECUTOR_OPERATOR_NAME + uidSuffix())
- .slotSharingGroup(slotSharingGroup())
- .forceNonParallel();
-
- result
- .getSideOutput(ExpireSnapshotsProcessor.DELETE_STREAM)
- .rebalance()
- .transform(
- operatorName(DELETE_FILES_OPERATOR_NAME),
- TypeInformation.of(Void.class),
- new DeleteFilesProcessor(
- tableLoader().loadTable(), taskName(), index(),
deleteBatchSize))
- .uid(DELETE_FILES_OPERATOR_NAME + uidSuffix())
- .slotSharingGroup(slotSharingGroup())
- .setParallelism(parallelism());
+ setSlotSharingGroup(
+ trigger
+ .process(
+ new ExpireSnapshotsProcessor(
+ tableLoader(),
+ maxSnapshotAge == null ? null :
maxSnapshotAge.toMillis(),
+ numSnapshots,
+ planningWorkerPoolSize,
+ cleanExpiredMetadata))
+ .name(operatorName(EXECUTOR_OPERATOR_NAME))
+ .uid(EXECUTOR_OPERATOR_NAME + uidSuffix())
+ .forceNonParallel());
+
+ setSlotSharingGroup(
+ result
+ .getSideOutput(ExpireSnapshotsProcessor.DELETE_STREAM)
+ .rebalance()
+ .transform(
+ operatorName(DELETE_FILES_OPERATOR_NAME),
+ TypeInformation.of(Void.class),
+ new DeleteFilesProcessor(
+ tableLoader().loadTable(), taskName(), index(),
deleteBatchSize))
+ .uid(DELETE_FILES_OPERATOR_NAME + uidSuffix())
+ .setParallelism(parallelism()));
// Ignore the file deletion result and return the DataStream<TaskResult>
directly
return result;
diff --git
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/FlinkMaintenanceConfig.java
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/FlinkMaintenanceConfig.java
index e6f536273e..34d7330c59 100644
---
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/FlinkMaintenanceConfig.java
+++
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/FlinkMaintenanceConfig.java
@@ -23,7 +23,6 @@ import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.ReadableConfig;
-import org.apache.flink.streaming.api.graph.StreamGraphGenerator;
import org.apache.iceberg.Table;
import org.apache.iceberg.flink.FlinkConfParser;
@@ -60,7 +59,7 @@ public class FlinkMaintenanceConfig {
public static final ConfigOption<String> SLOT_SHARING_GROUP_OPTION =
ConfigOptions.key(SLOT_SHARING_GROUP)
.stringType()
- .defaultValue(StreamGraphGenerator.DEFAULT_SLOT_SHARING_GROUP)
+ .noDefaultValue()
.withDescription(
"The slot sharing group for maintenance tasks. "
+ "Determines which operators can share slots in the Flink
execution environment.");
@@ -114,8 +113,7 @@ public class FlinkMaintenanceConfig {
.stringConf()
.option(SLOT_SHARING_GROUP)
.flinkConfig(SLOT_SHARING_GROUP_OPTION)
- .defaultValue(SLOT_SHARING_GROUP_OPTION.defaultValue())
- .parse();
+ .parseOptional();
}
public RewriteDataFilesConfig createRewriteDataFilesConfig() {
diff --git
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/MaintenanceTaskBuilder.java
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/MaintenanceTaskBuilder.java
index 5d5f17b0a8..04df6106bd 100644
---
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/MaintenanceTaskBuilder.java
+++
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/MaintenanceTaskBuilder.java
@@ -226,4 +226,8 @@ public abstract class MaintenanceTaskBuilder<T extends
MaintenanceTaskBuilder<?>
return append(sourceStream);
}
+
+ <O> SingleOutputStreamOperator<O>
setSlotSharingGroup(SingleOutputStreamOperator<O> operator) {
+ return slotSharingGroup == null ? operator :
operator.slotSharingGroup(slotSharingGroup);
+ }
}
diff --git
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/RewriteDataFiles.java
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/RewriteDataFiles.java
index f03f33a3fd..be77fda23d 100644
---
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/RewriteDataFiles.java
+++
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/RewriteDataFiles.java
@@ -290,58 +290,58 @@ public class RewriteDataFiles {
@Override
DataStream<TaskResult> append(DataStream<Trigger> trigger) {
SingleOutputStreamOperator<DataFileRewritePlanner.PlannedGroup> planned =
- trigger
- .process(
- new DataFileRewritePlanner(
- tableName(),
- taskName(),
- index(),
- tableLoader(),
- partialProgressEnabled ? partialProgressMaxCommits : 1,
- maxRewriteBytes,
- rewriteOptions,
- filterSupplier,
- branch))
- .name(operatorName(PLANNER_TASK_NAME))
- .uid(PLANNER_TASK_NAME + uidSuffix())
- .slotSharingGroup(slotSharingGroup())
- .forceNonParallel();
+ setSlotSharingGroup(
+ trigger
+ .process(
+ new DataFileRewritePlanner(
+ tableName(),
+ taskName(),
+ index(),
+ tableLoader(),
+ partialProgressEnabled ? partialProgressMaxCommits :
1,
+ maxRewriteBytes,
+ rewriteOptions,
+ filterSupplier,
+ branch))
+ .name(operatorName(PLANNER_TASK_NAME))
+ .uid(PLANNER_TASK_NAME + uidSuffix())
+ .forceNonParallel());
SingleOutputStreamOperator<DataFileRewriteRunner.ExecutedGroup>
rewritten =
- planned
- .rebalance()
- .process(new DataFileRewriteRunner(tableName(), taskName(),
index()))
- .name(operatorName(REWRITE_TASK_NAME))
- .uid(REWRITE_TASK_NAME + uidSuffix())
- .slotSharingGroup(slotSharingGroup())
- .setParallelism(parallelism());
+ setSlotSharingGroup(
+ planned
+ .rebalance()
+ .process(new DataFileRewriteRunner(tableName(), taskName(),
index()))
+ .name(operatorName(REWRITE_TASK_NAME))
+ .uid(REWRITE_TASK_NAME + uidSuffix())
+ .setParallelism(parallelism()));
SingleOutputStreamOperator<Trigger> updated =
- rewritten
- .transform(
- operatorName(COMMIT_TASK_NAME),
- TypeInformation.of(Trigger.class),
- new DataFileRewriteCommitter(
- tableName(), taskName(), index(), tableLoader(), branch))
- .uid(COMMIT_TASK_NAME + uidSuffix())
- .slotSharingGroup(slotSharingGroup())
- .forceNonParallel();
+ setSlotSharingGroup(
+ rewritten
+ .transform(
+ operatorName(COMMIT_TASK_NAME),
+ TypeInformation.of(Trigger.class),
+ new DataFileRewriteCommitter(
+ tableName(), taskName(), index(), tableLoader(),
branch))
+ .uid(COMMIT_TASK_NAME + uidSuffix())
+ .forceNonParallel());
- return trigger
- .union(updated)
- .connect(
- planned
- .getSideOutput(TaskResultAggregator.ERROR_STREAM)
- .union(
-
rewritten.getSideOutput(TaskResultAggregator.ERROR_STREAM),
-
updated.getSideOutput(TaskResultAggregator.ERROR_STREAM)))
- .transform(
- operatorName(AGGREGATOR_TASK_NAME),
- TypeInformation.of(TaskResult.class),
- new TaskResultAggregator(tableName(), taskName(), index()))
- .uid(AGGREGATOR_TASK_NAME + uidSuffix())
- .slotSharingGroup(slotSharingGroup())
- .forceNonParallel();
+ return setSlotSharingGroup(
+ trigger
+ .union(updated)
+ .connect(
+ planned
+ .getSideOutput(TaskResultAggregator.ERROR_STREAM)
+ .union(
+
rewritten.getSideOutput(TaskResultAggregator.ERROR_STREAM),
+
updated.getSideOutput(TaskResultAggregator.ERROR_STREAM)))
+ .transform(
+ operatorName(AGGREGATOR_TASK_NAME),
+ TypeInformation.of(TaskResult.class),
+ new TaskResultAggregator(tableName(), taskName(), index()))
+ .uid(AGGREGATOR_TASK_NAME + uidSuffix())
+ .forceNonParallel());
}
}
}
diff --git
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/TableMaintenance.java
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/TableMaintenance.java
index d98a1c9ab4..025a6d17c0 100644
---
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/TableMaintenance.java
+++
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/TableMaintenance.java
@@ -42,7 +42,6 @@ import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamUtils;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.graph.StreamGraphGenerator;
import org.apache.iceberg.flink.TableLoader;
import org.apache.iceberg.flink.maintenance.operator.LockRemover;
import
org.apache.iceberg.flink.maintenance.operator.LockRemoverOperatorFactory;
@@ -154,7 +153,7 @@ public class TableMaintenance {
private final TriggerLockFactory lockFactory;
private String uidSuffix = "TableMaintenance-" + UUID.randomUUID();
- private String slotSharingGroup =
StreamGraphGenerator.DEFAULT_SLOT_SHARING_GROUP;
+ private String slotSharingGroup = null;
private Duration rateLimit = Duration.ofSeconds(RATE_LIMIT_SECOND_DEFAULT);
private Duration lockCheckDelay =
Duration.ofSeconds(LOCK_CHECK_DELAY_SECOND_DEFAULT);
private int parallelism = ExecutionConfig.PARALLELISM_DEFAULT;
@@ -282,57 +281,57 @@ public class TableMaintenance {
DataStream<Trigger> triggers;
if (lockFactory == null) {
triggers =
- DataStreamUtils.reinterpretAsKeyedStream(
- changeStream(tableName, loader), unused -> true)
- .transform(
- TRIGGER_MANAGER_OPERATOR_NAME,
- TypeInformation.of(Trigger.class),
- new TriggerManagerOperatorFactory(
- tableName,
- taskNames,
- evaluators,
- rateLimit.toMillis(),
- lockCheckDelay.toMillis()))
- .uid(TRIGGER_MANAGER_OPERATOR_NAME + uidSuffix)
- .slotSharingGroup(slotSharingGroup)
- .forceNonParallel();
+ setSlotSharingGroup(
+ DataStreamUtils.reinterpretAsKeyedStream(
+ changeStream(tableName, loader), unused -> true)
+ .transform(
+ TRIGGER_MANAGER_OPERATOR_NAME,
+ TypeInformation.of(Trigger.class),
+ new TriggerManagerOperatorFactory(
+ tableName,
+ taskNames,
+ evaluators,
+ rateLimit.toMillis(),
+ lockCheckDelay.toMillis()))
+ .uid(TRIGGER_MANAGER_OPERATOR_NAME + uidSuffix)
+ .forceNonParallel());
} else {
triggers =
- DataStreamUtils.reinterpretAsKeyedStream(
- changeStream(tableName, loader), unused -> true)
- .process(
- new TriggerManager(
- loader,
- lockFactory,
- taskNames,
- evaluators,
- rateLimit.toMillis(),
- lockCheckDelay.toMillis()))
- .name(TRIGGER_MANAGER_OPERATOR_NAME)
- .uid(TRIGGER_MANAGER_OPERATOR_NAME + uidSuffix)
- .slotSharingGroup(slotSharingGroup)
- .forceNonParallel();
+ setSlotSharingGroup(
+ DataStreamUtils.reinterpretAsKeyedStream(
+ changeStream(tableName, loader), unused -> true)
+ .process(
+ new TriggerManager(
+ loader,
+ lockFactory,
+ taskNames,
+ evaluators,
+ rateLimit.toMillis(),
+ lockCheckDelay.toMillis()))
+ .name(TRIGGER_MANAGER_OPERATOR_NAME)
+ .uid(TRIGGER_MANAGER_OPERATOR_NAME + uidSuffix)
+ .forceNonParallel());
}
triggers =
- triggers
- .assignTimestampsAndWatermarks(new
PunctuatedWatermarkStrategy())
- .name(WATERMARK_ASSIGNER_OPERATOR_NAME)
- .uid(WATERMARK_ASSIGNER_OPERATOR_NAME + uidSuffix)
- .slotSharingGroup(slotSharingGroup)
- .forceNonParallel();
+ setSlotSharingGroup(
+ triggers
+ .assignTimestampsAndWatermarks(new
PunctuatedWatermarkStrategy())
+ .name(WATERMARK_ASSIGNER_OPERATOR_NAME)
+ .uid(WATERMARK_ASSIGNER_OPERATOR_NAME + uidSuffix)
+ .forceNonParallel());
// Add the specific tasks
DataStream<TaskResult> unioned = null;
for (int i = 0; i < taskBuilders.size(); ++i) {
int taskIndex = i;
DataStream<Trigger> filtered =
- triggers
- .filter(t -> t.taskId() != null && t.taskId() == taskIndex)
- .name(FILTER_OPERATOR_NAME_PREFIX + taskIndex)
- .forceNonParallel()
- .uid(FILTER_OPERATOR_NAME_PREFIX + taskIndex + "-" +
uidSuffix)
- .slotSharingGroup(slotSharingGroup);
+ setSlotSharingGroup(
+ triggers
+ .filter(t -> t.taskId() != null && t.taskId() ==
taskIndex)
+ .name(FILTER_OPERATOR_NAME_PREFIX + taskIndex)
+ .forceNonParallel()
+ .uid(FILTER_OPERATOR_NAME_PREFIX + taskIndex + "-" +
uidSuffix));
MaintenanceTaskBuilder<?> builder = taskBuilders.get(taskIndex);
DataStream<TaskResult> result =
builder.append(
@@ -353,23 +352,23 @@ public class TableMaintenance {
// Add the LockRemover to the end
if (lockFactory == null) {
- unioned
- .transform(
- LOCK_REMOVER_OPERATOR_NAME,
- TypeInformation.of(Void.class),
- new LockRemoverOperatorFactory(tableName, taskNames))
- .uid("lock-remover-" + uidSuffix)
- .forceNonParallel()
- .slotSharingGroup(slotSharingGroup);
+ setSlotSharingGroup(
+ unioned
+ .transform(
+ LOCK_REMOVER_OPERATOR_NAME,
+ TypeInformation.of(Void.class),
+ new LockRemoverOperatorFactory(tableName, taskNames))
+ .uid("lock-remover-" + uidSuffix)
+ .forceNonParallel());
} else {
- unioned
- .transform(
- LOCK_REMOVER_OPERATOR_NAME,
- TypeInformation.of(Void.class),
- new LockRemover(tableName, lockFactory, taskNames))
- .forceNonParallel()
- .uid("lock-remover-" + uidSuffix)
- .slotSharingGroup(slotSharingGroup);
+ setSlotSharingGroup(
+ unioned
+ .transform(
+ LOCK_REMOVER_OPERATOR_NAME,
+ TypeInformation.of(Void.class),
+ new LockRemover(tableName, lockFactory, taskNames))
+ .forceNonParallel()
+ .uid("lock-remover-" + uidSuffix));
}
}
}
@@ -380,11 +379,13 @@ public class TableMaintenance {
MonitorSource source =
new MonitorSource(
loader, RateLimiterStrategy.perSecond(1.0 /
rateLimit.getSeconds()), maxReadBack);
- return env.fromSource(
- source, WatermarkStrategy.noWatermarks(),
SOURCE_OPERATOR_NAME_PREFIX + tableName)
- .uid(SOURCE_OPERATOR_NAME_PREFIX + uidSuffix)
- .slotSharingGroup(slotSharingGroup)
- .forceNonParallel();
+ return setSlotSharingGroup(
+ env.fromSource(
+ source,
+ WatermarkStrategy.noWatermarks(),
+ SOURCE_OPERATOR_NAME_PREFIX + tableName)
+ .uid(SOURCE_OPERATOR_NAME_PREFIX + uidSuffix)
+ .forceNonParallel());
} else {
return inputStream.global();
}
@@ -393,6 +394,11 @@ public class TableMaintenance {
private static String nameFor(MaintenanceTaskBuilder<?> streamBuilder, int
taskIndex) {
return String.format(Locale.ROOT, "%s [%d]",
streamBuilder.maintenanceTaskName(), taskIndex);
}
+
+ private <T> SingleOutputStreamOperator<T> setSlotSharingGroup(
+ SingleOutputStreamOperator<T> operator) {
+ return slotSharingGroup == null ? operator :
operator.slotSharingGroup(slotSharingGroup);
+ }
}
@Internal
diff --git
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSink.java
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSink.java
index eaaf4ea6e4..440fdb278b 100644
---
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSink.java
+++
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSink.java
@@ -290,9 +290,14 @@ public class IcebergSink
.add(maintenanceTasks)
.rateLimit(Duration.ofSeconds(flinkMaintenanceConfig.rateLimit()))
.lockCheckDelay(Duration.ofSeconds(flinkMaintenanceConfig.lockCheckDelay()))
- .slotSharingGroup(flinkMaintenanceConfig.slotSharingGroup())
- .parallelism(flinkMaintenanceConfig.parallelism())
- .append();
+ .parallelism(flinkMaintenanceConfig.parallelism());
+
+ String slotSharingGroup = flinkMaintenanceConfig.slotSharingGroup();
+ if (slotSharingGroup != null) {
+ builder.slotSharingGroup(slotSharingGroup);
+ }
+
+ builder.append();
} catch (IOException e) {
throw new UncheckedIOException("Failed to create tableMaintenance ", e);
}
diff --git
a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestExpireSnapshots.java
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestExpireSnapshots.java
index b8aa259e2f..a23850d823 100644
---
a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestExpireSnapshots.java
+++
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestExpireSnapshots.java
@@ -164,7 +164,7 @@ class TestExpireSnapshots extends MaintenanceTaskTestBase {
0,
tableLoader(),
UID_SUFFIX,
- StreamGraphGenerator.DEFAULT_SLOT_SHARING_GROUP,
+ null,
1)
.sinkTo(infra.sink());
diff --git
a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestRewriteDataFiles.java
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestRewriteDataFiles.java
index 88b949a9a7..c27f6081af 100644
---
a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestRewriteDataFiles.java
+++
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestRewriteDataFiles.java
@@ -340,7 +340,7 @@ class TestRewriteDataFiles extends MaintenanceTaskTestBase {
0,
tableLoader(),
UID_SUFFIX,
- StreamGraphGenerator.DEFAULT_SLOT_SHARING_GROUP,
+ null,
1)
.sinkTo(infra.sink());
diff --git
a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestTableMaintenance.java
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestTableMaintenance.java
index eaa5b5e1b5..49219d5b46 100644
---
a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestTableMaintenance.java
+++
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestTableMaintenance.java
@@ -424,12 +424,12 @@ class TestTableMaintenance extends OperatorTestBase {
@Override
DataStream<TaskResult> append(DataStream<Trigger> trigger) {
String name = TASKS[id];
- return trigger
- .map(new DummyMaintenanceTask(success))
- .name(name)
- .uid(uidSuffix() + "-test-mapper-" + name + "-" + id)
- .slotSharingGroup(slotSharingGroup())
- .forceNonParallel();
+ return setSlotSharingGroup(
+ trigger
+ .map(new DummyMaintenanceTask(success))
+ .name(name)
+ .uid(uidSuffix() + "-test-mapper-" + name + "-" + id)
+ .forceNonParallel());
}
}
diff --git
a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestTableMaintenanceCoordinationLock.java
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestTableMaintenanceCoordinationLock.java
index eb5479045b..9b7941c2d3 100644
---
a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestTableMaintenanceCoordinationLock.java
+++
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestTableMaintenanceCoordinationLock.java
@@ -313,12 +313,12 @@ class TestTableMaintenanceCoordinationLock extends
OperatorTestBase {
@Override
DataStream<TaskResult> append(DataStream<Trigger> trigger) {
String name = TASKS[id];
- return trigger
- .map(new DummyMaintenanceTask(success))
- .name(name)
- .uid(uidSuffix() + "-test-mapper-" + name + "-" + id)
- .slotSharingGroup(slotSharingGroup())
- .forceNonParallel();
+ return setSlotSharingGroup(
+ trigger
+ .map(new DummyMaintenanceTask(success))
+ .name(name)
+ .uid(uidSuffix() + "-test-mapper-" + name + "-" + id)
+ .forceNonParallel());
}
}
diff --git
a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/OperatorTestBase.java
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/OperatorTestBase.java
index d6563e782e..4f394be76f 100644
---
a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/OperatorTestBase.java
+++
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/OperatorTestBase.java
@@ -35,7 +35,6 @@ import org.apache.flink.core.execution.JobClient;
import org.apache.flink.core.execution.SavepointFormatType;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.graph.StreamGraphGenerator;
import org.apache.flink.streaming.api.transformations.SinkTransformation;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
@@ -413,15 +412,17 @@ public class OperatorTestBase {
}
protected static void
checkSlotSharingGroupsAreSet(StreamExecutionEnvironment env, String name) {
- String nameToCheck = name != null ? name :
StreamGraphGenerator.DEFAULT_SLOT_SHARING_GROUP;
-
env.getTransformations().stream()
.filter(
t -> !(t instanceof SinkTransformation) &&
!(t.getName().equals(IGNORED_OPERATOR_NAME)))
.forEach(
t -> {
- assertThat(t.getSlotSharingGroup()).isPresent();
-
assertThat(t.getSlotSharingGroup().get().getName()).isEqualTo(nameToCheck);
+ if (name == null) {
+ assertThat(t.getSlotSharingGroup()).isNotPresent();
+ } else {
+ assertThat(t.getSlotSharingGroup()).isPresent();
+
assertThat(t.getSlotSharingGroup().get().getName()).isEqualTo(name);
+ }
});
}