This is an automated email from the ASF dual-hosted git repository.

huaxingao pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new 652691c2ab Flink: Backport: Allow arbitrary post-commit maintenance 
tasks via IcebergSink Builder (#15566) (#15667)
652691c2ab is described below

commit 652691c2ab06176f16f1db418d87d70cd94b176a
Author: Maximilian Michels <[email protected]>
AuthorDate: Tue Mar 17 18:31:34 2026 +0100

    Flink: Backport: Allow arbitrary post-commit maintenance tasks via 
IcebergSink Builder (#15566) (#15667)
---
 .../org/apache/iceberg/flink/FlinkWriteConf.java   |  18 ++
 .../apache/iceberg/flink/FlinkWriteOptions.java    |  16 +-
 .../flink/maintenance/api/DeleteOrphanFiles.java   |  26 ++-
 .../maintenance/api/DeleteOrphanFilesConfig.java   | 216 +++++++++++++++++++++
 .../flink/maintenance/api/ExpireSnapshots.java     |  14 ++
 .../maintenance/api/ExpireSnapshotsConfig.java     | 151 ++++++++++++++
 .../maintenance/api/FlinkMaintenanceConfig.java    |   8 +
 .../flink/maintenance/api/TableMaintenance.java    |  12 ++
 .../iceberg/flink/sink/IcebergCommitter.java       |   8 +-
 .../org/apache/iceberg/flink/sink/IcebergSink.java | 147 +++++++++++---
 .../api/TestDeleteOrphanFilesConfig.java           |  91 +++++++++
 .../maintenance/api/TestExpireSnapshotsConfig.java |  84 ++++++++
 ...t.java => TestIcebergSinkTableMaintenance.java} | 167 +++++++++++++++-
 .../org/apache/iceberg/flink/FlinkWriteConf.java   |  18 ++
 .../apache/iceberg/flink/FlinkWriteOptions.java    |  16 +-
 .../flink/maintenance/api/DeleteOrphanFiles.java   |  26 ++-
 .../maintenance/api/DeleteOrphanFilesConfig.java   | 216 +++++++++++++++++++++
 .../flink/maintenance/api/ExpireSnapshots.java     |  14 ++
 .../maintenance/api/ExpireSnapshotsConfig.java     | 151 ++++++++++++++
 .../maintenance/api/FlinkMaintenanceConfig.java    |   8 +
 .../flink/maintenance/api/TableMaintenance.java    |  12 ++
 .../iceberg/flink/sink/IcebergCommitter.java       |   8 +-
 .../org/apache/iceberg/flink/sink/IcebergSink.java | 147 +++++++++++---
 .../api/TestDeleteOrphanFilesConfig.java           |  91 +++++++++
 .../maintenance/api/TestExpireSnapshotsConfig.java |  84 ++++++++
 ...t.java => TestIcebergSinkTableMaintenance.java} | 167 +++++++++++++++-
 26 files changed, 1842 insertions(+), 74 deletions(-)

diff --git 
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteConf.java 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteConf.java
index 66fd098077..990d23f2aa 100644
--- 
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteConf.java
+++ 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteConf.java
@@ -220,6 +220,24 @@ public class FlinkWriteConf {
     return 
confParser.intConf().option(FlinkWriteOptions.WRITE_PARALLELISM.key()).parseOptional();
   }
 
