This is an automated email from the ASF dual-hosted git repository.

pvary pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new 9534c9b3ad Flink: TableMaintenance Support Coordinator Lock (#15151)
9534c9b3ad is described below

commit 9534c9b3adc29d127ecc541ce131f49fd72f1980
Author: GuoYu <[email protected]>
AuthorDate: Tue Feb 24 23:03:42 2026 +0800

    Flink: TableMaintenance Support Coordinator Lock (#15151)
---
 .../flink/maintenance/api/TableMaintenance.java    | 132 +++-
 .../maintenance/operator/BaseCoordinator.java      | 306 ++++++++++
 .../maintenance/operator/LockRegisterEvent.java    |  47 ++
 .../maintenance/operator/LockReleaseEvent.java     |  56 ++
 .../operator/LockRemoverCoordinator.java           |  60 ++
 .../maintenance/operator/LockRemoverOperator.java  | 112 ++++
 .../operator/LockRemoverOperatorFactory.java       |  86 +++
 .../flink/maintenance/operator/TriggerManager.java |  23 +-
 .../operator/TriggerManagerCoordinator.java        |  59 ++
 ...gerManager.java => TriggerManagerOperator.java} | 301 +++++-----
 .../operator/TriggerManagerOperatorFactory.java    | 110 ++++
 .../flink/maintenance/operator/TriggerUtil.java    |  46 ++
 .../flink/maintenance/api/TestMaintenanceE2E.java  |  39 ++
 .../api/TestTableMaintenanceCoordinationLock.java  | 344 +++++++++++
 .../maintenance/operator/CoordinatorTestBase.java  |  43 ++
 .../operator/TestLockRemoveCoordinator.java        |  69 +++
 .../operator/TestLockRemoverOperation.java         | 207 +++++++
 .../operator/TestTriggerManagerCoordinator.java    | 102 ++++
 .../operator/TestTriggerManagerOperator.java       | 668 +++++++++++++++++++++
 19 files changed, 2607 insertions(+), 203 deletions(-)

diff --git 
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/TableMaintenance.java
 
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/TableMaintenance.java
index 1a2b0607dd..ab5159be12 100644
--- 
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/TableMaintenance.java
+++ 
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/TableMaintenance.java
@@ -23,6 +23,7 @@ import java.time.Duration;
 import java.util.List;
 import java.util.Locale;
 import java.util.UUID;
+import javax.annotation.Nullable;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.eventtime.TimestampAssigner;
@@ -43,10 +44,12 @@ import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.graph.StreamGraphGenerator;
 import org.apache.iceberg.flink.TableLoader;
 import org.apache.iceberg.flink.maintenance.operator.LockRemover;
+import 
org.apache.iceberg.flink.maintenance.operator.LockRemoverOperatorFactory;
 import org.apache.iceberg.flink.maintenance.operator.MonitorSource;
 import org.apache.iceberg.flink.maintenance.operator.TableChange;
 import org.apache.iceberg.flink.maintenance.operator.TriggerEvaluator;
 import org.apache.iceberg.flink.maintenance.operator.TriggerManager;
+import 
org.apache.iceberg.flink.maintenance.operator.TriggerManagerOperatorFactory;
 import org.apache.iceberg.flink.sink.IcebergSink;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
@@ -71,39 +74,77 @@ public class TableMaintenance {
    *
    * @param changeStream the table changes
    * @param tableLoader used for accessing the table
-   * @param lockFactory used for preventing concurrent task runs
+   * @param lockFactory used for preventing concurrent task runs, if null, use 
coordination lock.
    * @return builder for the maintenance stream
+   * @deprecated since 1.12.0, will be removed in 2.0.0. Use {@link 
#forChangeStream(DataStream,
+   *     TableLoader)} instead.
    */
+  @Deprecated
   @Internal
   public static Builder forChangeStream(
       DataStream<TableChange> changeStream,
       TableLoader tableLoader,
-      TriggerLockFactory lockFactory) {
+      @Nullable TriggerLockFactory lockFactory) {
     Preconditions.checkNotNull(changeStream, "The change stream should not be 
null");
     Preconditions.checkNotNull(tableLoader, "TableLoader should not be null");
-    Preconditions.checkNotNull(lockFactory, "LockFactory should not be null");
 
     return new Builder(null, changeStream, tableLoader, lockFactory);
   }
 
+  /**
+   * Use when the change stream is already provided, like in the {@link
+   * IcebergSink#addPostCommitTopology(DataStream)}.
+   *
+   * @param changeStream the table changes
+   * @param tableLoader used for accessing the table
+   * @return builder for the maintenance stream
+   */
+  @Internal
+  public static Builder forChangeStream(
+      DataStream<TableChange> changeStream, TableLoader tableLoader) {
+    Preconditions.checkNotNull(changeStream, "The change stream should not be 
null");
+    Preconditions.checkNotNull(tableLoader, "TableLoader should not be null");
+
+    return new Builder(null, changeStream, tableLoader, null);
+  }
+
   /**
    * Use this for standalone maintenance job. It creates a monitor source that 
detect table changes
    * and build the maintenance pipelines afterwards.
    *
    * @param env used to register the monitor source
    * @param tableLoader used for accessing the table
-   * @param lockFactory used for preventing concurrent task runs
+   * @param lockFactory used for preventing concurrent task runs. If null, use 
coordination lock.
    * @return builder for the maintenance stream
+   * @deprecated since 1.12.0, will be removed in 2.0.0. Use {@link
+   *     #forTable(StreamExecutionEnvironment, TableLoader)} instead.
    */
+  @Deprecated
   public static Builder forTable(
-      StreamExecutionEnvironment env, TableLoader tableLoader, 
TriggerLockFactory lockFactory) {
+      StreamExecutionEnvironment env,
+      TableLoader tableLoader,
+      @Nullable TriggerLockFactory lockFactory) {
     Preconditions.checkNotNull(env, "StreamExecutionEnvironment should not be 
null");
     Preconditions.checkNotNull(tableLoader, "TableLoader should not be null");
-    Preconditions.checkNotNull(lockFactory, "LockFactory should not be null");
 
     return new Builder(env, null, tableLoader, lockFactory);
   }
 
+  /**
+   * Use this for standalone maintenance job. It creates a monitor source that 
detect table changes
+   * and build the maintenance pipelines afterwards. But use coordination lock 
default.
+   *
+   * @param env used to register the monitor source
+   * @param tableLoader used for accessing the table
+   * @return builder for the maintenance stream
+   */
+  public static Builder forTable(StreamExecutionEnvironment env, TableLoader 
tableLoader) {
+    Preconditions.checkNotNull(env, "StreamExecutionEnvironment should not be 
null");
+    Preconditions.checkNotNull(tableLoader, "TableLoader should not be null");
+
+    return new Builder(env, null, tableLoader, null);
+  }
+
   public static class Builder {
     private final StreamExecutionEnvironment env;
     private final DataStream<TableChange> inputStream;
@@ -226,21 +267,43 @@ public class TableMaintenance {
       try (TableLoader loader = tableLoader.clone()) {
         loader.open();
         String tableName = loader.loadTable().name();
-        DataStream<Trigger> triggers =
-            DataStreamUtils.reinterpretAsKeyedStream(
-                    changeStream(tableName, loader), unused -> true)
-                .process(
-                    new TriggerManager(
-                        loader,
-                        lockFactory,
-                        taskNames,
-                        evaluators,
-                        rateLimit.toMillis(),
-                        lockCheckDelay.toMillis()))
-                .name(TRIGGER_MANAGER_OPERATOR_NAME)
-                .uid(TRIGGER_MANAGER_OPERATOR_NAME + uidSuffix)
-                .slotSharingGroup(slotSharingGroup)
-                .forceNonParallel()
+        DataStream<Trigger> triggers;
+        if (lockFactory == null) {
+          triggers =
+              DataStreamUtils.reinterpretAsKeyedStream(
+                      changeStream(tableName, loader), unused -> true)
+                  .transform(
+                      TRIGGER_MANAGER_OPERATOR_NAME,
+                      TypeInformation.of(Trigger.class),
+                      new TriggerManagerOperatorFactory(
+                          tableName,
+                          taskNames,
+                          evaluators,
+                          rateLimit.toMillis(),
+                          lockCheckDelay.toMillis()))
+                  .uid(TRIGGER_MANAGER_OPERATOR_NAME + uidSuffix)
+                  .slotSharingGroup(slotSharingGroup)
+                  .forceNonParallel();
+        } else {
+          triggers =
+              DataStreamUtils.reinterpretAsKeyedStream(
+                      changeStream(tableName, loader), unused -> true)
+                  .process(
+                      new TriggerManager(
+                          loader,
+                          lockFactory,
+                          taskNames,
+                          evaluators,
+                          rateLimit.toMillis(),
+                          lockCheckDelay.toMillis()))
+                  .name(TRIGGER_MANAGER_OPERATOR_NAME)
+                  .uid(TRIGGER_MANAGER_OPERATOR_NAME + uidSuffix)
+                  .slotSharingGroup(slotSharingGroup)
+                  .forceNonParallel();
+        }
+
+        triggers =
+            triggers
                 .assignTimestampsAndWatermarks(new 
PunctuatedWatermarkStrategy())
                 .name(WATERMARK_ASSIGNER_OPERATOR_NAME)
                 .uid(WATERMARK_ASSIGNER_OPERATOR_NAME + uidSuffix)
@@ -277,14 +340,25 @@ public class TableMaintenance {
         }
 
         // Add the LockRemover to the end
-        unioned
-            .transform(
-                LOCK_REMOVER_OPERATOR_NAME,
-                TypeInformation.of(Void.class),
-                new LockRemover(tableName, lockFactory, taskNames))
-            .forceNonParallel()
-            .uid("lock-remover-" + uidSuffix)
-            .slotSharingGroup(slotSharingGroup);
+        if (lockFactory == null) {
+          unioned
+              .transform(
+                  LOCK_REMOVER_OPERATOR_NAME,
+                  TypeInformation.of(Void.class),
+                  new LockRemoverOperatorFactory(tableName, taskNames))
+              .uid("lock-remover-" + uidSuffix)
+              .forceNonParallel()
+              .slotSharingGroup(slotSharingGroup);
+        } else {
+          unioned
+              .transform(
+                  LOCK_REMOVER_OPERATOR_NAME,
+                  TypeInformation.of(Void.class),
+                  new LockRemover(tableName, lockFactory, taskNames))
+              .forceNonParallel()
+              .uid("lock-remover-" + uidSuffix)
+              .slotSharingGroup(slotSharingGroup);
+        }
       }
     }
 
diff --git 
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/BaseCoordinator.java
 
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/BaseCoordinator.java
new file mode 100644
index 0000000000..21e9ca43f3
--- /dev/null
+++ 
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/BaseCoordinator.java
@@ -0,0 +1,306 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.maintenance.operator;
+
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import java.util.function.Consumer;
+import javax.annotation.Nonnull;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.util.FatalExitExceptionHandler;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Base coordinator for table maintenance operators. Provides common 
functionality for thread
+ * management, subtask gateway management, and checkpoint handling.
+ */
+abstract class BaseCoordinator implements OperatorCoordinator {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(BaseCoordinator.class);
+  private static final Map<String, Consumer<LockReleaseEvent>> 
LOCK_RELEASE_CONSUMERS =
+      Maps.newConcurrentMap();
+  private static final List<LockReleaseEvent> PENDING_RELEASE_EVENTS = 
Lists.newArrayList();
+
+  private final String operatorName;
+  private final Context context;
+  private final ExecutorService coordinatorExecutor;
+  private final CoordinatorExecutorThreadFactory coordinatorThreadFactory;
+  private final SubtaskGateways subtaskGateways;
+
+  private boolean started;
+
+  BaseCoordinator(String operatorName, Context context) {
+    this.operatorName = operatorName;
+    this.context = context;
+
+    this.coordinatorThreadFactory =
+        new CoordinatorExecutorThreadFactory(
+            "Coordinator-" + operatorName, context.getUserCodeClassloader());
+    this.coordinatorExecutor = 
Executors.newSingleThreadExecutor(coordinatorThreadFactory);
+    Preconditions.checkState(
+        context.currentParallelism() == 1, "Coordinator must run with 
parallelism 1");
+    this.subtaskGateways = SubtaskGateways.create(operatorName);
+    LOG.info("Created coordinator: {}", operatorName);
+  }
+
+  @SuppressWarnings("FutureReturnValueIgnored")
+  void registerLock(LockRegisterEvent lockRegisterEvent) {
+    LOCK_RELEASE_CONSUMERS.put(
+        lockRegisterEvent.lockId(),
+        lock -> {
+          LOG.info(
+              "Send release event for lock id {}, timestamp: {} to Operator 
{}",
+              lock.lockId(),
+              lock.timestamp(),
+              operatorName());
+          subtaskGateways.subtaskGateway().sendEvent(lock);
+        });
+
+    synchronized (PENDING_RELEASE_EVENTS) {
+      if (!PENDING_RELEASE_EVENTS.isEmpty()) {
+        PENDING_RELEASE_EVENTS.forEach(this::handleReleaseLock);
+        PENDING_RELEASE_EVENTS.clear();
+      }
+    }
+  }
+
+  void handleReleaseLock(LockReleaseEvent lockReleaseEvent) {
+    synchronized (PENDING_RELEASE_EVENTS) {
+      if (LOCK_RELEASE_CONSUMERS.containsKey(lockReleaseEvent.lockId())) {
+        
LOCK_RELEASE_CONSUMERS.get(lockReleaseEvent.lockId()).accept(lockReleaseEvent);
+        LOG.info(
+            "Send release event for lock id {}, timestamp: {}",
+            lockReleaseEvent.lockId(),
+            lockReleaseEvent.timestamp());
+      } else {
+        PENDING_RELEASE_EVENTS.add(lockReleaseEvent);
+        LOG.info(
+            "No consumer for lock id {}, timestamp: {}",
+            lockReleaseEvent.lockId(),
+            lockReleaseEvent.timestamp());
+      }
+    }
+  }
+
+  @Override
+  public void start() throws Exception {
+    LOG.info("Starting coordinator: {}", operatorName);
+    this.started = true;
+  }
+
+  @Override
+  public void close() throws Exception {
+    coordinatorExecutor.shutdown();
+    this.started = false;
+    synchronized (PENDING_RELEASE_EVENTS) {
+      LOCK_RELEASE_CONSUMERS.clear();
+      PENDING_RELEASE_EVENTS.clear();
+    }
+
+    LOG.info("Closed coordinator: {}", operatorName);
+  }
+
+  @Override
+  public void checkpointCoordinator(long checkpointId, 
CompletableFuture<byte[]> resultFuture) {
+    runInCoordinatorThread(
+        () -> resultFuture.complete(new byte[0]),
+        String.format(Locale.ROOT, "taking checkpoint %d", checkpointId));
+  }
+
+  @Override
+  public void resetToCheckpoint(long checkpointId, byte[] checkpointData) {
+    Preconditions.checkState(
+        !started, "The coordinator %s can only be reset if it was not yet 
started", operatorName);
+    LOG.info("Reset to checkpoint {}", checkpointId);
+    synchronized (PENDING_RELEASE_EVENTS) {
+      LOCK_RELEASE_CONSUMERS.clear();
+      PENDING_RELEASE_EVENTS.clear();
+    }
+  }
+
+  @Override
+  public void notifyCheckpointComplete(long checkpointId) {}
+
+  @Override
+  public void subtaskReset(int subtask, long checkpointId) {
+    runInCoordinatorThread(
+        () -> {
+          LOG.info("Subtask {} is reset to checkpoint {}", subtask, 
checkpointId);
+          
Preconditions.checkState(coordinatorThreadFactory.isCurrentThreadCoordinatorThread());
+          subtaskGateways.reset();
+        },
+        String.format(
+            Locale.ROOT, "handling subtask %d recovery to checkpoint %d", 
subtask, checkpointId));
+  }
+
+  @Override
+  public void executionAttemptFailed(int subtask, int attemptNumber, Throwable 
reason) {
+    runInCoordinatorThread(
+        () -> {
+          LOG.info(
+              "Unregistering gateway after failure for subtask {} (#{}) of 
data statistics {}",
+              subtask,
+              attemptNumber,
+              operatorName);
+          Preconditions.checkState(
+              
this.coordinatorThreadFactory.isCurrentThreadCoordinatorThread());
+          subtaskGateways.unregisterSubtaskGateway(subtask, attemptNumber);
+        },
+        String.format(Locale.ROOT, "handling subtask %d (#%d) failure", 
subtask, attemptNumber));
+  }
+
+  @Override
+  public void executionAttemptReady(int subtask, int attemptNumber, 
SubtaskGateway gateway) {
+    Preconditions.checkArgument(subtask == gateway.getSubtask());
+    Preconditions.checkArgument(attemptNumber == 
gateway.getExecution().getAttemptNumber());
+    runInCoordinatorThread(
+        () -> {
+          Preconditions.checkState(
+              
this.coordinatorThreadFactory.isCurrentThreadCoordinatorThread());
+          subtaskGateways.registerSubtaskGateway(gateway);
+        },
+        String.format(
+            Locale.ROOT,
+            "making event gateway to subtask %d (#%d) available",
+            subtask,
+            attemptNumber));
+  }
+
+  String operatorName() {
+    return operatorName;
+  }
+
+  void runInCoordinatorThread(Runnable runnable, String actionString) {
+    ensureStarted();
+    coordinatorExecutor.execute(
+        () -> {
+          try {
+            runnable.run();
+          } catch (Throwable t) {
+            LOG.error(
+                "Uncaught exception in coordinator while {}: {}", 
actionString, t.getMessage(), t);
+            context.failJob(t);
+          }
+        });
+  }
+
+  private void ensureStarted() {
+    Preconditions.checkState(started, "The coordinator has not started yet.");
+  }
+
+  /** Inner class to manage subtask gateways. */
+  private record SubtaskGateways(String operatorName, Map<Integer, 
SubtaskGateway> gateways) {
+
+    private static SubtaskGateways create(String operatorName) {
+      return new SubtaskGateways(operatorName, Maps.newHashMap());
+    }
+
+    private void registerSubtaskGateway(SubtaskGateway gateway) {
+      int attemptNumber = gateway.getExecution().getAttemptNumber();
+      Preconditions.checkState(
+          !gateways.containsKey(attemptNumber),
+          "Coordinator of %s already has a subtask gateway for (#%d)",
+          operatorName,
+          attemptNumber);
+      LOG.debug("Coordinator of {} registers gateway for attempt {}", 
operatorName, attemptNumber);
+      gateways.put(attemptNumber, gateway);
+      LOG.debug("Registered gateway for  attempt {}", attemptNumber);
+    }
+
+    private void unregisterSubtaskGateway(int subtaskIndex, int attemptNumber) 
{
+      gateways.remove(attemptNumber);
+      LOG.debug("Unregistered gateway for subtask {} attempt {}", 
subtaskIndex, attemptNumber);
+    }
+
+    private SubtaskGateway subtaskGateway() {
+      Preconditions.checkState(
+          !gateways.isEmpty(),
+          "Coordinator of %s is not ready yet to receive events",
+          operatorName);
+      return Iterables.getOnlyElement(gateways.values());
+    }
+
+    private void reset() {
+      gateways.clear();
+    }
+  }
+
+  /** Custom thread factory for the coordinator executor. */
+  private static class CoordinatorExecutorThreadFactory
+      implements ThreadFactory, Thread.UncaughtExceptionHandler {
+
+    private final String coordinatorThreadName;
+    private final ClassLoader classLoader;
+    private final Thread.UncaughtExceptionHandler errorHandler;
+
+    private Thread thread;
+
+    private CoordinatorExecutorThreadFactory(
+        String coordinatorThreadName, ClassLoader contextClassLoader) {
+      this(coordinatorThreadName, contextClassLoader, 
FatalExitExceptionHandler.INSTANCE);
+    }
+
+    private CoordinatorExecutorThreadFactory(
+        String coordinatorThreadName,
+        ClassLoader contextClassLoader,
+        Thread.UncaughtExceptionHandler errorHandler) {
+      this.coordinatorThreadName = coordinatorThreadName;
+      this.classLoader = contextClassLoader;
+      this.errorHandler = errorHandler;
+    }
+
+    @Override
+    public synchronized Thread newThread(@Nonnull Runnable runnable) {
+      thread = new Thread(runnable, coordinatorThreadName);
+      thread.setContextClassLoader(classLoader);
+      thread.setUncaughtExceptionHandler(this);
+      return thread;
+    }
+
+    @Override
+    public synchronized void uncaughtException(Thread t, Throwable e) {
+      errorHandler.uncaughtException(t, e);
+    }
+
+    private boolean isCurrentThreadCoordinatorThread() {
+      return Thread.currentThread() == thread;
+    }
+  }
+
+  @VisibleForTesting
+  List<LockReleaseEvent> pendingReleaseEvents() {
+    return PENDING_RELEASE_EVENTS;
+  }
+
+  @VisibleForTesting
+  ExecutorService coordinatorExecutor() {
+    return coordinatorExecutor;
+  }
+}
diff --git 
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/LockRegisterEvent.java
 
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/LockRegisterEvent.java
new file mode 100644
index 0000000000..0dcd15a665
--- /dev/null
+++ 
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/LockRegisterEvent.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.maintenance.operator;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
+
+/**
+ * Event sent from TriggerManagerOperator to TriggerManagerCoordinator to 
register a lock release
+ * handler. This handler will be used to forward lock release events back to 
the operator when
+ * triggered by downstream operators.
+ */
+@Internal
+public class LockRegisterEvent implements OperatorEvent {
+
+  private final String lockId;
+
+  public LockRegisterEvent(String lockId) {
+    this.lockId = lockId;
+  }
+
+  public String lockId() {
+    return lockId;
+  }
+
+  @Override
+  public String toString() {
+    return MoreObjects.toStringHelper(this).add("lockId", lockId).toString();
+  }
+}
diff --git 
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/LockReleaseEvent.java
 
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/LockReleaseEvent.java
new file mode 100644
index 0000000000..8c6e71ca97
--- /dev/null
+++ 
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/LockReleaseEvent.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.maintenance.operator;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
+
+/**
+ * Event sent from LockRemoverOperator to LockRemoverCoordinator to notify 
that a lock has been
+ * released. The LockRemoverCoordinator then forwards this event to the 
TriggerManagerOperator via
+ * the registered lock release handler.
+ */
+@Internal
+public class LockReleaseEvent implements OperatorEvent {
+
+  private final String lockId;
+  private final long timestamp;
+
+  public LockReleaseEvent(String lockId, long timestamp) {
+    this.lockId = lockId;
+    this.timestamp = timestamp;
+  }
+
+  public long timestamp() {
+    return timestamp;
+  }
+
+  public String lockId() {
+    return lockId;
+  }
+
+  @Override
+  public String toString() {
+    return MoreObjects.toStringHelper(this)
+        .add("lockId", lockId)
+        .add("timestamp", timestamp)
+        .toString();
+  }
+}
diff --git 
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/LockRemoverCoordinator.java
 
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/LockRemoverCoordinator.java
new file mode 100644
index 0000000000..39020cd5d3
--- /dev/null
+++ 
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/LockRemoverCoordinator.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.maintenance.operator;
+
+import java.util.Locale;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Coordinator for LockRemoverOperator. Handles lock release events from 
downstream operators. */
+class LockRemoverCoordinator extends BaseCoordinator {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(LockRemoverCoordinator.class);
+
+  LockRemoverCoordinator(String operatorName, Context context) {
+    super(operatorName, context);
+    LOG.info("Created LockRemoverCoordinator: {}", operatorName);
+  }
+
+  @Override
+  public void handleEventFromOperator(int subtask, int attemptNumber, 
OperatorEvent event) {
+    runInCoordinatorThread(
+        () -> {
+          LOG.debug(
+              "Handling event from subtask {} (#{}) of {}: {}",
+              subtask,
+              attemptNumber,
+              operatorName(),
+              event);
+          if (event instanceof LockReleaseEvent) {
+            handleReleaseLock((LockReleaseEvent) event);
+          } else {
+            throw new IllegalArgumentException(
+                "Invalid operator event type: " + 
event.getClass().getCanonicalName());
+          }
+        },
+        String.format(
+            Locale.ROOT,
+            "handling operator event %s from subtask %d (#%d)",
+            event.getClass(),
+            subtask,
+            attemptNumber));
+  }
+}
diff --git 
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/LockRemoverOperator.java
 
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/LockRemoverOperator.java
new file mode 100644
index 0000000000..51aae657e6
--- /dev/null
+++ 
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/LockRemoverOperator.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.maintenance.operator;
+
+import java.io.Serial;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
+import org.apache.flink.runtime.operators.coordination.OperatorEventHandler;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.iceberg.flink.maintenance.api.TaskResult;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class LockRemoverOperator extends AbstractStreamOperator<Void>
+    implements OneInputStreamOperator<TaskResult, Void>, OperatorEventHandler {
+
+  @Serial private static final long serialVersionUID = 1L;
+  private static final Logger LOG = 
LoggerFactory.getLogger(LockRemoverOperator.class);
+
+  private final String tableName;
+  private final OperatorEventGateway operatorEventGateway;
+  private final List<String> maintenanceTaskNames;
+  private transient List<Counter> succeededTaskResultCounters;
+  private transient List<Counter> failedTaskResultCounters;
+  private transient List<AtomicLong> taskLastRunDurationMs;
+
+  LockRemoverOperator(
+      StreamOperatorParameters<Void> parameters,
+      OperatorEventGateway operatorEventGateway,
+      String tableName,
+      List<String> maintenanceTaskNames) {
+    super(parameters);
+    this.tableName = tableName;
+    this.operatorEventGateway = operatorEventGateway;
+    this.maintenanceTaskNames = maintenanceTaskNames;
+  }
+
+  @Override
+  public void open() throws Exception {
+    this.succeededTaskResultCounters =
+        Lists.newArrayListWithExpectedSize(maintenanceTaskNames.size());
+    this.failedTaskResultCounters = 
Lists.newArrayListWithExpectedSize(maintenanceTaskNames.size());
+    this.taskLastRunDurationMs = 
Lists.newArrayListWithExpectedSize(maintenanceTaskNames.size());
+    for (int taskIndex = 0; taskIndex < maintenanceTaskNames.size(); 
++taskIndex) {
+      MetricGroup taskMetricGroup =
+          TableMaintenanceMetrics.groupFor(
+              getRuntimeContext(), tableName, 
maintenanceTaskNames.get(taskIndex), taskIndex);
+      succeededTaskResultCounters.add(
+          
taskMetricGroup.counter(TableMaintenanceMetrics.SUCCEEDED_TASK_COUNTER));
+      failedTaskResultCounters.add(
+          
taskMetricGroup.counter(TableMaintenanceMetrics.FAILED_TASK_COUNTER));
+      AtomicLong duration = new AtomicLong(0);
+      taskLastRunDurationMs.add(duration);
+      taskMetricGroup.gauge(TableMaintenanceMetrics.LAST_RUN_DURATION_MS, 
duration::get);
+    }
+  }
+
+  @Override
+  public void handleOperatorEvent(OperatorEvent event) {
+    // no incoming events
+  }
+
+  @SuppressWarnings("FutureReturnValueIgnored")
+  @Override
+  public void processElement(StreamRecord<TaskResult> streamRecord) {
+    TaskResult taskResult = streamRecord.getValue();
+    LOG.info(
+        "Processing result {} for task {}",
+        taskResult,
+        maintenanceTaskNames.get(taskResult.taskIndex()));
+    long duration = System.currentTimeMillis() - taskResult.startEpoch();
+    // Update the metrics
+    taskLastRunDurationMs.get(taskResult.taskIndex()).set(duration);
+    if (taskResult.success()) {
+      succeededTaskResultCounters.get(taskResult.taskIndex()).inc();
+    } else {
+      failedTaskResultCounters.get(taskResult.taskIndex()).inc();
+    }
+  }
+
+  @Override
+  public void processWatermark(Watermark mark) throws Exception {
+    operatorEventGateway.sendEventToCoordinator(
+        new LockReleaseEvent(tableName, mark.getTimestamp()));
+    super.processWatermark(mark);
+  }
+}
diff --git 
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/LockRemoverOperatorFactory.java
 
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/LockRemoverOperatorFactory.java
new file mode 100644
index 0000000000..b43c3f15fc
--- /dev/null
+++ 
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/LockRemoverOperatorFactory.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.maintenance.operator;
+
+import java.util.List;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
+import 
org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory;
+import org.apache.flink.streaming.api.operators.CoordinatedOperatorFactory;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
+import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
+import org.apache.iceberg.flink.maintenance.api.TaskResult;
+
+@Internal
+public class LockRemoverOperatorFactory extends 
AbstractStreamOperatorFactory<Void>
+    implements CoordinatedOperatorFactory<Void>, 
OneInputStreamOperatorFactory<TaskResult, Void> {
+  private final String tableName;
+  private final List<String> maintenanceTaskNames;
+
+  public LockRemoverOperatorFactory(String tableName, List<String> 
maintenanceTaskNames) {
+    this.tableName = tableName;
+    this.maintenanceTaskNames = maintenanceTaskNames;
+  }
+
+  @Override
+  public OperatorCoordinator.Provider getCoordinatorProvider(
+      String operatorName, OperatorID operatorID) {
+    return new LockRemoverCoordinatorProvider(operatorName, operatorID);
+  }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public <T extends StreamOperator<Void>> T createStreamOperator(
+      StreamOperatorParameters<Void> parameters) {
+    OperatorID operatorId = parameters.getStreamConfig().getOperatorID();
+    OperatorEventGateway gateway =
+        
parameters.getOperatorEventDispatcher().getOperatorEventGateway(operatorId);
+    LockRemoverOperator lockRemoverOperator =
+        new LockRemoverOperator(parameters, gateway, tableName, 
maintenanceTaskNames);
+    parameters.getOperatorEventDispatcher().registerEventHandler(operatorId, 
lockRemoverOperator);
+
+    return (T) lockRemoverOperator;
+  }
+
+  @SuppressWarnings("rawtypes")
+  @Override
+  public Class<? extends StreamOperator> getStreamOperatorClass(ClassLoader 
classLoader) {
+    return LockRemoverOperator.class;
+  }
+
+  private static class LockRemoverCoordinatorProvider
+      extends RecreateOnResetOperatorCoordinator.Provider {
+
+    private final String operatorName;
+
+    private LockRemoverCoordinatorProvider(String operatorName, OperatorID 
operatorID) {
+      super(operatorID);
+      this.operatorName = operatorName;
+    }
+
+    @Override
+    public OperatorCoordinator getCoordinator(OperatorCoordinator.Context 
context) {
+      return new LockRemoverCoordinator(operatorName, context);
+    }
+  }
+}
diff --git 
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TriggerManager.java
 
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TriggerManager.java
index f1f2b51c09..6d8326645d 100644
--- 
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TriggerManager.java
+++ 
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TriggerManager.java
@@ -226,7 +226,8 @@ public class TriggerManager extends 
KeyedProcessFunction<Boolean, TableChange, T
     }
 
     Integer taskToStart =
-        nextTrigger(evaluators, accumulatedChanges, lastTriggerTimes, current, 
startsFrom);
+        TriggerUtil.nextTrigger(
+            evaluators, accumulatedChanges, lastTriggerTimes, current, 
startsFrom);
     if (taskToStart == null) {
       // Nothing to execute
       if (!triggered) {
@@ -269,26 +270,6 @@ public class TriggerManager extends 
KeyedProcessFunction<Boolean, TableChange, T
     timerService.registerProcessingTimeTimer(time);
   }
 
-  private static Integer nextTrigger(
-      List<TriggerEvaluator> evaluators,
-      List<TableChange> changes,
-      List<Long> lastTriggerTimes,
-      long currentTime,
-      int startPos) {
-    int current = startPos;
-    do {
-      if (evaluators
-          .get(current)
-          .check(changes.get(current), lastTriggerTimes.get(current), 
currentTime)) {
-        return current;
-      }
-
-      current = (current + 1) % evaluators.size();
-    } while (current != startPos);
-
-    return null;
-  }
-
   private void init(Collector<Trigger> out, TimerService timerService) throws 
Exception {
     if (!inited) {
       long current = timerService.currentProcessingTime();
diff --git 
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TriggerManagerCoordinator.java
 
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TriggerManagerCoordinator.java
new file mode 100644
index 0000000000..b5af41da27
--- /dev/null
+++ 
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TriggerManagerCoordinator.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.maintenance.operator;
+
+import java.util.Locale;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class TriggerManagerCoordinator extends BaseCoordinator {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(TriggerManagerCoordinator.class);
+
+  TriggerManagerCoordinator(String operatorName, Context context) {
+    super(operatorName, context);
+    LOG.info("Created TriggerManagerCoordinator: {}", operatorName);
+  }
+
+  @Override
+  public void handleEventFromOperator(int subtask, int attemptNumber, 
OperatorEvent event) {
+    runInCoordinatorThread(
+        () -> {
+          LOG.debug(
+              "Handling event from subtask {} (#{}) of {}: {}",
+              subtask,
+              attemptNumber,
+              operatorName(),
+              event);
+          if (event instanceof LockRegisterEvent) {
+            registerLock((LockRegisterEvent) event);
+          } else {
+            throw new IllegalArgumentException(
+                "Invalid operator event type: " + 
event.getClass().getCanonicalName());
+          }
+        },
+        String.format(
+            Locale.ROOT,
+            "handling operator event %s from subtask %d (#%d)",
+            event.getClass(),
+            subtask,
+            attemptNumber));
+  }
+}
diff --git 
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TriggerManager.java
 
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TriggerManagerOperator.java
similarity index 51%
copy from 
flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TriggerManager.java
copy to 
flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TriggerManagerOperator.java
index f1f2b51c09..f29ac9670b 100644
--- 
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TriggerManager.java
+++ 
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TriggerManagerOperator.java
@@ -18,70 +18,63 @@
  */
 package org.apache.iceberg.flink.maintenance.operator;
 
-import java.io.IOException;
+import java.io.Serial;
 import java.util.List;
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.functions.OpenContext;
+import org.apache.flink.annotation.VisibleForTesting;
+import 
org.apache.flink.api.common.operators.ProcessingTimeService.ProcessingTimeCallback;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
-import org.apache.flink.api.common.state.ValueState;
-import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeinfo.Types;
 import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.MetricGroup;
-import org.apache.flink.runtime.state.FunctionInitializationContext;
-import org.apache.flink.runtime.state.FunctionSnapshotContext;
-import org.apache.flink.streaming.api.TimerService;
-import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
-import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
-import org.apache.flink.util.Collector;
-import org.apache.iceberg.flink.TableLoader;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
+import org.apache.flink.runtime.operators.coordination.OperatorEventHandler;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
 import org.apache.iceberg.flink.maintenance.api.Trigger;
-import org.apache.iceberg.flink.maintenance.api.TriggerLockFactory;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * TriggerManager starts the Maintenance Tasks by emitting {@link Trigger} 
messages which are
- * calculated based on the incoming {@link TableChange} messages. The 
TriggerManager keeps track of
- * the changes since the last run of the Maintenance Tasks and triggers a new 
run based on the
- * result of the {@link TriggerEvaluator}.
- *
- * <p>The TriggerManager prevents overlapping Maintenance Task runs using 
{@link
- * TriggerLockFactory.Lock}. The current implementation only handles conflicts 
within a single job.
- * Users should avoid scheduling maintenance for the same table in different 
Flink jobs.
- *
- * <p>The TriggerManager should run as a global operator. {@link 
KeyedProcessFunction} is used, so
- * the timer functions are available, but the key is not used.
+ * The TriggerManagerOperator itself holds the lock and registers a callback 
method with the
+ * coordinator. When a task finishes, it sends a signal from downstream to the 
coordinator to
+ * trigger this callback, allowing the TriggerManagerOperator to release the 
lock.
  */
-@Internal
-public class TriggerManager extends KeyedProcessFunction<Boolean, TableChange, 
Trigger>
-    implements CheckpointedFunction {
-  private static final Logger LOG = 
LoggerFactory.getLogger(TriggerManager.class);
+class TriggerManagerOperator extends AbstractStreamOperator<Trigger>
+    implements OneInputStreamOperator<TableChange, Trigger>,
+        OperatorEventHandler,
+        ProcessingTimeCallback {
 
-  private final String tableName;
-  private final TriggerLockFactory lockFactory;
+  @Serial private static final long serialVersionUID = 1L;
+  private static final Logger LOG = 
LoggerFactory.getLogger(TriggerManagerOperator.class);
+
+  private final OperatorEventGateway operatorEventGateway;
   private final List<String> maintenanceTaskNames;
   private final List<TriggerEvaluator> evaluators;
   private final long minFireDelayMs;
   private final long lockCheckDelayMs;
+  private final String tableName;
+
   private transient Counter rateLimiterTriggeredCounter;
   private transient Counter concurrentRunThrottledCounter;
   private transient Counter nothingToTriggerCounter;
   private transient List<Counter> triggerCounters;
-  private transient ValueState<Long> nextEvaluationTimeState;
+  private transient ListState<Long> nextEvaluationTimeState;
   private transient ListState<TableChange> accumulatedChangesState;
   private transient ListState<Long> lastTriggerTimesState;
   private transient Long nextEvaluationTime;
   private transient List<TableChange> accumulatedChanges;
   private transient List<Long> lastTriggerTimes;
-  private transient TriggerLockFactory.Lock lock;
-  private transient TriggerLockFactory.Lock recoveryLock;
-  private transient boolean shouldRestoreTasks = false;
-  private transient boolean inited = false;
   // To keep the task scheduling fair we keep the last triggered task position 
in memory.
   // If we find a task to trigger, then we run it, but after it is finished, 
we start from the given
   // position to prevent "starvation" of the tasks.
@@ -89,16 +82,18 @@ public class TriggerManager extends 
KeyedProcessFunction<Boolean, TableChange, T
   // be important (RewriteDataFiles first, and then RewriteManifestFiles later)
   private transient int startsFrom = 0;
   private transient boolean triggered = false;
+  private transient Long lockTime;
+  private transient boolean shouldRestoreTasks = false;
 
-  public TriggerManager(
-      TableLoader tableLoader,
-      TriggerLockFactory lockFactory,
+  TriggerManagerOperator(
+      StreamOperatorParameters<Trigger> parameters,
+      OperatorEventGateway operatorEventGateway,
       List<String> maintenanceTaskNames,
       List<TriggerEvaluator> evaluators,
       long minFireDelayMs,
-      long lockCheckDelayMs) {
-    Preconditions.checkNotNull(tableLoader, "Table loader should no be null");
-    Preconditions.checkNotNull(lockFactory, "Lock factory should no be null");
+      long lockCheckDelayMs,
+      String tableName) {
+    super(parameters);
     Preconditions.checkArgument(
         maintenanceTaskNames != null && !maintenanceTaskNames.isEmpty(),
         "Invalid maintenance task names: null or empty");
@@ -111,17 +106,17 @@ public class TriggerManager extends 
KeyedProcessFunction<Boolean, TableChange, T
     Preconditions.checkArgument(
         lockCheckDelayMs > 0, "Minimum lock delay rate should be at least 1 
ms.");
 
-    tableLoader.open();
-    this.tableName = tableLoader.loadTable().name();
-    this.lockFactory = lockFactory;
     this.maintenanceTaskNames = maintenanceTaskNames;
     this.evaluators = evaluators;
     this.minFireDelayMs = minFireDelayMs;
     this.lockCheckDelayMs = lockCheckDelayMs;
+    this.tableName = tableName;
+    this.operatorEventGateway = operatorEventGateway;
   }
 
   @Override
-  public void open(OpenContext parameters) throws Exception {
+  public void open() throws Exception {
+    super.open();
     MetricGroup mainGroup = 
TableMaintenanceMetrics.groupFor(getRuntimeContext(), tableName);
     this.rateLimiterTriggeredCounter =
         mainGroup.counter(TableMaintenanceMetrics.RATE_LIMITER_TRIGGERED);
@@ -135,98 +130,150 @@ public class TriggerManager extends 
KeyedProcessFunction<Boolean, TableChange, T
                   mainGroup, maintenanceTaskNames.get(taskIndex), taskIndex)
               .counter(TableMaintenanceMetrics.TRIGGERED));
     }
+  }
 
+  @Override
+  public void initializeState(StateInitializationContext context) throws 
Exception {
+    super.initializeState(context);
     this.nextEvaluationTimeState =
-        getRuntimeContext()
-            .getState(new 
ValueStateDescriptor<>("triggerManagerNextTriggerTime", Types.LONG));
+        context
+            .getOperatorStateStore()
+            .getListState(new 
ListStateDescriptor<>("triggerManagerNextTriggerTime", Types.LONG));
+
     this.accumulatedChangesState =
-        getRuntimeContext()
+        context
+            .getOperatorStateStore()
             .getListState(
                 new ListStateDescriptor<>(
                     "triggerManagerAccumulatedChange", 
TypeInformation.of(TableChange.class)));
+
     this.lastTriggerTimesState =
-        getRuntimeContext()
+        context
+            .getOperatorStateStore()
             .getListState(new 
ListStateDescriptor<>("triggerManagerLastTriggerTime", Types.LONG));
+
+    long current = getProcessingTimeService().getCurrentProcessingTime();
+
+    // Initialize from state
+    if (!Iterables.isEmpty(nextEvaluationTimeState.get())) {
+      nextEvaluationTime = 
Iterables.getOnlyElement(nextEvaluationTimeState.get());
+    }
+
+    this.accumulatedChanges = 
Lists.newArrayList(accumulatedChangesState.get());
+    this.lastTriggerTimes = Lists.newArrayList(lastTriggerTimesState.get());
+
+    // Initialize if the state was empty
+    if (accumulatedChanges.isEmpty()) {
+      for (int i = 0; i < evaluators.size(); ++i) {
+        accumulatedChanges.add(TableChange.empty());
+        lastTriggerTimes.add(current);
+      }
+    }
+
+    // register the lock register event
+    operatorEventGateway.sendEventToCoordinator(new 
LockRegisterEvent(tableName));
+
+    if (context.isRestored()) {
+      // When the job state is restored, there could be ongoing tasks.
+      // To prevent collision with the new triggers the following is done:
+      //  - add a recovery lock
+      // This ensures that the tasks of the previous trigger are executed, and 
the lock is removed
+      // in the end. The result of the 'tryLock' is ignored as an already 
existing lock prevents
+      // collisions as well.
+      // register the recover lock
+      this.lockTime = current;
+      this.shouldRestoreTasks = true;
+      output.collect(new StreamRecord<>(Trigger.recovery(current), current));
+      if (nextEvaluationTime == null) {
+        schedule(getProcessingTimeService(), current + minFireDelayMs);
+      } else {
+        schedule(getProcessingTimeService(), nextEvaluationTime);
+      }
+    } else {
+      this.lockTime = null;
+    }
   }
 
   @Override
-  public void snapshotState(FunctionSnapshotContext context) throws Exception {
-    if (inited) {
-      // Only store state if initialized
-      nextEvaluationTimeState.update(nextEvaluationTime);
-      accumulatedChangesState.update(accumulatedChanges);
-      lastTriggerTimesState.update(lastTriggerTimes);
-      LOG.info(
-          "Storing state: nextEvaluationTime {}, accumulatedChanges {}, 
lastTriggerTimes {}",
-          nextEvaluationTime,
-          accumulatedChanges,
-          lastTriggerTimes);
-    } else {
-      LOG.info("Not initialized, state is not stored");
+  public void snapshotState(StateSnapshotContext context) throws Exception {
+    nextEvaluationTimeState.clear();
+    if (nextEvaluationTime != null) {
+      nextEvaluationTimeState.add(nextEvaluationTime);
     }
+
+    accumulatedChangesState.update(accumulatedChanges);
+    lastTriggerTimesState.update(lastTriggerTimes);
+    LOG.info(
+        "Storing state: nextEvaluationTime {}, accumulatedChanges {}, 
lastTriggerTimes {}",
+        nextEvaluationTime,
+        accumulatedChanges,
+        lastTriggerTimes);
   }
 
   @Override
-  public void initializeState(FunctionInitializationContext context) throws 
Exception {
-    LOG.info("Initializing state restored: {}", context.isRestored());
-    lockFactory.open();
-    this.lock = lockFactory.createLock();
-    this.recoveryLock = lockFactory.createRecoveryLock();
-    if (context.isRestored()) {
-      shouldRestoreTasks = true;
+  public void handleOperatorEvent(OperatorEvent event) {
+    if (event instanceof LockReleaseEvent) {
+      LOG.info("Received lock released event: {}", event);
+      handleLockRelease((LockReleaseEvent) event);
     } else {
-      lock.unlock();
-      recoveryLock.unlock();
+      throw new IllegalArgumentException(
+          "Invalid operator event type: " + 
event.getClass().getCanonicalName());
     }
   }
 
   @Override
-  public void processElement(TableChange change, Context ctx, 
Collector<Trigger> out)
-      throws Exception {
-    init(out, ctx.timerService());
-
+  public void processElement(StreamRecord<TableChange> streamRecord) throws 
Exception {
+    TableChange change = streamRecord.getValue();
     accumulatedChanges.forEach(tableChange -> tableChange.merge(change));
-
-    long current = ctx.timerService().currentProcessingTime();
     if (nextEvaluationTime == null) {
-      checkAndFire(current, ctx.timerService(), out);
+      checkAndFire(getProcessingTimeService());
     } else {
       LOG.info(
-          "Trigger manager rate limiter triggered current: {}, next: {}, 
accumulated changes: {}",
-          current,
+          "Trigger manager rate limiter triggered current: {}, next: {}, 
accumulated changes: {},{}",
+          getProcessingTimeService().getCurrentProcessingTime(),
           nextEvaluationTime,
-          accumulatedChanges);
+          accumulatedChanges,
+          maintenanceTaskNames);
       rateLimiterTriggeredCounter.inc();
     }
   }
 
   @Override
-  public void onTimer(long timestamp, OnTimerContext ctx, Collector<Trigger> 
out) throws Exception {
-    init(out, ctx.timerService());
+  public void onProcessingTime(long l) {
     this.nextEvaluationTime = null;
-    checkAndFire(ctx.timerService().currentProcessingTime(), 
ctx.timerService(), out);
+    checkAndFire(getProcessingTimeService());
   }
 
   @Override
-  public void close() throws IOException {
-    lockFactory.close();
+  public void close() throws Exception {
+    super.close();
+    this.lockTime = null;
+  }
+
+  @VisibleForTesting
+  void handleLockRelease(LockReleaseEvent event) {
+    Preconditions.checkArgument(lockTime != null, "Lock time is null, Can't 
release lock");
+
+    if (event.timestamp() >= lockTime) {
+      this.lockTime = null;
+      this.shouldRestoreTasks = false;
+    }
   }
 
-  private void checkAndFire(long current, TimerService timerService, 
Collector<Trigger> out) {
+  private void checkAndFire(ProcessingTimeService timerService) {
+    long current = timerService.getCurrentProcessingTime();
     if (shouldRestoreTasks) {
-      if (recoveryLock.isHeld()) {
+      if (lockTime != null) {
         // Recovered tasks in progress. Skip trigger check
-        LOG.debug("The recovery lock is still held at {}", current);
+        LOG.info("The recovery lock is still held at {}", current);
         schedule(timerService, current + lockCheckDelayMs);
         return;
-      } else {
-        LOG.info("The recovery is finished at {}", current);
-        shouldRestoreTasks = false;
       }
     }
 
     Integer taskToStart =
-        nextTrigger(evaluators, accumulatedChanges, lastTriggerTimes, current, 
startsFrom);
+        TriggerUtil.nextTrigger(
+            evaluators, accumulatedChanges, lastTriggerTimes, current, 
startsFrom);
     if (taskToStart == null) {
       // Nothing to execute
       if (!triggered) {
@@ -242,9 +289,10 @@ public class TriggerManager extends 
KeyedProcessFunction<Boolean, TableChange, T
       return;
     }
 
-    if (lock.tryLock()) {
+    if (lockTime == null) {
+      this.lockTime = current;
       TableChange change = accumulatedChanges.get(taskToStart);
-      out.collect(Trigger.create(current, taskToStart));
+      output.collect(new StreamRecord<>(Trigger.create(current, taskToStart), 
current));
       LOG.debug("Fired event with time: {}, collected: {} for {}", current, 
change, tableName);
       triggerCounters.get(taskToStart).inc();
       accumulatedChanges.set(taskToStart, TableChange.empty());
@@ -260,68 +308,15 @@ public class TriggerManager extends 
KeyedProcessFunction<Boolean, TableChange, T
       concurrentRunThrottledCounter.inc();
       schedule(timerService, current + lockCheckDelayMs);
     }
-
-    timerService.registerProcessingTimeTimer(nextEvaluationTime);
   }
 
-  private void schedule(TimerService timerService, long time) {
+  private void schedule(ProcessingTimeService timerService, long time) {
     this.nextEvaluationTime = time;
-    timerService.registerProcessingTimeTimer(time);
+    timerService.registerTimer(time, this);
   }
 
-  private static Integer nextTrigger(
-      List<TriggerEvaluator> evaluators,
-      List<TableChange> changes,
-      List<Long> lastTriggerTimes,
-      long currentTime,
-      int startPos) {
-    int current = startPos;
-    do {
-      if (evaluators
-          .get(current)
-          .check(changes.get(current), lastTriggerTimes.get(current), 
currentTime)) {
-        return current;
-      }
-
-      current = (current + 1) % evaluators.size();
-    } while (current != startPos);
-
-    return null;
-  }
-
-  private void init(Collector<Trigger> out, TimerService timerService) throws 
Exception {
-    if (!inited) {
-      long current = timerService.currentProcessingTime();
-
-      // Initialize from state
-      this.nextEvaluationTime = nextEvaluationTimeState.value();
-      this.accumulatedChanges = 
Lists.newArrayList(accumulatedChangesState.get());
-      this.lastTriggerTimes = Lists.newArrayList(lastTriggerTimesState.get());
-
-      // Initialize if the state was empty
-      if (accumulatedChanges.isEmpty()) {
-        for (int i = 0; i < evaluators.size(); ++i) {
-          accumulatedChanges.add(TableChange.empty());
-          lastTriggerTimes.add(current);
-        }
-      }
-
-      if (shouldRestoreTasks) {
-        // When the job state is restored, there could be ongoing tasks.
-        // To prevent collision with the new triggers the following is done:
-        //  - add a recovery lock
-        //  - fire a recovery trigger
-        // This ensures that the tasks of the previous trigger are executed, 
and the lock is removed
-        // in the end. The result of the 'tryLock' is ignored as an already 
existing lock prevents
-        // collisions as well.
-        recoveryLock.tryLock();
-        out.collect(Trigger.recovery(current));
-        if (nextEvaluationTime == null) {
-          schedule(timerService, current + minFireDelayMs);
-        }
-      }
-
-      inited = true;
-    }
+  @VisibleForTesting
+  Long lockTime() {
+    return lockTime;
   }
 }
diff --git 
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TriggerManagerOperatorFactory.java
 
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TriggerManagerOperatorFactory.java
new file mode 100644
index 0000000000..bace5e3afe
--- /dev/null
+++ 
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TriggerManagerOperatorFactory.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.maintenance.operator;
+
+import java.util.List;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
+import 
org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory;
+import org.apache.flink.streaming.api.operators.CoordinatedOperatorFactory;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
+import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
+import org.apache.iceberg.flink.maintenance.api.Trigger;
+
+@Internal
+public class TriggerManagerOperatorFactory extends 
AbstractStreamOperatorFactory<Trigger>
+    implements CoordinatedOperatorFactory<Trigger>,
+        OneInputStreamOperatorFactory<TableChange, Trigger> {
+
+  private final String lockId;
+  private final List<String> maintenanceTaskNames;
+  private final List<TriggerEvaluator> evaluators;
+  private final long minFireDelayMs;
+  private final long lockCheckDelayMs;
+
+  public TriggerManagerOperatorFactory(
+      String lockId,
+      List<String> maintenanceTaskNames,
+      List<TriggerEvaluator> evaluators,
+      long minFireDelayMs,
+      long lockCheckDelayMs) {
+    this.lockId = lockId;
+    this.maintenanceTaskNames = maintenanceTaskNames;
+    this.evaluators = evaluators;
+    this.minFireDelayMs = minFireDelayMs;
+    this.lockCheckDelayMs = lockCheckDelayMs;
+  }
+
+  @Override
+  public OperatorCoordinator.Provider getCoordinatorProvider(
+      String operatorName, OperatorID operatorID) {
+    return new TriggerManagerCoordinatorProvider(operatorName, operatorID);
+  }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public <T extends StreamOperator<Trigger>> T createStreamOperator(
+      StreamOperatorParameters<Trigger> parameters) {
+    OperatorID operatorId = parameters.getStreamConfig().getOperatorID();
+    OperatorEventGateway gateway =
+        
parameters.getOperatorEventDispatcher().getOperatorEventGateway(operatorId);
+
+    TriggerManagerOperator triggerManagerOperator =
+        new TriggerManagerOperator(
+            parameters,
+            gateway,
+            maintenanceTaskNames,
+            evaluators,
+            minFireDelayMs,
+            lockCheckDelayMs,
+            lockId);
+
+    parameters
+        .getOperatorEventDispatcher()
+        .registerEventHandler(operatorId, triggerManagerOperator);
+
+    return (T) triggerManagerOperator;
+  }
+
+  @SuppressWarnings("rawtypes")
+  @Override
+  public Class<? extends StreamOperator> getStreamOperatorClass(ClassLoader 
classLoader) {
+    return TriggerManagerOperator.class;
+  }
+
+  private static class TriggerManagerCoordinatorProvider
+      extends RecreateOnResetOperatorCoordinator.Provider {
+
+    private final String operatorName;
+
+    private TriggerManagerCoordinatorProvider(String operatorName, OperatorID 
operatorID) {
+      super(operatorID);
+      this.operatorName = operatorName;
+    }
+
+    @Override
+    public OperatorCoordinator getCoordinator(OperatorCoordinator.Context 
context) {
+      return new TriggerManagerCoordinator(operatorName, context);
+    }
+  }
+}
diff --git 
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TriggerUtil.java
 
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TriggerUtil.java
new file mode 100644
index 0000000000..634e9a0d03
--- /dev/null
+++ 
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TriggerUtil.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.maintenance.operator;
+
+import java.util.List;
+
+class TriggerUtil {
+
+  private TriggerUtil() {}
+
+  static Integer nextTrigger(
+      List<TriggerEvaluator> evaluators,
+      List<TableChange> changes,
+      List<Long> lastTriggerTimes,
+      long currentTime,
+      int startPos) {
+    int current = startPos;
+    do {
+      if (evaluators
+          .get(current)
+          .check(changes.get(current), lastTriggerTimes.get(current), 
currentTime)) {
+        return current;
+      }
+
+      current = (current + 1) % evaluators.size();
+    } while (current != startPos);
+
+    return null;
+  }
+}
diff --git 
a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestMaintenanceE2E.java
 
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestMaintenanceE2E.java
index 0a860fec47..fe8457167a 100644
--- 
a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestMaintenanceE2E.java
+++ 
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestMaintenanceE2E.java
@@ -77,4 +77,43 @@ class TestMaintenanceE2E extends OperatorTestBase {
       closeJobClient(jobClient);
     }
   }
+
+  @Test
+  void testE2eUseCoordinator() throws Exception {
+    TableMaintenance.forTable(env, tableLoader())
+        .uidSuffix("E2eTestUID")
+        .rateLimit(Duration.ofMinutes(10))
+        .lockCheckDelay(Duration.ofSeconds(10))
+        .add(
+            ExpireSnapshots.builder()
+                .scheduleOnCommitCount(10)
+                .maxSnapshotAge(Duration.ofMinutes(10))
+                .retainLast(5)
+                .deleteBatchSize(5)
+                .parallelism(8))
+        .add(
+            RewriteDataFiles.builder()
+                .scheduleOnDataFileCount(10)
+                .partialProgressEnabled(true)
+                .partialProgressMaxCommits(10)
+                .maxRewriteBytes(1000L)
+                .targetFileSizeBytes(1000L)
+                .minFileSizeBytes(1000L)
+                .maxFileSizeBytes(1000L)
+                .minInputFiles(10)
+                .deleteFileThreshold(10)
+                .rewriteAll(false)
+                .maxFileGroupSizeBytes(1000L))
+        .append();
+
+    JobClient jobClient = null;
+    try {
+      jobClient = env.executeAsync();
+
+      // Just make sure that we are able to instantiate the flow
+      assertThat(jobClient).isNotNull();
+    } finally {
+      closeJobClient(jobClient);
+    }
+  }
 }
diff --git 
a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestTableMaintenanceCoordinationLock.java
 
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestTableMaintenanceCoordinationLock.java
new file mode 100644
index 0000000000..eb5479045b
--- /dev/null
+++ 
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestTableMaintenanceCoordinationLock.java
@@ -0,0 +1,344 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.maintenance.api;
+
+import static org.apache.iceberg.flink.SimpleDataUtil.createRowData;
+import static 
org.apache.iceberg.flink.maintenance.api.TableMaintenance.SOURCE_OPERATOR_NAME_PREFIX;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.Serializable;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.List;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.transformations.SourceTransformation;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.flink.FlinkSchemaUtil;
+import org.apache.iceberg.flink.TableLoader;
+import org.apache.iceberg.flink.maintenance.operator.ManualSource;
+import org.apache.iceberg.flink.maintenance.operator.OperatorTestBase;
+import org.apache.iceberg.flink.maintenance.operator.TableChange;
+import org.apache.iceberg.flink.sink.FlinkSink;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.awaitility.Awaitility;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+class TestTableMaintenanceCoordinationLock extends OperatorTestBase {
+  private static final String MAINTENANCE_TASK_NAME = "TestTableMaintenance";
+  private static final String[] TASKS =
+      new String[] {MAINTENANCE_TASK_NAME + " [0]", MAINTENANCE_TASK_NAME + " 
[1]"};
+  private static final TableChange DUMMY_CHANGE = 
TableChange.builder().commitCount(1).build();
+  private static final List<Trigger> PROCESSED =
+      Collections.synchronizedList(Lists.newArrayListWithCapacity(1));
+
+  private StreamExecutionEnvironment env;
+  private Table table;
+
+  @TempDir private File checkpointDir;
+
+  @BeforeEach
+  void beforeEach() throws IOException {
+    Configuration config = new Configuration();
+    config.set(CheckpointingOptions.CHECKPOINT_STORAGE, "filesystem");
+    config.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, "file://" + 
checkpointDir.getPath());
+    this.env = StreamExecutionEnvironment.getExecutionEnvironment(config);
+    this.table = createTable();
+    insert(table, 1, "a");
+
+    PROCESSED.clear();
+    MaintenanceTaskBuilderForTest.counter = 0;
+  }
+
+  @Test
+  void testForChangeStream() throws Exception {
+    ManualSource<TableChange> schedulerSource =
+        new ManualSource<>(env, TypeInformation.of(TableChange.class));
+
+    TableMaintenance.Builder streamBuilder =
+        TableMaintenance.forChangeStream(schedulerSource.dataStream(), 
tableLoader())
+            .rateLimit(Duration.ofMillis(2))
+            .lockCheckDelay(Duration.ofSeconds(3))
+            .add(
+                new MaintenanceTaskBuilderForTest(true)
+                    .scheduleOnCommitCount(1)
+                    .scheduleOnDataFileCount(2)
+                    .scheduleOnDataFileSize(3L)
+                    .scheduleOnEqDeleteFileCount(4)
+                    .scheduleOnEqDeleteRecordCount(5L)
+                    .scheduleOnPosDeleteFileCount(6)
+                    .scheduleOnPosDeleteRecordCount(7L)
+                    .scheduleOnInterval(Duration.ofHours(1)));
+
+    sendEvents(schedulerSource, streamBuilder, 
ImmutableList.of(Tuple2.of(DUMMY_CHANGE, 1)));
+  }
+
+  @Test
+  void testForTable() throws Exception {
+    TableLoader tableLoader = tableLoader();
+
+    env.enableCheckpointing(10);
+
+    TableMaintenance.forTable(env, tableLoader)
+        .rateLimit(Duration.ofMillis(2))
+        .maxReadBack(2)
+        .add(new MaintenanceTaskBuilderForTest(true).scheduleOnCommitCount(2))
+        .append();
+
+    // Creating a stream for inserting data into the table concurrently
+    ManualSource<RowData> insertSource =
+        new ManualSource<>(env, 
InternalTypeInfo.of(FlinkSchemaUtil.convert(table.schema())));
+    FlinkSink.forRowData(insertSource.dataStream())
+        .tableLoader(tableLoader)
+        .uidPrefix(UID_SUFFIX + "-iceberg-sink")
+        .append();
+
+    JobClient jobClient = null;
+    try {
+      jobClient = env.executeAsync();
+
+      insertSource.sendRecord(createRowData(2, "b"));
+
+      Awaitility.await().until(() -> PROCESSED.size() == 1);
+    } finally {
+      closeJobClient(jobClient);
+    }
+  }
+
+  @Test
+  void testUidAndSlotSharingGroup() throws IOException {
+    TableMaintenance.forChangeStream(
+            new ManualSource<>(env, 
TypeInformation.of(TableChange.class)).dataStream(),
+            tableLoader())
+        .uidSuffix(UID_SUFFIX)
+        .slotSharingGroup(SLOT_SHARING_GROUP)
+        .add(
+            new MaintenanceTaskBuilderForTest(true)
+                .scheduleOnCommitCount(1)
+                .uidSuffix(UID_SUFFIX)
+                .slotSharingGroup(SLOT_SHARING_GROUP))
+        .append();
+
+    checkUidsAreSet(env, UID_SUFFIX);
+    checkSlotSharingGroupsAreSet(env, SLOT_SHARING_GROUP);
+  }
+
+  @Test
+  void testUidAndSlotSharingGroupUnset() throws IOException {
+    TableMaintenance.forChangeStream(
+            new ManualSource<>(env, 
TypeInformation.of(TableChange.class)).dataStream(),
+            tableLoader())
+        .add(new MaintenanceTaskBuilderForTest(true).scheduleOnCommitCount(1))
+        .append();
+
+    checkUidsAreSet(env, null);
+    checkSlotSharingGroupsAreSet(env, null);
+  }
+
+  @Test
+  void testUidAndSlotSharingGroupInherit() throws IOException {
+    TableMaintenance.forChangeStream(
+            new ManualSource<>(env, 
TypeInformation.of(TableChange.class)).dataStream(),
+            tableLoader())
+        .uidSuffix(UID_SUFFIX)
+        .slotSharingGroup(SLOT_SHARING_GROUP)
+        .add(new MaintenanceTaskBuilderForTest(true).scheduleOnCommitCount(1))
+        .append();
+
+    checkUidsAreSet(env, UID_SUFFIX);
+    checkSlotSharingGroupsAreSet(env, SLOT_SHARING_GROUP);
+  }
+
+  @Test
+  void testUidAndSlotSharingGroupOverWrite() throws IOException {
+    String anotherUid = "Another-UID";
+    String anotherSlotSharingGroup = "Another-SlotSharingGroup";
+    TableMaintenance.forChangeStream(
+            new ManualSource<>(env, 
TypeInformation.of(TableChange.class)).dataStream(),
+            tableLoader())
+        .uidSuffix(UID_SUFFIX)
+        .slotSharingGroup(SLOT_SHARING_GROUP)
+        .add(
+            new MaintenanceTaskBuilderForTest(true)
+                .scheduleOnCommitCount(1)
+                .uidSuffix(anotherUid)
+                .slotSharingGroup(anotherSlotSharingGroup))
+        .append();
+
+    // Choose an operator from the scheduler part of the graph
+    Transformation<?> schedulerTransformation =
+        env.getTransformations().stream()
+            .filter(t -> t.getName().equals("Trigger manager"))
+            .findFirst()
+            .orElseThrow();
+    assertThat(schedulerTransformation.getUid()).contains(UID_SUFFIX);
+    assertThat(schedulerTransformation.getSlotSharingGroup()).isPresent();
+    assertThat(schedulerTransformation.getSlotSharingGroup().get().getName())
+        .isEqualTo(SLOT_SHARING_GROUP);
+
+    // Choose an operator from the maintenance task part of the graph
+    Transformation<?> scheduledTransformation =
+        env.getTransformations().stream()
+            .filter(t -> t.getName().startsWith(MAINTENANCE_TASK_NAME))
+            .findFirst()
+            .orElseThrow();
+    assertThat(scheduledTransformation.getUid()).contains(anotherUid);
+    assertThat(scheduledTransformation.getSlotSharingGroup()).isPresent();
+    assertThat(scheduledTransformation.getSlotSharingGroup().get().getName())
+        .isEqualTo(anotherSlotSharingGroup);
+  }
+
+  @Test
+  void testUidAndSlotSharingGroupForMonitorSource() throws IOException {
+    TableMaintenance.forTable(env, tableLoader())
+        .uidSuffix(UID_SUFFIX)
+        .slotSharingGroup(SLOT_SHARING_GROUP)
+        .add(
+            new MaintenanceTaskBuilderForTest(true)
+                .scheduleOnCommitCount(1)
+                .uidSuffix(UID_SUFFIX)
+                .slotSharingGroup(SLOT_SHARING_GROUP))
+        .append();
+
+    Transformation<?> source = monitorSource();
+    assertThat(source).isNotNull();
+    assertThat(source.getUid()).contains(UID_SUFFIX);
+    assertThat(source.getSlotSharingGroup()).isPresent();
+    
assertThat(source.getSlotSharingGroup().get().getName()).isEqualTo(SLOT_SHARING_GROUP);
+
+    checkUidsAreSet(env, UID_SUFFIX);
+    checkSlotSharingGroupsAreSet(env, SLOT_SHARING_GROUP);
+  }
+
+  /**
+   * Sends the events though the {@link ManualSource} provided, and waits 
until the given number of
+   * records are processed.
+   *
+   * @param schedulerSource used for sending the events
+   * @param streamBuilder used for generating the job
+   * @param eventsAndResultNumbers the pair of the event and the expected 
processed records
+   * @throws Exception if any
+   */
+  private void sendEvents(
+      ManualSource<TableChange> schedulerSource,
+      TableMaintenance.Builder streamBuilder,
+      List<Tuple2<TableChange, Integer>> eventsAndResultNumbers)
+      throws Exception {
+    streamBuilder.append();
+
+    JobClient jobClient = null;
+    try {
+      jobClient = env.executeAsync();
+
+      eventsAndResultNumbers.forEach(
+          eventsAndResultNumber -> {
+            int expectedSize = PROCESSED.size() + eventsAndResultNumber.f1;
+            schedulerSource.sendRecord(eventsAndResultNumber.f0);
+            Awaitility.await().until(() -> PROCESSED.size() == expectedSize);
+          });
+    } finally {
+      closeJobClient(jobClient);
+    }
+  }
+
+  /**
+   * Finds the {@link 
org.apache.iceberg.flink.maintenance.operator.MonitorSource} for testing
+   * purposes by parsing the transformation tree.
+   *
+   * @return The monitor source if we found it
+   */
+  private Transformation<?> monitorSource() {
+    assertThat(env.getTransformations()).isNotEmpty();
+    assertThat(env.getTransformations().get(0).getInputs()).isNotEmpty();
+    
assertThat(env.getTransformations().get(0).getInputs().get(0).getInputs()).isNotEmpty();
+
+    Transformation<?> result =
+        env.getTransformations().get(0).getInputs().get(0).getInputs().get(0);
+
+    // Some checks to make sure this is the transformation we are looking for
+    assertThat(result).isInstanceOf(SourceTransformation.class);
+    assertThat(result.getName()).startsWith(SOURCE_OPERATOR_NAME_PREFIX);
+
+    return result;
+  }
+
+  private static class MaintenanceTaskBuilderForTest
+      extends MaintenanceTaskBuilder<MaintenanceTaskBuilderForTest> {
+    private final boolean success;
+    private final int id;
+    private static int counter = 0;
+
+    MaintenanceTaskBuilderForTest(boolean success) {
+      this.success = success;
+      this.id = counter;
+      ++counter;
+    }
+
+    @Override
+    String maintenanceTaskName() {
+      return MAINTENANCE_TASK_NAME;
+    }
+
+    @Override
+    DataStream<TaskResult> append(DataStream<Trigger> trigger) {
+      String name = TASKS[id];
+      return trigger
+          .map(new DummyMaintenanceTask(success))
+          .name(name)
+          .uid(uidSuffix() + "-test-mapper-" + name + "-" + id)
+          .slotSharingGroup(slotSharingGroup())
+          .forceNonParallel();
+    }
+  }
+
+  private record DummyMaintenanceTask(boolean success)
+      implements MapFunction<Trigger, TaskResult>, 
ResultTypeQueryable<TaskResult>, Serializable {
+
+    @Override
+    public TaskResult map(Trigger trigger) {
+      PROCESSED.add(trigger);
+
+      return new TaskResult(
+          trigger.taskId(),
+          trigger.timestamp(),
+          success,
+          success ? Collections.emptyList() : Lists.newArrayList(new 
Exception("Testing error")));
+    }
+
+    @Override
+    public TypeInformation<TaskResult> getProducedType() {
+      return TypeInformation.of(TaskResult.class);
+    }
+  }
+}
diff --git 
a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/CoordinatorTestBase.java
 
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/CoordinatorTestBase.java
new file mode 100644
index 0000000000..5bfc889f27
--- /dev/null
+++ 
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/CoordinatorTestBase.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.maintenance.operator;
+
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.operators.coordination.EventReceivingTasks;
+import org.junit.jupiter.api.Timeout;
+
+@Timeout(value = 10)
+class CoordinatorTestBase extends OperatorTestBase {
+  protected static final String OPERATOR_NAME = "TestCoordinator";
+  protected static final String OPERATOR_NAME_1 = "TestCoordinator_1";
+  protected static final OperatorID TEST_OPERATOR_ID = new OperatorID(1234L, 
5678L);
+  protected static final OperatorID TEST_OPERATOR_ID_1 = new OperatorID(1235L, 
5679L);
+  protected static final int NUM_SUBTASKS = 1;
+  protected static final LockRegisterEvent LOCK_REGISTER_EVENT =
+      new LockRegisterEvent(DUMMY_TABLE_NAME);
+  protected static final LockReleaseEvent LOCK_RELEASE_EVENT =
+      new LockReleaseEvent(DUMMY_TABLE_NAME, 1L);
+
+  protected static void setAllTasksReady(
+      BaseCoordinator baseCoordinator, EventReceivingTasks receivingTasks) {
+    for (int i = 0; i < NUM_SUBTASKS; i++) {
+      baseCoordinator.executionAttemptReady(i, 0, 
receivingTasks.createGatewayForSubtask(i, 0));
+    }
+  }
+}
diff --git 
a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestLockRemoveCoordinator.java
 
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestLockRemoveCoordinator.java
new file mode 100644
index 0000000000..3427d2abe0
--- /dev/null
+++ 
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestLockRemoveCoordinator.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.maintenance.operator;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.util.concurrent.ExecutionException;
+import org.apache.flink.runtime.operators.coordination.EventReceivingTasks;
+import 
org.apache.flink.runtime.operators.coordination.MockOperatorCoordinatorContext;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.api.parallel.Execution;
+import org.junit.jupiter.api.parallel.ExecutionMode;
+
+@Timeout(value = 10)
+@Execution(ExecutionMode.SAME_THREAD)
+class TestLockRemoveCoordinator extends CoordinatorTestBase {
+
+  private EventReceivingTasks receivingTasks;
+
+  @BeforeEach
+  void before() {
+    this.receivingTasks = EventReceivingTasks.createForRunningTasks();
+  }
+
+  @Test
+  void testEventHandling() throws Exception {
+    try (LockRemoverCoordinator lockRemoverCoordinator = createCoordinator()) {
+
+      lockRemoverCoordinator.start();
+
+      setAllTasksReady(lockRemoverCoordinator, receivingTasks);
+
+      lockRemoverCoordinator.handleReleaseLock(LOCK_RELEASE_EVENT);
+      assertThat(lockRemoverCoordinator.pendingReleaseEvents()).hasSize(1);
+    }
+  }
+
+  private LockRemoverCoordinator createCoordinator() {
+    return new LockRemoverCoordinator(
+        OPERATOR_NAME, new MockOperatorCoordinatorContext(TEST_OPERATOR_ID, 
1)) {
+      @Override
+      void runInCoordinatorThread(Runnable runnable, String actionString) {
+        try {
+          coordinatorExecutor().submit(runnable).get();
+        } catch (InterruptedException | ExecutionException e) {
+          throw new RuntimeException(actionString, e);
+        }
+      }
+    };
+  }
+}
diff --git 
a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestLockRemoverOperation.java
 
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestLockRemoverOperation.java
new file mode 100644
index 0000000000..9ba95cab30
--- /dev/null
+++ 
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestLockRemoverOperation.java
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.maintenance.operator;
+
+import static 
org.apache.iceberg.flink.maintenance.operator.TableMaintenanceMetrics.FAILED_TASK_COUNTER;
+import static 
org.apache.iceberg.flink.maintenance.operator.TableMaintenanceMetrics.LAST_RUN_DURATION_MS;
+import static 
org.apache.iceberg.flink.maintenance.operator.TableMaintenanceMetrics.SUCCEEDED_TASK_COUNTER;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.IOException;
+import java.util.List;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import 
org.apache.flink.runtime.operators.coordination.MockOperatorCoordinatorContext;
+import 
org.apache.flink.runtime.operators.coordination.MockOperatorEventGateway;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.iceberg.flink.maintenance.api.TaskResult;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.awaitility.Awaitility;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+@Timeout(value = 10)
+class TestLockRemoverOperation extends OperatorTestBase {
+  private static final String[] TASKS = new String[] {"task0", "task1", 
"task2"};
+  private static final String OPERATOR_NAME = "TestCoordinator";
+  private static final OperatorID TEST_OPERATOR_ID = new OperatorID(1234L, 
5678L);
+
+  private LockRemoverCoordinator lockRemoverCoordinator;
+
+  @BeforeEach
+  void before() {
+    MetricsReporterFactoryForTests.reset();
+    this.lockRemoverCoordinator = createCoordinator();
+    try {
+      lockRemoverCoordinator.start();
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @AfterEach
+  void after() throws IOException {
+    super.after();
+    try {
+      lockRemoverCoordinator.close();
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Test
+  void testProcess() throws Exception {
+    MockOperatorEventGateway mockGateway = new MockOperatorEventGateway();
+    LockRemoverOperator operator =
+        new LockRemoverOperator(null, mockGateway, DUMMY_TASK_NAME, 
Lists.newArrayList(TASKS[0]));
+    try (OneInputStreamOperatorTestHarness<TaskResult, Void> testHarness =
+        createHarness(operator)) {
+
+      testHarness.processElement(
+          new StreamRecord<>(new TaskResult(0, 0L, true, 
Lists.newArrayList())));
+      assertThat(mockGateway.getEventsSent()).hasSize(0);
+
+      testHarness.processWatermark(WATERMARK);
+      assertThat(mockGateway.getEventsSent()).hasSize(1);
+    }
+  }
+
+  @Test
+  void testMetrics() throws Exception {
+    StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+    ManualSource<TaskResult> source = new ManualSource<>(env, 
TypeInformation.of(TaskResult.class));
+    source
+        .dataStream()
+        .transform(
+            DUMMY_TASK_NAME,
+            TypeInformation.of(Void.class),
+            new LockRemoverOperatorFactory(DUMMY_TABLE_NAME, 
Lists.newArrayList(TASKS)))
+        .forceNonParallel();
+
+    JobClient jobClient = null;
+    long time = System.currentTimeMillis();
+    try {
+      jobClient = env.executeAsync();
+      // Start the 2 successful and one failed result trigger for task1, and 3 
successful for task2
+      processAndCheck(source, new TaskResult(0, time, true, 
Lists.newArrayList()));
+      processAndCheck(source, new TaskResult(1, 0L, true, 
Lists.newArrayList()));
+      processAndCheck(source, new TaskResult(1, 0L, true, 
Lists.newArrayList()));
+      processAndCheck(source, new TaskResult(0, time, false, 
Lists.newArrayList()));
+      processAndCheck(source, new TaskResult(0, time, true, 
Lists.newArrayList()));
+      processAndCheck(source, new TaskResult(1, 0L, true, 
Lists.newArrayList()));
+
+      Awaitility.await()
+          .until(
+              () ->
+                  MetricsReporterFactoryForTests.counter(
+                          ImmutableList.of(
+                              DUMMY_TASK_NAME,
+                              DUMMY_TABLE_NAME,
+                              TASKS[1],
+                              "1",
+                              SUCCEEDED_TASK_COUNTER))
+                      .equals(3L));
+
+      // Final check all the counters
+      MetricsReporterFactoryForTests.assertCounters(
+          new ImmutableMap.Builder<List<String>, Long>()
+              .put(
+                  ImmutableList.of(
+                      DUMMY_TASK_NAME, DUMMY_TABLE_NAME, TASKS[0], "0", 
SUCCEEDED_TASK_COUNTER),
+                  2L)
+              .put(
+                  ImmutableList.of(
+                      DUMMY_TASK_NAME, DUMMY_TABLE_NAME, TASKS[0], "0", 
FAILED_TASK_COUNTER),
+                  1L)
+              .put(
+                  ImmutableList.of(
+                      DUMMY_TASK_NAME, DUMMY_TABLE_NAME, TASKS[1], "1", 
SUCCEEDED_TASK_COUNTER),
+                  3L)
+              .put(
+                  ImmutableList.of(
+                      DUMMY_TASK_NAME, DUMMY_TABLE_NAME, TASKS[1], "1", 
FAILED_TASK_COUNTER),
+                  0L)
+              .put(
+                  ImmutableList.of(
+                      DUMMY_TASK_NAME, DUMMY_TABLE_NAME, TASKS[2], "2", 
SUCCEEDED_TASK_COUNTER),
+                  0L)
+              .put(
+                  ImmutableList.of(
+                      DUMMY_TASK_NAME, DUMMY_TABLE_NAME, TASKS[2], "2", 
FAILED_TASK_COUNTER),
+                  0L)
+              .build());
+
+      assertThat(
+              MetricsReporterFactoryForTests.gauge(
+                  ImmutableList.of(
+                      DUMMY_TASK_NAME, DUMMY_TABLE_NAME, TASKS[0], "0", 
LAST_RUN_DURATION_MS)))
+          .isPositive();
+      assertThat(
+              MetricsReporterFactoryForTests.gauge(
+                  ImmutableList.of(
+                      DUMMY_TASK_NAME, DUMMY_TABLE_NAME, TASKS[1], "1", 
LAST_RUN_DURATION_MS)))
+          .isGreaterThan(time);
+      assertThat(
+              MetricsReporterFactoryForTests.gauge(
+                  ImmutableList.of(
+                      DUMMY_TASK_NAME, DUMMY_TABLE_NAME, TASKS[2], "2", 
LAST_RUN_DURATION_MS)))
+          .isZero();
+    } finally {
+      closeJobClient(jobClient);
+    }
+  }
+
+  private void processAndCheck(ManualSource<TaskResult> source, TaskResult 
input) {
+    List<String> counterKey =
+        ImmutableList.of(
+            DUMMY_TASK_NAME,
+            DUMMY_TABLE_NAME,
+            TASKS[input.taskIndex()],
+            String.valueOf(input.taskIndex()),
+            input.success() ? SUCCEEDED_TASK_COUNTER : FAILED_TASK_COUNTER);
+    Long counterValue = MetricsReporterFactoryForTests.counter(counterKey);
+    Long expected = counterValue != null ? counterValue + 1 : 1L;
+
+    source.sendRecord(input);
+    source.sendWatermark(input.startEpoch());
+
+    Awaitility.await()
+        .until(() -> 
expected.equals(MetricsReporterFactoryForTests.counter(counterKey)));
+  }
+
+  private OneInputStreamOperatorTestHarness<TaskResult, Void> createHarness(
+      LockRemoverOperator lockRemoverOperator) throws Exception {
+    OneInputStreamOperatorTestHarness<TaskResult, Void> harness =
+        new OneInputStreamOperatorTestHarness<>(lockRemoverOperator);
+    harness.open();
+    return harness;
+  }
+
+  private static LockRemoverCoordinator createCoordinator() {
+    return new LockRemoverCoordinator(
+        OPERATOR_NAME, new MockOperatorCoordinatorContext(TEST_OPERATOR_ID, 
1));
+  }
+}
diff --git 
a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestTriggerManagerCoordinator.java
 
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestTriggerManagerCoordinator.java
new file mode 100644
index 0000000000..5fc6695f3e
--- /dev/null
+++ 
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestTriggerManagerCoordinator.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.maintenance.operator;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.util.concurrent.ExecutionException;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.operators.coordination.EventReceivingTasks;
+import 
org.apache.flink.runtime.operators.coordination.MockOperatorCoordinatorContext;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.api.parallel.Execution;
+import org.junit.jupiter.api.parallel.ExecutionMode;
+
+@Timeout(value = 10)
+@Execution(ExecutionMode.SAME_THREAD)
+class TestTriggerManagerCoordinator extends CoordinatorTestBase {
+
+  private EventReceivingTasks receivingTasks;
+  private EventReceivingTasks receivingTasks1;
+
+  @BeforeEach
+  void before() {
+    this.receivingTasks = EventReceivingTasks.createForRunningTasks();
+    this.receivingTasks1 = EventReceivingTasks.createForRunningTasks();
+  }
+
+  @Test
+  void testEventHandling() throws Exception {
+    try (TriggerManagerCoordinator triggerManagerCoordinator =
+            createCoordinator(OPERATOR_NAME, TEST_OPERATOR_ID);
+        TriggerManagerCoordinator triggerManagerCoordinator1 =
+            createCoordinator(OPERATOR_NAME_1, TEST_OPERATOR_ID_1)) {
+
+      triggerManagerCoordinator.start();
+      triggerManagerCoordinator1.start();
+
+      setAllTasksReady(triggerManagerCoordinator, receivingTasks);
+      setAllTasksReady(triggerManagerCoordinator1, receivingTasks1);
+
+      triggerManagerCoordinator.handleEventFromOperator(0, 0, 
LOCK_REGISTER_EVENT);
+      
assertThat(receivingTasks.getSentEventsForSubtask(0).size()).isEqualTo(0);
+
+      // release lock from coordinator1 and get one event from coordinator
+      triggerManagerCoordinator1.handleReleaseLock(LOCK_RELEASE_EVENT);
+      
assertThat(receivingTasks.getSentEventsForSubtask(0).size()).isEqualTo(1);
+      
assertThat(receivingTasks1.getSentEventsForSubtask(0).size()).isEqualTo(0);
+    }
+  }
+
+  @Test
+  void testEventArriveBeforeRegister() throws Exception {
+    try (TriggerManagerCoordinator triggerManagerCoordinator =
+        createCoordinator(OPERATOR_NAME, TEST_OPERATOR_ID)) {
+
+      triggerManagerCoordinator.start();
+      setAllTasksReady(triggerManagerCoordinator, receivingTasks);
+
+      // release event arrive before register
+      triggerManagerCoordinator.handleReleaseLock(LOCK_RELEASE_EVENT);
+      assertThat(triggerManagerCoordinator.pendingReleaseEvents()).hasSize(1);
+
+      triggerManagerCoordinator.handleEventFromOperator(0, 0, 
LOCK_REGISTER_EVENT);
+      
assertThat(receivingTasks.getSentEventsForSubtask(0).size()).isEqualTo(1);
+
+      assertThat(triggerManagerCoordinator.pendingReleaseEvents()).hasSize(0);
+    }
+  }
+
+  private static TriggerManagerCoordinator createCoordinator(
+      String operatorName, OperatorID operatorID) {
+    return new TriggerManagerCoordinator(
+        operatorName, new MockOperatorCoordinatorContext(operatorID, 1)) {
+      @Override
+      void runInCoordinatorThread(Runnable runnable, String actionString) {
+        try {
+          coordinatorExecutor().submit(runnable).get();
+        } catch (InterruptedException | ExecutionException e) {
+          throw new RuntimeException(actionString, e);
+        }
+      }
+    };
+  }
+}
diff --git 
a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestTriggerManagerOperator.java
 
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestTriggerManagerOperator.java
new file mode 100644
index 0000000000..ea7d8b9625
--- /dev/null
+++ 
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestTriggerManagerOperator.java
@@ -0,0 +1,668 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.maintenance.operator;
+
+import static 
org.apache.iceberg.flink.maintenance.operator.TableMaintenanceMetrics.CONCURRENT_RUN_THROTTLED;
+import static 
org.apache.iceberg.flink.maintenance.operator.TableMaintenanceMetrics.NOTHING_TO_TRIGGER;
+import static 
org.apache.iceberg.flink.maintenance.operator.TableMaintenanceMetrics.RATE_LIMITER_TRIGGERED;
+import static 
org.apache.iceberg.flink.maintenance.operator.TableMaintenanceMetrics.TRIGGERED;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.List;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import 
org.apache.flink.runtime.operators.coordination.MockOperatorCoordinatorContext;
+import 
org.apache.flink.runtime.operators.coordination.MockOperatorEventGateway;
+import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.flink.maintenance.api.Trigger;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.awaitility.Awaitility;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+class TestTriggerManagerOperator extends OperatorTestBase {
+  private static final long DELAY = 10L;
+  private static final String OPERATOR_NAME = "TestCoordinator";
+  private static final OperatorID TEST_OPERATOR_ID = new OperatorID(1234L, 
5678L);
+  private static final String[] TASKS = new String[] {"task0", "task1"};
+  private long processingTime = 0L;
+  private String tableName;
+  private TriggerManagerCoordinator triggerManagerCoordinator;
+  private LockReleaseEvent lockReleaseEvent;
+
+  @BeforeEach
+  void before() {
+    super.before();
+    Table table = createTable();
+    this.tableName = table.name();
+    lockReleaseEvent = new LockReleaseEvent(tableName, 1L);
+    this.triggerManagerCoordinator = createCoordinator();
+    try {
+      triggerManagerCoordinator.start();
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @AfterEach
+  void after() throws IOException {
+    super.after();
+    try {
+      triggerManagerCoordinator.close();
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Test
+  void testCommitCount() throws Exception {
+    MockOperatorEventGateway mockGateway = new MockOperatorEventGateway();
+    TriggerManagerOperator operator =
+        createOperator(new TriggerEvaluator.Builder().commitCount(3).build(), 
mockGateway);
+    try (OneInputStreamOperatorTestHarness<TableChange, Trigger> testHarness =
+        createHarness(operator)) {
+
+      addEventAndCheckResult(
+          operator, testHarness, TableChange.builder().commitCount(1).build(), 
0);
+      addEventAndCheckResult(
+          operator, testHarness, TableChange.builder().commitCount(2).build(), 
1);
+      addEventAndCheckResult(
+          operator, testHarness, TableChange.builder().commitCount(3).build(), 
2);
+      addEventAndCheckResult(
+          operator, testHarness, 
TableChange.builder().commitCount(10).build(), 3);
+
+      // No trigger in this case
+      addEventAndCheckResult(
+          operator, testHarness, TableChange.builder().commitCount(1).build(), 
3);
+      addEventAndCheckResult(
+          operator, testHarness, TableChange.builder().commitCount(1).build(), 
3);
+
+      addEventAndCheckResult(
+          operator, testHarness, TableChange.builder().commitCount(1).build(), 
4);
+    }
+  }
+
+  @Test
+  void testDataFileCount() throws Exception {
+    TriggerManagerOperator operator =
+        createOperator(
+            new TriggerEvaluator.Builder().dataFileCount(3).build(),
+            new MockOperatorEventGateway());
+    try (OneInputStreamOperatorTestHarness<TableChange, Trigger> testHarness =
+        createHarness(operator)) {
+      addEventAndCheckResult(
+          operator, testHarness, 
TableChange.builder().dataFileCount(1).build(), 0);
+
+      addEventAndCheckResult(
+          operator, testHarness, 
TableChange.builder().dataFileCount(2).build(), 1);
+      addEventAndCheckResult(
+          operator, testHarness, 
TableChange.builder().dataFileCount(3).build(), 2);
+      addEventAndCheckResult(
+          operator, testHarness, 
TableChange.builder().dataFileCount(5).build(), 3);
+
+      // No trigger in this case
+      addEventAndCheckResult(
+          operator, testHarness, 
TableChange.builder().dataFileCount(1).build(), 3);
+
+      addEventAndCheckResult(
+          operator, testHarness, 
TableChange.builder().dataFileCount(2).build(), 4);
+    }
+  }
+
+  @Test
+  void testDataFileSizeInBytes() throws Exception {
+    TriggerManagerOperator operator =
+        createOperator(
+            new TriggerEvaluator.Builder().dataFileSizeInBytes(3).build(),
+            new MockOperatorEventGateway());
+    try (OneInputStreamOperatorTestHarness<TableChange, Trigger> testHarness =
+        createHarness(operator)) {
+
+      addEventAndCheckResult(
+          operator, testHarness, 
TableChange.builder().dataFileSizeInBytes(1L).build(), 0);
+      addEventAndCheckResult(
+          operator, testHarness, 
TableChange.builder().dataFileSizeInBytes(2L).build(), 1);
+      addEventAndCheckResult(
+          operator, testHarness, 
TableChange.builder().dataFileSizeInBytes(5L).build(), 2);
+
+      // No trigger in this case
+      addEventAndCheckResult(
+          operator, testHarness, 
TableChange.builder().dataFileSizeInBytes(1L).build(), 2);
+
+      addEventAndCheckResult(
+          operator, testHarness, 
TableChange.builder().dataFileSizeInBytes(2L).build(), 3);
+    }
+  }
+
+  @Test
+  void testPosDeleteFileCount() throws Exception {
+    TriggerManagerOperator operator =
+        createOperator(
+            new TriggerEvaluator.Builder().posDeleteFileCount(3).build(),
+            new MockOperatorEventGateway());
+    try (OneInputStreamOperatorTestHarness<TableChange, Trigger> testHarness =
+        createHarness(operator)) {
+      addEventAndCheckResult(
+          operator, testHarness, 
TableChange.builder().posDeleteFileCount(1).build(), 0);
+      addEventAndCheckResult(
+          operator, testHarness, 
TableChange.builder().posDeleteFileCount(2).build(), 1);
+      addEventAndCheckResult(
+          operator, testHarness, 
TableChange.builder().posDeleteFileCount(3).build(), 2);
+      addEventAndCheckResult(
+          operator, testHarness, 
TableChange.builder().posDeleteFileCount(10).build(), 3);
+
+      // No trigger in this case
+      addEventAndCheckResult(
+          operator, testHarness, 
TableChange.builder().posDeleteFileCount(1).build(), 3);
+      addEventAndCheckResult(
+          operator, testHarness, 
TableChange.builder().posDeleteFileCount(1).build(), 3);
+
+      addEventAndCheckResult(
+          operator, testHarness, 
TableChange.builder().posDeleteFileCount(1).build(), 4);
+    }
+  }
+
+  @Test
+  void testPosDeleteRecordCount() throws Exception {
+
+    TriggerManagerOperator operator =
+        createOperator(
+            new TriggerEvaluator.Builder().posDeleteRecordCount(3).build(),
+            new MockOperatorEventGateway());
+    try (OneInputStreamOperatorTestHarness<TableChange, Trigger> testHarness =
+        createHarness(operator)) {
+      addEventAndCheckResult(
+          operator, testHarness, 
TableChange.builder().posDeleteRecordCount(1L).build(), 0);
+      addEventAndCheckResult(
+          operator, testHarness, 
TableChange.builder().posDeleteRecordCount(2L).build(), 1);
+      addEventAndCheckResult(
+          operator, testHarness, 
TableChange.builder().posDeleteRecordCount(5L).build(), 2);
+
+      // No trigger in this case
+      addEventAndCheckResult(
+          operator, testHarness, 
TableChange.builder().posDeleteRecordCount(1L).build(), 2);
+
+      addEventAndCheckResult(
+          operator, testHarness, 
TableChange.builder().posDeleteRecordCount(2L).build(), 3);
+    }
+  }
+
+  @Test
+  void testEqDeleteFileCount() throws Exception {
+    TriggerManagerOperator operator =
+        createOperator(
+            new TriggerEvaluator.Builder().eqDeleteFileCount(3).build(),
+            new MockOperatorEventGateway());
+    try (OneInputStreamOperatorTestHarness<TableChange, Trigger> testHarness =
+        createHarness(operator)) {
+
+      addEventAndCheckResult(
+          operator, testHarness, 
TableChange.builder().eqDeleteFileCount(1).build(), 0);
+      addEventAndCheckResult(
+          operator, testHarness, 
TableChange.builder().eqDeleteFileCount(2).build(), 1);
+      addEventAndCheckResult(
+          operator, testHarness, 
TableChange.builder().eqDeleteFileCount(3).build(), 2);
+      addEventAndCheckResult(
+          operator, testHarness, 
TableChange.builder().eqDeleteFileCount(10).build(), 3);
+
+      // No trigger in this case
+      addEventAndCheckResult(
+          operator, testHarness, 
TableChange.builder().eqDeleteFileCount(1).build(), 3);
+      addEventAndCheckResult(
+          operator, testHarness, 
TableChange.builder().eqDeleteFileCount(1).build(), 3);
+
+      addEventAndCheckResult(
+          operator, testHarness, 
TableChange.builder().eqDeleteFileCount(1).build(), 4);
+    }
+  }
+
+  @Test
+  void testEqDeleteRecordCount() throws Exception {
+    TriggerManagerOperator operator =
+        createOperator(
+            new TriggerEvaluator.Builder().eqDeleteRecordCount(3).build(),
+            new MockOperatorEventGateway());
+    try (OneInputStreamOperatorTestHarness<TableChange, Trigger> testHarness =
+        createHarness(operator)) {
+      addEventAndCheckResult(
+          operator, testHarness, 
TableChange.builder().eqDeleteRecordCount(1L).build(), 0);
+      addEventAndCheckResult(
+          operator, testHarness, 
TableChange.builder().eqDeleteRecordCount(2L).build(), 1);
+      addEventAndCheckResult(
+          operator, testHarness, 
TableChange.builder().eqDeleteRecordCount(5L).build(), 2);
+
+      // No trigger in this case
+      addEventAndCheckResult(
+          operator, testHarness, 
TableChange.builder().eqDeleteRecordCount(1L).build(), 2);
+
+      addEventAndCheckResult(
+          operator, testHarness, 
TableChange.builder().eqDeleteRecordCount(2L).build(), 3);
+    }
+  }
+
+  @Test
+  void testTimeout() throws Exception {
+    TriggerManagerOperator operator =
+        createOperator(
+            new 
TriggerEvaluator.Builder().timeout(Duration.ofSeconds(1)).build(),
+            new MockOperatorEventGateway());
+    try (OneInputStreamOperatorTestHarness<TableChange, Trigger> testHarness =
+        createHarness(operator)) {
+      TableChange event = 
TableChange.builder().dataFileCount(1).commitCount(1).build();
+
+      // Wait for some time
+      testHarness.processElement(event, EVENT_TIME);
+      assertThat(testHarness.extractOutputValues()).isEmpty();
+
+      // Wait for the timeout to expire
+      long newTime = EVENT_TIME + Duration.ofSeconds(1).toMillis();
+      testHarness.setProcessingTime(newTime);
+      testHarness.processElement(event, newTime);
+      assertThat(testHarness.extractOutputValues()).hasSize(1);
+
+      // Remove the lock to allow the next trigger
+      operator.handleLockRelease(new LockReleaseEvent(tableName, newTime));
+
+      // Send a new event
+      testHarness.setProcessingTime(newTime + 1);
+      testHarness.processElement(event, newTime);
+
+      // No trigger yet
+      assertThat(testHarness.extractOutputValues()).hasSize(1);
+
+      // Send a new event
+      newTime += Duration.ofSeconds(1).toMillis();
+      testHarness.setProcessingTime(newTime);
+      testHarness.processElement(event, newTime);
+
+      // New trigger should arrive
+      assertThat(testHarness.extractOutputValues()).hasSize(2);
+    }
+  }
+
+  @Test
+  void testStateRestore() throws Exception {
+    OperatorSubtaskState state;
+    TriggerManagerOperator operator =
+        createOperator(
+            new TriggerEvaluator.Builder().commitCount(2).build(), new 
MockOperatorEventGateway());
+    try (OneInputStreamOperatorTestHarness<TableChange, Trigger> testHarness =
+        createHarness(operator)) {
+      testHarness.processElement(
+          TableChange.builder().dataFileCount(1).commitCount(1).build(), 
EVENT_TIME);
+
+      assertThat(testHarness.extractOutputValues()).isEmpty();
+
+      state = testHarness.snapshot(1, EVENT_TIME);
+    }
+
+    // Restore the state, write some more data, create a checkpoint, check the 
data which is written
+    TriggerManagerOperator newOperator =
+        createOperator(
+            new TriggerEvaluator.Builder().commitCount(2).build(), new 
MockOperatorEventGateway());
+    try (OneInputStreamOperatorTestHarness<TableChange, Trigger> testHarness =
+        new OneInputStreamOperatorTestHarness<>(newOperator)) {
+      testHarness.initializeState(state);
+      testHarness.open();
+
+      // Mock a recovery trigger lock
+      assertTriggers(
+          testHarness.extractOutputValues(),
+          
Lists.newArrayList(Trigger.recovery(testHarness.getProcessingTime())));
+
+      testHarness.processElement(TableChange.builder().commitCount(1).build(), 
EVENT_TIME_2);
+
+      // Remove the lock to allow the next trigger
+      newOperator.handleOperatorEvent(lockReleaseEvent);
+      testHarness.setProcessingTime(EVENT_TIME_2);
+
+      // At this point the output contains the recovery trigger and the real 
trigger
+      assertThat(testHarness.extractOutputValues()).hasSize(2);
+    }
+  }
+
+  @Test
+  void testMinFireDelay() throws Exception {
+    TriggerManagerOperator operator =
+        createOperator(
+            tableName,
+            new TriggerEvaluator.Builder().commitCount(2).build(),
+            new MockOperatorEventGateway(),
+            DELAY,
+            1);
+    try (OneInputStreamOperatorTestHarness<TableChange, Trigger> testHarness =
+        createHarness(operator)) {
+      testHarness.open();
+
+      addEventAndCheckResult(
+          operator, testHarness, TableChange.builder().commitCount(2).build(), 
1);
+      long currentTime = testHarness.getProcessingTime();
+
+      // No new fire yet
+      addEventAndCheckResult(
+          operator, testHarness, TableChange.builder().commitCount(2).build(), 
1);
+
+      // Check that the trigger fired after the delay
+      testHarness.setProcessingTime(currentTime + DELAY);
+      assertThat(testHarness.extractOutputValues()).hasSize(2);
+    }
+  }
+
+  @Test
+  void testLockCheckDelay() throws Exception {
+    TriggerManagerOperator operator =
+        createOperator(
+            tableName,
+            new TriggerEvaluator.Builder().commitCount(2).build(),
+            new MockOperatorEventGateway(),
+            1,
+            DELAY);
+    try (OneInputStreamOperatorTestHarness<TableChange, Trigger> testHarness =
+        createHarness(operator)) {
+      testHarness.open();
+
+      // Create a lock to prevent execution, and check that there is no result
+      addEventAndCheckResult(
+          operator, testHarness, TableChange.builder().commitCount(2).build(), 
1, false);
+      addEventAndCheckResult(
+          operator, testHarness, TableChange.builder().commitCount(2).build(), 
1, false);
+      long currentTime = testHarness.getProcessingTime();
+
+      // Remove the lock, and still no trigger
+      operator.handleOperatorEvent(lockReleaseEvent);
+      assertThat(testHarness.extractOutputValues()).hasSize(1);
+
+      // Check that the trigger fired after the delay
+      testHarness.setProcessingTime(currentTime + DELAY);
+      assertThat(testHarness.extractOutputValues()).hasSize(2);
+    }
+  }
+
+  @Test
+  void testTriggerMetrics() throws Exception {
+    StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+    ManualSource<TableChange> source =
+        new ManualSource<>(env, TypeInformation.of(TableChange.class));
+    CollectingSink<Trigger> sink = new CollectingSink<>();
+
+    TriggerManagerOperatorFactory triggerManagerOperatorFactory =
+        new TriggerManagerOperatorFactory(
+            tableName,
+            Lists.newArrayList(TASKS),
+            Lists.newArrayList(
+                new TriggerEvaluator.Builder().commitCount(2).build(),
+                new TriggerEvaluator.Builder().commitCount(4).build()),
+            1L,
+            1L);
+    source
+        .dataStream()
+        .keyBy(unused -> true)
+        .transform(
+            DUMMY_TASK_NAME, TypeInformation.of(Trigger.class), 
triggerManagerOperatorFactory)
+        .forceNonParallel()
+        .sinkTo(sink);
+
+    JobClient jobClient = null;
+    try {
+      jobClient = env.executeAsync();
+
+      // This one doesn't trigger - tests NOTHING_TO_TRIGGER
+      source.sendRecord(TableChange.builder().commitCount(1).build());
+
+      Awaitility.await()
+          .until(
+              () -> {
+                Long notingCounter =
+                    MetricsReporterFactoryForTests.counter(
+                        ImmutableList.of(DUMMY_TASK_NAME, tableName, 
NOTHING_TO_TRIGGER));
+                return notingCounter != null && notingCounter.equals(1L);
+              });
+
+      // Trigger one of the tasks - tests TRIGGERED
+      source.sendRecord(TableChange.builder().commitCount(1).build());
+      // Wait until we receive the trigger
+      assertThat(sink.poll(Duration.ofSeconds(5))).isNotNull();
+      assertThat(
+              MetricsReporterFactoryForTests.counter(
+                  ImmutableList.of(DUMMY_TASK_NAME, tableName, TASKS[0], "0", 
TRIGGERED)))
+          .isEqualTo(1L);
+
+      // manual unlock
+      triggerManagerCoordinator.handleReleaseLock(new 
LockReleaseEvent(tableName, Long.MAX_VALUE));
+      // Trigger both of the tasks - tests TRIGGERED
+      source.sendRecord(TableChange.builder().commitCount(2).build());
+      // Wait until we receive the trigger
+      assertThat(sink.poll(Duration.ofSeconds(5))).isNotNull();
+
+      // manual unlock
+      triggerManagerCoordinator.handleReleaseLock(new 
LockReleaseEvent(tableName, Long.MAX_VALUE));
+      assertThat(sink.poll(Duration.ofSeconds(5))).isNotNull();
+
+      assertThat(
+              MetricsReporterFactoryForTests.counter(
+                  ImmutableList.of(DUMMY_TASK_NAME, tableName, TASKS[0], "0", 
TRIGGERED)))
+          .isEqualTo(2L);
+      assertThat(
+              MetricsReporterFactoryForTests.counter(
+                  ImmutableList.of(DUMMY_TASK_NAME, tableName, TASKS[1], "1", 
TRIGGERED)))
+          .isEqualTo(1L);
+
+      // Final check all the counters
+      MetricsReporterFactoryForTests.assertCounters(
+          new ImmutableMap.Builder<List<String>, Long>()
+              .put(ImmutableList.of(DUMMY_TASK_NAME, tableName, 
RATE_LIMITER_TRIGGERED), -1L)
+              .put(ImmutableList.of(DUMMY_TASK_NAME, tableName, 
CONCURRENT_RUN_THROTTLED), -1L)
+              .put(ImmutableList.of(DUMMY_TASK_NAME, tableName, TASKS[0], "0", 
TRIGGERED), 2L)
+              .put(ImmutableList.of(DUMMY_TASK_NAME, tableName, TASKS[1], "1", 
TRIGGERED), 1L)
+              .put(ImmutableList.of(DUMMY_TASK_NAME, tableName, 
NOTHING_TO_TRIGGER), 1L)
+              .build());
+    } finally {
+      closeJobClient(jobClient);
+    }
+  }
+
+  @Test
+  void testRateLimiterMetrics() throws Exception {
+    StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+    ManualSource<TableChange> source =
+        new ManualSource<>(env, TypeInformation.of(TableChange.class));
+    CollectingSink<Trigger> sink = new CollectingSink<>();
+
+    // High delay, so only triggered once
+    TriggerManagerOperatorFactory manager = manager(1_000_000L, 1L);
+
+    source
+        .dataStream()
+        .keyBy(unused -> true)
+        .transform(DUMMY_TASK_NAME, TypeInformation.of(Trigger.class), manager)
+        .forceNonParallel()
+        .sinkTo(sink);
+
+    JobClient jobClient = null;
+    try {
+      jobClient = env.executeAsync();
+
+      // Start the first trigger
+      source.sendRecord(TableChange.builder().commitCount(2).build());
+      assertThat(sink.poll(Duration.ofSeconds(5))).isNotNull();
+
+      // Remove the lock to allow the next trigger
+      triggerManagerCoordinator.handleReleaseLock(lockReleaseEvent);
+
+      // The second trigger will be blocked
+      source.sendRecord(TableChange.builder().commitCount(2).build());
+      Awaitility.await()
+          .until(
+              () ->
+                  MetricsReporterFactoryForTests.counter(
+                          ImmutableList.of(DUMMY_TASK_NAME, tableName, 
RATE_LIMITER_TRIGGERED))
+                      .equals(1L));
+
+      // Final check all the counters
+      assertCounters(1L, 0L);
+    } finally {
+      closeJobClient(jobClient);
+    }
+  }
+
+  @Test
+  void testConcurrentRunMetrics() throws Exception {
+    StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+    ManualSource<TableChange> source =
+        new ManualSource<>(env, TypeInformation.of(TableChange.class));
+    CollectingSink<Trigger> sink = new CollectingSink<>();
+
+    // High delay, so only triggered once
+    TriggerManagerOperatorFactory manager = manager(1L, 1_000_000L);
+
+    source
+        .dataStream()
+        .keyBy(unused -> true)
+        .transform(DUMMY_TASK_NAME, TypeInformation.of(Trigger.class), manager)
+        .forceNonParallel()
+        .sinkTo(sink);
+
+    JobClient jobClient = null;
+    try {
+      jobClient = env.executeAsync();
+
+      // Start the first trigger - notice that we do not remove the lock after 
the trigger
+      source.sendRecord(TableChange.builder().commitCount(2).build());
+      assertThat(sink.poll(Duration.ofSeconds(5))).isNotNull();
+
+      // The second trigger will be blocked by the lock
+      source.sendRecord(TableChange.builder().commitCount(2).build());
+      Awaitility.await()
+          .until(
+              () ->
+                  MetricsReporterFactoryForTests.counter(
+                          ImmutableList.of(DUMMY_TASK_NAME, tableName, 
CONCURRENT_RUN_THROTTLED))
+                      .equals(1L));
+
+      // Final check all the counters
+      assertCounters(0L, 1L);
+    } finally {
+      closeJobClient(jobClient);
+    }
+  }
+
+  private void assertCounters(long rateLimiterTrigger, long 
concurrentRunTrigger) {
+    MetricsReporterFactoryForTests.assertCounters(
+        new ImmutableMap.Builder<List<String>, Long>()
+            .put(
+                ImmutableList.of(DUMMY_TASK_NAME, tableName, 
RATE_LIMITER_TRIGGERED),
+                rateLimiterTrigger)
+            .put(
+                ImmutableList.of(DUMMY_TASK_NAME, tableName, 
CONCURRENT_RUN_THROTTLED),
+                concurrentRunTrigger)
+            .put(ImmutableList.of(DUMMY_TASK_NAME, tableName, TASKS[0], "0", 
TRIGGERED), 1L)
+            .put(ImmutableList.of(DUMMY_TASK_NAME, tableName, 
NOTHING_TO_TRIGGER), 0L)
+            .build());
+  }
+
+  private void addEventAndCheckResult(
+      TriggerManagerOperator operator,
+      OneInputStreamOperatorTestHarness<TableChange, Trigger> testHarness,
+      TableChange event,
+      int expectedSize)
+      throws Exception {
+    addEventAndCheckResult(operator, testHarness, event, expectedSize, true);
+  }
+
+  private void addEventAndCheckResult(
+      TriggerManagerOperator operator,
+      OneInputStreamOperatorTestHarness<TableChange, Trigger> testHarness,
+      TableChange event,
+      int expectedSize,
+      boolean removeLock)
+      throws Exception {
+    ++processingTime;
+    testHarness.setProcessingTime(processingTime);
+    testHarness.processElement(event, processingTime);
+    assertThat(testHarness.extractOutputValues()).hasSize(expectedSize);
+    if (removeLock && operator.lockTime() != null) {
+      // Remove the lock to allow the next trigger
+      operator.handleLockRelease(new LockReleaseEvent(tableName, 
processingTime));
+    }
+  }
+
+  private TriggerManagerOperatorFactory manager(long minFireDelayMs, long 
lockCheckDelayMs) {
+    return new TriggerManagerOperatorFactory(
+        tableName,
+        Lists.newArrayList(TASKS[0]),
+        Lists.newArrayList(new 
TriggerEvaluator.Builder().commitCount(2).build()),
+        minFireDelayMs,
+        lockCheckDelayMs);
+  }
+
+  private static void assertTriggers(List<Trigger> expected, List<Trigger> 
actual) {
+    assertThat(actual).hasSize(expected.size());
+    for (int i = 0; i < expected.size(); ++i) {
+      Trigger expectedTrigger = expected.get(i);
+      Trigger actualTrigger = actual.get(i);
+      
assertThat(actualTrigger.timestamp()).isEqualTo(expectedTrigger.timestamp());
+      assertThat(actualTrigger.taskId()).isEqualTo(expectedTrigger.taskId());
+      
assertThat(actualTrigger.isRecovery()).isEqualTo(expectedTrigger.isRecovery());
+    }
+  }
+
+  private static TriggerManagerCoordinator createCoordinator() {
+    return new TriggerManagerCoordinator(
+        OPERATOR_NAME, new MockOperatorCoordinatorContext(TEST_OPERATOR_ID, 
1));
+  }
+
+  private TriggerManagerOperator createOperator(
+      TriggerEvaluator evaluator, OperatorEventGateway mockGateway) {
+    return createOperator(tableName, evaluator, mockGateway, 1, 1);
+  }
+
+  private TriggerManagerOperator createOperator(
+      String lockId,
+      TriggerEvaluator evaluator,
+      OperatorEventGateway mockGateway,
+      long minFireDelayMs,
+      long lockCheckDelayMs) {
+    return new TriggerManagerOperator(
+        null,
+        mockGateway,
+        Lists.newArrayList(TASKS[0]),
+        Lists.newArrayList(evaluator),
+        minFireDelayMs,
+        lockCheckDelayMs,
+        lockId);
+  }
+
+  private OneInputStreamOperatorTestHarness<TableChange, Trigger> 
createHarness(
+      TriggerManagerOperator triggerManagerOperator) throws Exception {
+    OneInputStreamOperatorTestHarness<TableChange, Trigger> harness =
+        new OneInputStreamOperatorTestHarness<>(triggerManagerOperator);
+    harness.open();
+    return harness;
+  }
+}

Reply via email to