+  public boolean expireSnapshotsMode() {
+    return confParser
+        .booleanConf()
+        .option(FlinkWriteOptions.EXPIRE_SNAPSHOTS_ENABLE.key())
+        .flinkConfig(FlinkWriteOptions.EXPIRE_SNAPSHOTS_ENABLE)
+        .defaultValue(FlinkWriteOptions.EXPIRE_SNAPSHOTS_ENABLE.defaultValue())
+        .parse();
+  }
+
+  public boolean deleteOrphanFilesMode() {
+    return confParser
+        .booleanConf()
+        .option(FlinkWriteOptions.DELETE_ORPHAN_FILES_ENABLE.key())
+        .flinkConfig(FlinkWriteOptions.DELETE_ORPHAN_FILES_ENABLE)
+        
.defaultValue(FlinkWriteOptions.DELETE_ORPHAN_FILES_ENABLE.defaultValue())
+        .parse();
+  }
+
   public boolean compactMode() {
     return confParser
         .booleanConf()
diff --git 
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteOptions.java
 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteOptions.java
index e68e64ac57..ee2aeaa450 100644
--- 
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteOptions.java
+++ 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteOptions.java
@@ -23,6 +23,9 @@ import org.apache.flink.annotation.Experimental;
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.ConfigOptions;
 import org.apache.iceberg.SnapshotRef;
+import org.apache.iceberg.flink.maintenance.api.DeleteOrphanFilesConfig;
+import org.apache.iceberg.flink.maintenance.api.ExpireSnapshotsConfig;
+import org.apache.iceberg.flink.maintenance.api.RewriteDataFilesConfig;
 import org.apache.iceberg.flink.sink.shuffle.StatisticsType;
 
 /** Flink sink write options */
@@ -82,7 +85,18 @@ public class FlinkWriteOptions {
       ConfigOptions.key("write-parallelism").intType().noDefaultValue();
 
   public static final ConfigOption<Boolean> COMPACTION_ENABLE =
-      
ConfigOptions.key("compaction-enabled").booleanType().defaultValue(false);
+      ConfigOptions.key(RewriteDataFilesConfig.PREFIX + "enabled")
+          .booleanType()
+          .defaultValue(false)
+          .withDeprecatedKeys("compaction-enabled");
+
+  public static final ConfigOption<Boolean> EXPIRE_SNAPSHOTS_ENABLE =
+      ConfigOptions.key(ExpireSnapshotsConfig.PREFIX + 
"enabled").booleanType().defaultValue(false);
+
+  public static final ConfigOption<Boolean> DELETE_ORPHAN_FILES_ENABLE =
+      ConfigOptions.key(DeleteOrphanFilesConfig.PREFIX + "enabled")
+          .booleanType()
+          .defaultValue(false);
 
   @Experimental
   public static final ConfigOption<Duration> TABLE_REFRESH_INTERVAL =
diff --git 
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/DeleteOrphanFiles.java
 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/DeleteOrphanFiles.java
index 2fce5e0b3e..63a267d16e 100644
--- 
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/DeleteOrphanFiles.java
+++ 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/DeleteOrphanFiles.java
@@ -53,6 +53,11 @@ public class DeleteOrphanFiles {
       ScanContext.builder().streaming(true).project(FILE_PATH_SCHEMA).build();
   private static final Splitter COMMA_SPLITTER = Splitter.on(",");
 
+  static final Map<String, String> DEFAULT_EQUAL_SCHEMES =
+      ImmutableMap.of(
+          "s3n", "s3",
+          "s3a", "s3");
+
   @Internal
   public static final OutputTag<Exception> ERROR_STREAM =
       new OutputTag<>("error-stream", TypeInformation.of(Exception.class));
@@ -79,12 +84,8 @@ public class DeleteOrphanFiles {
     private Duration minAge = Duration.ofDays(3);
     private int planningWorkerPoolSize = ThreadPools.WORKER_THREAD_POOL_SIZE;
     private int deleteBatchSize = 1000;
-    private boolean usePrefixListing = false;
-    private Map<String, String> equalSchemes =
-        Maps.newHashMap(
-            ImmutableMap.of(
-                "s3n", "s3",
-                "s3a", "s3"));
+    private boolean usePrefixListing = true;
+    private Map<String, String> equalSchemes = 
Maps.newHashMap(DEFAULT_EQUAL_SCHEMES);
     private final Map<String, String> equalAuthorities = Maps.newHashMap();
     private PrefixMismatchMode prefixMismatchMode = PrefixMismatchMode.ERROR;
 
@@ -189,6 +190,19 @@ public class DeleteOrphanFiles {
       return this;
     }
 
+    public Builder config(DeleteOrphanFilesConfig deleteOrphanFilesConfig) {
+      return this.scheduleOnInterval(
+              
Duration.ofSeconds(deleteOrphanFilesConfig.scheduleOnIntervalSecond()))
+          .minAge(Duration.ofSeconds(deleteOrphanFilesConfig.minAgeSeconds()))
+          .deleteBatchSize(deleteOrphanFilesConfig.deleteBatchSize())
+          .usePrefixListing(deleteOrphanFilesConfig.usePrefixListing())
+          .prefixMismatchMode(deleteOrphanFilesConfig.prefixMismatchMode())
+          .location(deleteOrphanFilesConfig.location())
+          
.planningWorkerPoolSize(deleteOrphanFilesConfig.planningWorkerPoolSize())
+          .equalSchemes(deleteOrphanFilesConfig.equalSchemes())
+          .equalAuthorities(deleteOrphanFilesConfig.equalAuthorities());
+    }
+
     @Override
     DataStream<TaskResult> append(DataStream<Trigger> trigger) {
       tableLoader().open();
diff --git 
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/DeleteOrphanFilesConfig.java
 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/DeleteOrphanFilesConfig.java
new file mode 100644
index 0000000000..af34735781
--- /dev/null
+++ 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/DeleteOrphanFilesConfig.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.maintenance.api;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.actions.DeleteOrphanFiles.PrefixMismatchMode;
+import org.apache.iceberg.flink.FlinkConfParser;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.base.Splitter;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.util.ThreadPools;
+
+public class DeleteOrphanFilesConfig {
+  public static final String PREFIX = FlinkMaintenanceConfig.PREFIX + 
"delete-orphan-files.";
+
+  private static final Splitter COMMA_SPLITTER = Splitter.on(",");
+  private static final Splitter EQUALS_SPLITTER = Splitter.on("=").limit(2);
+
+  public static final String SCHEDULE_ON_INTERVAL_SECOND = PREFIX + 
"schedule.interval-second";
+  public static final ConfigOption<Long> SCHEDULE_ON_INTERVAL_SECOND_OPTION =
+      ConfigOptions.key(SCHEDULE_ON_INTERVAL_SECOND)
+          .longType()
+          .defaultValue(60 * 60L) // Default is 1 hour
+          .withDescription(
+              "The time interval (in seconds) between two consecutive delete 
orphan files operations.");
+
+  public static final String MIN_AGE_SECONDS = PREFIX + "min-age-seconds";
+  public static final ConfigOption<Long> MIN_AGE_SECONDS_OPTION =
+      ConfigOptions.key(MIN_AGE_SECONDS)
+          .longType()
+          .defaultValue(3L * 24 * 60 * 60) // Default is 3 days
+          .withDescription(
+              "The minimum age (in seconds) of files to be considered for 
deletion. "
+                  + "Files newer than this will not be removed.");
+
+  public static final String DELETE_BATCH_SIZE = PREFIX + "delete-batch-size";
+  public static final ConfigOption<Integer> DELETE_BATCH_SIZE_OPTION =
+      ConfigOptions.key(DELETE_BATCH_SIZE)
+          .intType()
+          .defaultValue(1000)
+          .withDescription("The batch size used for deleting orphan files.");
+
+  public static final String LOCATION = PREFIX + "location";
+  public static final ConfigOption<String> LOCATION_OPTION =
+      ConfigOptions.key(LOCATION)
+          .stringType()
+          .noDefaultValue()
+          .withDescription(
+              "The location to start recursive listing of candidate files for 
removal. "
+                  + "By default, the table location is used.");
+
+  public static final String USE_PREFIX_LISTING = PREFIX + 
"use-prefix-listing";
+  public static final ConfigOption<Boolean> USE_PREFIX_LISTING_OPTION =
+      ConfigOptions.key(USE_PREFIX_LISTING)
+          .booleanType()
+          .defaultValue(true)
+          .withDescription(
+              "Whether to use prefix listing when listing files from the file 
system.");
+
+  public static final String PLANNING_WORKER_POOL_SIZE = PREFIX + 
"planning-worker-pool-size";
+  public static final ConfigOption<Integer> PLANNING_WORKER_POOL_SIZE_OPTION =
+      ConfigOptions.key(PLANNING_WORKER_POOL_SIZE)
+          .intType()
+          .noDefaultValue()
+          .withDescription(
+              "The worker pool size used for planning the scan of the 
ALL_FILES table. "
+                  + "If not set, the shared worker pool is used.");
+
+  public static final String EQUAL_SCHEMES = PREFIX + "equal-schemes";
+  public static final ConfigOption<String> EQUAL_SCHEMES_OPTION =
+      ConfigOptions.key(EQUAL_SCHEMES)
+          .stringType()
+          .noDefaultValue()
+          .withDescription(
+              "Schemes that should be considered equal, in the format 
'scheme1=scheme2,scheme3=scheme4'.");
+
+  public static final String EQUAL_AUTHORITIES = PREFIX + "equal-authorities";
+  public static final ConfigOption<String> EQUAL_AUTHORITIES_OPTION =
+      ConfigOptions.key(EQUAL_AUTHORITIES)
+          .stringType()
+          .noDefaultValue()
+          .withDescription(
+              "Authorities that should be considered equal, in the format 
'auth1=auth2,auth3=auth4'.");
+
+  public static final String PREFIX_MISMATCH_MODE = PREFIX + 
"prefix-mismatch-mode";
+  public static final ConfigOption<String> PREFIX_MISMATCH_MODE_OPTION =
+      ConfigOptions.key(PREFIX_MISMATCH_MODE)
+          .stringType()
+          .defaultValue(PrefixMismatchMode.ERROR.name())
+          .withDescription(
+              "Action behavior when location prefixes (schemes/authorities) 
mismatch. "
+                  + "Valid values: ERROR, IGNORE, DELETE.");
+
+  private final FlinkConfParser confParser;
+
+  public DeleteOrphanFilesConfig(
+      Table table, Map<String, String> writeOptions, ReadableConfig 
readableConfig) {
+    this.confParser = new FlinkConfParser(table, writeOptions, readableConfig);
+  }
+
+  public long scheduleOnIntervalSecond() {
+    return confParser
+        .longConf()
+        .option(SCHEDULE_ON_INTERVAL_SECOND)
+        .flinkConfig(SCHEDULE_ON_INTERVAL_SECOND_OPTION)
+        .defaultValue(SCHEDULE_ON_INTERVAL_SECOND_OPTION.defaultValue())
+        .parse();
+  }
+
+  public long minAgeSeconds() {
+    return confParser
+        .longConf()
+        .option(MIN_AGE_SECONDS)
+        .flinkConfig(MIN_AGE_SECONDS_OPTION)
+        .defaultValue(MIN_AGE_SECONDS_OPTION.defaultValue())
+        .parse();
+  }
+
+  public int deleteBatchSize() {
+    return confParser
+        .intConf()
+        .option(DELETE_BATCH_SIZE)
+        .flinkConfig(DELETE_BATCH_SIZE_OPTION)
+        .defaultValue(DELETE_BATCH_SIZE_OPTION.defaultValue())
+        .parse();
+  }
+
+  public String location() {
+    return 
confParser.stringConf().option(LOCATION).flinkConfig(LOCATION_OPTION).parseOptional();
+  }
+
+  public boolean usePrefixListing() {
+    return confParser
+        .booleanConf()
+        .option(USE_PREFIX_LISTING)
+        .flinkConfig(USE_PREFIX_LISTING_OPTION)
+        .defaultValue(USE_PREFIX_LISTING_OPTION.defaultValue())
+        .parse();
+  }
+
+  public Integer planningWorkerPoolSize() {
+    return confParser
+        .intConf()
+        .option(PLANNING_WORKER_POOL_SIZE)
+        .flinkConfig(PLANNING_WORKER_POOL_SIZE_OPTION)
+        .defaultValue(ThreadPools.WORKER_THREAD_POOL_SIZE)
+        .parse();
+  }
+
+  public Map<String, String> equalSchemes() {
+    String equalSchemes =
+        confParser
+            .stringConf()
+            .option(EQUAL_SCHEMES)
+            .flinkConfig(EQUAL_SCHEMES_OPTION)
+            .parseOptional();
+
+    return equalSchemes != null
+        ? parseKeyValuePairs(equalSchemes)
+        : Maps.newHashMap(DeleteOrphanFiles.DEFAULT_EQUAL_SCHEMES);
+  }
+
+  public Map<String, String> equalAuthorities() {
+    String equalAuthorities =
+        confParser
+            .stringConf()
+            .option(EQUAL_AUTHORITIES)
+            .flinkConfig(EQUAL_AUTHORITIES_OPTION)
+            .parseOptional();
+
+    return equalAuthorities != null ? parseKeyValuePairs(equalAuthorities) : 
Map.of();
+  }
+
+  public PrefixMismatchMode prefixMismatchMode() {
+    String value =
+        confParser
+            .stringConf()
+            .option(PREFIX_MISMATCH_MODE)
+            .flinkConfig(PREFIX_MISMATCH_MODE_OPTION)
+            .defaultValue(PREFIX_MISMATCH_MODE_OPTION.defaultValue())
+            .parse();
+    return PrefixMismatchMode.valueOf(value);
+  }
+
+  private static Map<String, String> parseKeyValuePairs(String value) {
+    Map<String, String> result = Maps.newHashMap();
+    for (String pair : COMMA_SPLITTER.split(value)) {
+      List<String> parts = EQUALS_SPLITTER.splitToList(pair);
+      Preconditions.checkArgument(parts.size() == 2, "Invalid key-value pair: 
%s", pair);
+      result.put(parts.get(0).trim(), parts.get(1).trim());
+    }
+
+    return result;
+  }
+}
diff --git 
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/ExpireSnapshots.java
 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/ExpireSnapshots.java
index 628a911414..c84932f96f 100644
--- 
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/ExpireSnapshots.java
+++ 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/ExpireSnapshots.java
@@ -19,6 +19,7 @@
 package org.apache.iceberg.flink.maintenance.api;
 
 import java.time.Duration;
+import java.util.Optional;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
@@ -107,6 +108,19 @@ public class ExpireSnapshots {
       return this;
     }
 
+    public Builder config(ExpireSnapshotsConfig expireSnapshotsConfig) {
+      return 
this.scheduleOnCommitCount(expireSnapshotsConfig.scheduleOnCommitCount())
+          
.scheduleOnInterval(Duration.ofSeconds(expireSnapshotsConfig.scheduleOnIntervalSecond()))
+          .deleteBatchSize(expireSnapshotsConfig.deleteBatchSize())
+          .maxSnapshotAge(
+              
Optional.ofNullable(expireSnapshotsConfig.maxSnapshotAgeSeconds())
+                  .map(Duration::ofSeconds)
+                  .orElse(null))
+          .retainLast(expireSnapshotsConfig.retainLast())
+          .cleanExpiredMetadata(expireSnapshotsConfig.cleanExpiredMetadata())
+          
.planningWorkerPoolSize(expireSnapshotsConfig.planningWorkerPoolSize());
+    }
+
     @Override
     DataStream<TaskResult> append(DataStream<Trigger> trigger) {
       Preconditions.checkNotNull(tableLoader(), "TableLoader should not be 
null");
diff --git 
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/ExpireSnapshotsConfig.java
 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/ExpireSnapshotsConfig.java
new file mode 100644
index 0000000000..13436975e1
--- /dev/null
+++ 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/ExpireSnapshotsConfig.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.maintenance.api;
+
+import java.util.Map;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.flink.FlinkConfParser;
+import org.apache.iceberg.util.ThreadPools;
+
+public class ExpireSnapshotsConfig {
+  public static final String PREFIX = FlinkMaintenanceConfig.PREFIX + 
"expire-snapshots.";
+
+  public static final String SCHEDULE_ON_COMMIT_COUNT = PREFIX + 
"schedule.commit-count";
+  public static final ConfigOption<Integer> SCHEDULE_ON_COMMIT_COUNT_OPTION =
+      ConfigOptions.key(SCHEDULE_ON_COMMIT_COUNT)
+          .intType()
+          .defaultValue(10)
+          .withDescription(
+              "The number of commits after which to trigger a new expire 
snapshots operation.");
+
+  public static final String SCHEDULE_ON_INTERVAL_SECOND = PREFIX + 
"schedule.interval-second";
+  public static final ConfigOption<Long> SCHEDULE_ON_INTERVAL_SECOND_OPTION =
+      ConfigOptions.key(SCHEDULE_ON_INTERVAL_SECOND)
+          .longType()
+          .defaultValue(60 * 60L) // Default is 1 hour
+          .withDescription(
+              "The time interval (in seconds) between two consecutive expire 
snapshots operations.");
+
+  public static final String MAX_SNAPSHOT_AGE_SECONDS = PREFIX + 
"max-snapshot-age-seconds";
+  public static final ConfigOption<Long> MAX_SNAPSHOT_AGE_SECONDS_OPTION =
+      ConfigOptions.key(MAX_SNAPSHOT_AGE_SECONDS)
+          .longType()
+          .noDefaultValue()
+          .withDescription(
+              "The maximum age (in seconds) of snapshots to retain. "
+                  + "Snapshots older than this will be expired.");
+
+  public static final String RETAIN_LAST = PREFIX + "retain-last";
+  public static final ConfigOption<Integer> RETAIN_LAST_OPTION =
+      ConfigOptions.key(RETAIN_LAST)
+          .intType()
+          .noDefaultValue()
+          .withDescription("The minimum number of snapshots to retain.");
+
+  public static final String DELETE_BATCH_SIZE = PREFIX + "delete-batch-size";
+  public static final ConfigOption<Integer> DELETE_BATCH_SIZE_OPTION =
+      ConfigOptions.key(DELETE_BATCH_SIZE)
+          .intType()
+          .defaultValue(1000)
+          .withDescription("The batch size used for deleting expired files.");
+
+  public static final String CLEAN_EXPIRED_METADATA = PREFIX + 
"clean-expired-metadata";
+  public static final ConfigOption<Boolean> CLEAN_EXPIRED_METADATA_OPTION =
+      ConfigOptions.key(CLEAN_EXPIRED_METADATA)
+          .booleanType()
+          .defaultValue(true)
+          .withDescription(
+              "Whether to clean expired metadata such as partition specs and 
schemas.");
+
+  public static final String PLANNING_WORKER_POOL_SIZE = PREFIX + 
"planning-worker-pool-size";
+  public static final ConfigOption<Integer> PLANNING_WORKER_POOL_SIZE_OPTION =
+      ConfigOptions.key(PLANNING_WORKER_POOL_SIZE)
+          .intType()
+          .noDefaultValue()
+          .withDescription(
+              "The worker pool size used to calculate the files to delete. "
+                  + "If not set, the shared worker pool is used.");
+
+  private final FlinkConfParser confParser;
+
+  public ExpireSnapshotsConfig(
+      Table table, Map<String, String> writeOptions, ReadableConfig 
readableConfig) {
+    this.confParser = new FlinkConfParser(table, writeOptions, readableConfig);
+  }
+
+  public int scheduleOnCommitCount() {
+    return confParser
+        .intConf()
+        .option(SCHEDULE_ON_COMMIT_COUNT)
+        .flinkConfig(SCHEDULE_ON_COMMIT_COUNT_OPTION)
+        .defaultValue(SCHEDULE_ON_COMMIT_COUNT_OPTION.defaultValue())
+        .parse();
+  }
+
+  public long scheduleOnIntervalSecond() {
+    return confParser
+        .longConf()
+        .option(SCHEDULE_ON_INTERVAL_SECOND)
+        .flinkConfig(SCHEDULE_ON_INTERVAL_SECOND_OPTION)
+        .defaultValue(SCHEDULE_ON_INTERVAL_SECOND_OPTION.defaultValue())
+        .parse();
+  }
+
+  public Long maxSnapshotAgeSeconds() {
+    return confParser
+        .longConf()
+        .option(MAX_SNAPSHOT_AGE_SECONDS)
+        .flinkConfig(MAX_SNAPSHOT_AGE_SECONDS_OPTION)
+        .parseOptional();
+  }
+
+  public Integer retainLast() {
+    return 
confParser.intConf().option(RETAIN_LAST).flinkConfig(RETAIN_LAST_OPTION).parseOptional();
+  }
+
+  public int deleteBatchSize() {
+    return confParser
+        .intConf()
+        .option(DELETE_BATCH_SIZE)
+        .flinkConfig(DELETE_BATCH_SIZE_OPTION)
+        .defaultValue(DELETE_BATCH_SIZE_OPTION.defaultValue())
+        .parse();
+  }
+
+  public Boolean cleanExpiredMetadata() {
+    return confParser
+        .booleanConf()
+        .option(CLEAN_EXPIRED_METADATA)
+        .flinkConfig(CLEAN_EXPIRED_METADATA_OPTION)
+        .defaultValue(CLEAN_EXPIRED_METADATA_OPTION.defaultValue())
+        .parse();
+  }
+
+  public Integer planningWorkerPoolSize() {
+    return confParser
+        .intConf()
+        .option(PLANNING_WORKER_POOL_SIZE)
+        .flinkConfig(PLANNING_WORKER_POOL_SIZE_OPTION)
+        .defaultValue(ThreadPools.WORKER_THREAD_POOL_SIZE)
+        .parse();
+  }
+}
diff --git 
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/FlinkMaintenanceConfig.java
 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/FlinkMaintenanceConfig.java
index 0c88abf820..e6f536273e 100644
--- 
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/FlinkMaintenanceConfig.java
+++ 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/FlinkMaintenanceConfig.java
@@ -122,6 +122,14 @@ public class FlinkMaintenanceConfig {
     return new RewriteDataFilesConfig(table, writeProperties, readableConfig);
   }
 
+  public ExpireSnapshotsConfig createExpireSnapshotsConfig() {
+    return new ExpireSnapshotsConfig(table, writeProperties, readableConfig);
+  }
+
+  public DeleteOrphanFilesConfig createDeleteOrphanFilesConfig() {
+    return new DeleteOrphanFilesConfig(table, writeProperties, readableConfig);
+  }
+
   public LockConfig createLockConfig() {
     return new LockConfig(table, writeProperties, readableConfig);
   }
diff --git 
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/TableMaintenance.java
 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/TableMaintenance.java
index ab5159be12..d98a1c9ab4 100644
--- 
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/TableMaintenance.java
+++ 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/TableMaintenance.java
@@ -20,6 +20,7 @@ package org.apache.iceberg.flink.maintenance.api;
 
 import java.io.IOException;
 import java.time.Duration;
+import java.util.Collection;
 import java.util.List;
 import java.util.Locale;
 import java.util.UUID;
@@ -252,6 +253,17 @@ public class TableMaintenance {
       return this;
     }
 
+    /**
+     * Adds multiple tasks with the given schedules.
+     *
+     * @param tasks to add
+     */
+    public Builder add(Collection<MaintenanceTaskBuilder<?>> tasks) {
+      Preconditions.checkNotNull(tasks, "Tasks collection should not be null");
+      taskBuilders.addAll(tasks);
+      return this;
+    }
+
     /** Builds the task graph for the maintenance tasks. */
     public void append() throws IOException {
       Preconditions.checkArgument(!taskBuilders.isEmpty(), "Provide at least 
one task");
diff --git 
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergCommitter.java
 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergCommitter.java
index c05e7d9180..8e45a2db30 100644
--- 
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergCommitter.java
+++ 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergCommitter.java
@@ -78,7 +78,7 @@ class IcebergCommitter implements 
Committer<IcebergCommittable> {
   private int maxContinuousEmptyCommits;
   private ExecutorService workerPool;
   private int continuousEmptyCheckpoints = 0;
-  private boolean compactMode = false;
+  private final boolean tableMaintenanceEnabled;
 
   IcebergCommitter(
       TableLoader tableLoader,
@@ -88,7 +88,7 @@ class IcebergCommitter implements 
Committer<IcebergCommittable> {
       int workerPoolSize,
       String sinkId,
       IcebergFilesCommitterMetrics committerMetrics,
-      boolean compactMode) {
+      boolean tableMaintenanceEnabled) {
     this.branch = branch;
     this.snapshotProperties = snapshotProperties;
     this.replacePartitions = replacePartitions;
@@ -107,7 +107,7 @@ class IcebergCommitter implements 
Committer<IcebergCommittable> {
         ThreadPools.newFixedThreadPool(
             "iceberg-committer-pool-" + table.name() + "-" + sinkId, 
workerPoolSize);
     this.continuousEmptyCheckpoints = 0;
-    this.compactMode = compactMode;
+    this.tableMaintenanceEnabled = tableMaintenanceEnabled;
   }
 
   @Override
@@ -177,7 +177,7 @@ class IcebergCommitter implements 
Committer<IcebergCommittable> {
       committerMetrics.updateCommitSummary(summary);
     }
 
-    if (!compactMode) {
+    if (!tableMaintenanceEnabled) {
       FlinkManifestUtil.deleteCommittedManifests(table, manifests, 
newFlinkJobId, checkpointId);
     }
   }
diff --git 
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSink.java
 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSink.java
index dd20853f05..0d3e4a34d9 100644
--- 
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSink.java
+++ 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSink.java
@@ -72,8 +72,13 @@ import org.apache.iceberg.flink.FlinkSchemaUtil;
 import org.apache.iceberg.flink.FlinkWriteConf;
 import org.apache.iceberg.flink.FlinkWriteOptions;
 import org.apache.iceberg.flink.TableLoader;
+import org.apache.iceberg.flink.maintenance.api.DeleteOrphanFiles;
+import org.apache.iceberg.flink.maintenance.api.DeleteOrphanFilesConfig;
+import org.apache.iceberg.flink.maintenance.api.ExpireSnapshots;
+import org.apache.iceberg.flink.maintenance.api.ExpireSnapshotsConfig;
 import org.apache.iceberg.flink.maintenance.api.FlinkMaintenanceConfig;
 import org.apache.iceberg.flink.maintenance.api.LockConfig;
+import org.apache.iceberg.flink.maintenance.api.MaintenanceTaskBuilder;
 import org.apache.iceberg.flink.maintenance.api.RewriteDataFiles;
 import org.apache.iceberg.flink.maintenance.api.RewriteDataFilesConfig;
 import org.apache.iceberg.flink.maintenance.api.TableMaintenance;
@@ -87,6 +92,7 @@ import org.apache.iceberg.flink.sink.shuffle.StatisticsType;
 import org.apache.iceberg.flink.util.FlinkCompatibilityUtil;
 import org.apache.iceberg.io.WriteResult;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.apache.iceberg.relocated.com.google.common.collect.Sets;
 import org.apache.iceberg.types.TypeUtil;
@@ -158,14 +164,15 @@ public class IcebergSink
   private final String branch;
   private final boolean overwriteMode;
   private final int workerPoolSize;
-  private final boolean compactMode;
-  private final transient FlinkMaintenanceConfig flinkMaintenanceConfig;
-
+  private final boolean maintenanceEnabled;
   private final Table table;
   // This should only be used for logging/error messages. For any actual logic 
always use
   // equalityFieldIds instead.
   private final Set<String> equalityFieldColumns;
 
+  private final transient List<MaintenanceTaskBuilder<?>> maintenanceTasks;
+  private final transient FlinkMaintenanceConfig flinkMaintenanceConfig;
+
   private IcebergSink(
       TableLoader tableLoader,
       Table table,
@@ -178,6 +185,7 @@ public class IcebergSink
       Set<Integer> equalityFieldIds,
       String branch,
       boolean overwriteMode,
+      List<MaintenanceTaskBuilder<?>> maintenanceTasks,
       FlinkMaintenanceConfig flinkMaintenanceConfig,
       Set<String> equalityFieldColumns) {
     this.tableLoader = tableLoader;
@@ -199,7 +207,8 @@ public class IcebergSink
     // This is used to separate files generated by different sinks writing the 
same table.
     // Also used to generate the aggregator operator name
     this.sinkId = UUID.randomUUID().toString();
-    this.compactMode = flinkWriteConf.compactMode();
+    this.maintenanceEnabled = !maintenanceTasks.isEmpty();
+    this.maintenanceTasks = maintenanceTasks;
     this.flinkMaintenanceConfig = flinkMaintenanceConfig;
     this.equalityFieldColumns = equalityFieldColumns;
   }
@@ -238,7 +247,7 @@ public class IcebergSink
         workerPoolSize,
         sinkId,
         metrics,
-        compactMode);
+        maintenanceEnabled);
   }
 
   @Override
@@ -250,7 +259,7 @@ public class IcebergSink
   public void addPostCommitTopology(
       DataStream<CommittableMessage<IcebergCommittable>> committables) {
 
-    if (!compactMode) {
+    if (maintenanceTasks.isEmpty()) {
       return;
     }
 
@@ -264,26 +273,21 @@ public class IcebergSink
             .uid(postCommitUid)
             .forceNonParallel();
     try {
-      RewriteDataFilesConfig rewriteDataFilesConfig =
-          flinkMaintenanceConfig.createRewriteDataFilesConfig();
-      RewriteDataFiles.Builder rewriteBuilder =
-          RewriteDataFiles.builder().config(rewriteDataFilesConfig);
-
       LockConfig lockConfig = flinkMaintenanceConfig.createLockConfig();
       String tableMaintenanceUid = String.format("TableMaintenance : %s", 
suffix);
-      TableMaintenance.Builder builder =
-          StringUtils.isNotEmpty(lockConfig.lockType())
-              ? TableMaintenance.forChangeStream(
-                      tableChangeStream,
-                      tableLoader,
-                      LockFactoryBuilder.build(lockConfig, table.name()))
-                  .uidSuffix(tableMaintenanceUid)
-                  .add(rewriteBuilder)
-              : TableMaintenance.forChangeStream(tableChangeStream, 
tableLoader)
-                  .uidSuffix(tableMaintenanceUid)
-                  .add(rewriteBuilder);
+
+      TableMaintenance.Builder builder;
+      if (StringUtils.isNotEmpty(lockConfig.lockType())) {
+        builder =
+            TableMaintenance.forChangeStream(
+                tableChangeStream, tableLoader, 
LockFactoryBuilder.build(lockConfig, table.name()));
+      } else {
+        builder = TableMaintenance.forChangeStream(tableChangeStream, 
tableLoader);
+      }
 
       builder
+          .uidSuffix(tableMaintenanceUid)
+          .add(maintenanceTasks)
           .rateLimit(Duration.ofSeconds(flinkMaintenanceConfig.rateLimit()))
           
.lockCheckDelay(Duration.ofSeconds(flinkMaintenanceConfig.lockCheckDelay()))
           .slotSharingGroup(flinkMaintenanceConfig.slotSharingGroup())
@@ -344,6 +348,7 @@ public class IcebergSink
     private final Map<String, String> snapshotSummary = Maps.newHashMap();
     private ReadableConfig readableConfig = new Configuration();
     private List<String> equalityFieldColumns = null;
+    private final List<MaintenanceTaskBuilder<?>> maintenanceTasks = 
Lists.newArrayList();
 
     private Builder() {}
 
@@ -626,6 +631,85 @@ public class IcebergSink
       return this;
     }
 
+    /**
+     * Enables or disables compaction (rewriting data files) as a post-commit 
maintenance task.
+     *
+     * @param enabled whether to enable compaction
+     * @see RewriteDataFilesConfig for the default config.
+     * @deprecated See {@code rewriteDatafiles(..)}
+     */
+    @Deprecated
+    public Builder compaction(boolean enabled) {
+      writeOptions.put(FlinkWriteOptions.COMPACTION_ENABLE.key(), 
Boolean.toString(enabled));
+      return this;
+    }
+
+    /**
+     * Enables rewriting data files (compaction) as a post-commit maintenance 
task.
+     *
+     * @see RewriteDataFilesConfig for the default config.
+     */
+    public Builder rewriteDataFiles() {
+      writeOptions.put(FlinkWriteOptions.COMPACTION_ENABLE.key(), "true");
+      return this;
+    }
+
+    /**
+     * Enables rewriting data files (compaction) as a post-commit maintenance 
task.
+     *
+     * @param config task-specific configuration, see {@link 
RewriteDataFilesConfig} for available
+     *     keys
+     */
+    public Builder rewriteDataFiles(Map<String, String> config) {
+      rewriteDataFiles();
+      writeOptions.putAll(config);
+      return this;
+    }
+
+    /**
+     * Enables expire snapshots as a post-commit maintenance task.
+     *
+     * @see ExpireSnapshotsConfig for the default config.
+     */
+    public Builder expireSnapshots() {
+      writeOptions.put(FlinkWriteOptions.EXPIRE_SNAPSHOTS_ENABLE.key(), 
"true");
+      return this;
+    }
+
+    /**
+     * Enables or disables expire snapshots as a post-commit maintenance task.
+     *
+     * @param config task-specific configuration, see {@link 
ExpireSnapshotsConfig} for available
+     *     keys
+     */
+    public Builder expireSnapshots(Map<String, String> config) {
+      expireSnapshots();
+      writeOptions.putAll(config);
+      return this;
+    }
+
+    /**
+     * Enables delete orphan files as a post-commit maintenance task.
+     *
+     * @see DeleteOrphanFilesConfig for the default config.
+     */
+    public Builder deleteOrphanFiles() {
+      writeOptions.put(FlinkWriteOptions.DELETE_ORPHAN_FILES_ENABLE.key(), 
"true");
+      return this;
+    }
+
+    /**
+     * Enables delete orphan files as a post-commit maintenance task.
+     *
+     * @param config task-specific configuration, see {@link 
DeleteOrphanFilesConfig} for available
+     *     keys.
+     */
+    public Builder deleteOrphanFiles(Map<String, String> config) {
+      deleteOrphanFiles();
+      writeOptions.putAll(config);
+      return this;
+    }
+
     @Override
     public Builder toBranch(String branch) {
       writeOptions.put(FlinkWriteOptions.BRANCH.key(), branch);
@@ -682,6 +766,24 @@ public class IcebergSink
       FlinkMaintenanceConfig flinkMaintenanceConfig =
           new FlinkMaintenanceConfig(table, writeOptions, readableConfig);
 
+      if (flinkWriteConf.compactMode()) {
+        RewriteDataFilesConfig rewriteDataFilesConfig =
+            flinkMaintenanceConfig.createRewriteDataFilesConfig();
+        
maintenanceTasks.add(RewriteDataFiles.builder().config(rewriteDataFilesConfig));
+      }
+
+      if (flinkWriteConf.expireSnapshotsMode()) {
+        ExpireSnapshotsConfig expireSnapshotsConfig =
+            flinkMaintenanceConfig.createExpireSnapshotsConfig();
+        
maintenanceTasks.add(ExpireSnapshots.builder().config(expireSnapshotsConfig));
+      }
+
+      if (flinkWriteConf.deleteOrphanFilesMode()) {
+        DeleteOrphanFilesConfig deleteOrphanFilesConfig =
+            flinkMaintenanceConfig.createDeleteOrphanFilesConfig();
+        
maintenanceTasks.add(DeleteOrphanFiles.builder().config(deleteOrphanFilesConfig));
+      }
+
       Set<String> equalityFieldColumnsSet =
           equalityFieldColumns != null ? Sets.newHashSet(equalityFieldColumns) 
: null;
 
@@ -699,6 +801,7 @@ public class IcebergSink
           equalityFieldIds,
           flinkWriteConf.branch(),
           overwriteMode,
+          maintenanceTasks,
           flinkMaintenanceConfig,
           equalityFieldColumnsSet);
     }
diff --git 
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestDeleteOrphanFilesConfig.java
 
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestDeleteOrphanFilesConfig.java
new file mode 100644
index 0000000000..f26f608ffa
--- /dev/null
+++ 
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestDeleteOrphanFilesConfig.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.maintenance.api;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.util.Map;
+import org.apache.flink.configuration.Configuration;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.actions.DeleteOrphanFiles.PrefixMismatchMode;
+import org.apache.iceberg.flink.maintenance.operator.OperatorTestBase;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.util.ThreadPools;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+public class TestDeleteOrphanFilesConfig extends OperatorTestBase {
+  private Table table;
+  private Map<String, String> input = Maps.newHashMap();
+
+  @BeforeEach
+  public void before() {
+    this.table = createTable();
+    input.put(DeleteOrphanFilesConfig.SCHEDULE_ON_INTERVAL_SECOND, "60");
+    input.put(DeleteOrphanFilesConfig.MIN_AGE_SECONDS, "86400");
+    input.put(DeleteOrphanFilesConfig.DELETE_BATCH_SIZE, "500");
+    input.put(DeleteOrphanFilesConfig.LOCATION, "/tmp/test-location");
+    input.put(DeleteOrphanFilesConfig.USE_PREFIX_LISTING, "true");
+    input.put(DeleteOrphanFilesConfig.PLANNING_WORKER_POOL_SIZE, "4");
+    input.put(DeleteOrphanFilesConfig.EQUAL_SCHEMES, "s3n=s3,s3a=s3");
+    input.put(DeleteOrphanFilesConfig.EQUAL_AUTHORITIES, "auth1=auth2");
+    input.put(DeleteOrphanFilesConfig.PREFIX_MISMATCH_MODE, "IGNORE");
+    input.put("other.config", "should-be-ignored");
+  }
+
+  @AfterEach
+  public void after() {
+    input.clear();
+  }
+
+  @Test
+  void testConfigParsing() {
+    DeleteOrphanFilesConfig config = new DeleteOrphanFilesConfig(table, input, 
new Configuration());
+
+    assertThat(config.scheduleOnIntervalSecond()).isEqualTo(60);
+    assertThat(config.minAgeSeconds()).isEqualTo(86400L);
+    assertThat(config.deleteBatchSize()).isEqualTo(500);
+    assertThat(config.location()).isEqualTo("/tmp/test-location");
+    assertThat(config.usePrefixListing()).isTrue();
+    assertThat(config.planningWorkerPoolSize()).isEqualTo(4);
+    assertThat(config.equalSchemes()).containsEntry("s3n", 
"s3").containsEntry("s3a", "s3");
+    assertThat(config.equalAuthorities()).containsEntry("auth1", "auth2");
+    
assertThat(config.prefixMismatchMode()).isEqualTo(PrefixMismatchMode.IGNORE);
+  }
+
+  @Test
+  void testConfigDefaults() {
+    DeleteOrphanFilesConfig config =
+        new DeleteOrphanFilesConfig(table, Maps.newHashMap(), new 
Configuration());
+
+    assertThat(config.scheduleOnIntervalSecond())
+        
.isEqualTo(DeleteOrphanFilesConfig.SCHEDULE_ON_INTERVAL_SECOND_OPTION.defaultValue());
+    assertThat(config.minAgeSeconds())
+        
.isEqualTo(DeleteOrphanFilesConfig.MIN_AGE_SECONDS_OPTION.defaultValue());
+    assertThat(config.deleteBatchSize())
+        
.isEqualTo(DeleteOrphanFilesConfig.DELETE_BATCH_SIZE_OPTION.defaultValue());
+    assertThat(config.location()).isNull();
+    assertThat(config.usePrefixListing()).isTrue();
+    
assertThat(config.planningWorkerPoolSize()).isEqualTo(ThreadPools.WORKER_THREAD_POOL_SIZE);
+    assertThat(config.equalSchemes()).containsEntry("s3n", 
"s3").containsEntry("s3a", "s3");
+    assertThat(config.equalAuthorities()).isEqualTo(Map.of());
+    
assertThat(config.prefixMismatchMode()).isEqualTo(PrefixMismatchMode.ERROR);
+  }
+}
diff --git 
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestExpireSnapshotsConfig.java
 
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestExpireSnapshotsConfig.java
new file mode 100644
index 0000000000..3bcec8114b
--- /dev/null
+++ 
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestExpireSnapshotsConfig.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.maintenance.api;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.util.Map;
+import org.apache.flink.configuration.Configuration;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.flink.maintenance.operator.OperatorTestBase;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.util.ThreadPools;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+public class TestExpireSnapshotsConfig extends OperatorTestBase {
+  private Table table;
+  private Map<String, String> input = Maps.newHashMap();
+
+  @BeforeEach
+  public void before() {
+    this.table = createTable();
+    input.put(ExpireSnapshotsConfig.SCHEDULE_ON_COMMIT_COUNT, "10");
+    input.put(ExpireSnapshotsConfig.SCHEDULE_ON_INTERVAL_SECOND, "60");
+    input.put(ExpireSnapshotsConfig.MAX_SNAPSHOT_AGE_SECONDS, "7200");
+    input.put(ExpireSnapshotsConfig.RETAIN_LAST, "5");
+    input.put(ExpireSnapshotsConfig.DELETE_BATCH_SIZE, "500");
+    input.put(ExpireSnapshotsConfig.CLEAN_EXPIRED_METADATA, "true");
+    input.put(ExpireSnapshotsConfig.PLANNING_WORKER_POOL_SIZE, "4");
+    input.put("other.config", "should-be-ignored");
+  }
+
+  @AfterEach
+  public void after() {
+    input.clear();
+  }
+
+  @Test
+  void testConfigParsing() {
+    ExpireSnapshotsConfig config = new ExpireSnapshotsConfig(table, input, new 
Configuration());
+
+    assertThat(config.scheduleOnCommitCount()).isEqualTo(10);
+    assertThat(config.scheduleOnIntervalSecond()).isEqualTo(60);
+    assertThat(config.maxSnapshotAgeSeconds()).isEqualTo(7200L);
+    assertThat(config.retainLast()).isEqualTo(5);
+    assertThat(config.deleteBatchSize()).isEqualTo(500);
+    assertThat(config.cleanExpiredMetadata()).isTrue();
+    assertThat(config.planningWorkerPoolSize()).isEqualTo(4);
+  }
+
+  @Test
+  void testConfigDefaults() {
+    ExpireSnapshotsConfig config =
+        new ExpireSnapshotsConfig(table, Maps.newHashMap(), new 
Configuration());
+
+    assertThat(config.scheduleOnCommitCount())
+        
.isEqualTo(ExpireSnapshotsConfig.SCHEDULE_ON_COMMIT_COUNT_OPTION.defaultValue());
+    assertThat(config.scheduleOnIntervalSecond())
+        
.isEqualTo(ExpireSnapshotsConfig.SCHEDULE_ON_INTERVAL_SECOND_OPTION.defaultValue());
+    assertThat(config.maxSnapshotAgeSeconds()).isNull();
+    assertThat(config.retainLast()).isNull();
+    assertThat(config.deleteBatchSize())
+        
.isEqualTo(ExpireSnapshotsConfig.DELETE_BATCH_SIZE_OPTION.defaultValue());
+    assertThat(config.cleanExpiredMetadata()).isTrue();
+    
assertThat(config.planningWorkerPoolSize()).isEqualTo(ThreadPools.WORKER_THREAD_POOL_SIZE);
+  }
+}
diff --git 
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergSinkCompact.java
 
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergSinkTableMaintenance.java
similarity index 53%
rename from 
flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergSinkCompact.java
rename to 
flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergSinkTableMaintenance.java
index 8042fe6e0f..5c926d7c25 100644
--- 
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergSinkCompact.java
+++ 
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergSinkTableMaintenance.java
@@ -42,6 +42,8 @@ import org.apache.iceberg.flink.FlinkWriteOptions;
 import org.apache.iceberg.flink.MiniFlinkClusterExtension;
 import org.apache.iceberg.flink.SimpleDataUtil;
 import org.apache.iceberg.flink.TestFixtures;
+import org.apache.iceberg.flink.maintenance.api.DeleteOrphanFilesConfig;
+import org.apache.iceberg.flink.maintenance.api.ExpireSnapshotsConfig;
 import org.apache.iceberg.flink.maintenance.api.LockConfig;
 import org.apache.iceberg.flink.maintenance.api.RewriteDataFilesConfig;
 import org.apache.iceberg.flink.util.FlinkCompatibilityUtil;
@@ -51,7 +53,7 @@ import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.FieldSource;
 
-class TestIcebergSinkCompact extends TestFlinkIcebergSinkBase {
+class TestIcebergSinkTableMaintenance extends TestFlinkIcebergSinkBase {
   private static final String[] LOCK_TYPES = new String[] 
{LockConfig.JdbcLockConfig.JDBC, ""};
 
   private Map<String, String> flinkConf;
@@ -59,9 +61,6 @@ class TestIcebergSinkCompact extends TestFlinkIcebergSinkBase 
{
   @BeforeEach
   void before() throws IOException {
     this.flinkConf = Maps.newHashMap();
-    flinkConf.put(FlinkWriteOptions.COMPACTION_ENABLE.key(), "true");
-    flinkConf.put(RewriteDataFilesConfig.SCHEDULE_ON_DATA_FILE_SIZE, "1");
-    flinkConf.put(RewriteDataFilesConfig.PREFIX + 
SizeBasedFileRewritePlanner.REWRITE_ALL, "true");
 
     table =
         CATALOG_EXTENSION
@@ -84,6 +83,7 @@ class TestIcebergSinkCompact extends TestFlinkIcebergSinkBase 
{
   @FieldSource("LOCK_TYPES")
   public void testCompactFileE2e(String lockType) throws Exception {
     setupLockConfig(lockType);
+    setupCompactionConfig();
     List<Row> rows = Lists.newArrayList(Row.of(1, "hello"), Row.of(2, 
"world"), Row.of(3, "foo"));
     DataStream<RowData> dataStream =
         env.addSource(createBoundedSource(rows), ROW_TYPE_INFO)
@@ -124,6 +124,7 @@ class TestIcebergSinkCompact extends 
TestFlinkIcebergSinkBase {
   @FieldSource("LOCK_TYPES")
   public void testTableMaintenanceOperatorAdded(String lockType) {
     setupLockConfig(lockType);
+    setupCompactionConfig();
     List<Row> rows = Lists.newArrayList(Row.of(1, "hello"), Row.of(2, 
"world"), Row.of(3, "foo"));
     DataStream<RowData> dataStream =
         env.addSource(createBoundedSource(rows), ROW_TYPE_INFO)
@@ -147,6 +148,22 @@ class TestIcebergSinkCompact extends 
TestFlinkIcebergSinkBase {
     assertThat(containRewrite).isTrue();
   }
 
+  private void setupCompactionConfig() {
+    flinkConf.put(FlinkWriteOptions.COMPACTION_ENABLE.key(), "true");
+    flinkConf.put(RewriteDataFilesConfig.SCHEDULE_ON_DATA_FILE_SIZE, "1");
+    flinkConf.put(RewriteDataFilesConfig.PREFIX + 
SizeBasedFileRewritePlanner.REWRITE_ALL, "true");
+  }
+
+  private void setupExpireSnapshotsConfig() {
+    flinkConf.put(FlinkWriteOptions.EXPIRE_SNAPSHOTS_ENABLE.key(), "true");
+    flinkConf.put(ExpireSnapshotsConfig.RETAIN_LAST, "5");
+  }
+
+  private void setupDeleteOrphanFilesConfig() {
+    flinkConf.put(FlinkWriteOptions.DELETE_ORPHAN_FILES_ENABLE.key(), "true");
+    flinkConf.put(DeleteOrphanFilesConfig.MIN_AGE_SECONDS, "86400");
+  }
+
   private void setupLockConfig(String lockType) {
     if (lockType.equals(LockConfig.JdbcLockConfig.JDBC)) {
       flinkConf.put(LockConfig.LOCK_TYPE_OPTION.key(), 
LockConfig.JdbcLockConfig.JDBC);
@@ -159,4 +176,146 @@ class TestIcebergSinkCompact extends 
TestFlinkIcebergSinkBase {
       flinkConf.put(LockConfig.LOCK_TYPE_OPTION.key(), "");
     }
   }
+
+  @ParameterizedTest(name = "lockType = {0}")
+  @FieldSource("LOCK_TYPES")
+  public void testExpireSnapshotsEnabled(String lockType) {
+    setupLockConfig(lockType);
+    setupExpireSnapshotsConfig();
+
+    List<Row> rows = Lists.newArrayList(Row.of(1, "hello"), Row.of(2, 
"world"), Row.of(3, "foo"));
+    DataStream<RowData> dataStream =
+        env.addSource(createBoundedSource(rows), ROW_TYPE_INFO)
+            .map(CONVERTER::toInternal, 
FlinkCompatibilityUtil.toTypeInfo(SimpleDataUtil.ROW_TYPE));
+
+    IcebergSink.forRowData(dataStream)
+        .table(table)
+        .tableLoader(tableLoader)
+        .setAll(flinkConf)
+        .append();
+
+    StreamGraph streamGraph = env.getStreamGraph();
+    boolean containExpire = false;
+    for (JobVertex vertex : streamGraph.getJobGraph().getVertices()) {
+      if (vertex.getName().contains("Expire")) {
+        containExpire = true;
+        break;
+      }
+    }
+
+    assertThat(containExpire).isTrue();
+  }
+
+  @ParameterizedTest(name = "lockType = {0}")
+  @FieldSource("LOCK_TYPES")
+  public void testDeleteOrphanFilesEnabled(String lockType) {
+    setupLockConfig(lockType);
+    setupDeleteOrphanFilesConfig();
+
+    List<Row> rows = Lists.newArrayList(Row.of(1, "hello"), Row.of(2, 
"world"), Row.of(3, "foo"));
+    DataStream<RowData> dataStream =
+        env.addSource(createBoundedSource(rows), ROW_TYPE_INFO)
+            .map(CONVERTER::toInternal, 
FlinkCompatibilityUtil.toTypeInfo(SimpleDataUtil.ROW_TYPE));
+
+    IcebergSink.forRowData(dataStream)
+        .table(table)
+        .tableLoader(tableLoader)
+        .setAll(flinkConf)
+        .append();
+
+    StreamGraph streamGraph = env.getStreamGraph();
+    boolean containOrphan = false;
+    for (JobVertex vertex : streamGraph.getJobGraph().getVertices()) {
+      if (vertex.getName().contains("Orphan")) {
+        containOrphan = true;
+        break;
+      }
+    }
+
+    assertThat(containOrphan).isTrue();
+  }
+
+  @ParameterizedTest(name = "lockType = {0}")
+  @FieldSource("LOCK_TYPES")
+  public void testAllMaintenanceTasksCombined(String lockType) {
+    setupLockConfig(lockType);
+    setupCompactionConfig();
+    setupExpireSnapshotsConfig();
+    setupDeleteOrphanFilesConfig();
+
+    List<Row> rows = Lists.newArrayList(Row.of(1, "hello"), Row.of(2, 
"world"), Row.of(3, "foo"));
+    DataStream<RowData> dataStream =
+        env.addSource(createBoundedSource(rows), ROW_TYPE_INFO)
+            .map(CONVERTER::toInternal, 
FlinkCompatibilityUtil.toTypeInfo(SimpleDataUtil.ROW_TYPE));
+
+    IcebergSink.forRowData(dataStream)
+        .table(table)
+        .tableLoader(tableLoader)
+        .setAll(flinkConf)
+        .append();
+
+    StreamGraph streamGraph = env.getStreamGraph();
+    boolean containRewrite = false;
+    boolean containExpire = false;
+    boolean containOrphan = false;
+    for (JobVertex vertex : streamGraph.getJobGraph().getVertices()) {
+      if (vertex.getName().contains("Rewrite")) {
+        containRewrite = true;
+      }
+
+      if (vertex.getName().contains("Expire")) {
+        containExpire = true;
+      }
+
+      if (vertex.getName().contains("Orphan")) {
+        containOrphan = true;
+      }
+    }
+
+    assertThat(containRewrite).isTrue();
+    assertThat(containExpire).isTrue();
+    assertThat(containOrphan).isTrue();
+  }
+
+  @ParameterizedTest(name = "lockType = {0}")
+  @FieldSource("LOCK_TYPES")
+  public void testAllMaintenanceE2e(String lockType) throws Exception {
+    setupLockConfig(lockType);
+
+    Map<String, String> compactionConfig = Maps.newHashMap();
+    compactionConfig.put(RewriteDataFilesConfig.SCHEDULE_ON_DATA_FILE_SIZE, 
"1");
+    compactionConfig.put(
+        RewriteDataFilesConfig.PREFIX + 
SizeBasedFileRewritePlanner.REWRITE_ALL, "true");
+
+    Map<String, String> expireConfig = Maps.newHashMap();
+    expireConfig.put(ExpireSnapshotsConfig.RETAIN_LAST, "5");
+
+    Map<String, String> orphanConfig = Maps.newHashMap();
+    orphanConfig.put(DeleteOrphanFilesConfig.MIN_AGE_SECONDS, "86400");
+
+    List<Row> rows = Lists.newArrayList(Row.of(1, "hello"), Row.of(2, 
"world"), Row.of(3, "foo"));
+    DataStream<RowData> dataStream =
+        env.addSource(createBoundedSource(rows), ROW_TYPE_INFO)
+            .map(CONVERTER::toInternal, 
FlinkCompatibilityUtil.toTypeInfo(SimpleDataUtil.ROW_TYPE));
+
+    IcebergSink.forRowData(dataStream)
+        .setAll(flinkConf)
+        .table(table)
+        .tableLoader(tableLoader)
+        .rewriteDataFiles(compactionConfig)
+        .expireSnapshots(expireConfig)
+        .deleteOrphanFiles(orphanConfig)
+        .append();
+
+    env.execute("Test All Maintenance E2E");
+
+    table.refresh();
+    // Compaction should have merged the 3 data files into 1
+    List<DataFile> afterCompactDataFiles = 
getDataFiles(table.currentSnapshot(), table);
+    assertThat(afterCompactDataFiles).hasSize(1);
+
+    List<DataFile> preCompactDataFiles =
+        getDataFiles(table.snapshot(table.currentSnapshot().parentId()), 
table);
+    assertThat(preCompactDataFiles).hasSize(3);
+  }
 }
diff --git 
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteConf.java 
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteConf.java
index 66fd098077..990d23f2aa 100644
--- 
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteConf.java
+++ 
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteConf.java
@@ -220,6 +220,24 @@ public class FlinkWriteConf {
     return 
confParser.intConf().option(FlinkWriteOptions.WRITE_PARALLELISM.key()).parseOptional();
   }
 
+  public boolean expireSnapshotsMode() {
+    return confParser
+        .booleanConf()
+        .option(FlinkWriteOptions.EXPIRE_SNAPSHOTS_ENABLE.key())
+        .flinkConfig(FlinkWriteOptions.EXPIRE_SNAPSHOTS_ENABLE)
+        .defaultValue(FlinkWriteOptions.EXPIRE_SNAPSHOTS_ENABLE.defaultValue())
+        .parse();
+  }
+
+  public boolean deleteOrphanFilesMode() {
+    return confParser
+        .booleanConf()
+        .option(FlinkWriteOptions.DELETE_ORPHAN_FILES_ENABLE.key())
+        .flinkConfig(FlinkWriteOptions.DELETE_ORPHAN_FILES_ENABLE)
+        
.defaultValue(FlinkWriteOptions.DELETE_ORPHAN_FILES_ENABLE.defaultValue())
+        .parse();
+  }
+
   public boolean compactMode() {
     return confParser
         .booleanConf()
diff --git 
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteOptions.java
 
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteOptions.java
index e68e64ac57..ee2aeaa450 100644
--- 
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteOptions.java
+++ 
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteOptions.java
@@ -23,6 +23,9 @@ import org.apache.flink.annotation.Experimental;
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.ConfigOptions;
 import org.apache.iceberg.SnapshotRef;
+import org.apache.iceberg.flink.maintenance.api.DeleteOrphanFilesConfig;
+import org.apache.iceberg.flink.maintenance.api.ExpireSnapshotsConfig;
+import org.apache.iceberg.flink.maintenance.api.RewriteDataFilesConfig;
 import org.apache.iceberg.flink.sink.shuffle.StatisticsType;
 
 /** Flink sink write options */
@@ -82,7 +85,18 @@ public class FlinkWriteOptions {
       ConfigOptions.key("write-parallelism").intType().noDefaultValue();
 
   public static final ConfigOption<Boolean> COMPACTION_ENABLE =
-      
ConfigOptions.key("compaction-enabled").booleanType().defaultValue(false);
+      ConfigOptions.key(RewriteDataFilesConfig.PREFIX + "enabled")
+          .booleanType()
+          .defaultValue(false)
+          .withDeprecatedKeys("compaction-enabled");
+
+  public static final ConfigOption<Boolean> EXPIRE_SNAPSHOTS_ENABLE =
+      ConfigOptions.key(ExpireSnapshotsConfig.PREFIX + 
"enabled").booleanType().defaultValue(false);
+
+  public static final ConfigOption<Boolean> DELETE_ORPHAN_FILES_ENABLE =
+      ConfigOptions.key(DeleteOrphanFilesConfig.PREFIX + "enabled")
+          .booleanType()
+          .defaultValue(false);
 
   @Experimental
   public static final ConfigOption<Duration> TABLE_REFRESH_INTERVAL =
diff --git 
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/DeleteOrphanFiles.java
 
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/DeleteOrphanFiles.java
index 2fce5e0b3e..63a267d16e 100644
--- 
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/DeleteOrphanFiles.java
+++ 
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/DeleteOrphanFiles.java
@@ -53,6 +53,11 @@ public class DeleteOrphanFiles {
       ScanContext.builder().streaming(true).project(FILE_PATH_SCHEMA).build();
   private static final Splitter COMMA_SPLITTER = Splitter.on(",");
 
+  static final Map<String, String> DEFAULT_EQUAL_SCHEMES =
+      ImmutableMap.of(
+          "s3n", "s3",
+          "s3a", "s3");
+
   @Internal
   public static final OutputTag<Exception> ERROR_STREAM =
       new OutputTag<>("error-stream", TypeInformation.of(Exception.class));
@@ -79,12 +84,8 @@ public class DeleteOrphanFiles {
     private Duration minAge = Duration.ofDays(3);
     private int planningWorkerPoolSize = ThreadPools.WORKER_THREAD_POOL_SIZE;
     private int deleteBatchSize = 1000;
-    private boolean usePrefixListing = false;
-    private Map<String, String> equalSchemes =
-        Maps.newHashMap(
-            ImmutableMap.of(
-                "s3n", "s3",
-                "s3a", "s3"));
+    private boolean usePrefixListing = true;
+    private Map<String, String> equalSchemes = 
Maps.newHashMap(DEFAULT_EQUAL_SCHEMES);
     private final Map<String, String> equalAuthorities = Maps.newHashMap();
     private PrefixMismatchMode prefixMismatchMode = PrefixMismatchMode.ERROR;
 
@@ -189,6 +190,19 @@ public class DeleteOrphanFiles {
       return this;
     }
 
+    public Builder config(DeleteOrphanFilesConfig deleteOrphanFilesConfig) {
+      return this.scheduleOnInterval(
+              
Duration.ofSeconds(deleteOrphanFilesConfig.scheduleOnIntervalSecond()))
+          .minAge(Duration.ofSeconds(deleteOrphanFilesConfig.minAgeSeconds()))
+          .deleteBatchSize(deleteOrphanFilesConfig.deleteBatchSize())
+          .usePrefixListing(deleteOrphanFilesConfig.usePrefixListing())
+          .prefixMismatchMode(deleteOrphanFilesConfig.prefixMismatchMode())
+          .location(deleteOrphanFilesConfig.location())
+          
.planningWorkerPoolSize(deleteOrphanFilesConfig.planningWorkerPoolSize())
+          .equalSchemes(deleteOrphanFilesConfig.equalSchemes())
+          .equalAuthorities(deleteOrphanFilesConfig.equalAuthorities());
+    }
+
     @Override
     DataStream<TaskResult> append(DataStream<Trigger> trigger) {
       tableLoader().open();
diff --git 
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/DeleteOrphanFilesConfig.java
 
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/DeleteOrphanFilesConfig.java
new file mode 100644
index 0000000000..af34735781
--- /dev/null
+++ 
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/DeleteOrphanFilesConfig.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.maintenance.api;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.actions.DeleteOrphanFiles.PrefixMismatchMode;
+import org.apache.iceberg.flink.FlinkConfParser;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.base.Splitter;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.util.ThreadPools;
+
+public class DeleteOrphanFilesConfig {
+  public static final String PREFIX = FlinkMaintenanceConfig.PREFIX + 
"delete-orphan-files.";
+
+  private static final Splitter COMMA_SPLITTER = Splitter.on(",");
+  private static final Splitter EQUALS_SPLITTER = Splitter.on("=").limit(2);
+
+  public static final String SCHEDULE_ON_INTERVAL_SECOND = PREFIX + 
"schedule.interval-second";
+  public static final ConfigOption<Long> SCHEDULE_ON_INTERVAL_SECOND_OPTION =
+      ConfigOptions.key(SCHEDULE_ON_INTERVAL_SECOND)
+          .longType()
+          .defaultValue(60 * 60L) // Default is 1 hour
+          .withDescription(
+              "The time interval (in seconds) between two consecutive delete 
orphan files operations.");
+
+  public static final String MIN_AGE_SECONDS = PREFIX + "min-age-seconds";
+  public static final ConfigOption<Long> MIN_AGE_SECONDS_OPTION =
+      ConfigOptions.key(MIN_AGE_SECONDS)
+          .longType()
+          .defaultValue(3L * 24 * 60 * 60) // Default is 3 days
+          .withDescription(
+              "The minimum age (in seconds) of files to be considered for 
deletion. "
+                  + "Files newer than this will not be removed.");
+
+  public static final String DELETE_BATCH_SIZE = PREFIX + "delete-batch-size";
+  public static final ConfigOption<Integer> DELETE_BATCH_SIZE_OPTION =
+      ConfigOptions.key(DELETE_BATCH_SIZE)
+          .intType()
+          .defaultValue(1000)
+          .withDescription("The batch size used for deleting orphan files.");
+
+  public static final String LOCATION = PREFIX + "location";
+  public static final ConfigOption<String> LOCATION_OPTION =
+      ConfigOptions.key(LOCATION)
+          .stringType()
+          .noDefaultValue()
+          .withDescription(
+              "The location to start recursive listing of candidate files for 
removal. "
+                  + "By default, the table location is used.");
+
+  public static final String USE_PREFIX_LISTING = PREFIX + 
"use-prefix-listing";
+  public static final ConfigOption<Boolean> USE_PREFIX_LISTING_OPTION =
+      ConfigOptions.key(USE_PREFIX_LISTING)
+          .booleanType()
+          .defaultValue(true)
+          .withDescription(
+              "Whether to use prefix listing when listing files from the file 
system.");
+
+  public static final String PLANNING_WORKER_POOL_SIZE = PREFIX + 
"planning-worker-pool-size";
+  public static final ConfigOption<Integer> PLANNING_WORKER_POOL_SIZE_OPTION =
+      ConfigOptions.key(PLANNING_WORKER_POOL_SIZE)
+          .intType()
+          .noDefaultValue()
+          .withDescription(
+              "The worker pool size used for planning the scan of the 
ALL_FILES table. "
+                  + "If not set, the shared worker pool is used.");
+
+  public static final String EQUAL_SCHEMES = PREFIX + "equal-schemes";
+  public static final ConfigOption<String> EQUAL_SCHEMES_OPTION =
+      ConfigOptions.key(EQUAL_SCHEMES)
+          .stringType()
+          .noDefaultValue()
+          .withDescription(
+              "Schemes that should be considered equal, in the format 
'scheme1=scheme2,scheme3=scheme4'.");
+
+  public static final String EQUAL_AUTHORITIES = PREFIX + "equal-authorities";
+  public static final ConfigOption<String> EQUAL_AUTHORITIES_OPTION =
+      ConfigOptions.key(EQUAL_AUTHORITIES)
+          .stringType()
+          .noDefaultValue()
+          .withDescription(
+              "Authorities that should be considered equal, in the format 
'auth1=auth2,auth3=auth4'.");
+
+  public static final String PREFIX_MISMATCH_MODE = PREFIX + 
"prefix-mismatch-mode";
+  public static final ConfigOption<String> PREFIX_MISMATCH_MODE_OPTION =
+      ConfigOptions.key(PREFIX_MISMATCH_MODE)
+          .stringType()
+          .defaultValue(PrefixMismatchMode.ERROR.name())
+          .withDescription(
+              "Action behavior when location prefixes (schemes/authorities) 
mismatch. "
+                  + "Valid values: ERROR, IGNORE, DELETE.");
+
+  private final FlinkConfParser confParser;
+
+  public DeleteOrphanFilesConfig(
+      Table table, Map<String, String> writeOptions, ReadableConfig 
readableConfig) {
+    this.confParser = new FlinkConfParser(table, writeOptions, readableConfig);
+  }
+
+  public long scheduleOnIntervalSecond() {
+    return confParser
+        .longConf()
+        .option(SCHEDULE_ON_INTERVAL_SECOND)
+        .flinkConfig(SCHEDULE_ON_INTERVAL_SECOND_OPTION)
+        .defaultValue(SCHEDULE_ON_INTERVAL_SECOND_OPTION.defaultValue())
+        .parse();
+  }
+
+  public long minAgeSeconds() {
+    return confParser
+        .longConf()
+        .option(MIN_AGE_SECONDS)
+        .flinkConfig(MIN_AGE_SECONDS_OPTION)
+        .defaultValue(MIN_AGE_SECONDS_OPTION.defaultValue())
+        .parse();
+  }
+
+  public int deleteBatchSize() {
+    return confParser
+        .intConf()
+        .option(DELETE_BATCH_SIZE)
+        .flinkConfig(DELETE_BATCH_SIZE_OPTION)
+        .defaultValue(DELETE_BATCH_SIZE_OPTION.defaultValue())
+        .parse();
+  }
+
+  public String location() {
+    return 
confParser.stringConf().option(LOCATION).flinkConfig(LOCATION_OPTION).parseOptional();
+  }
+
+  public boolean usePrefixListing() {
+    return confParser
+        .booleanConf()
+        .option(USE_PREFIX_LISTING)
+        .flinkConfig(USE_PREFIX_LISTING_OPTION)
+        .defaultValue(USE_PREFIX_LISTING_OPTION.defaultValue())
+        .parse();
+  }
+
+  public Integer planningWorkerPoolSize() {
+    return confParser
+        .intConf()
+        .option(PLANNING_WORKER_POOL_SIZE)
+        .flinkConfig(PLANNING_WORKER_POOL_SIZE_OPTION)
+        .defaultValue(ThreadPools.WORKER_THREAD_POOL_SIZE)
+        .parse();
+  }
+
+  public Map<String, String> equalSchemes() {
+    String equalSchemes =
+        confParser
+            .stringConf()
+            .option(EQUAL_SCHEMES)
+            .flinkConfig(EQUAL_SCHEMES_OPTION)
+            .parseOptional();
+
+    return equalSchemes != null
+        ? parseKeyValuePairs(equalSchemes)
+        : Maps.newHashMap(DeleteOrphanFiles.DEFAULT_EQUAL_SCHEMES);
+  }
+
+  public Map<String, String> equalAuthorities() {
+    String equalAuthorities =
+        confParser
+            .stringConf()
+            .option(EQUAL_AUTHORITIES)
+            .flinkConfig(EQUAL_AUTHORITIES_OPTION)
+            .parseOptional();
+
+    return equalAuthorities != null ? parseKeyValuePairs(equalAuthorities) : 
Map.of();
+  }
+
+  public PrefixMismatchMode prefixMismatchMode() {
+    String value =
+        confParser
+            .stringConf()
+            .option(PREFIX_MISMATCH_MODE)
+            .flinkConfig(PREFIX_MISMATCH_MODE_OPTION)
+            .defaultValue(PREFIX_MISMATCH_MODE_OPTION.defaultValue())
+            .parse();
+    return PrefixMismatchMode.valueOf(value);
+  }
+
+  private static Map<String, String> parseKeyValuePairs(String value) {
+    Map<String, String> result = Maps.newHashMap();
+    for (String pair : COMMA_SPLITTER.split(value)) {
+      List<String> parts = EQUALS_SPLITTER.splitToList(pair);
+      Preconditions.checkArgument(parts.size() == 2, "Invalid key-value pair: 
%s", pair);
+      result.put(parts.get(0).trim(), parts.get(1).trim());
+    }
+
+    return result;
+  }
+}
diff --git 
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/ExpireSnapshots.java
 
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/ExpireSnapshots.java
index 628a911414..c84932f96f 100644
--- 
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/ExpireSnapshots.java
+++ 
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/ExpireSnapshots.java
@@ -19,6 +19,7 @@
 package org.apache.iceberg.flink.maintenance.api;
 
 import java.time.Duration;
+import java.util.Optional;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
@@ -107,6 +108,19 @@ public class ExpireSnapshots {
       return this;
     }
 
+    public Builder config(ExpireSnapshotsConfig expireSnapshotsConfig) {
+      return 
this.scheduleOnCommitCount(expireSnapshotsConfig.scheduleOnCommitCount())
+          
.scheduleOnInterval(Duration.ofSeconds(expireSnapshotsConfig.scheduleOnIntervalSecond()))
+          .deleteBatchSize(expireSnapshotsConfig.deleteBatchSize())
+          .maxSnapshotAge(
+              
Optional.ofNullable(expireSnapshotsConfig.maxSnapshotAgeSeconds())
+                  .map(Duration::ofSeconds)
+                  .orElse(null))
+          .retainLast(expireSnapshotsConfig.retainLast())
+          .cleanExpiredMetadata(expireSnapshotsConfig.cleanExpiredMetadata())
+          
.planningWorkerPoolSize(expireSnapshotsConfig.planningWorkerPoolSize());
+    }
+
     @Override
     DataStream<TaskResult> append(DataStream<Trigger> trigger) {
       Preconditions.checkNotNull(tableLoader(), "TableLoader should not be 
null");
diff --git 
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/ExpireSnapshotsConfig.java
 
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/ExpireSnapshotsConfig.java
new file mode 100644
index 0000000000..13436975e1
--- /dev/null
+++ 
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/ExpireSnapshotsConfig.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.maintenance.api;
+
+import java.util.Map;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.flink.FlinkConfParser;
+import org.apache.iceberg.util.ThreadPools;
+
+public class ExpireSnapshotsConfig {
+  public static final String PREFIX = FlinkMaintenanceConfig.PREFIX + 
"expire-snapshots.";
+
+  public static final String SCHEDULE_ON_COMMIT_COUNT = PREFIX + 
"schedule.commit-count";
+  public static final ConfigOption<Integer> SCHEDULE_ON_COMMIT_COUNT_OPTION =
+      ConfigOptions.key(SCHEDULE_ON_COMMIT_COUNT)
+          .intType()
+          .defaultValue(10)
+          .withDescription(
+              "The number of commits after which to trigger a new expire 
snapshots operation.");
+
+  public static final String SCHEDULE_ON_INTERVAL_SECOND = PREFIX + 
"schedule.interval-second";
+  public static final ConfigOption<Long> SCHEDULE_ON_INTERVAL_SECOND_OPTION =
+      ConfigOptions.key(SCHEDULE_ON_INTERVAL_SECOND)
+          .longType()
+          .defaultValue(60 * 60L) // Default is 1 hour
+          .withDescription(
+              "The time interval (in seconds) between two consecutive expire 
snapshots operations.");
+
+  public static final String MAX_SNAPSHOT_AGE_SECONDS = PREFIX + 
"max-snapshot-age-seconds";
+  public static final ConfigOption<Long> MAX_SNAPSHOT_AGE_SECONDS_OPTION =
+      ConfigOptions.key(MAX_SNAPSHOT_AGE_SECONDS)
+          .longType()
+          .noDefaultValue()
+          .withDescription(
+              "The maximum age (in seconds) of snapshots to retain. "
+                  + "Snapshots older than this will be expired.");
+
+  public static final String RETAIN_LAST = PREFIX + "retain-last";
+  public static final ConfigOption<Integer> RETAIN_LAST_OPTION =
+      ConfigOptions.key(RETAIN_LAST)
+          .intType()
+          .noDefaultValue()
+          .withDescription("The minimum number of snapshots to retain.");
+
+  public static final String DELETE_BATCH_SIZE = PREFIX + "delete-batch-size";
+  public static final ConfigOption<Integer> DELETE_BATCH_SIZE_OPTION =
+      ConfigOptions.key(DELETE_BATCH_SIZE)
+          .intType()
+          .defaultValue(1000)
+          .withDescription("The batch size used for deleting expired files.");
+
+  public static final String CLEAN_EXPIRED_METADATA = PREFIX + 
"clean-expired-metadata";
+  public static final ConfigOption<Boolean> CLEAN_EXPIRED_METADATA_OPTION =
+      ConfigOptions.key(CLEAN_EXPIRED_METADATA)
+          .booleanType()
+          .defaultValue(true)
+          .withDescription(
+              "Whether to clean expired metadata such as partition specs and 
schemas.");
+
+  public static final String PLANNING_WORKER_POOL_SIZE = PREFIX + 
"planning-worker-pool-size";
+  public static final ConfigOption<Integer> PLANNING_WORKER_POOL_SIZE_OPTION =
+      ConfigOptions.key(PLANNING_WORKER_POOL_SIZE)
+          .intType()
+          .noDefaultValue()
+          .withDescription(
+              "The worker pool size used to calculate the files to delete. "
+                  + "If not set, the shared worker pool is used.");
+
+  private final FlinkConfParser confParser;
+
+  public ExpireSnapshotsConfig(
+      Table table, Map<String, String> writeOptions, ReadableConfig 
readableConfig) {
+    this.confParser = new FlinkConfParser(table, writeOptions, readableConfig);
+  }
+
+  public int scheduleOnCommitCount() {
+    return confParser
+        .intConf()
+        .option(SCHEDULE_ON_COMMIT_COUNT)
+        .flinkConfig(SCHEDULE_ON_COMMIT_COUNT_OPTION)
+        .defaultValue(SCHEDULE_ON_COMMIT_COUNT_OPTION.defaultValue())
+        .parse();
+  }
+
+  public long scheduleOnIntervalSecond() {
+    return confParser
+        .longConf()
+        .option(SCHEDULE_ON_INTERVAL_SECOND)
+        .flinkConfig(SCHEDULE_ON_INTERVAL_SECOND_OPTION)
+        .defaultValue(SCHEDULE_ON_INTERVAL_SECOND_OPTION.defaultValue())
+        .parse();
+  }
+
+  public Long maxSnapshotAgeSeconds() {
+    return confParser
+        .longConf()
+        .option(MAX_SNAPSHOT_AGE_SECONDS)
+        .flinkConfig(MAX_SNAPSHOT_AGE_SECONDS_OPTION)
+        .parseOptional();
+  }
+
+  public Integer retainLast() {
+    return 
confParser.intConf().option(RETAIN_LAST).flinkConfig(RETAIN_LAST_OPTION).parseOptional();
+  }
+
+  public int deleteBatchSize() {
+    return confParser
+        .intConf()
+        .option(DELETE_BATCH_SIZE)
+        .flinkConfig(DELETE_BATCH_SIZE_OPTION)
+        .defaultValue(DELETE_BATCH_SIZE_OPTION.defaultValue())
+        .parse();
+  }
+
+  public Boolean cleanExpiredMetadata() {
+    return confParser
+        .booleanConf()
+        .option(CLEAN_EXPIRED_METADATA)
+        .flinkConfig(CLEAN_EXPIRED_METADATA_OPTION)
+        .defaultValue(CLEAN_EXPIRED_METADATA_OPTION.defaultValue())
+        .parse();
+  }
+
+  public Integer planningWorkerPoolSize() {
+    return confParser
+        .intConf()
+        .option(PLANNING_WORKER_POOL_SIZE)
+        .flinkConfig(PLANNING_WORKER_POOL_SIZE_OPTION)
+        .defaultValue(ThreadPools.WORKER_THREAD_POOL_SIZE)
+        .parse();
+  }
+}
diff --git 
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/FlinkMaintenanceConfig.java
 
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/FlinkMaintenanceConfig.java
index 0c88abf820..e6f536273e 100644
--- 
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/FlinkMaintenanceConfig.java
+++ 
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/FlinkMaintenanceConfig.java
@@ -122,6 +122,14 @@ public class FlinkMaintenanceConfig {
     return new RewriteDataFilesConfig(table, writeProperties, readableConfig);
   }
 
+  public ExpireSnapshotsConfig createExpireSnapshotsConfig() {
+    return new ExpireSnapshotsConfig(table, writeProperties, readableConfig);
+  }
+
+  public DeleteOrphanFilesConfig createDeleteOrphanFilesConfig() {
+    return new DeleteOrphanFilesConfig(table, writeProperties, readableConfig);
+  }
+
   public LockConfig createLockConfig() {
     return new LockConfig(table, writeProperties, readableConfig);
   }
diff --git 
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/TableMaintenance.java
 
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/TableMaintenance.java
index ab5159be12..d98a1c9ab4 100644
--- 
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/TableMaintenance.java
+++ 
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/TableMaintenance.java
@@ -20,6 +20,7 @@ package org.apache.iceberg.flink.maintenance.api;
 
 import java.io.IOException;
 import java.time.Duration;
+import java.util.Collection;
 import java.util.List;
 import java.util.Locale;
 import java.util.UUID;
@@ -252,6 +253,17 @@ public class TableMaintenance {
       return this;
     }
 
+    /**
+     * Adds multiple tasks with the given schedules.
+     *
+     * @param tasks to add
+     */
+    public Builder add(Collection<MaintenanceTaskBuilder<?>> tasks) {
+      Preconditions.checkNotNull(tasks, "Tasks collection should not be null");
+      taskBuilders.addAll(tasks);
+      return this;
+    }
+
     /** Builds the task graph for the maintenance tasks. */
     public void append() throws IOException {
       Preconditions.checkArgument(!taskBuilders.isEmpty(), "Provide at least 
one task");
diff --git 
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergCommitter.java
 
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergCommitter.java
index c05e7d9180..8e45a2db30 100644
--- 
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergCommitter.java
+++ 
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergCommitter.java
@@ -78,7 +78,7 @@ class IcebergCommitter implements 
Committer<IcebergCommittable> {
   private int maxContinuousEmptyCommits;
   private ExecutorService workerPool;
   private int continuousEmptyCheckpoints = 0;
-  private boolean compactMode = false;
+  private final boolean tableMaintenanceEnabled;
 
   IcebergCommitter(
       TableLoader tableLoader,
@@ -88,7 +88,7 @@ class IcebergCommitter implements 
Committer<IcebergCommittable> {
       int workerPoolSize,
       String sinkId,
       IcebergFilesCommitterMetrics committerMetrics,
-      boolean compactMode) {
+      boolean tableMaintenanceEnabled) {
     this.branch = branch;
     this.snapshotProperties = snapshotProperties;
     this.replacePartitions = replacePartitions;
@@ -107,7 +107,7 @@ class IcebergCommitter implements 
Committer<IcebergCommittable> {
         ThreadPools.newFixedThreadPool(
             "iceberg-committer-pool-" + table.name() + "-" + sinkId, 
workerPoolSize);
     this.continuousEmptyCheckpoints = 0;
-    this.compactMode = compactMode;
+    this.tableMaintenanceEnabled = tableMaintenanceEnabled;
   }
 
   @Override
@@ -177,7 +177,7 @@ class IcebergCommitter implements 
Committer<IcebergCommittable> {
       committerMetrics.updateCommitSummary(summary);
     }
 
-    if (!compactMode) {
+    if (!tableMaintenanceEnabled) {
       FlinkManifestUtil.deleteCommittedManifests(table, manifests, 
newFlinkJobId, checkpointId);
     }
   }
diff --git 
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSink.java 
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSink.java
index 96c947a853..9f90d8fd35 100644
--- 
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSink.java
+++ 
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSink.java
@@ -73,8 +73,13 @@ import org.apache.iceberg.flink.FlinkSchemaUtil;
 import org.apache.iceberg.flink.FlinkWriteConf;
 import org.apache.iceberg.flink.FlinkWriteOptions;
 import org.apache.iceberg.flink.TableLoader;
+import org.apache.iceberg.flink.maintenance.api.DeleteOrphanFiles;
+import org.apache.iceberg.flink.maintenance.api.DeleteOrphanFilesConfig;
+import org.apache.iceberg.flink.maintenance.api.ExpireSnapshots;
+import org.apache.iceberg.flink.maintenance.api.ExpireSnapshotsConfig;
 import org.apache.iceberg.flink.maintenance.api.FlinkMaintenanceConfig;
 import org.apache.iceberg.flink.maintenance.api.LockConfig;
+import org.apache.iceberg.flink.maintenance.api.MaintenanceTaskBuilder;
 import org.apache.iceberg.flink.maintenance.api.RewriteDataFiles;
 import org.apache.iceberg.flink.maintenance.api.RewriteDataFilesConfig;
 import org.apache.iceberg.flink.maintenance.api.TableMaintenance;
@@ -88,6 +93,7 @@ import org.apache.iceberg.flink.sink.shuffle.StatisticsType;
 import org.apache.iceberg.flink.util.FlinkCompatibilityUtil;
 import org.apache.iceberg.io.WriteResult;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.apache.iceberg.relocated.com.google.common.collect.Sets;
 import org.apache.iceberg.types.TypeUtil;
@@ -159,14 +165,15 @@ public class IcebergSink
   private final String branch;
   private final boolean overwriteMode;
   private final int workerPoolSize;
-  private final boolean compactMode;
-  private final transient FlinkMaintenanceConfig flinkMaintenanceConfig;
-
+  private final boolean maintenanceEnabled;
   private final Table table;
   // This should only be used for logging/error messages. For any actual logic 
always use
   // equalityFieldIds instead.
   private final Set<String> equalityFieldColumns;
 
+  private final transient List<MaintenanceTaskBuilder<?>> maintenanceTasks;
+  private final transient FlinkMaintenanceConfig flinkMaintenanceConfig;
+
   private IcebergSink(
       TableLoader tableLoader,
       Table table,
@@ -179,6 +186,7 @@ public class IcebergSink
       Set<Integer> equalityFieldIds,
       String branch,
       boolean overwriteMode,
+      List<MaintenanceTaskBuilder<?>> maintenanceTasks,
       FlinkMaintenanceConfig flinkMaintenanceConfig,
       Set<String> equalityFieldColumns) {
     this.tableLoader = tableLoader;
@@ -200,7 +208,8 @@ public class IcebergSink
     // This is used to separate files generated by different sinks writing the 
same table.
     // Also used to generate the aggregator operator name
     this.sinkId = UUID.randomUUID().toString();
-    this.compactMode = flinkWriteConf.compactMode();
+    this.maintenanceEnabled = !maintenanceTasks.isEmpty();
+    this.maintenanceTasks = maintenanceTasks;
     this.flinkMaintenanceConfig = flinkMaintenanceConfig;
     this.equalityFieldColumns = equalityFieldColumns;
   }
@@ -238,7 +247,7 @@ public class IcebergSink
         workerPoolSize,
         sinkId,
         metrics,
-        compactMode);
+        maintenanceEnabled);
   }
 
   @Override
@@ -250,7 +259,7 @@ public class IcebergSink
   public void addPostCommitTopology(
       DataStream<CommittableMessage<IcebergCommittable>> committables) {
 
-    if (!compactMode) {
+    if (maintenanceTasks.isEmpty()) {
       return;
     }
 
@@ -264,26 +273,21 @@ public class IcebergSink
             .uid(postCommitUid)
             .forceNonParallel();
     try {
-      RewriteDataFilesConfig rewriteDataFilesConfig =
-          flinkMaintenanceConfig.createRewriteDataFilesConfig();
-      RewriteDataFiles.Builder rewriteBuilder =
-          RewriteDataFiles.builder().config(rewriteDataFilesConfig);
-
       LockConfig lockConfig = flinkMaintenanceConfig.createLockConfig();
       String tableMaintenanceUid = String.format("TableMaintenance : %s", 
suffix);
-      TableMaintenance.Builder builder =
-          StringUtils.isNotEmpty(lockConfig.lockType())
-              ? TableMaintenance.forChangeStream(
-                      tableChangeStream,
-                      tableLoader,
-                      LockFactoryBuilder.build(lockConfig, table.name()))
-                  .uidSuffix(tableMaintenanceUid)
-                  .add(rewriteBuilder)
-              : TableMaintenance.forChangeStream(tableChangeStream, 
tableLoader)
-                  .uidSuffix(tableMaintenanceUid)
-                  .add(rewriteBuilder);
+
+      TableMaintenance.Builder builder;
+      if (StringUtils.isNotEmpty(lockConfig.lockType())) {
+        builder =
+            TableMaintenance.forChangeStream(
+                tableChangeStream, tableLoader, 
LockFactoryBuilder.build(lockConfig, table.name()));
+      } else {
+        builder = TableMaintenance.forChangeStream(tableChangeStream, 
tableLoader);
+      }
 
       builder
+          .uidSuffix(tableMaintenanceUid)
+          .add(maintenanceTasks)
           .rateLimit(Duration.ofSeconds(flinkMaintenanceConfig.rateLimit()))
           
.lockCheckDelay(Duration.ofSeconds(flinkMaintenanceConfig.lockCheckDelay()))
           .slotSharingGroup(flinkMaintenanceConfig.slotSharingGroup())
@@ -344,6 +348,7 @@ public class IcebergSink
     private final Map<String, String> snapshotSummary = Maps.newHashMap();
     private ReadableConfig readableConfig = new Configuration();
     private List<String> equalityFieldColumns = null;
+    private final List<MaintenanceTaskBuilder<?>> maintenanceTasks = 
Lists.newArrayList();
 
     private Builder() {}
 
@@ -626,6 +631,85 @@ public class IcebergSink
       return this;
     }
 
+    /**
+     * Enables or disables compaction (rewriting data files) as a post-commit 
maintenance task.
+     *
+     * @param enabled whether to enable compaction
+     * @see RewriteDataFilesConfig for the default config.
+     * @deprecated See {@code rewriteDatafiles(..)}
+     */
+    @Deprecated
+    public Builder compaction(boolean enabled) {
+      writeOptions.put(FlinkWriteOptions.COMPACTION_ENABLE.key(), 
Boolean.toString(enabled));
+      return this;
+    }
+
+    /**
+     * Enables rewriting data files (compaction) as a post-commit maintenance 
task.
+     *
+     * @see RewriteDataFilesConfig for the default config.
+     */
+    public Builder rewriteDataFiles() {
+      writeOptions.put(FlinkWriteOptions.COMPACTION_ENABLE.key(), "true");
+      return this;
+    }
+
+    /**
+     * Enables rewriting data files (compaction) as a post-commit maintenance 
task.
+     *
+     * @param config task-specific configuration, see {@link 
RewriteDataFilesConfig} for available
+     *     keys
+     */
+    public Builder rewriteDataFiles(Map<String, String> config) {
+      rewriteDataFiles();
+      writeOptions.putAll(config);
+      return this;
+    }
+
+    /**
+     * Enables expire snapshots as a post-commit maintenance task.
+     *
+     * @see ExpireSnapshotsConfig for the default config.
+     */
+    public Builder expireSnapshots() {
+      writeOptions.put(FlinkWriteOptions.EXPIRE_SNAPSHOTS_ENABLE.key(), 
"true");
+      return this;
+    }
+
+    /**
+     * Enables or disables expire snapshots as a post-commit maintenance task.
+     *
+     * @param config task-specific configuration, see {@link 
ExpireSnapshotsConfig} for available
+     *     keys
+     */
+    public Builder expireSnapshots(Map<String, String> config) {
+      expireSnapshots();
+      writeOptions.putAll(config);
+      return this;
+    }
+
+    /**
+     * Enables delete orphan files as a post-commit maintenance task.
+     *
+     * @see DeleteOrphanFilesConfig for the default config.
+     */
+    public Builder deleteOrphanFiles() {
+      writeOptions.put(FlinkWriteOptions.DELETE_ORPHAN_FILES_ENABLE.key(), 
"true");
+      return this;
+    }
+
+    /**
+     * Enables delete orphan files as a post-commit maintenance task.
+     *
+     * @param config task-specific configuration, see {@link 
DeleteOrphanFilesConfig} for available
+     *     keys.
+     */
+    public Builder deleteOrphanFiles(Map<String, String> config) {
+      deleteOrphanFiles();
+      writeOptions.putAll(config);
+      return this;
+    }
+
     @Override
     public Builder toBranch(String branch) {
       writeOptions.put(FlinkWriteOptions.BRANCH.key(), branch);
@@ -682,6 +766,24 @@ public class IcebergSink
       FlinkMaintenanceConfig flinkMaintenanceConfig =
           new FlinkMaintenanceConfig(table, writeOptions, readableConfig);
 
+      if (flinkWriteConf.compactMode()) {
+        RewriteDataFilesConfig rewriteDataFilesConfig =
+            flinkMaintenanceConfig.createRewriteDataFilesConfig();
+        
maintenanceTasks.add(RewriteDataFiles.builder().config(rewriteDataFilesConfig));
+      }
+
+      if (flinkWriteConf.expireSnapshotsMode()) {
+        ExpireSnapshotsConfig expireSnapshotsConfig =
+            flinkMaintenanceConfig.createExpireSnapshotsConfig();
+        
maintenanceTasks.add(ExpireSnapshots.builder().config(expireSnapshotsConfig));
+      }
+
+      if (flinkWriteConf.deleteOrphanFilesMode()) {
+        DeleteOrphanFilesConfig deleteOrphanFilesConfig =
+            flinkMaintenanceConfig.createDeleteOrphanFilesConfig();
+        
maintenanceTasks.add(DeleteOrphanFiles.builder().config(deleteOrphanFilesConfig));
+      }
+
       Set<String> equalityFieldColumnsSet =
           equalityFieldColumns != null ? Sets.newHashSet(equalityFieldColumns) 
: null;
 
@@ -699,6 +801,7 @@ public class IcebergSink
           equalityFieldIds,
           flinkWriteConf.branch(),
           overwriteMode,
+          maintenanceTasks,
           flinkMaintenanceConfig,
           equalityFieldColumnsSet);
     }
diff --git 
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestDeleteOrphanFilesConfig.java
 
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestDeleteOrphanFilesConfig.java
new file mode 100644
index 0000000000..f26f608ffa
--- /dev/null
+++ 
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestDeleteOrphanFilesConfig.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.maintenance.api;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.util.Map;
+import org.apache.flink.configuration.Configuration;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.actions.DeleteOrphanFiles.PrefixMismatchMode;
+import org.apache.iceberg.flink.maintenance.operator.OperatorTestBase;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.util.ThreadPools;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+public class TestDeleteOrphanFilesConfig extends OperatorTestBase {
+  private Table table;
+  private Map<String, String> input = Maps.newHashMap();
+
+  @BeforeEach
+  public void before() {
+    this.table = createTable();
+    input.put(DeleteOrphanFilesConfig.SCHEDULE_ON_INTERVAL_SECOND, "60");
+    input.put(DeleteOrphanFilesConfig.MIN_AGE_SECONDS, "86400");
+    input.put(DeleteOrphanFilesConfig.DELETE_BATCH_SIZE, "500");
+    input.put(DeleteOrphanFilesConfig.LOCATION, "/tmp/test-location");
+    input.put(DeleteOrphanFilesConfig.USE_PREFIX_LISTING, "true");
+    input.put(DeleteOrphanFilesConfig.PLANNING_WORKER_POOL_SIZE, "4");
+    input.put(DeleteOrphanFilesConfig.EQUAL_SCHEMES, "s3n=s3,s3a=s3");
+    input.put(DeleteOrphanFilesConfig.EQUAL_AUTHORITIES, "auth1=auth2");
+    input.put(DeleteOrphanFilesConfig.PREFIX_MISMATCH_MODE, "IGNORE");
+    input.put("other.config", "should-be-ignored");
+  }
+
+  @AfterEach
+  public void after() {
+    input.clear();
+  }
+
+  @Test
+  void testConfigParsing() {
+    DeleteOrphanFilesConfig config = new DeleteOrphanFilesConfig(table, input, 
new Configuration());
+
+    assertThat(config.scheduleOnIntervalSecond()).isEqualTo(60);
+    assertThat(config.minAgeSeconds()).isEqualTo(86400L);
+    assertThat(config.deleteBatchSize()).isEqualTo(500);
+    assertThat(config.location()).isEqualTo("/tmp/test-location");
+    assertThat(config.usePrefixListing()).isTrue();
+    assertThat(config.planningWorkerPoolSize()).isEqualTo(4);
+    assertThat(config.equalSchemes()).containsEntry("s3n", 
"s3").containsEntry("s3a", "s3");
+    assertThat(config.equalAuthorities()).containsEntry("auth1", "auth2");
+    
assertThat(config.prefixMismatchMode()).isEqualTo(PrefixMismatchMode.IGNORE);
+  }
+
+  @Test
+  void testConfigDefaults() {
+    DeleteOrphanFilesConfig config =
+        new DeleteOrphanFilesConfig(table, Maps.newHashMap(), new 
Configuration());
+
+    assertThat(config.scheduleOnIntervalSecond())
+        
.isEqualTo(DeleteOrphanFilesConfig.SCHEDULE_ON_INTERVAL_SECOND_OPTION.defaultValue());
+    assertThat(config.minAgeSeconds())
+        
.isEqualTo(DeleteOrphanFilesConfig.MIN_AGE_SECONDS_OPTION.defaultValue());
+    assertThat(config.deleteBatchSize())
+        
.isEqualTo(DeleteOrphanFilesConfig.DELETE_BATCH_SIZE_OPTION.defaultValue());
+    assertThat(config.location()).isNull();
+    assertThat(config.usePrefixListing()).isTrue();
+    
assertThat(config.planningWorkerPoolSize()).isEqualTo(ThreadPools.WORKER_THREAD_POOL_SIZE);
+    assertThat(config.equalSchemes()).containsEntry("s3n", 
"s3").containsEntry("s3a", "s3");
+    assertThat(config.equalAuthorities()).isEqualTo(Map.of());
+    
assertThat(config.prefixMismatchMode()).isEqualTo(PrefixMismatchMode.ERROR);
+  }
+}
diff --git 
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestExpireSnapshotsConfig.java
 
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestExpireSnapshotsConfig.java
new file mode 100644
index 0000000000..3bcec8114b
--- /dev/null
+++ 
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestExpireSnapshotsConfig.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.maintenance.api;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.util.Map;
+import org.apache.flink.configuration.Configuration;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.flink.maintenance.operator.OperatorTestBase;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.util.ThreadPools;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+public class TestExpireSnapshotsConfig extends OperatorTestBase {
+  private Table table;
+  private Map<String, String> input = Maps.newHashMap();
+
+  @BeforeEach
+  public void before() {
+    this.table = createTable();
+    input.put(ExpireSnapshotsConfig.SCHEDULE_ON_COMMIT_COUNT, "10");
+    input.put(ExpireSnapshotsConfig.SCHEDULE_ON_INTERVAL_SECOND, "60");
+    input.put(ExpireSnapshotsConfig.MAX_SNAPSHOT_AGE_SECONDS, "7200");
+    input.put(ExpireSnapshotsConfig.RETAIN_LAST, "5");
+    input.put(ExpireSnapshotsConfig.DELETE_BATCH_SIZE, "500");
+    input.put(ExpireSnapshotsConfig.CLEAN_EXPIRED_METADATA, "true");
+    input.put(ExpireSnapshotsConfig.PLANNING_WORKER_POOL_SIZE, "4");
+    input.put("other.config", "should-be-ignored");
+  }
+
+  @AfterEach
+  public void after() {
+    input.clear();
+  }
+
+  @Test
+  void testConfigParsing() {
+    ExpireSnapshotsConfig config = new ExpireSnapshotsConfig(table, input, new 
Configuration());
+
+    assertThat(config.scheduleOnCommitCount()).isEqualTo(10);
+    assertThat(config.scheduleOnIntervalSecond()).isEqualTo(60);
+    assertThat(config.maxSnapshotAgeSeconds()).isEqualTo(7200L);
+    assertThat(config.retainLast()).isEqualTo(5);
+    assertThat(config.deleteBatchSize()).isEqualTo(500);
+    assertThat(config.cleanExpiredMetadata()).isTrue();
+    assertThat(config.planningWorkerPoolSize()).isEqualTo(4);
+  }
+
+  @Test
+  void testConfigDefaults() {
+    ExpireSnapshotsConfig config =
+        new ExpireSnapshotsConfig(table, Maps.newHashMap(), new 
Configuration());
+
+    assertThat(config.scheduleOnCommitCount())
+        
.isEqualTo(ExpireSnapshotsConfig.SCHEDULE_ON_COMMIT_COUNT_OPTION.defaultValue());
+    assertThat(config.scheduleOnIntervalSecond())
+        
.isEqualTo(ExpireSnapshotsConfig.SCHEDULE_ON_INTERVAL_SECOND_OPTION.defaultValue());
+    assertThat(config.maxSnapshotAgeSeconds()).isNull();
+    assertThat(config.retainLast()).isNull();
+    assertThat(config.deleteBatchSize())
+        
.isEqualTo(ExpireSnapshotsConfig.DELETE_BATCH_SIZE_OPTION.defaultValue());
+    assertThat(config.cleanExpiredMetadata()).isTrue();
+    
assertThat(config.planningWorkerPoolSize()).isEqualTo(ThreadPools.WORKER_THREAD_POOL_SIZE);
+  }
+}
diff --git 
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergSinkCompact.java
 
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergSinkTableMaintenance.java
similarity index 53%
rename from 
flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergSinkCompact.java
rename to 
flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergSinkTableMaintenance.java
index 8042fe6e0f..5c926d7c25 100644
--- 
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergSinkCompact.java
+++ 
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergSinkTableMaintenance.java
@@ -42,6 +42,8 @@ import org.apache.iceberg.flink.FlinkWriteOptions;
 import org.apache.iceberg.flink.MiniFlinkClusterExtension;
 import org.apache.iceberg.flink.SimpleDataUtil;
 import org.apache.iceberg.flink.TestFixtures;
+import org.apache.iceberg.flink.maintenance.api.DeleteOrphanFilesConfig;
+import org.apache.iceberg.flink.maintenance.api.ExpireSnapshotsConfig;
 import org.apache.iceberg.flink.maintenance.api.LockConfig;
 import org.apache.iceberg.flink.maintenance.api.RewriteDataFilesConfig;
 import org.apache.iceberg.flink.util.FlinkCompatibilityUtil;
@@ -51,7 +53,7 @@ import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.FieldSource;
 
-class TestIcebergSinkCompact extends TestFlinkIcebergSinkBase {
+class TestIcebergSinkTableMaintenance extends TestFlinkIcebergSinkBase {
   private static final String[] LOCK_TYPES = new String[] 
{LockConfig.JdbcLockConfig.JDBC, ""};
 
   private Map<String, String> flinkConf;
@@ -59,9 +61,6 @@ class TestIcebergSinkCompact extends TestFlinkIcebergSinkBase 
{
   @BeforeEach
   void before() throws IOException {
     this.flinkConf = Maps.newHashMap();
-    flinkConf.put(FlinkWriteOptions.COMPACTION_ENABLE.key(), "true");
-    flinkConf.put(RewriteDataFilesConfig.SCHEDULE_ON_DATA_FILE_SIZE, "1");
-    flinkConf.put(RewriteDataFilesConfig.PREFIX + 
SizeBasedFileRewritePlanner.REWRITE_ALL, "true");
 
     table =
         CATALOG_EXTENSION
@@ -84,6 +83,7 @@ class TestIcebergSinkCompact extends TestFlinkIcebergSinkBase 
{
   @FieldSource("LOCK_TYPES")
   public void testCompactFileE2e(String lockType) throws Exception {
     setupLockConfig(lockType);
+    setupCompactionConfig();
     List<Row> rows = Lists.newArrayList(Row.of(1, "hello"), Row.of(2, 
"world"), Row.of(3, "foo"));
     DataStream<RowData> dataStream =
         env.addSource(createBoundedSource(rows), ROW_TYPE_INFO)
@@ -124,6 +124,7 @@ class TestIcebergSinkCompact extends 
TestFlinkIcebergSinkBase {
   @FieldSource("LOCK_TYPES")
   public void testTableMaintenanceOperatorAdded(String lockType) {
     setupLockConfig(lockType);
+    setupCompactionConfig();
     List<Row> rows = Lists.newArrayList(Row.of(1, "hello"), Row.of(2, 
"world"), Row.of(3, "foo"));
     DataStream<RowData> dataStream =
         env.addSource(createBoundedSource(rows), ROW_TYPE_INFO)
@@ -147,6 +148,22 @@ class TestIcebergSinkCompact extends 
TestFlinkIcebergSinkBase {
     assertThat(containRewrite).isTrue();
   }
 
+  private void setupCompactionConfig() {
+    flinkConf.put(FlinkWriteOptions.COMPACTION_ENABLE.key(), "true");
+    flinkConf.put(RewriteDataFilesConfig.SCHEDULE_ON_DATA_FILE_SIZE, "1");
+    flinkConf.put(RewriteDataFilesConfig.PREFIX + 
SizeBasedFileRewritePlanner.REWRITE_ALL, "true");
+  }
+
+  private void setupExpireSnapshotsConfig() {
+    flinkConf.put(FlinkWriteOptions.EXPIRE_SNAPSHOTS_ENABLE.key(), "true");
+    flinkConf.put(ExpireSnapshotsConfig.RETAIN_LAST, "5");
+  }
+
+  private void setupDeleteOrphanFilesConfig() {
+    flinkConf.put(FlinkWriteOptions.DELETE_ORPHAN_FILES_ENABLE.key(), "true");
+    flinkConf.put(DeleteOrphanFilesConfig.MIN_AGE_SECONDS, "86400");
+  }
+
   private void setupLockConfig(String lockType) {
     if (lockType.equals(LockConfig.JdbcLockConfig.JDBC)) {
       flinkConf.put(LockConfig.LOCK_TYPE_OPTION.key(), 
LockConfig.JdbcLockConfig.JDBC);
@@ -159,4 +176,146 @@ class TestIcebergSinkCompact extends 
TestFlinkIcebergSinkBase {
       flinkConf.put(LockConfig.LOCK_TYPE_OPTION.key(), "");
     }
   }
+
+  @ParameterizedTest(name = "lockType = {0}")
+  @FieldSource("LOCK_TYPES")
+  public void testExpireSnapshotsEnabled(String lockType) {
+    setupLockConfig(lockType);
+    setupExpireSnapshotsConfig();
+
+    List<Row> rows = Lists.newArrayList(Row.of(1, "hello"), Row.of(2, 
"world"), Row.of(3, "foo"));
+    DataStream<RowData> dataStream =
+        env.addSource(createBoundedSource(rows), ROW_TYPE_INFO)
+            .map(CONVERTER::toInternal, 
FlinkCompatibilityUtil.toTypeInfo(SimpleDataUtil.ROW_TYPE));
+
+    IcebergSink.forRowData(dataStream)
+        .table(table)
+        .tableLoader(tableLoader)
+        .setAll(flinkConf)
+        .append();
+
+    StreamGraph streamGraph = env.getStreamGraph();
+    boolean containExpire = false;
+    for (JobVertex vertex : streamGraph.getJobGraph().getVertices()) {
+      if (vertex.getName().contains("Expire")) {
+        containExpire = true;
+        break;
+      }
+    }
+
+    assertThat(containExpire).isTrue();
+  }
+
+  @ParameterizedTest(name = "lockType = {0}")
+  @FieldSource("LOCK_TYPES")
+  public void testDeleteOrphanFilesEnabled(String lockType) {
+    setupLockConfig(lockType);
+    setupDeleteOrphanFilesConfig();
+
+    List<Row> rows = Lists.newArrayList(Row.of(1, "hello"), Row.of(2, 
"world"), Row.of(3, "foo"));
+    DataStream<RowData> dataStream =
+        env.addSource(createBoundedSource(rows), ROW_TYPE_INFO)
+            .map(CONVERTER::toInternal, 
FlinkCompatibilityUtil.toTypeInfo(SimpleDataUtil.ROW_TYPE));
+
+    IcebergSink.forRowData(dataStream)
+        .table(table)
+        .tableLoader(tableLoader)
+        .setAll(flinkConf)
+        .append();
+
+    StreamGraph streamGraph = env.getStreamGraph();
+    boolean containOrphan = false;
+    for (JobVertex vertex : streamGraph.getJobGraph().getVertices()) {
+      if (vertex.getName().contains("Orphan")) {
+        containOrphan = true;
+        break;
+      }
+    }
+
+    assertThat(containOrphan).isTrue();
+  }
+
+  @ParameterizedTest(name = "lockType = {0}")
+  @FieldSource("LOCK_TYPES")
+  public void testAllMaintenanceTasksCombined(String lockType) {
+    setupLockConfig(lockType);
+    setupCompactionConfig();
+    setupExpireSnapshotsConfig();
+    setupDeleteOrphanFilesConfig();
+
+    List<Row> rows = Lists.newArrayList(Row.of(1, "hello"), Row.of(2, 
"world"), Row.of(3, "foo"));
+    DataStream<RowData> dataStream =
+        env.addSource(createBoundedSource(rows), ROW_TYPE_INFO)
+            .map(CONVERTER::toInternal, 
FlinkCompatibilityUtil.toTypeInfo(SimpleDataUtil.ROW_TYPE));
+
+    IcebergSink.forRowData(dataStream)
+        .table(table)
+        .tableLoader(tableLoader)
+        .setAll(flinkConf)
+        .append();
+
+    StreamGraph streamGraph = env.getStreamGraph();
+    boolean containRewrite = false;
+    boolean containExpire = false;
+    boolean containOrphan = false;
+    for (JobVertex vertex : streamGraph.getJobGraph().getVertices()) {
+      if (vertex.getName().contains("Rewrite")) {
+        containRewrite = true;
+      }
+
+      if (vertex.getName().contains("Expire")) {
+        containExpire = true;
+      }
+
+      if (vertex.getName().contains("Orphan")) {
+        containOrphan = true;
+      }
+    }
+
+    assertThat(containRewrite).isTrue();
+    assertThat(containExpire).isTrue();
+    assertThat(containOrphan).isTrue();
+  }
+
+  @ParameterizedTest(name = "lockType = {0}")
+  @FieldSource("LOCK_TYPES")
+  public void testAllMaintenanceE2e(String lockType) throws Exception {
+    setupLockConfig(lockType);
+
+    Map<String, String> compactionConfig = Maps.newHashMap();
+    compactionConfig.put(RewriteDataFilesConfig.SCHEDULE_ON_DATA_FILE_SIZE, 
"1");
+    compactionConfig.put(
+        RewriteDataFilesConfig.PREFIX + 
SizeBasedFileRewritePlanner.REWRITE_ALL, "true");
+
+    Map<String, String> expireConfig = Maps.newHashMap();
+    expireConfig.put(ExpireSnapshotsConfig.RETAIN_LAST, "5");
+
+    Map<String, String> orphanConfig = Maps.newHashMap();
+    orphanConfig.put(DeleteOrphanFilesConfig.MIN_AGE_SECONDS, "86400");
+
+    List<Row> rows = Lists.newArrayList(Row.of(1, "hello"), Row.of(2, 
"world"), Row.of(3, "foo"));
+    DataStream<RowData> dataStream =
+        env.addSource(createBoundedSource(rows), ROW_TYPE_INFO)
+            .map(CONVERTER::toInternal, 
FlinkCompatibilityUtil.toTypeInfo(SimpleDataUtil.ROW_TYPE));
+
+    IcebergSink.forRowData(dataStream)
+        .setAll(flinkConf)
+        .table(table)
+        .tableLoader(tableLoader)
+        .rewriteDataFiles(compactionConfig)
+        .expireSnapshots(expireConfig)
+        .deleteOrphanFiles(orphanConfig)
+        .append();
+
+    env.execute("Test All Maintenance E2E");
+
+    table.refresh();
+    // Compaction should have merged the 3 data files into 1
+    List<DataFile> afterCompactDataFiles = 
getDataFiles(table.currentSnapshot(), table);
+    assertThat(afterCompactDataFiles).hasSize(1);
+
+    List<DataFile> preCompactDataFiles =
+        getDataFiles(table.snapshot(table.currentSnapshot().parentId()), 
table);
+    assertThat(preCompactDataFiles).hasSize(3);
+  }
 }

Reply via email to