This is an automated email from the ASF dual-hosted git repository.
pvary pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new 9534c9b3ad Flink: TableMaintenance Support Coordinator Lock (#15151)
9534c9b3ad is described below
commit 9534c9b3adc29d127ecc541ce131f49fd72f1980
Author: GuoYu <[email protected]>
AuthorDate: Tue Feb 24 23:03:42 2026 +0800
Flink: TableMaintenance Support Coordinator Lock (#15151)
---
.../flink/maintenance/api/TableMaintenance.java | 132 +++-
.../maintenance/operator/BaseCoordinator.java | 306 ++++++++++
.../maintenance/operator/LockRegisterEvent.java | 47 ++
.../maintenance/operator/LockReleaseEvent.java | 56 ++
.../operator/LockRemoverCoordinator.java | 60 ++
.../maintenance/operator/LockRemoverOperator.java | 112 ++++
.../operator/LockRemoverOperatorFactory.java | 86 +++
.../flink/maintenance/operator/TriggerManager.java | 23 +-
.../operator/TriggerManagerCoordinator.java | 59 ++
...gerManager.java => TriggerManagerOperator.java} | 301 +++++-----
.../operator/TriggerManagerOperatorFactory.java | 110 ++++
.../flink/maintenance/operator/TriggerUtil.java | 46 ++
.../flink/maintenance/api/TestMaintenanceE2E.java | 39 ++
.../api/TestTableMaintenanceCoordinationLock.java | 344 +++++++++++
.../maintenance/operator/CoordinatorTestBase.java | 43 ++
.../operator/TestLockRemoveCoordinator.java | 69 +++
.../operator/TestLockRemoverOperation.java | 207 +++++++
.../operator/TestTriggerManagerCoordinator.java | 102 ++++
.../operator/TestTriggerManagerOperator.java | 668 +++++++++++++++++++++
19 files changed, 2607 insertions(+), 203 deletions(-)
diff --git
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/TableMaintenance.java
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/TableMaintenance.java
index 1a2b0607dd..ab5159be12 100644
---
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/TableMaintenance.java
+++
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/TableMaintenance.java
@@ -23,6 +23,7 @@ import java.time.Duration;
import java.util.List;
import java.util.Locale;
import java.util.UUID;
+import javax.annotation.Nullable;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.eventtime.TimestampAssigner;
@@ -43,10 +44,12 @@ import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.graph.StreamGraphGenerator;
import org.apache.iceberg.flink.TableLoader;
import org.apache.iceberg.flink.maintenance.operator.LockRemover;
+import
org.apache.iceberg.flink.maintenance.operator.LockRemoverOperatorFactory;
import org.apache.iceberg.flink.maintenance.operator.MonitorSource;
import org.apache.iceberg.flink.maintenance.operator.TableChange;
import org.apache.iceberg.flink.maintenance.operator.TriggerEvaluator;
import org.apache.iceberg.flink.maintenance.operator.TriggerManager;
+import
org.apache.iceberg.flink.maintenance.operator.TriggerManagerOperatorFactory;
import org.apache.iceberg.flink.sink.IcebergSink;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
@@ -71,39 +74,77 @@ public class TableMaintenance {
*
* @param changeStream the table changes
* @param tableLoader used for accessing the table
- * @param lockFactory used for preventing concurrent task runs
+ * @param lockFactory used for preventing concurrent task runs, if null, use
coordination lock.
* @return builder for the maintenance stream
+ * @deprecated since 1.12.0, will be removed in 2.0.0. Use {@link
#forChangeStream(DataStream,
+ * TableLoader)} instead.
*/
+ @Deprecated
@Internal
public static Builder forChangeStream(
DataStream<TableChange> changeStream,
TableLoader tableLoader,
- TriggerLockFactory lockFactory) {
+ @Nullable TriggerLockFactory lockFactory) {
Preconditions.checkNotNull(changeStream, "The change stream should not be
null");
Preconditions.checkNotNull(tableLoader, "TableLoader should not be null");
- Preconditions.checkNotNull(lockFactory, "LockFactory should not be null");
return new Builder(null, changeStream, tableLoader, lockFactory);
}
+ /**
+ * Use when the change stream is already provided, like in the {@link
+ * IcebergSink#addPostCommitTopology(DataStream)}.
+ *
+ * @param changeStream the table changes
+ * @param tableLoader used for accessing the table
+ * @return builder for the maintenance stream
+ */
+ @Internal
+ public static Builder forChangeStream(
+ DataStream<TableChange> changeStream, TableLoader tableLoader) {
+ Preconditions.checkNotNull(changeStream, "The change stream should not be
null");
+ Preconditions.checkNotNull(tableLoader, "TableLoader should not be null");
+
+ return new Builder(null, changeStream, tableLoader, null);
+ }
+
/**
* Use this for standalone maintenance job. It creates a monitor source that
detect table changes
* and build the maintenance pipelines afterwards.
*
* @param env used to register the monitor source
* @param tableLoader used for accessing the table
- * @param lockFactory used for preventing concurrent task runs
+ * @param lockFactory used for preventing concurrent task runs. If null, use
coordination lock.
* @return builder for the maintenance stream
+ * @deprecated since 1.12.0, will be removed in 2.0.0. Use {@link
+ * #forTable(StreamExecutionEnvironment, TableLoader)} instead.
*/
+ @Deprecated
public static Builder forTable(
- StreamExecutionEnvironment env, TableLoader tableLoader,
TriggerLockFactory lockFactory) {
+ StreamExecutionEnvironment env,
+ TableLoader tableLoader,
+ @Nullable TriggerLockFactory lockFactory) {
Preconditions.checkNotNull(env, "StreamExecutionEnvironment should not be
null");
Preconditions.checkNotNull(tableLoader, "TableLoader should not be null");
- Preconditions.checkNotNull(lockFactory, "LockFactory should not be null");
return new Builder(env, null, tableLoader, lockFactory);
}
+ /**
+ * Use this for standalone maintenance job. It creates a monitor source that
detect table changes
+ * and build the maintenance pipelines afterwards. But use coordination lock
default.
+ *
+ * @param env used to register the monitor source
+ * @param tableLoader used for accessing the table
+ * @return builder for the maintenance stream
+ */
+ public static Builder forTable(StreamExecutionEnvironment env, TableLoader
tableLoader) {
+ Preconditions.checkNotNull(env, "StreamExecutionEnvironment should not be
null");
+ Preconditions.checkNotNull(tableLoader, "TableLoader should not be null");
+
+ return new Builder(env, null, tableLoader, null);
+ }
+
public static class Builder {
private final StreamExecutionEnvironment env;
private final DataStream<TableChange> inputStream;
@@ -226,21 +267,43 @@ public class TableMaintenance {
try (TableLoader loader = tableLoader.clone()) {
loader.open();
String tableName = loader.loadTable().name();
- DataStream<Trigger> triggers =
- DataStreamUtils.reinterpretAsKeyedStream(
- changeStream(tableName, loader), unused -> true)
- .process(
- new TriggerManager(
- loader,
- lockFactory,
- taskNames,
- evaluators,
- rateLimit.toMillis(),
- lockCheckDelay.toMillis()))
- .name(TRIGGER_MANAGER_OPERATOR_NAME)
- .uid(TRIGGER_MANAGER_OPERATOR_NAME + uidSuffix)
- .slotSharingGroup(slotSharingGroup)
- .forceNonParallel()
+ DataStream<Trigger> triggers;
+ if (lockFactory == null) {
+ triggers =
+ DataStreamUtils.reinterpretAsKeyedStream(
+ changeStream(tableName, loader), unused -> true)
+ .transform(
+ TRIGGER_MANAGER_OPERATOR_NAME,
+ TypeInformation.of(Trigger.class),
+ new TriggerManagerOperatorFactory(
+ tableName,
+ taskNames,
+ evaluators,
+ rateLimit.toMillis(),
+ lockCheckDelay.toMillis()))
+ .uid(TRIGGER_MANAGER_OPERATOR_NAME + uidSuffix)
+ .slotSharingGroup(slotSharingGroup)
+ .forceNonParallel();
+ } else {
+ triggers =
+ DataStreamUtils.reinterpretAsKeyedStream(
+ changeStream(tableName, loader), unused -> true)
+ .process(
+ new TriggerManager(
+ loader,
+ lockFactory,
+ taskNames,
+ evaluators,
+ rateLimit.toMillis(),
+ lockCheckDelay.toMillis()))
+ .name(TRIGGER_MANAGER_OPERATOR_NAME)
+ .uid(TRIGGER_MANAGER_OPERATOR_NAME + uidSuffix)
+ .slotSharingGroup(slotSharingGroup)
+ .forceNonParallel();
+ }
+
+ triggers =
+ triggers
.assignTimestampsAndWatermarks(new
PunctuatedWatermarkStrategy())
.name(WATERMARK_ASSIGNER_OPERATOR_NAME)
.uid(WATERMARK_ASSIGNER_OPERATOR_NAME + uidSuffix)
@@ -277,14 +340,25 @@ public class TableMaintenance {
}
// Add the LockRemover to the end
- unioned
- .transform(
- LOCK_REMOVER_OPERATOR_NAME,
- TypeInformation.of(Void.class),
- new LockRemover(tableName, lockFactory, taskNames))
- .forceNonParallel()
- .uid("lock-remover-" + uidSuffix)
- .slotSharingGroup(slotSharingGroup);
+ if (lockFactory == null) {
+ unioned
+ .transform(
+ LOCK_REMOVER_OPERATOR_NAME,
+ TypeInformation.of(Void.class),
+ new LockRemoverOperatorFactory(tableName, taskNames))
+ .uid("lock-remover-" + uidSuffix)
+ .forceNonParallel()
+ .slotSharingGroup(slotSharingGroup);
+ } else {
+ unioned
+ .transform(
+ LOCK_REMOVER_OPERATOR_NAME,
+ TypeInformation.of(Void.class),
+ new LockRemover(tableName, lockFactory, taskNames))
+ .forceNonParallel()
+ .uid("lock-remover-" + uidSuffix)
+ .slotSharingGroup(slotSharingGroup);
+ }
}
}
diff --git
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/BaseCoordinator.java
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/BaseCoordinator.java
new file mode 100644
index 0000000000..21e9ca43f3
--- /dev/null
+++
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/BaseCoordinator.java
@@ -0,0 +1,306 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.maintenance.operator;
+
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import java.util.function.Consumer;
+import javax.annotation.Nonnull;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.util.FatalExitExceptionHandler;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Base coordinator for table maintenance operators. Provides common
functionality for thread
+ * management, subtask gateway management, and checkpoint handling.
+ */
+abstract class BaseCoordinator implements OperatorCoordinator {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(BaseCoordinator.class);
+ private static final Map<String, Consumer<LockReleaseEvent>>
LOCK_RELEASE_CONSUMERS =
+ Maps.newConcurrentMap();
+ private static final List<LockReleaseEvent> PENDING_RELEASE_EVENTS =
Lists.newArrayList();
+
+ private final String operatorName;
+ private final Context context;
+ private final ExecutorService coordinatorExecutor;
+ private final CoordinatorExecutorThreadFactory coordinatorThreadFactory;
+ private final SubtaskGateways subtaskGateways;
+
+ private boolean started;
+
+ BaseCoordinator(String operatorName, Context context) {
+ this.operatorName = operatorName;
+ this.context = context;
+
+ this.coordinatorThreadFactory =
+ new CoordinatorExecutorThreadFactory(
+ "Coordinator-" + operatorName, context.getUserCodeClassloader());
+ this.coordinatorExecutor =
Executors.newSingleThreadExecutor(coordinatorThreadFactory);
+ Preconditions.checkState(
+ context.currentParallelism() == 1, "Coordinator must run with
parallelism 1");
+ this.subtaskGateways = SubtaskGateways.create(operatorName);
+ LOG.info("Created coordinator: {}", operatorName);
+ }
+
+ @SuppressWarnings("FutureReturnValueIgnored")
+ void registerLock(LockRegisterEvent lockRegisterEvent) {
+ LOCK_RELEASE_CONSUMERS.put(
+ lockRegisterEvent.lockId(),
+ lock -> {
+ LOG.info(
+ "Send release event for lock id {}, timestamp: {} to Operator
{}",
+ lock.lockId(),
+ lock.timestamp(),
+ operatorName());
+ subtaskGateways.subtaskGateway().sendEvent(lock);
+ });
+
+ synchronized (PENDING_RELEASE_EVENTS) {
+ if (!PENDING_RELEASE_EVENTS.isEmpty()) {
+ PENDING_RELEASE_EVENTS.forEach(this::handleReleaseLock);
+ PENDING_RELEASE_EVENTS.clear();
+ }
+ }
+ }
+
+ void handleReleaseLock(LockReleaseEvent lockReleaseEvent) {
+ synchronized (PENDING_RELEASE_EVENTS) {
+ if (LOCK_RELEASE_CONSUMERS.containsKey(lockReleaseEvent.lockId())) {
+
LOCK_RELEASE_CONSUMERS.get(lockReleaseEvent.lockId()).accept(lockReleaseEvent);
+ LOG.info(
+ "Send release event for lock id {}, timestamp: {}",
+ lockReleaseEvent.lockId(),
+ lockReleaseEvent.timestamp());
+ } else {
+ PENDING_RELEASE_EVENTS.add(lockReleaseEvent);
+ LOG.info(
+ "No consumer for lock id {}, timestamp: {}",
+ lockReleaseEvent.lockId(),
+ lockReleaseEvent.timestamp());
+ }
+ }
+ }
+
+ @Override
+ public void start() throws Exception {
+ LOG.info("Starting coordinator: {}", operatorName);
+ this.started = true;
+ }
+
+ @Override
+ public void close() throws Exception {
+ coordinatorExecutor.shutdown();
+ this.started = false;
+ synchronized (PENDING_RELEASE_EVENTS) {
+ LOCK_RELEASE_CONSUMERS.clear();
+ PENDING_RELEASE_EVENTS.clear();
+ }
+
+ LOG.info("Closed coordinator: {}", operatorName);
+ }
+
+ @Override
+ public void checkpointCoordinator(long checkpointId,
CompletableFuture<byte[]> resultFuture) {
+ runInCoordinatorThread(
+ () -> resultFuture.complete(new byte[0]),
+ String.format(Locale.ROOT, "taking checkpoint %d", checkpointId));
+ }
+
+ @Override
+ public void resetToCheckpoint(long checkpointId, byte[] checkpointData) {
+ Preconditions.checkState(
+ !started, "The coordinator %s can only be reset if it was not yet
started", operatorName);
+ LOG.info("Reset to checkpoint {}", checkpointId);
+ synchronized (PENDING_RELEASE_EVENTS) {
+ LOCK_RELEASE_CONSUMERS.clear();
+ PENDING_RELEASE_EVENTS.clear();
+ }
+ }
+
+ @Override
+ public void notifyCheckpointComplete(long checkpointId) {}
+
+ @Override
+ public void subtaskReset(int subtask, long checkpointId) {
+ runInCoordinatorThread(
+ () -> {
+ LOG.info("Subtask {} is reset to checkpoint {}", subtask,
checkpointId);
+
Preconditions.checkState(coordinatorThreadFactory.isCurrentThreadCoordinatorThread());
+ subtaskGateways.reset();
+ },
+ String.format(
+ Locale.ROOT, "handling subtask %d recovery to checkpoint %d",
subtask, checkpointId));
+ }
+
+ @Override
+ public void executionAttemptFailed(int subtask, int attemptNumber, Throwable
reason) {
+ runInCoordinatorThread(
+ () -> {
+ LOG.info(
+ "Unregistering gateway after failure for subtask {} (#{}) of
data statistics {}",
+ subtask,
+ attemptNumber,
+ operatorName);
+ Preconditions.checkState(
+
this.coordinatorThreadFactory.isCurrentThreadCoordinatorThread());
+ subtaskGateways.unregisterSubtaskGateway(subtask, attemptNumber);
+ },
+ String.format(Locale.ROOT, "handling subtask %d (#%d) failure",
subtask, attemptNumber));
+ }
+
+ @Override
+ public void executionAttemptReady(int subtask, int attemptNumber,
SubtaskGateway gateway) {
+ Preconditions.checkArgument(subtask == gateway.getSubtask());
+ Preconditions.checkArgument(attemptNumber ==
gateway.getExecution().getAttemptNumber());
+ runInCoordinatorThread(
+ () -> {
+ Preconditions.checkState(
+
this.coordinatorThreadFactory.isCurrentThreadCoordinatorThread());
+ subtaskGateways.registerSubtaskGateway(gateway);
+ },
+ String.format(
+ Locale.ROOT,
+ "making event gateway to subtask %d (#%d) available",
+ subtask,
+ attemptNumber));
+ }
+
+ String operatorName() {
+ return operatorName;
+ }
+
+ void runInCoordinatorThread(Runnable runnable, String actionString) {
+ ensureStarted();
+ coordinatorExecutor.execute(
+ () -> {
+ try {
+ runnable.run();
+ } catch (Throwable t) {
+ LOG.error(
+ "Uncaught exception in coordinator while {}: {}",
actionString, t.getMessage(), t);
+ context.failJob(t);
+ }
+ });
+ }
+
+ private void ensureStarted() {
+ Preconditions.checkState(started, "The coordinator has not started yet.");
+ }
+
+ /** Inner class to manage subtask gateways. */
+ private record SubtaskGateways(String operatorName, Map<Integer,
SubtaskGateway> gateways) {
+
+ private static SubtaskGateways create(String operatorName) {
+ return new SubtaskGateways(operatorName, Maps.newHashMap());
+ }
+
+ private void registerSubtaskGateway(SubtaskGateway gateway) {
+ int attemptNumber = gateway.getExecution().getAttemptNumber();
+ Preconditions.checkState(
+ !gateways.containsKey(attemptNumber),
+ "Coordinator of %s already has a subtask gateway for (#%d)",
+ operatorName,
+ attemptNumber);
+ LOG.debug("Coordinator of {} registers gateway for attempt {}",
operatorName, attemptNumber);
+ gateways.put(attemptNumber, gateway);
+ LOG.debug("Registered gateway for attempt {}", attemptNumber);
+ }
+
+ private void unregisterSubtaskGateway(int subtaskIndex, int attemptNumber)
{
+ gateways.remove(attemptNumber);
+ LOG.debug("Unregistered gateway for subtask {} attempt {}",
subtaskIndex, attemptNumber);
+ }
+
+ private SubtaskGateway subtaskGateway() {
+ Preconditions.checkState(
+ !gateways.isEmpty(),
+ "Coordinator of %s is not ready yet to receive events",
+ operatorName);
+ return Iterables.getOnlyElement(gateways.values());
+ }
+
+ private void reset() {
+ gateways.clear();
+ }
+ }
+
+ /** Custom thread factory for the coordinator executor. */
+ private static class CoordinatorExecutorThreadFactory
+ implements ThreadFactory, Thread.UncaughtExceptionHandler {
+
+ private final String coordinatorThreadName;
+ private final ClassLoader classLoader;
+ private final Thread.UncaughtExceptionHandler errorHandler;
+
+ private Thread thread;
+
+ private CoordinatorExecutorThreadFactory(
+ String coordinatorThreadName, ClassLoader contextClassLoader) {
+ this(coordinatorThreadName, contextClassLoader,
FatalExitExceptionHandler.INSTANCE);
+ }
+
+ private CoordinatorExecutorThreadFactory(
+ String coordinatorThreadName,
+ ClassLoader contextClassLoader,
+ Thread.UncaughtExceptionHandler errorHandler) {
+ this.coordinatorThreadName = coordinatorThreadName;
+ this.classLoader = contextClassLoader;
+ this.errorHandler = errorHandler;
+ }
+
+ @Override
+ public synchronized Thread newThread(@Nonnull Runnable runnable) {
+ thread = new Thread(runnable, coordinatorThreadName);
+ thread.setContextClassLoader(classLoader);
+ thread.setUncaughtExceptionHandler(this);
+ return thread;
+ }
+
+ @Override
+ public synchronized void uncaughtException(Thread t, Throwable e) {
+ errorHandler.uncaughtException(t, e);
+ }
+
+ private boolean isCurrentThreadCoordinatorThread() {
+ return Thread.currentThread() == thread;
+ }
+ }
+
+ @VisibleForTesting
+ List<LockReleaseEvent> pendingReleaseEvents() {
+ return PENDING_RELEASE_EVENTS;
+ }
+
+ @VisibleForTesting
+ ExecutorService coordinatorExecutor() {
+ return coordinatorExecutor;
+ }
+}
diff --git
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/LockRegisterEvent.java
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/LockRegisterEvent.java
new file mode 100644
index 0000000000..0dcd15a665
--- /dev/null
+++
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/LockRegisterEvent.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.maintenance.operator;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
+
+/**
+ * Event sent from TriggerManagerOperator to TriggerManagerCoordinator to
register a lock release
+ * handler. This handler will be used to forward lock release events back to
the operator when
+ * triggered by downstream operators.
+ */
+@Internal
+public class LockRegisterEvent implements OperatorEvent {
+
+ private final String lockId;
+
+ public LockRegisterEvent(String lockId) {
+ this.lockId = lockId;
+ }
+
+ public String lockId() {
+ return lockId;
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(this).add("lockId", lockId).toString();
+ }
+}
diff --git
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/LockReleaseEvent.java
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/LockReleaseEvent.java
new file mode 100644
index 0000000000..8c6e71ca97
--- /dev/null
+++
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/LockReleaseEvent.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.maintenance.operator;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
+
+/**
+ * Event sent from LockRemoverOperator to LockRemoverCoordinator to notify
that a lock has been
+ * released. The LockRemoverCoordinator then forwards this event to the
TriggerManagerOperator via
+ * the registered lock release handler.
+ */
+@Internal
+public class LockReleaseEvent implements OperatorEvent {
+
+ private final String lockId;
+ private final long timestamp;
+
+ public LockReleaseEvent(String lockId, long timestamp) {
+ this.lockId = lockId;
+ this.timestamp = timestamp;
+ }
+
+ public long timestamp() {
+ return timestamp;
+ }
+
+ public String lockId() {
+ return lockId;
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(this)
+ .add("lockId", lockId)
+ .add("timestamp", timestamp)
+ .toString();
+ }
+}
diff --git
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/LockRemoverCoordinator.java
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/LockRemoverCoordinator.java
new file mode 100644
index 0000000000..39020cd5d3
--- /dev/null
+++
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/LockRemoverCoordinator.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.maintenance.operator;
+
+import java.util.Locale;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Coordinator for LockRemoverOperator. Handles lock release events from
downstream operators. */
+class LockRemoverCoordinator extends BaseCoordinator {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(LockRemoverCoordinator.class);
+
+ LockRemoverCoordinator(String operatorName, Context context) {
+ super(operatorName, context);
+ LOG.info("Created LockRemoverCoordinator: {}", operatorName);
+ }
+
+ @Override
+ public void handleEventFromOperator(int subtask, int attemptNumber,
OperatorEvent event) {
+ runInCoordinatorThread(
+ () -> {
+ LOG.debug(
+ "Handling event from subtask {} (#{}) of {}: {}",
+ subtask,
+ attemptNumber,
+ operatorName(),
+ event);
+ if (event instanceof LockReleaseEvent) {
+ handleReleaseLock((LockReleaseEvent) event);
+ } else {
+ throw new IllegalArgumentException(
+ "Invalid operator event type: " +
event.getClass().getCanonicalName());
+ }
+ },
+ String.format(
+ Locale.ROOT,
+ "handling operator event %s from subtask %d (#%d)",
+ event.getClass(),
+ subtask,
+ attemptNumber));
+ }
+}
diff --git
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/LockRemoverOperator.java
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/LockRemoverOperator.java
new file mode 100644
index 0000000000..51aae657e6
--- /dev/null
+++
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/LockRemoverOperator.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.maintenance.operator;
+
+import java.io.Serial;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
+import org.apache.flink.runtime.operators.coordination.OperatorEventHandler;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.iceberg.flink.maintenance.api.TaskResult;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class LockRemoverOperator extends AbstractStreamOperator<Void>
+ implements OneInputStreamOperator<TaskResult, Void>, OperatorEventHandler {
+
+ @Serial private static final long serialVersionUID = 1L;
+ private static final Logger LOG =
LoggerFactory.getLogger(LockRemoverOperator.class);
+
+ private final String tableName;
+ private final OperatorEventGateway operatorEventGateway;
+ private final List<String> maintenanceTaskNames;
+ private transient List<Counter> succeededTaskResultCounters;
+ private transient List<Counter> failedTaskResultCounters;
+ private transient List<AtomicLong> taskLastRunDurationMs;
+
+ LockRemoverOperator(
+ StreamOperatorParameters<Void> parameters,
+ OperatorEventGateway operatorEventGateway,
+ String tableName,
+ List<String> maintenanceTaskNames) {
+ super(parameters);
+ this.tableName = tableName;
+ this.operatorEventGateway = operatorEventGateway;
+ this.maintenanceTaskNames = maintenanceTaskNames;
+ }
+
+ @Override
+ public void open() throws Exception {
+ this.succeededTaskResultCounters =
+ Lists.newArrayListWithExpectedSize(maintenanceTaskNames.size());
+ this.failedTaskResultCounters =
Lists.newArrayListWithExpectedSize(maintenanceTaskNames.size());
+ this.taskLastRunDurationMs =
Lists.newArrayListWithExpectedSize(maintenanceTaskNames.size());
+ for (int taskIndex = 0; taskIndex < maintenanceTaskNames.size();
++taskIndex) {
+ MetricGroup taskMetricGroup =
+ TableMaintenanceMetrics.groupFor(
+ getRuntimeContext(), tableName,
maintenanceTaskNames.get(taskIndex), taskIndex);
+ succeededTaskResultCounters.add(
+
taskMetricGroup.counter(TableMaintenanceMetrics.SUCCEEDED_TASK_COUNTER));
+ failedTaskResultCounters.add(
+
taskMetricGroup.counter(TableMaintenanceMetrics.FAILED_TASK_COUNTER));
+ AtomicLong duration = new AtomicLong(0);
+ taskLastRunDurationMs.add(duration);
+ taskMetricGroup.gauge(TableMaintenanceMetrics.LAST_RUN_DURATION_MS,
duration::get);
+ }
+ }
+
+ @Override
+ public void handleOperatorEvent(OperatorEvent event) {
+ // no incoming events
+ }
+
+ @SuppressWarnings("FutureReturnValueIgnored")
+ @Override
+ public void processElement(StreamRecord<TaskResult> streamRecord) {
+ TaskResult taskResult = streamRecord.getValue();
+ LOG.info(
+ "Processing result {} for task {}",
+ taskResult,
+ maintenanceTaskNames.get(taskResult.taskIndex()));
+ long duration = System.currentTimeMillis() - taskResult.startEpoch();
+ // Update the metrics
+ taskLastRunDurationMs.get(taskResult.taskIndex()).set(duration);
+ if (taskResult.success()) {
+ succeededTaskResultCounters.get(taskResult.taskIndex()).inc();
+ } else {
+ failedTaskResultCounters.get(taskResult.taskIndex()).inc();
+ }
+ }
+
+ @Override
+ public void processWatermark(Watermark mark) throws Exception {
+ operatorEventGateway.sendEventToCoordinator(
+ new LockReleaseEvent(tableName, mark.getTimestamp()));
+ super.processWatermark(mark);
+ }
+}
diff --git
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/LockRemoverOperatorFactory.java
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/LockRemoverOperatorFactory.java
new file mode 100644
index 0000000000..b43c3f15fc
--- /dev/null
+++
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/LockRemoverOperatorFactory.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.maintenance.operator;
+
+import java.util.List;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
+import
org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory;
+import org.apache.flink.streaming.api.operators.CoordinatedOperatorFactory;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
+import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
+import org.apache.iceberg.flink.maintenance.api.TaskResult;
+
+@Internal
+public class LockRemoverOperatorFactory extends
AbstractStreamOperatorFactory<Void>
+ implements CoordinatedOperatorFactory<Void>,
OneInputStreamOperatorFactory<TaskResult, Void> {
+ private final String tableName;
+ private final List<String> maintenanceTaskNames;
+
+ public LockRemoverOperatorFactory(String tableName, List<String>
maintenanceTaskNames) {
+ this.tableName = tableName;
+ this.maintenanceTaskNames = maintenanceTaskNames;
+ }
+
+ @Override
+ public OperatorCoordinator.Provider getCoordinatorProvider(
+ String operatorName, OperatorID operatorID) {
+ return new LockRemoverCoordinatorProvider(operatorName, operatorID);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public <T extends StreamOperator<Void>> T createStreamOperator(
+ StreamOperatorParameters<Void> parameters) {
+ OperatorID operatorId = parameters.getStreamConfig().getOperatorID();
+ OperatorEventGateway gateway =
+
parameters.getOperatorEventDispatcher().getOperatorEventGateway(operatorId);
+ LockRemoverOperator lockRemoverOperator =
+ new LockRemoverOperator(parameters, gateway, tableName,
maintenanceTaskNames);
+ parameters.getOperatorEventDispatcher().registerEventHandler(operatorId,
lockRemoverOperator);
+
+ return (T) lockRemoverOperator;
+ }
+
+ @SuppressWarnings("rawtypes")
+ @Override
+ public Class<? extends StreamOperator> getStreamOperatorClass(ClassLoader
classLoader) {
+ return LockRemoverOperator.class;
+ }
+
+ private static class LockRemoverCoordinatorProvider
+ extends RecreateOnResetOperatorCoordinator.Provider {
+
+ private final String operatorName;
+
+ private LockRemoverCoordinatorProvider(String operatorName, OperatorID
operatorID) {
+ super(operatorID);
+ this.operatorName = operatorName;
+ }
+
+ @Override
+ public OperatorCoordinator getCoordinator(OperatorCoordinator.Context
context) {
+ return new LockRemoverCoordinator(operatorName, context);
+ }
+ }
+}
diff --git
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TriggerManager.java
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TriggerManager.java
index f1f2b51c09..6d8326645d 100644
---
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TriggerManager.java
+++
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TriggerManager.java
@@ -226,7 +226,8 @@ public class TriggerManager extends
KeyedProcessFunction<Boolean, TableChange, T
}
Integer taskToStart =
- nextTrigger(evaluators, accumulatedChanges, lastTriggerTimes, current,
startsFrom);
+ TriggerUtil.nextTrigger(
+ evaluators, accumulatedChanges, lastTriggerTimes, current,
startsFrom);
if (taskToStart == null) {
// Nothing to execute
if (!triggered) {
@@ -269,26 +270,6 @@ public class TriggerManager extends
KeyedProcessFunction<Boolean, TableChange, T
timerService.registerProcessingTimeTimer(time);
}
- private static Integer nextTrigger(
- List<TriggerEvaluator> evaluators,
- List<TableChange> changes,
- List<Long> lastTriggerTimes,
- long currentTime,
- int startPos) {
- int current = startPos;
- do {
- if (evaluators
- .get(current)
- .check(changes.get(current), lastTriggerTimes.get(current),
currentTime)) {
- return current;
- }
-
- current = (current + 1) % evaluators.size();
- } while (current != startPos);
-
- return null;
- }
-
private void init(Collector<Trigger> out, TimerService timerService) throws
Exception {
if (!inited) {
long current = timerService.currentProcessingTime();
diff --git
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TriggerManagerCoordinator.java
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TriggerManagerCoordinator.java
new file mode 100644
index 0000000000..b5af41da27
--- /dev/null
+++
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TriggerManagerCoordinator.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.maintenance.operator;
+
+import java.util.Locale;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class TriggerManagerCoordinator extends BaseCoordinator {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(TriggerManagerCoordinator.class);
+
+ TriggerManagerCoordinator(String operatorName, Context context) {
+ super(operatorName, context);
+ LOG.info("Created TriggerManagerCoordinator: {}", operatorName);
+ }
+
+ @Override
+ public void handleEventFromOperator(int subtask, int attemptNumber,
OperatorEvent event) {
+ runInCoordinatorThread(
+ () -> {
+ LOG.debug(
+ "Handling event from subtask {} (#{}) of {}: {}",
+ subtask,
+ attemptNumber,
+ operatorName(),
+ event);
+ if (event instanceof LockRegisterEvent) {
+ registerLock((LockRegisterEvent) event);
+ } else {
+ throw new IllegalArgumentException(
+ "Invalid operator event type: " +
event.getClass().getCanonicalName());
+ }
+ },
+ String.format(
+ Locale.ROOT,
+ "handling operator event %s from subtask %d (#%d)",
+ event.getClass(),
+ subtask,
+ attemptNumber));
+ }
+}
diff --git
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TriggerManager.java
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TriggerManagerOperator.java
similarity index 51%
copy from
flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TriggerManager.java
copy to
flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TriggerManagerOperator.java
index f1f2b51c09..f29ac9670b 100644
---
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TriggerManager.java
+++
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TriggerManagerOperator.java
@@ -18,70 +18,63 @@
*/
package org.apache.iceberg.flink.maintenance.operator;
-import java.io.IOException;
+import java.io.Serial;
import java.util.List;
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.functions.OpenContext;
+import org.apache.flink.annotation.VisibleForTesting;
+import
org.apache.flink.api.common.operators.ProcessingTimeService.ProcessingTimeCallback;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
-import org.apache.flink.api.common.state.ValueState;
-import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.MetricGroup;
-import org.apache.flink.runtime.state.FunctionInitializationContext;
-import org.apache.flink.runtime.state.FunctionSnapshotContext;
-import org.apache.flink.streaming.api.TimerService;
-import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
-import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
-import org.apache.flink.util.Collector;
-import org.apache.iceberg.flink.TableLoader;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
+import org.apache.flink.runtime.operators.coordination.OperatorEventHandler;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.iceberg.flink.maintenance.api.Trigger;
-import org.apache.iceberg.flink.maintenance.api.TriggerLockFactory;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * TriggerManager starts the Maintenance Tasks by emitting {@link Trigger}
messages which are
- * calculated based on the incoming {@link TableChange} messages. The
TriggerManager keeps track of
- * the changes since the last run of the Maintenance Tasks and triggers a new
run based on the
- * result of the {@link TriggerEvaluator}.
- *
- * <p>The TriggerManager prevents overlapping Maintenance Task runs using
{@link
- * TriggerLockFactory.Lock}. The current implementation only handles conflicts
within a single job.
- * Users should avoid scheduling maintenance for the same table in different
Flink jobs.
- *
- * <p>The TriggerManager should run as a global operator. {@link
KeyedProcessFunction} is used, so
- * the timer functions are available, but the key is not used.
+ * The TriggerManagerOperator itself holds the lock and registers a callback
method with the
+ * coordinator. When a task finishes, it sends a signal from downstream to the
coordinator to
+ * trigger this callback, allowing the TriggerManagerOperator to release the
lock.
*/
-@Internal
-public class TriggerManager extends KeyedProcessFunction<Boolean, TableChange,
Trigger>
- implements CheckpointedFunction {
- private static final Logger LOG =
LoggerFactory.getLogger(TriggerManager.class);
+class TriggerManagerOperator extends AbstractStreamOperator<Trigger>
+ implements OneInputStreamOperator<TableChange, Trigger>,
+ OperatorEventHandler,
+ ProcessingTimeCallback {
- private final String tableName;
- private final TriggerLockFactory lockFactory;
+ @Serial private static final long serialVersionUID = 1L;
+ private static final Logger LOG =
LoggerFactory.getLogger(TriggerManagerOperator.class);
+
+ private final OperatorEventGateway operatorEventGateway;
private final List<String> maintenanceTaskNames;
private final List<TriggerEvaluator> evaluators;
private final long minFireDelayMs;
private final long lockCheckDelayMs;
+ private final String tableName;
+
private transient Counter rateLimiterTriggeredCounter;
private transient Counter concurrentRunThrottledCounter;
private transient Counter nothingToTriggerCounter;
private transient List<Counter> triggerCounters;
- private transient ValueState<Long> nextEvaluationTimeState;
+ private transient ListState<Long> nextEvaluationTimeState;
private transient ListState<TableChange> accumulatedChangesState;
private transient ListState<Long> lastTriggerTimesState;
private transient Long nextEvaluationTime;
private transient List<TableChange> accumulatedChanges;
private transient List<Long> lastTriggerTimes;
- private transient TriggerLockFactory.Lock lock;
- private transient TriggerLockFactory.Lock recoveryLock;
- private transient boolean shouldRestoreTasks = false;
- private transient boolean inited = false;
// To keep the task scheduling fair we keep the last triggered task position
in memory.
// If we find a task to trigger, then we run it, but after it is finished,
we start from the given
// position to prevent "starvation" of the tasks.
@@ -89,16 +82,18 @@ public class TriggerManager extends
KeyedProcessFunction<Boolean, TableChange, T
// be important (RewriteDataFiles first, and then RewriteManifestFiles later)
private transient int startsFrom = 0;
private transient boolean triggered = false;
+ private transient Long lockTime;
+ private transient boolean shouldRestoreTasks = false;
- public TriggerManager(
- TableLoader tableLoader,
- TriggerLockFactory lockFactory,
+ TriggerManagerOperator(
+ StreamOperatorParameters<Trigger> parameters,
+ OperatorEventGateway operatorEventGateway,
List<String> maintenanceTaskNames,
List<TriggerEvaluator> evaluators,
long minFireDelayMs,
- long lockCheckDelayMs) {
- Preconditions.checkNotNull(tableLoader, "Table loader should no be null");
- Preconditions.checkNotNull(lockFactory, "Lock factory should no be null");
+ long lockCheckDelayMs,
+ String tableName) {
+ super(parameters);
Preconditions.checkArgument(
maintenanceTaskNames != null && !maintenanceTaskNames.isEmpty(),
"Invalid maintenance task names: null or empty");
@@ -111,17 +106,17 @@ public class TriggerManager extends
KeyedProcessFunction<Boolean, TableChange, T
Preconditions.checkArgument(
lockCheckDelayMs > 0, "Minimum lock delay rate should be at least 1
ms.");
- tableLoader.open();
- this.tableName = tableLoader.loadTable().name();
- this.lockFactory = lockFactory;
this.maintenanceTaskNames = maintenanceTaskNames;
this.evaluators = evaluators;
this.minFireDelayMs = minFireDelayMs;
this.lockCheckDelayMs = lockCheckDelayMs;
+ this.tableName = tableName;
+ this.operatorEventGateway = operatorEventGateway;
}
@Override
- public void open(OpenContext parameters) throws Exception {
+ public void open() throws Exception {
+ super.open();
MetricGroup mainGroup =
TableMaintenanceMetrics.groupFor(getRuntimeContext(), tableName);
this.rateLimiterTriggeredCounter =
mainGroup.counter(TableMaintenanceMetrics.RATE_LIMITER_TRIGGERED);
@@ -135,98 +130,150 @@ public class TriggerManager extends
KeyedProcessFunction<Boolean, TableChange, T
mainGroup, maintenanceTaskNames.get(taskIndex), taskIndex)
.counter(TableMaintenanceMetrics.TRIGGERED));
}
+ }
+ @Override
+ public void initializeState(StateInitializationContext context) throws
Exception {
+ super.initializeState(context);
this.nextEvaluationTimeState =
- getRuntimeContext()
- .getState(new
ValueStateDescriptor<>("triggerManagerNextTriggerTime", Types.LONG));
+ context
+ .getOperatorStateStore()
+ .getListState(new
ListStateDescriptor<>("triggerManagerNextTriggerTime", Types.LONG));
+
this.accumulatedChangesState =
- getRuntimeContext()
+ context
+ .getOperatorStateStore()
.getListState(
new ListStateDescriptor<>(
"triggerManagerAccumulatedChange",
TypeInformation.of(TableChange.class)));
+
this.lastTriggerTimesState =
- getRuntimeContext()
+ context
+ .getOperatorStateStore()
.getListState(new
ListStateDescriptor<>("triggerManagerLastTriggerTime", Types.LONG));
+
+ long current = getProcessingTimeService().getCurrentProcessingTime();
+
+ // Initialize from state
+ if (!Iterables.isEmpty(nextEvaluationTimeState.get())) {
+ nextEvaluationTime =
Iterables.getOnlyElement(nextEvaluationTimeState.get());
+ }
+
+ this.accumulatedChanges =
Lists.newArrayList(accumulatedChangesState.get());
+ this.lastTriggerTimes = Lists.newArrayList(lastTriggerTimesState.get());
+
+ // Initialize if the state was empty
+ if (accumulatedChanges.isEmpty()) {
+ for (int i = 0; i < evaluators.size(); ++i) {
+ accumulatedChanges.add(TableChange.empty());
+ lastTriggerTimes.add(current);
+ }
+ }
+
+ // register the lock register event
+ operatorEventGateway.sendEventToCoordinator(new
LockRegisterEvent(tableName));
+
+ if (context.isRestored()) {
+ // When the job state is restored, there could be ongoing tasks.
+ // To prevent collision with the new triggers the following is done:
+ // - add a recovery lock
+ // This ensures that the tasks of the previous trigger are executed, and
the lock is removed
+ // in the end. The result of the 'tryLock' is ignored as an already
existing lock prevents
+ // collisions as well.
+ // register the recover lock
+ this.lockTime = current;
+ this.shouldRestoreTasks = true;
+ output.collect(new StreamRecord<>(Trigger.recovery(current), current));
+ if (nextEvaluationTime == null) {
+ schedule(getProcessingTimeService(), current + minFireDelayMs);
+ } else {
+ schedule(getProcessingTimeService(), nextEvaluationTime);
+ }
+ } else {
+ this.lockTime = null;
+ }
}
@Override
- public void snapshotState(FunctionSnapshotContext context) throws Exception {
- if (inited) {
- // Only store state if initialized
- nextEvaluationTimeState.update(nextEvaluationTime);
- accumulatedChangesState.update(accumulatedChanges);
- lastTriggerTimesState.update(lastTriggerTimes);
- LOG.info(
- "Storing state: nextEvaluationTime {}, accumulatedChanges {},
lastTriggerTimes {}",
- nextEvaluationTime,
- accumulatedChanges,
- lastTriggerTimes);
- } else {
- LOG.info("Not initialized, state is not stored");
+ public void snapshotState(StateSnapshotContext context) throws Exception {
+ nextEvaluationTimeState.clear();
+ if (nextEvaluationTime != null) {
+ nextEvaluationTimeState.add(nextEvaluationTime);
}
+
+ accumulatedChangesState.update(accumulatedChanges);
+ lastTriggerTimesState.update(lastTriggerTimes);
+ LOG.info(
+ "Storing state: nextEvaluationTime {}, accumulatedChanges {},
lastTriggerTimes {}",
+ nextEvaluationTime,
+ accumulatedChanges,
+ lastTriggerTimes);
}
@Override
- public void initializeState(FunctionInitializationContext context) throws
Exception {
- LOG.info("Initializing state restored: {}", context.isRestored());
- lockFactory.open();
- this.lock = lockFactory.createLock();
- this.recoveryLock = lockFactory.createRecoveryLock();
- if (context.isRestored()) {
- shouldRestoreTasks = true;
+ public void handleOperatorEvent(OperatorEvent event) {
+ if (event instanceof LockReleaseEvent) {
+ LOG.info("Received lock released event: {}", event);
+ handleLockRelease((LockReleaseEvent) event);
} else {
- lock.unlock();
- recoveryLock.unlock();
+ throw new IllegalArgumentException(
+ "Invalid operator event type: " +
event.getClass().getCanonicalName());
}
}
@Override
- public void processElement(TableChange change, Context ctx,
Collector<Trigger> out)
- throws Exception {
- init(out, ctx.timerService());
-
+ public void processElement(StreamRecord<TableChange> streamRecord) throws
Exception {
+ TableChange change = streamRecord.getValue();
accumulatedChanges.forEach(tableChange -> tableChange.merge(change));
-
- long current = ctx.timerService().currentProcessingTime();
if (nextEvaluationTime == null) {
- checkAndFire(current, ctx.timerService(), out);
+ checkAndFire(getProcessingTimeService());
} else {
LOG.info(
- "Trigger manager rate limiter triggered current: {}, next: {},
accumulated changes: {}",
- current,
+ "Trigger manager rate limiter triggered current: {}, next: {},
accumulated changes: {},{}",
+ getProcessingTimeService().getCurrentProcessingTime(),
nextEvaluationTime,
- accumulatedChanges);
+ accumulatedChanges,
+ maintenanceTaskNames);
rateLimiterTriggeredCounter.inc();
}
}
@Override
- public void onTimer(long timestamp, OnTimerContext ctx, Collector<Trigger>
out) throws Exception {
- init(out, ctx.timerService());
+ public void onProcessingTime(long l) {
this.nextEvaluationTime = null;
- checkAndFire(ctx.timerService().currentProcessingTime(),
ctx.timerService(), out);
+ checkAndFire(getProcessingTimeService());
}
@Override
- public void close() throws IOException {
- lockFactory.close();
+ public void close() throws Exception {
+ super.close();
+ this.lockTime = null;
+ }
+
+ @VisibleForTesting
+ void handleLockRelease(LockReleaseEvent event) {
+ Preconditions.checkArgument(lockTime != null, "Lock time is null, Can't
release lock");
+
+ if (event.timestamp() >= lockTime) {
+ this.lockTime = null;
+ this.shouldRestoreTasks = false;
+ }
}
- private void checkAndFire(long current, TimerService timerService,
Collector<Trigger> out) {
+ private void checkAndFire(ProcessingTimeService timerService) {
+ long current = timerService.getCurrentProcessingTime();
if (shouldRestoreTasks) {
- if (recoveryLock.isHeld()) {
+ if (lockTime != null) {
// Recovered tasks in progress. Skip trigger check
- LOG.debug("The recovery lock is still held at {}", current);
+ LOG.info("The recovery lock is still held at {}", current);
schedule(timerService, current + lockCheckDelayMs);
return;
- } else {
- LOG.info("The recovery is finished at {}", current);
- shouldRestoreTasks = false;
}
}
Integer taskToStart =
- nextTrigger(evaluators, accumulatedChanges, lastTriggerTimes, current,
startsFrom);
+ TriggerUtil.nextTrigger(
+ evaluators, accumulatedChanges, lastTriggerTimes, current,
startsFrom);
if (taskToStart == null) {
// Nothing to execute
if (!triggered) {
@@ -242,9 +289,10 @@ public class TriggerManager extends
KeyedProcessFunction<Boolean, TableChange, T
return;
}
- if (lock.tryLock()) {
+ if (lockTime == null) {
+ this.lockTime = current;
TableChange change = accumulatedChanges.get(taskToStart);
- out.collect(Trigger.create(current, taskToStart));
+ output.collect(new StreamRecord<>(Trigger.create(current, taskToStart),
current));
LOG.debug("Fired event with time: {}, collected: {} for {}", current,
change, tableName);
triggerCounters.get(taskToStart).inc();
accumulatedChanges.set(taskToStart, TableChange.empty());
@@ -260,68 +308,15 @@ public class TriggerManager extends
KeyedProcessFunction<Boolean, TableChange, T
concurrentRunThrottledCounter.inc();
schedule(timerService, current + lockCheckDelayMs);
}
-
- timerService.registerProcessingTimeTimer(nextEvaluationTime);
}
- private void schedule(TimerService timerService, long time) {
+ private void schedule(ProcessingTimeService timerService, long time) {
this.nextEvaluationTime = time;
- timerService.registerProcessingTimeTimer(time);
+ timerService.registerTimer(time, this);
}
- private static Integer nextTrigger(
- List<TriggerEvaluator> evaluators,
- List<TableChange> changes,
- List<Long> lastTriggerTimes,
- long currentTime,
- int startPos) {
- int current = startPos;
- do {
- if (evaluators
- .get(current)
- .check(changes.get(current), lastTriggerTimes.get(current),
currentTime)) {
- return current;
- }
-
- current = (current + 1) % evaluators.size();
- } while (current != startPos);
-
- return null;
- }
-
- private void init(Collector<Trigger> out, TimerService timerService) throws
Exception {
- if (!inited) {
- long current = timerService.currentProcessingTime();
-
- // Initialize from state
- this.nextEvaluationTime = nextEvaluationTimeState.value();
- this.accumulatedChanges =
Lists.newArrayList(accumulatedChangesState.get());
- this.lastTriggerTimes = Lists.newArrayList(lastTriggerTimesState.get());
-
- // Initialize if the state was empty
- if (accumulatedChanges.isEmpty()) {
- for (int i = 0; i < evaluators.size(); ++i) {
- accumulatedChanges.add(TableChange.empty());
- lastTriggerTimes.add(current);
- }
- }
-
- if (shouldRestoreTasks) {
- // When the job state is restored, there could be ongoing tasks.
- // To prevent collision with the new triggers the following is done:
- // - add a recovery lock
- // - fire a recovery trigger
- // This ensures that the tasks of the previous trigger are executed,
and the lock is removed
- // in the end. The result of the 'tryLock' is ignored as an already
existing lock prevents
- // collisions as well.
- recoveryLock.tryLock();
- out.collect(Trigger.recovery(current));
- if (nextEvaluationTime == null) {
- schedule(timerService, current + minFireDelayMs);
- }
- }
-
- inited = true;
- }
+ @VisibleForTesting
+ Long lockTime() {
+ return lockTime;
}
}
diff --git
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TriggerManagerOperatorFactory.java
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TriggerManagerOperatorFactory.java
new file mode 100644
index 0000000000..bace5e3afe
--- /dev/null
+++
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TriggerManagerOperatorFactory.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.maintenance.operator;
+
+import java.util.List;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
+import
org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory;
+import org.apache.flink.streaming.api.operators.CoordinatedOperatorFactory;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
+import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
+import org.apache.iceberg.flink.maintenance.api.Trigger;
+
+@Internal
+public class TriggerManagerOperatorFactory extends
AbstractStreamOperatorFactory<Trigger>
+ implements CoordinatedOperatorFactory<Trigger>,
+ OneInputStreamOperatorFactory<TableChange, Trigger> {
+
+ private final String lockId;
+ private final List<String> maintenanceTaskNames;
+ private final List<TriggerEvaluator> evaluators;
+ private final long minFireDelayMs;
+ private final long lockCheckDelayMs;
+
+ public TriggerManagerOperatorFactory(
+ String lockId,
+ List<String> maintenanceTaskNames,
+ List<TriggerEvaluator> evaluators,
+ long minFireDelayMs,
+ long lockCheckDelayMs) {
+ this.lockId = lockId;
+ this.maintenanceTaskNames = maintenanceTaskNames;
+ this.evaluators = evaluators;
+ this.minFireDelayMs = minFireDelayMs;
+ this.lockCheckDelayMs = lockCheckDelayMs;
+ }
+
+ @Override
+ public OperatorCoordinator.Provider getCoordinatorProvider(
+ String operatorName, OperatorID operatorID) {
+ return new TriggerManagerCoordinatorProvider(operatorName, operatorID);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public <T extends StreamOperator<Trigger>> T createStreamOperator(
+ StreamOperatorParameters<Trigger> parameters) {
+ OperatorID operatorId = parameters.getStreamConfig().getOperatorID();
+ OperatorEventGateway gateway =
+
parameters.getOperatorEventDispatcher().getOperatorEventGateway(operatorId);
+
+ TriggerManagerOperator triggerManagerOperator =
+ new TriggerManagerOperator(
+ parameters,
+ gateway,
+ maintenanceTaskNames,
+ evaluators,
+ minFireDelayMs,
+ lockCheckDelayMs,
+ lockId);
+
+ parameters
+ .getOperatorEventDispatcher()
+ .registerEventHandler(operatorId, triggerManagerOperator);
+
+ return (T) triggerManagerOperator;
+ }
+
+ @SuppressWarnings("rawtypes")
+ @Override
+ public Class<? extends StreamOperator> getStreamOperatorClass(ClassLoader
classLoader) {
+ return TriggerManagerOperator.class;
+ }
+
+ private static class TriggerManagerCoordinatorProvider
+ extends RecreateOnResetOperatorCoordinator.Provider {
+
+ private final String operatorName;
+
+ private TriggerManagerCoordinatorProvider(String operatorName, OperatorID
operatorID) {
+ super(operatorID);
+ this.operatorName = operatorName;
+ }
+
+ @Override
+ public OperatorCoordinator getCoordinator(OperatorCoordinator.Context
context) {
+ return new TriggerManagerCoordinator(operatorName, context);
+ }
+ }
+}
diff --git
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TriggerUtil.java
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TriggerUtil.java
new file mode 100644
index 0000000000..634e9a0d03
--- /dev/null
+++
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TriggerUtil.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.maintenance.operator;
+
+import java.util.List;
+
+class TriggerUtil {
+
+ private TriggerUtil() {}
+
+ static Integer nextTrigger(
+ List<TriggerEvaluator> evaluators,
+ List<TableChange> changes,
+ List<Long> lastTriggerTimes,
+ long currentTime,
+ int startPos) {
+ int current = startPos;
+ do {
+ if (evaluators
+ .get(current)
+ .check(changes.get(current), lastTriggerTimes.get(current),
currentTime)) {
+ return current;
+ }
+
+ current = (current + 1) % evaluators.size();
+ } while (current != startPos);
+
+ return null;
+ }
+}
diff --git
a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestMaintenanceE2E.java
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestMaintenanceE2E.java
index 0a860fec47..fe8457167a 100644
---
a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestMaintenanceE2E.java
+++
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestMaintenanceE2E.java
@@ -77,4 +77,43 @@ class TestMaintenanceE2E extends OperatorTestBase {
closeJobClient(jobClient);
}
}
+
+ @Test
+ void testE2eUseCoordinator() throws Exception {
+ TableMaintenance.forTable(env, tableLoader())
+ .uidSuffix("E2eTestUID")
+ .rateLimit(Duration.ofMinutes(10))
+ .lockCheckDelay(Duration.ofSeconds(10))
+ .add(
+ ExpireSnapshots.builder()
+ .scheduleOnCommitCount(10)
+ .maxSnapshotAge(Duration.ofMinutes(10))
+ .retainLast(5)
+ .deleteBatchSize(5)
+ .parallelism(8))
+ .add(
+ RewriteDataFiles.builder()
+ .scheduleOnDataFileCount(10)
+ .partialProgressEnabled(true)
+ .partialProgressMaxCommits(10)
+ .maxRewriteBytes(1000L)
+ .targetFileSizeBytes(1000L)
+ .minFileSizeBytes(1000L)
+ .maxFileSizeBytes(1000L)
+ .minInputFiles(10)
+ .deleteFileThreshold(10)
+ .rewriteAll(false)
+ .maxFileGroupSizeBytes(1000L))
+ .append();
+
+ JobClient jobClient = null;
+ try {
+ jobClient = env.executeAsync();
+
+ // Just make sure that we are able to instantiate the flow
+ assertThat(jobClient).isNotNull();
+ } finally {
+ closeJobClient(jobClient);
+ }
+ }
}
diff --git
a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestTableMaintenanceCoordinationLock.java
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestTableMaintenanceCoordinationLock.java
new file mode 100644
index 0000000000..eb5479045b
--- /dev/null
+++
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestTableMaintenanceCoordinationLock.java
@@ -0,0 +1,344 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.maintenance.api;
+
+import static org.apache.iceberg.flink.SimpleDataUtil.createRowData;
+import static
org.apache.iceberg.flink.maintenance.api.TableMaintenance.SOURCE_OPERATOR_NAME_PREFIX;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.Serializable;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.List;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.transformations.SourceTransformation;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.flink.FlinkSchemaUtil;
+import org.apache.iceberg.flink.TableLoader;
+import org.apache.iceberg.flink.maintenance.operator.ManualSource;
+import org.apache.iceberg.flink.maintenance.operator.OperatorTestBase;
+import org.apache.iceberg.flink.maintenance.operator.TableChange;
+import org.apache.iceberg.flink.sink.FlinkSink;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.awaitility.Awaitility;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+class TestTableMaintenanceCoordinationLock extends OperatorTestBase {
+ private static final String MAINTENANCE_TASK_NAME = "TestTableMaintenance";
+ private static final String[] TASKS =
+ new String[] {MAINTENANCE_TASK_NAME + " [0]", MAINTENANCE_TASK_NAME + "
[1]"};
+ private static final TableChange DUMMY_CHANGE =
TableChange.builder().commitCount(1).build();
+ private static final List<Trigger> PROCESSED =
+ Collections.synchronizedList(Lists.newArrayListWithCapacity(1));
+
+ private StreamExecutionEnvironment env;
+ private Table table;
+
+ @TempDir private File checkpointDir;
+
+ @BeforeEach
+ void beforeEach() throws IOException {
+ Configuration config = new Configuration();
+ config.set(CheckpointingOptions.CHECKPOINT_STORAGE, "filesystem");
+ config.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, "file://" +
checkpointDir.getPath());
+ this.env = StreamExecutionEnvironment.getExecutionEnvironment(config);
+ this.table = createTable();
+ insert(table, 1, "a");
+
+ PROCESSED.clear();
+ MaintenanceTaskBuilderForTest.counter = 0;
+ }
+
+ @Test
+ void testForChangeStream() throws Exception {
+ ManualSource<TableChange> schedulerSource =
+ new ManualSource<>(env, TypeInformation.of(TableChange.class));
+
+ TableMaintenance.Builder streamBuilder =
+ TableMaintenance.forChangeStream(schedulerSource.dataStream(),
tableLoader())
+ .rateLimit(Duration.ofMillis(2))
+ .lockCheckDelay(Duration.ofSeconds(3))
+ .add(
+ new MaintenanceTaskBuilderForTest(true)
+ .scheduleOnCommitCount(1)
+ .scheduleOnDataFileCount(2)
+ .scheduleOnDataFileSize(3L)
+ .scheduleOnEqDeleteFileCount(4)
+ .scheduleOnEqDeleteRecordCount(5L)
+ .scheduleOnPosDeleteFileCount(6)
+ .scheduleOnPosDeleteRecordCount(7L)
+ .scheduleOnInterval(Duration.ofHours(1)));
+
+ sendEvents(schedulerSource, streamBuilder,
ImmutableList.of(Tuple2.of(DUMMY_CHANGE, 1)));
+ }
+
+ @Test
+ void testForTable() throws Exception {
+ TableLoader tableLoader = tableLoader();
+
+ env.enableCheckpointing(10);
+
+ TableMaintenance.forTable(env, tableLoader)
+ .rateLimit(Duration.ofMillis(2))
+ .maxReadBack(2)
+ .add(new MaintenanceTaskBuilderForTest(true).scheduleOnCommitCount(2))
+ .append();
+
+ // Creating a stream for inserting data into the table concurrently
+ ManualSource<RowData> insertSource =
+ new ManualSource<>(env,
InternalTypeInfo.of(FlinkSchemaUtil.convert(table.schema())));
+ FlinkSink.forRowData(insertSource.dataStream())
+ .tableLoader(tableLoader)
+ .uidPrefix(UID_SUFFIX + "-iceberg-sink")
+ .append();
+
+ JobClient jobClient = null;
+ try {
+ jobClient = env.executeAsync();
+
+ insertSource.sendRecord(createRowData(2, "b"));
+
+ Awaitility.await().until(() -> PROCESSED.size() == 1);
+ } finally {
+ closeJobClient(jobClient);
+ }
+ }
+
+ @Test
+ void testUidAndSlotSharingGroup() throws IOException {
+ TableMaintenance.forChangeStream(
+ new ManualSource<>(env,
TypeInformation.of(TableChange.class)).dataStream(),
+ tableLoader())
+ .uidSuffix(UID_SUFFIX)
+ .slotSharingGroup(SLOT_SHARING_GROUP)
+ .add(
+ new MaintenanceTaskBuilderForTest(true)
+ .scheduleOnCommitCount(1)
+ .uidSuffix(UID_SUFFIX)
+ .slotSharingGroup(SLOT_SHARING_GROUP))
+ .append();
+
+ checkUidsAreSet(env, UID_SUFFIX);
+ checkSlotSharingGroupsAreSet(env, SLOT_SHARING_GROUP);
+ }
+
+ @Test
+ void testUidAndSlotSharingGroupUnset() throws IOException {
+ TableMaintenance.forChangeStream(
+ new ManualSource<>(env,
TypeInformation.of(TableChange.class)).dataStream(),
+ tableLoader())
+ .add(new MaintenanceTaskBuilderForTest(true).scheduleOnCommitCount(1))
+ .append();
+
+ checkUidsAreSet(env, null);
+ checkSlotSharingGroupsAreSet(env, null);
+ }
+
+ @Test
+ void testUidAndSlotSharingGroupInherit() throws IOException {
+ TableMaintenance.forChangeStream(
+ new ManualSource<>(env,
TypeInformation.of(TableChange.class)).dataStream(),
+ tableLoader())
+ .uidSuffix(UID_SUFFIX)
+ .slotSharingGroup(SLOT_SHARING_GROUP)
+ .add(new MaintenanceTaskBuilderForTest(true).scheduleOnCommitCount(1))
+ .append();
+
+ checkUidsAreSet(env, UID_SUFFIX);
+ checkSlotSharingGroupsAreSet(env, SLOT_SHARING_GROUP);
+ }
+
+ @Test
+ void testUidAndSlotSharingGroupOverWrite() throws IOException {
+ String anotherUid = "Another-UID";
+ String anotherSlotSharingGroup = "Another-SlotSharingGroup";
+ TableMaintenance.forChangeStream(
+ new ManualSource<>(env,
TypeInformation.of(TableChange.class)).dataStream(),
+ tableLoader())
+ .uidSuffix(UID_SUFFIX)
+ .slotSharingGroup(SLOT_SHARING_GROUP)
+ .add(
+ new MaintenanceTaskBuilderForTest(true)
+ .scheduleOnCommitCount(1)
+ .uidSuffix(anotherUid)
+ .slotSharingGroup(anotherSlotSharingGroup))
+ .append();
+
+ // Choose an operator from the scheduler part of the graph
+ Transformation<?> schedulerTransformation =
+ env.getTransformations().stream()
+ .filter(t -> t.getName().equals("Trigger manager"))
+ .findFirst()
+ .orElseThrow();
+ assertThat(schedulerTransformation.getUid()).contains(UID_SUFFIX);
+ assertThat(schedulerTransformation.getSlotSharingGroup()).isPresent();
+ assertThat(schedulerTransformation.getSlotSharingGroup().get().getName())
+ .isEqualTo(SLOT_SHARING_GROUP);
+
+ // Choose an operator from the maintenance task part of the graph
+ Transformation<?> scheduledTransformation =
+ env.getTransformations().stream()
+ .filter(t -> t.getName().startsWith(MAINTENANCE_TASK_NAME))
+ .findFirst()
+ .orElseThrow();
+ assertThat(scheduledTransformation.getUid()).contains(anotherUid);
+ assertThat(scheduledTransformation.getSlotSharingGroup()).isPresent();
+ assertThat(scheduledTransformation.getSlotSharingGroup().get().getName())
+ .isEqualTo(anotherSlotSharingGroup);
+ }
+
+ @Test
+ void testUidAndSlotSharingGroupForMonitorSource() throws IOException {
+ TableMaintenance.forTable(env, tableLoader())
+ .uidSuffix(UID_SUFFIX)
+ .slotSharingGroup(SLOT_SHARING_GROUP)
+ .add(
+ new MaintenanceTaskBuilderForTest(true)
+ .scheduleOnCommitCount(1)
+ .uidSuffix(UID_SUFFIX)
+ .slotSharingGroup(SLOT_SHARING_GROUP))
+ .append();
+
+ Transformation<?> source = monitorSource();
+ assertThat(source).isNotNull();
+ assertThat(source.getUid()).contains(UID_SUFFIX);
+ assertThat(source.getSlotSharingGroup()).isPresent();
+
assertThat(source.getSlotSharingGroup().get().getName()).isEqualTo(SLOT_SHARING_GROUP);
+
+ checkUidsAreSet(env, UID_SUFFIX);
+ checkSlotSharingGroupsAreSet(env, SLOT_SHARING_GROUP);
+ }
+
+ /**
+ * Sends the events though the {@link ManualSource} provided, and waits
until the given number of
+ * records are processed.
+ *
+ * @param schedulerSource used for sending the events
+ * @param streamBuilder used for generating the job
+ * @param eventsAndResultNumbers the pair of the event and the expected
processed records
+ * @throws Exception if any
+ */
+ private void sendEvents(
+ ManualSource<TableChange> schedulerSource,
+ TableMaintenance.Builder streamBuilder,
+ List<Tuple2<TableChange, Integer>> eventsAndResultNumbers)
+ throws Exception {
+ streamBuilder.append();
+
+ JobClient jobClient = null;
+ try {
+ jobClient = env.executeAsync();
+
+ eventsAndResultNumbers.forEach(
+ eventsAndResultNumber -> {
+ int expectedSize = PROCESSED.size() + eventsAndResultNumber.f1;
+ schedulerSource.sendRecord(eventsAndResultNumber.f0);
+ Awaitility.await().until(() -> PROCESSED.size() == expectedSize);
+ });
+ } finally {
+ closeJobClient(jobClient);
+ }
+ }
+
+ /**
+ * Finds the {@link
org.apache.iceberg.flink.maintenance.operator.MonitorSource} for testing
+ * purposes by parsing the transformation tree.
+ *
+ * @return The monitor source if we found it
+ */
+ private Transformation<?> monitorSource() {
+ assertThat(env.getTransformations()).isNotEmpty();
+ assertThat(env.getTransformations().get(0).getInputs()).isNotEmpty();
+
assertThat(env.getTransformations().get(0).getInputs().get(0).getInputs()).isNotEmpty();
+
+ Transformation<?> result =
+ env.getTransformations().get(0).getInputs().get(0).getInputs().get(0);
+
+ // Some checks to make sure this is the transformation we are looking for
+ assertThat(result).isInstanceOf(SourceTransformation.class);
+ assertThat(result.getName()).startsWith(SOURCE_OPERATOR_NAME_PREFIX);
+
+ return result;
+ }
+
+ private static class MaintenanceTaskBuilderForTest
+ extends MaintenanceTaskBuilder<MaintenanceTaskBuilderForTest> {
+ private final boolean success;
+ private final int id;
+ private static int counter = 0;
+
+ MaintenanceTaskBuilderForTest(boolean success) {
+ this.success = success;
+ this.id = counter;
+ ++counter;
+ }
+
+ @Override
+ String maintenanceTaskName() {
+ return MAINTENANCE_TASK_NAME;
+ }
+
+ @Override
+ DataStream<TaskResult> append(DataStream<Trigger> trigger) {
+ String name = TASKS[id];
+ return trigger
+ .map(new DummyMaintenanceTask(success))
+ .name(name)
+ .uid(uidSuffix() + "-test-mapper-" + name + "-" + id)
+ .slotSharingGroup(slotSharingGroup())
+ .forceNonParallel();
+ }
+ }
+
+ private record DummyMaintenanceTask(boolean success)
+ implements MapFunction<Trigger, TaskResult>,
ResultTypeQueryable<TaskResult>, Serializable {
+
+ @Override
+ public TaskResult map(Trigger trigger) {
+ PROCESSED.add(trigger);
+
+ return new TaskResult(
+ trigger.taskId(),
+ trigger.timestamp(),
+ success,
+ success ? Collections.emptyList() : Lists.newArrayList(new
Exception("Testing error")));
+ }
+
+ @Override
+ public TypeInformation<TaskResult> getProducedType() {
+ return TypeInformation.of(TaskResult.class);
+ }
+ }
+}
diff --git
a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/CoordinatorTestBase.java
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/CoordinatorTestBase.java
new file mode 100644
index 0000000000..5bfc889f27
--- /dev/null
+++
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/CoordinatorTestBase.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.maintenance.operator;
+
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.operators.coordination.EventReceivingTasks;
+import org.junit.jupiter.api.Timeout;
+
+@Timeout(value = 10)
+class CoordinatorTestBase extends OperatorTestBase {
+ protected static final String OPERATOR_NAME = "TestCoordinator";
+ protected static final String OPERATOR_NAME_1 = "TestCoordinator_1";
+ protected static final OperatorID TEST_OPERATOR_ID = new OperatorID(1234L,
5678L);
+ protected static final OperatorID TEST_OPERATOR_ID_1 = new OperatorID(1235L,
5679L);
+ protected static final int NUM_SUBTASKS = 1;
+ protected static final LockRegisterEvent LOCK_REGISTER_EVENT =
+ new LockRegisterEvent(DUMMY_TABLE_NAME);
+ protected static final LockReleaseEvent LOCK_RELEASE_EVENT =
+ new LockReleaseEvent(DUMMY_TABLE_NAME, 1L);
+
+ protected static void setAllTasksReady(
+ BaseCoordinator baseCoordinator, EventReceivingTasks receivingTasks) {
+ for (int i = 0; i < NUM_SUBTASKS; i++) {
+ baseCoordinator.executionAttemptReady(i, 0,
receivingTasks.createGatewayForSubtask(i, 0));
+ }
+ }
+}
diff --git
a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestLockRemoveCoordinator.java
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestLockRemoveCoordinator.java
new file mode 100644
index 0000000000..3427d2abe0
--- /dev/null
+++
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestLockRemoveCoordinator.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.maintenance.operator;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.util.concurrent.ExecutionException;
+import org.apache.flink.runtime.operators.coordination.EventReceivingTasks;
+import
org.apache.flink.runtime.operators.coordination.MockOperatorCoordinatorContext;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.api.parallel.Execution;
+import org.junit.jupiter.api.parallel.ExecutionMode;
+
+@Timeout(value = 10)
+@Execution(ExecutionMode.SAME_THREAD)
+class TestLockRemoveCoordinator extends CoordinatorTestBase {
+
+ private EventReceivingTasks receivingTasks;
+
+ @BeforeEach
+ void before() {
+ this.receivingTasks = EventReceivingTasks.createForRunningTasks();
+ }
+
+ @Test
+ void testEventHandling() throws Exception {
+ try (LockRemoverCoordinator lockRemoverCoordinator = createCoordinator()) {
+
+ lockRemoverCoordinator.start();
+
+ setAllTasksReady(lockRemoverCoordinator, receivingTasks);
+
+ lockRemoverCoordinator.handleReleaseLock(LOCK_RELEASE_EVENT);
+ assertThat(lockRemoverCoordinator.pendingReleaseEvents()).hasSize(1);
+ }
+ }
+
+ private LockRemoverCoordinator createCoordinator() {
+ return new LockRemoverCoordinator(
+ OPERATOR_NAME, new MockOperatorCoordinatorContext(TEST_OPERATOR_ID,
1)) {
+ @Override
+ void runInCoordinatorThread(Runnable runnable, String actionString) {
+ try {
+ coordinatorExecutor().submit(runnable).get();
+ } catch (InterruptedException | ExecutionException e) {
+ throw new RuntimeException(actionString, e);
+ }
+ }
+ };
+ }
+}
diff --git
a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestLockRemoverOperation.java
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestLockRemoverOperation.java
new file mode 100644
index 0000000000..9ba95cab30
--- /dev/null
+++
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestLockRemoverOperation.java
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.maintenance.operator;
+
+import static
org.apache.iceberg.flink.maintenance.operator.TableMaintenanceMetrics.FAILED_TASK_COUNTER;
+import static
org.apache.iceberg.flink.maintenance.operator.TableMaintenanceMetrics.LAST_RUN_DURATION_MS;
+import static
org.apache.iceberg.flink.maintenance.operator.TableMaintenanceMetrics.SUCCEEDED_TASK_COUNTER;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.IOException;
+import java.util.List;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import
org.apache.flink.runtime.operators.coordination.MockOperatorCoordinatorContext;
+import
org.apache.flink.runtime.operators.coordination.MockOperatorEventGateway;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.iceberg.flink.maintenance.api.TaskResult;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.awaitility.Awaitility;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+@Timeout(value = 10)
+class TestLockRemoverOperation extends OperatorTestBase {
+ private static final String[] TASKS = new String[] {"task0", "task1",
"task2"};
+ private static final String OPERATOR_NAME = "TestCoordinator";
+ private static final OperatorID TEST_OPERATOR_ID = new OperatorID(1234L,
5678L);
+
+ private LockRemoverCoordinator lockRemoverCoordinator;
+
+ @BeforeEach
+ void before() {
+ MetricsReporterFactoryForTests.reset();
+ this.lockRemoverCoordinator = createCoordinator();
+ try {
+ lockRemoverCoordinator.start();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @AfterEach
+ void after() throws IOException {
+ super.after();
+ try {
+ lockRemoverCoordinator.close();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Test
+ void testProcess() throws Exception {
+ MockOperatorEventGateway mockGateway = new MockOperatorEventGateway();
+ LockRemoverOperator operator =
+ new LockRemoverOperator(null, mockGateway, DUMMY_TASK_NAME,
Lists.newArrayList(TASKS[0]));
+ try (OneInputStreamOperatorTestHarness<TaskResult, Void> testHarness =
+ createHarness(operator)) {
+
+ testHarness.processElement(
+ new StreamRecord<>(new TaskResult(0, 0L, true,
Lists.newArrayList())));
+ assertThat(mockGateway.getEventsSent()).hasSize(0);
+
+ testHarness.processWatermark(WATERMARK);
+ assertThat(mockGateway.getEventsSent()).hasSize(1);
+ }
+ }
+
+ @Test
+ void testMetrics() throws Exception {
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ ManualSource<TaskResult> source = new ManualSource<>(env,
TypeInformation.of(TaskResult.class));
+ source
+ .dataStream()
+ .transform(
+ DUMMY_TASK_NAME,
+ TypeInformation.of(Void.class),
+ new LockRemoverOperatorFactory(DUMMY_TABLE_NAME,
Lists.newArrayList(TASKS)))
+ .forceNonParallel();
+
+ JobClient jobClient = null;
+ long time = System.currentTimeMillis();
+ try {
+ jobClient = env.executeAsync();
+ // Start the 2 successful and one failed result trigger for task1, and 3
successful for task2
+ processAndCheck(source, new TaskResult(0, time, true,
Lists.newArrayList()));
+ processAndCheck(source, new TaskResult(1, 0L, true,
Lists.newArrayList()));
+ processAndCheck(source, new TaskResult(1, 0L, true,
Lists.newArrayList()));
+ processAndCheck(source, new TaskResult(0, time, false,
Lists.newArrayList()));
+ processAndCheck(source, new TaskResult(0, time, true,
Lists.newArrayList()));
+ processAndCheck(source, new TaskResult(1, 0L, true,
Lists.newArrayList()));
+
+ Awaitility.await()
+ .until(
+ () ->
+ MetricsReporterFactoryForTests.counter(
+ ImmutableList.of(
+ DUMMY_TASK_NAME,
+ DUMMY_TABLE_NAME,
+ TASKS[1],
+ "1",
+ SUCCEEDED_TASK_COUNTER))
+ .equals(3L));
+
+ // Final check all the counters
+ MetricsReporterFactoryForTests.assertCounters(
+ new ImmutableMap.Builder<List<String>, Long>()
+ .put(
+ ImmutableList.of(
+ DUMMY_TASK_NAME, DUMMY_TABLE_NAME, TASKS[0], "0",
SUCCEEDED_TASK_COUNTER),
+ 2L)
+ .put(
+ ImmutableList.of(
+ DUMMY_TASK_NAME, DUMMY_TABLE_NAME, TASKS[0], "0",
FAILED_TASK_COUNTER),
+ 1L)
+ .put(
+ ImmutableList.of(
+ DUMMY_TASK_NAME, DUMMY_TABLE_NAME, TASKS[1], "1",
SUCCEEDED_TASK_COUNTER),
+ 3L)
+ .put(
+ ImmutableList.of(
+ DUMMY_TASK_NAME, DUMMY_TABLE_NAME, TASKS[1], "1",
FAILED_TASK_COUNTER),
+ 0L)
+ .put(
+ ImmutableList.of(
+ DUMMY_TASK_NAME, DUMMY_TABLE_NAME, TASKS[2], "2",
SUCCEEDED_TASK_COUNTER),
+ 0L)
+ .put(
+ ImmutableList.of(
+ DUMMY_TASK_NAME, DUMMY_TABLE_NAME, TASKS[2], "2",
FAILED_TASK_COUNTER),
+ 0L)
+ .build());
+
+ assertThat(
+ MetricsReporterFactoryForTests.gauge(
+ ImmutableList.of(
+ DUMMY_TASK_NAME, DUMMY_TABLE_NAME, TASKS[0], "0",
LAST_RUN_DURATION_MS)))
+ .isPositive();
+ assertThat(
+ MetricsReporterFactoryForTests.gauge(
+ ImmutableList.of(
+ DUMMY_TASK_NAME, DUMMY_TABLE_NAME, TASKS[1], "1",
LAST_RUN_DURATION_MS)))
+ .isGreaterThan(time);
+ assertThat(
+ MetricsReporterFactoryForTests.gauge(
+ ImmutableList.of(
+ DUMMY_TASK_NAME, DUMMY_TABLE_NAME, TASKS[2], "2",
LAST_RUN_DURATION_MS)))
+ .isZero();
+ } finally {
+ closeJobClient(jobClient);
+ }
+ }
+
+ private void processAndCheck(ManualSource<TaskResult> source, TaskResult
input) {
+ List<String> counterKey =
+ ImmutableList.of(
+ DUMMY_TASK_NAME,
+ DUMMY_TABLE_NAME,
+ TASKS[input.taskIndex()],
+ String.valueOf(input.taskIndex()),
+ input.success() ? SUCCEEDED_TASK_COUNTER : FAILED_TASK_COUNTER);
+ Long counterValue = MetricsReporterFactoryForTests.counter(counterKey);
+ Long expected = counterValue != null ? counterValue + 1 : 1L;
+
+ source.sendRecord(input);
+ source.sendWatermark(input.startEpoch());
+
+ Awaitility.await()
+ .until(() ->
expected.equals(MetricsReporterFactoryForTests.counter(counterKey)));
+ }
+
+ private OneInputStreamOperatorTestHarness<TaskResult, Void> createHarness(
+ LockRemoverOperator lockRemoverOperator) throws Exception {
+ OneInputStreamOperatorTestHarness<TaskResult, Void> harness =
+ new OneInputStreamOperatorTestHarness<>(lockRemoverOperator);
+ harness.open();
+ return harness;
+ }
+
+ private static LockRemoverCoordinator createCoordinator() {
+ return new LockRemoverCoordinator(
+ OPERATOR_NAME, new MockOperatorCoordinatorContext(TEST_OPERATOR_ID,
1));
+ }
+}
diff --git
a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestTriggerManagerCoordinator.java
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestTriggerManagerCoordinator.java
new file mode 100644
index 0000000000..5fc6695f3e
--- /dev/null
+++
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestTriggerManagerCoordinator.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.maintenance.operator;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.util.concurrent.ExecutionException;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.operators.coordination.EventReceivingTasks;
+import
org.apache.flink.runtime.operators.coordination.MockOperatorCoordinatorContext;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.api.parallel.Execution;
+import org.junit.jupiter.api.parallel.ExecutionMode;
+
+@Timeout(value = 10)
+@Execution(ExecutionMode.SAME_THREAD)
+class TestTriggerManagerCoordinator extends CoordinatorTestBase {
+
+ private EventReceivingTasks receivingTasks;
+ private EventReceivingTasks receivingTasks1;
+
+ @BeforeEach
+ void before() {
+ this.receivingTasks = EventReceivingTasks.createForRunningTasks();
+ this.receivingTasks1 = EventReceivingTasks.createForRunningTasks();
+ }
+
+ @Test
+ void testEventHandling() throws Exception {
+ try (TriggerManagerCoordinator triggerManagerCoordinator =
+ createCoordinator(OPERATOR_NAME, TEST_OPERATOR_ID);
+ TriggerManagerCoordinator triggerManagerCoordinator1 =
+ createCoordinator(OPERATOR_NAME_1, TEST_OPERATOR_ID_1)) {
+
+ triggerManagerCoordinator.start();
+ triggerManagerCoordinator1.start();
+
+ setAllTasksReady(triggerManagerCoordinator, receivingTasks);
+ setAllTasksReady(triggerManagerCoordinator1, receivingTasks1);
+
+ triggerManagerCoordinator.handleEventFromOperator(0, 0,
LOCK_REGISTER_EVENT);
+
assertThat(receivingTasks.getSentEventsForSubtask(0).size()).isEqualTo(0);
+
+ // release lock from coordinator1 and get one event from coordinator
+ triggerManagerCoordinator1.handleReleaseLock(LOCK_RELEASE_EVENT);
+
assertThat(receivingTasks.getSentEventsForSubtask(0).size()).isEqualTo(1);
+
assertThat(receivingTasks1.getSentEventsForSubtask(0).size()).isEqualTo(0);
+ }
+ }
+
+ @Test
+ void testEventArriveBeforeRegister() throws Exception {
+ try (TriggerManagerCoordinator triggerManagerCoordinator =
+ createCoordinator(OPERATOR_NAME, TEST_OPERATOR_ID)) {
+
+ triggerManagerCoordinator.start();
+ setAllTasksReady(triggerManagerCoordinator, receivingTasks);
+
+ // release event arrive before register
+ triggerManagerCoordinator.handleReleaseLock(LOCK_RELEASE_EVENT);
+ assertThat(triggerManagerCoordinator.pendingReleaseEvents()).hasSize(1);
+
+ triggerManagerCoordinator.handleEventFromOperator(0, 0,
LOCK_REGISTER_EVENT);
+
assertThat(receivingTasks.getSentEventsForSubtask(0).size()).isEqualTo(1);
+
+ assertThat(triggerManagerCoordinator.pendingReleaseEvents()).hasSize(0);
+ }
+ }
+
+ private static TriggerManagerCoordinator createCoordinator(
+ String operatorName, OperatorID operatorID) {
+ return new TriggerManagerCoordinator(
+ operatorName, new MockOperatorCoordinatorContext(operatorID, 1)) {
+ @Override
+ void runInCoordinatorThread(Runnable runnable, String actionString) {
+ try {
+ coordinatorExecutor().submit(runnable).get();
+ } catch (InterruptedException | ExecutionException e) {
+ throw new RuntimeException(actionString, e);
+ }
+ }
+ };
+ }
+}
diff --git
a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestTriggerManagerOperator.java
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestTriggerManagerOperator.java
new file mode 100644
index 0000000000..ea7d8b9625
--- /dev/null
+++
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestTriggerManagerOperator.java
@@ -0,0 +1,668 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.maintenance.operator;
+
+import static
org.apache.iceberg.flink.maintenance.operator.TableMaintenanceMetrics.CONCURRENT_RUN_THROTTLED;
+import static
org.apache.iceberg.flink.maintenance.operator.TableMaintenanceMetrics.NOTHING_TO_TRIGGER;
+import static
org.apache.iceberg.flink.maintenance.operator.TableMaintenanceMetrics.RATE_LIMITER_TRIGGERED;
+import static
org.apache.iceberg.flink.maintenance.operator.TableMaintenanceMetrics.TRIGGERED;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.List;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import
org.apache.flink.runtime.operators.coordination.MockOperatorCoordinatorContext;
+import
org.apache.flink.runtime.operators.coordination.MockOperatorEventGateway;
+import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.flink.maintenance.api.Trigger;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.awaitility.Awaitility;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+class TestTriggerManagerOperator extends OperatorTestBase {
+ private static final long DELAY = 10L;
+ private static final String OPERATOR_NAME = "TestCoordinator";
+ private static final OperatorID TEST_OPERATOR_ID = new OperatorID(1234L,
5678L);
+ private static final String[] TASKS = new String[] {"task0", "task1"};
+ private long processingTime = 0L;
+ private String tableName;
+ private TriggerManagerCoordinator triggerManagerCoordinator;
+ private LockReleaseEvent lockReleaseEvent;
+
+ @BeforeEach
+ void before() {
+ super.before();
+ Table table = createTable();
+ this.tableName = table.name();
+ lockReleaseEvent = new LockReleaseEvent(tableName, 1L);
+ this.triggerManagerCoordinator = createCoordinator();
+ try {
+ triggerManagerCoordinator.start();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @AfterEach
+ void after() throws IOException {
+ super.after();
+ try {
+ triggerManagerCoordinator.close();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Test
+ void testCommitCount() throws Exception {
+ MockOperatorEventGateway mockGateway = new MockOperatorEventGateway();
+ TriggerManagerOperator operator =
+ createOperator(new TriggerEvaluator.Builder().commitCount(3).build(),
mockGateway);
+ try (OneInputStreamOperatorTestHarness<TableChange, Trigger> testHarness =
+ createHarness(operator)) {
+
+ addEventAndCheckResult(
+ operator, testHarness, TableChange.builder().commitCount(1).build(),
0);
+ addEventAndCheckResult(
+ operator, testHarness, TableChange.builder().commitCount(2).build(),
1);
+ addEventAndCheckResult(
+ operator, testHarness, TableChange.builder().commitCount(3).build(),
2);
+ addEventAndCheckResult(
+ operator, testHarness,
TableChange.builder().commitCount(10).build(), 3);
+
+ // No trigger in this case
+ addEventAndCheckResult(
+ operator, testHarness, TableChange.builder().commitCount(1).build(),
3);
+ addEventAndCheckResult(
+ operator, testHarness, TableChange.builder().commitCount(1).build(),
3);
+
+ addEventAndCheckResult(
+ operator, testHarness, TableChange.builder().commitCount(1).build(),
4);
+ }
+ }
+
+ @Test
+ void testDataFileCount() throws Exception {
+ TriggerManagerOperator operator =
+ createOperator(
+ new TriggerEvaluator.Builder().dataFileCount(3).build(),
+ new MockOperatorEventGateway());
+ try (OneInputStreamOperatorTestHarness<TableChange, Trigger> testHarness =
+ createHarness(operator)) {
+ addEventAndCheckResult(
+ operator, testHarness,
TableChange.builder().dataFileCount(1).build(), 0);
+
+ addEventAndCheckResult(
+ operator, testHarness,
TableChange.builder().dataFileCount(2).build(), 1);
+ addEventAndCheckResult(
+ operator, testHarness,
TableChange.builder().dataFileCount(3).build(), 2);
+ addEventAndCheckResult(
+ operator, testHarness,
TableChange.builder().dataFileCount(5).build(), 3);
+
+ // No trigger in this case
+ addEventAndCheckResult(
+ operator, testHarness,
TableChange.builder().dataFileCount(1).build(), 3);
+
+ addEventAndCheckResult(
+ operator, testHarness,
TableChange.builder().dataFileCount(2).build(), 4);
+ }
+ }
+
+ @Test
+ void testDataFileSizeInBytes() throws Exception {
+ TriggerManagerOperator operator =
+ createOperator(
+ new TriggerEvaluator.Builder().dataFileSizeInBytes(3).build(),
+ new MockOperatorEventGateway());
+ try (OneInputStreamOperatorTestHarness<TableChange, Trigger> testHarness =
+ createHarness(operator)) {
+
+ addEventAndCheckResult(
+ operator, testHarness,
TableChange.builder().dataFileSizeInBytes(1L).build(), 0);
+ addEventAndCheckResult(
+ operator, testHarness,
TableChange.builder().dataFileSizeInBytes(2L).build(), 1);
+ addEventAndCheckResult(
+ operator, testHarness,
TableChange.builder().dataFileSizeInBytes(5L).build(), 2);
+
+ // No trigger in this case
+ addEventAndCheckResult(
+ operator, testHarness,
TableChange.builder().dataFileSizeInBytes(1L).build(), 2);
+
+ addEventAndCheckResult(
+ operator, testHarness,
TableChange.builder().dataFileSizeInBytes(2L).build(), 3);
+ }
+ }
+
+ @Test
+ void testPosDeleteFileCount() throws Exception {
+ TriggerManagerOperator operator =
+ createOperator(
+ new TriggerEvaluator.Builder().posDeleteFileCount(3).build(),
+ new MockOperatorEventGateway());
+ try (OneInputStreamOperatorTestHarness<TableChange, Trigger> testHarness =
+ createHarness(operator)) {
+ addEventAndCheckResult(
+ operator, testHarness,
TableChange.builder().posDeleteFileCount(1).build(), 0);
+ addEventAndCheckResult(
+ operator, testHarness,
TableChange.builder().posDeleteFileCount(2).build(), 1);
+ addEventAndCheckResult(
+ operator, testHarness,
TableChange.builder().posDeleteFileCount(3).build(), 2);
+ addEventAndCheckResult(
+ operator, testHarness,
TableChange.builder().posDeleteFileCount(10).build(), 3);
+
+ // No trigger in this case
+ addEventAndCheckResult(
+ operator, testHarness,
TableChange.builder().posDeleteFileCount(1).build(), 3);
+ addEventAndCheckResult(
+ operator, testHarness,
TableChange.builder().posDeleteFileCount(1).build(), 3);
+
+ addEventAndCheckResult(
+ operator, testHarness,
TableChange.builder().posDeleteFileCount(1).build(), 4);
+ }
+ }
+
+ @Test
+ void testPosDeleteRecordCount() throws Exception {
+
+ TriggerManagerOperator operator =
+ createOperator(
+ new TriggerEvaluator.Builder().posDeleteRecordCount(3).build(),
+ new MockOperatorEventGateway());
+ try (OneInputStreamOperatorTestHarness<TableChange, Trigger> testHarness =
+ createHarness(operator)) {
+ addEventAndCheckResult(
+ operator, testHarness,
TableChange.builder().posDeleteRecordCount(1L).build(), 0);
+ addEventAndCheckResult(
+ operator, testHarness,
TableChange.builder().posDeleteRecordCount(2L).build(), 1);
+ addEventAndCheckResult(
+ operator, testHarness,
TableChange.builder().posDeleteRecordCount(5L).build(), 2);
+
+ // No trigger in this case
+ addEventAndCheckResult(
+ operator, testHarness,
TableChange.builder().posDeleteRecordCount(1L).build(), 2);
+
+ addEventAndCheckResult(
+ operator, testHarness,
TableChange.builder().posDeleteRecordCount(2L).build(), 3);
+ }
+ }
+
+ @Test
+ void testEqDeleteFileCount() throws Exception {
+ TriggerManagerOperator operator =
+ createOperator(
+ new TriggerEvaluator.Builder().eqDeleteFileCount(3).build(),
+ new MockOperatorEventGateway());
+ try (OneInputStreamOperatorTestHarness<TableChange, Trigger> testHarness =
+ createHarness(operator)) {
+
+ addEventAndCheckResult(
+ operator, testHarness,
TableChange.builder().eqDeleteFileCount(1).build(), 0);
+ addEventAndCheckResult(
+ operator, testHarness,
TableChange.builder().eqDeleteFileCount(2).build(), 1);
+ addEventAndCheckResult(
+ operator, testHarness,
TableChange.builder().eqDeleteFileCount(3).build(), 2);
+ addEventAndCheckResult(
+ operator, testHarness,
TableChange.builder().eqDeleteFileCount(10).build(), 3);
+
+ // No trigger in this case
+ addEventAndCheckResult(
+ operator, testHarness,
TableChange.builder().eqDeleteFileCount(1).build(), 3);
+ addEventAndCheckResult(
+ operator, testHarness,
TableChange.builder().eqDeleteFileCount(1).build(), 3);
+
+ addEventAndCheckResult(
+ operator, testHarness,
TableChange.builder().eqDeleteFileCount(1).build(), 4);
+ }
+ }
+
+ @Test
+ void testEqDeleteRecordCount() throws Exception {
+ TriggerManagerOperator operator =
+ createOperator(
+ new TriggerEvaluator.Builder().eqDeleteRecordCount(3).build(),
+ new MockOperatorEventGateway());
+ try (OneInputStreamOperatorTestHarness<TableChange, Trigger> testHarness =
+ createHarness(operator)) {
+ addEventAndCheckResult(
+ operator, testHarness,
TableChange.builder().eqDeleteRecordCount(1L).build(), 0);
+ addEventAndCheckResult(
+ operator, testHarness,
TableChange.builder().eqDeleteRecordCount(2L).build(), 1);
+ addEventAndCheckResult(
+ operator, testHarness,
TableChange.builder().eqDeleteRecordCount(5L).build(), 2);
+
+ // No trigger in this case
+ addEventAndCheckResult(
+ operator, testHarness,
TableChange.builder().eqDeleteRecordCount(1L).build(), 2);
+
+ addEventAndCheckResult(
+ operator, testHarness,
TableChange.builder().eqDeleteRecordCount(2L).build(), 3);
+ }
+ }
+
+ @Test
+ void testTimeout() throws Exception {
+ TriggerManagerOperator operator =
+ createOperator(
+ new
TriggerEvaluator.Builder().timeout(Duration.ofSeconds(1)).build(),
+ new MockOperatorEventGateway());
+ try (OneInputStreamOperatorTestHarness<TableChange, Trigger> testHarness =
+ createHarness(operator)) {
+ TableChange event =
TableChange.builder().dataFileCount(1).commitCount(1).build();
+
+ // Wait for some time
+ testHarness.processElement(event, EVENT_TIME);
+ assertThat(testHarness.extractOutputValues()).isEmpty();
+
+ // Wait for the timeout to expire
+ long newTime = EVENT_TIME + Duration.ofSeconds(1).toMillis();
+ testHarness.setProcessingTime(newTime);
+ testHarness.processElement(event, newTime);
+ assertThat(testHarness.extractOutputValues()).hasSize(1);
+
+ // Remove the lock to allow the next trigger
+ operator.handleLockRelease(new LockReleaseEvent(tableName, newTime));
+
+ // Send a new event
+ testHarness.setProcessingTime(newTime + 1);
+ testHarness.processElement(event, newTime);
+
+ // No trigger yet
+ assertThat(testHarness.extractOutputValues()).hasSize(1);
+
+ // Send a new event
+ newTime += Duration.ofSeconds(1).toMillis();
+ testHarness.setProcessingTime(newTime);
+ testHarness.processElement(event, newTime);
+
+ // New trigger should arrive
+ assertThat(testHarness.extractOutputValues()).hasSize(2);
+ }
+ }
+
+ @Test
+ void testStateRestore() throws Exception {
+ OperatorSubtaskState state;
+ TriggerManagerOperator operator =
+ createOperator(
+ new TriggerEvaluator.Builder().commitCount(2).build(), new
MockOperatorEventGateway());
+ try (OneInputStreamOperatorTestHarness<TableChange, Trigger> testHarness =
+ createHarness(operator)) {
+ testHarness.processElement(
+ TableChange.builder().dataFileCount(1).commitCount(1).build(),
EVENT_TIME);
+
+ assertThat(testHarness.extractOutputValues()).isEmpty();
+
+ state = testHarness.snapshot(1, EVENT_TIME);
+ }
+
+ // Restore the state, write some more data, create a checkpoint, check the
data which is written
+ TriggerManagerOperator newOperator =
+ createOperator(
+ new TriggerEvaluator.Builder().commitCount(2).build(), new
MockOperatorEventGateway());
+ try (OneInputStreamOperatorTestHarness<TableChange, Trigger> testHarness =
+ new OneInputStreamOperatorTestHarness<>(newOperator)) {
+ testHarness.initializeState(state);
+ testHarness.open();
+
+ // Mock a recovery trigger lock
+ assertTriggers(
+ testHarness.extractOutputValues(),
+
Lists.newArrayList(Trigger.recovery(testHarness.getProcessingTime())));
+
+ testHarness.processElement(TableChange.builder().commitCount(1).build(),
EVENT_TIME_2);
+
+ // Remove the lock to allow the next trigger
+ newOperator.handleOperatorEvent(lockReleaseEvent);
+ testHarness.setProcessingTime(EVENT_TIME_2);
+
+ // At this point the output contains the recovery trigger and the real
trigger
+ assertThat(testHarness.extractOutputValues()).hasSize(2);
+ }
+ }
+
+ @Test
+ void testMinFireDelay() throws Exception {
+ TriggerManagerOperator operator =
+ createOperator(
+ tableName,
+ new TriggerEvaluator.Builder().commitCount(2).build(),
+ new MockOperatorEventGateway(),
+ DELAY,
+ 1);
+ try (OneInputStreamOperatorTestHarness<TableChange, Trigger> testHarness =
+ createHarness(operator)) {
+ testHarness.open();
+
+ addEventAndCheckResult(
+ operator, testHarness, TableChange.builder().commitCount(2).build(),
1);
+ long currentTime = testHarness.getProcessingTime();
+
+ // No new fire yet
+ addEventAndCheckResult(
+ operator, testHarness, TableChange.builder().commitCount(2).build(),
1);
+
+ // Check that the trigger fired after the delay
+ testHarness.setProcessingTime(currentTime + DELAY);
+ assertThat(testHarness.extractOutputValues()).hasSize(2);
+ }
+ }
+
+ @Test
+ void testLockCheckDelay() throws Exception {
+ TriggerManagerOperator operator =
+ createOperator(
+ tableName,
+ new TriggerEvaluator.Builder().commitCount(2).build(),
+ new MockOperatorEventGateway(),
+ 1,
+ DELAY);
+ try (OneInputStreamOperatorTestHarness<TableChange, Trigger> testHarness =
+ createHarness(operator)) {
+ testHarness.open();
+
+ // Create a lock to prevent execution, and check that there is no result
+ addEventAndCheckResult(
+ operator, testHarness, TableChange.builder().commitCount(2).build(),
1, false);
+ addEventAndCheckResult(
+ operator, testHarness, TableChange.builder().commitCount(2).build(),
1, false);
+ long currentTime = testHarness.getProcessingTime();
+
+ // Remove the lock, and still no trigger
+ operator.handleOperatorEvent(lockReleaseEvent);
+ assertThat(testHarness.extractOutputValues()).hasSize(1);
+
+ // Check that the trigger fired after the delay
+ testHarness.setProcessingTime(currentTime + DELAY);
+ assertThat(testHarness.extractOutputValues()).hasSize(2);
+ }
+ }
+
+ @Test
+ void testTriggerMetrics() throws Exception {
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ ManualSource<TableChange> source =
+ new ManualSource<>(env, TypeInformation.of(TableChange.class));
+ CollectingSink<Trigger> sink = new CollectingSink<>();
+
+ TriggerManagerOperatorFactory triggerManagerOperatorFactory =
+ new TriggerManagerOperatorFactory(
+ tableName,
+ Lists.newArrayList(TASKS),
+ Lists.newArrayList(
+ new TriggerEvaluator.Builder().commitCount(2).build(),
+ new TriggerEvaluator.Builder().commitCount(4).build()),
+ 1L,
+ 1L);
+ source
+ .dataStream()
+ .keyBy(unused -> true)
+ .transform(
+ DUMMY_TASK_NAME, TypeInformation.of(Trigger.class),
triggerManagerOperatorFactory)
+ .forceNonParallel()
+ .sinkTo(sink);
+
+ JobClient jobClient = null;
+ try {
+ jobClient = env.executeAsync();
+
+ // This one doesn't trigger - tests NOTHING_TO_TRIGGER
+ source.sendRecord(TableChange.builder().commitCount(1).build());
+
+ Awaitility.await()
+ .until(
+ () -> {
+ Long notingCounter =
+ MetricsReporterFactoryForTests.counter(
+ ImmutableList.of(DUMMY_TASK_NAME, tableName,
NOTHING_TO_TRIGGER));
+ return notingCounter != null && notingCounter.equals(1L);
+ });
+
+ // Trigger one of the tasks - tests TRIGGERED
+ source.sendRecord(TableChange.builder().commitCount(1).build());
+ // Wait until we receive the trigger
+ assertThat(sink.poll(Duration.ofSeconds(5))).isNotNull();
+ assertThat(
+ MetricsReporterFactoryForTests.counter(
+ ImmutableList.of(DUMMY_TASK_NAME, tableName, TASKS[0], "0",
TRIGGERED)))
+ .isEqualTo(1L);
+
+ // manual unlock
+ triggerManagerCoordinator.handleReleaseLock(new
LockReleaseEvent(tableName, Long.MAX_VALUE));
+ // Trigger both of the tasks - tests TRIGGERED
+ source.sendRecord(TableChange.builder().commitCount(2).build());
+ // Wait until we receive the trigger
+ assertThat(sink.poll(Duration.ofSeconds(5))).isNotNull();
+
+ // manual unlock
+ triggerManagerCoordinator.handleReleaseLock(new
LockReleaseEvent(tableName, Long.MAX_VALUE));
+ assertThat(sink.poll(Duration.ofSeconds(5))).isNotNull();
+
+ assertThat(
+ MetricsReporterFactoryForTests.counter(
+ ImmutableList.of(DUMMY_TASK_NAME, tableName, TASKS[0], "0",
TRIGGERED)))
+ .isEqualTo(2L);
+ assertThat(
+ MetricsReporterFactoryForTests.counter(
+ ImmutableList.of(DUMMY_TASK_NAME, tableName, TASKS[1], "1",
TRIGGERED)))
+ .isEqualTo(1L);
+
+ // Final check all the counters
+ MetricsReporterFactoryForTests.assertCounters(
+ new ImmutableMap.Builder<List<String>, Long>()
+ .put(ImmutableList.of(DUMMY_TASK_NAME, tableName,
RATE_LIMITER_TRIGGERED), -1L)
+ .put(ImmutableList.of(DUMMY_TASK_NAME, tableName,
CONCURRENT_RUN_THROTTLED), -1L)
+ .put(ImmutableList.of(DUMMY_TASK_NAME, tableName, TASKS[0], "0",
TRIGGERED), 2L)
+ .put(ImmutableList.of(DUMMY_TASK_NAME, tableName, TASKS[1], "1",
TRIGGERED), 1L)
+ .put(ImmutableList.of(DUMMY_TASK_NAME, tableName,
NOTHING_TO_TRIGGER), 1L)
+ .build());
+ } finally {
+ closeJobClient(jobClient);
+ }
+ }
+
+ @Test
+ void testRateLimiterMetrics() throws Exception {
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ ManualSource<TableChange> source =
+ new ManualSource<>(env, TypeInformation.of(TableChange.class));
+ CollectingSink<Trigger> sink = new CollectingSink<>();
+
+ // High delay, so only triggered once
+ TriggerManagerOperatorFactory manager = manager(1_000_000L, 1L);
+
+ source
+ .dataStream()
+ .keyBy(unused -> true)
+ .transform(DUMMY_TASK_NAME, TypeInformation.of(Trigger.class), manager)
+ .forceNonParallel()
+ .sinkTo(sink);
+
+ JobClient jobClient = null;
+ try {
+ jobClient = env.executeAsync();
+
+ // Start the first trigger
+ source.sendRecord(TableChange.builder().commitCount(2).build());
+ assertThat(sink.poll(Duration.ofSeconds(5))).isNotNull();
+
+ // Remove the lock to allow the next trigger
+ triggerManagerCoordinator.handleReleaseLock(lockReleaseEvent);
+
+ // The second trigger will be blocked
+ source.sendRecord(TableChange.builder().commitCount(2).build());
+ Awaitility.await()
+ .until(
+ () ->
+ MetricsReporterFactoryForTests.counter(
+ ImmutableList.of(DUMMY_TASK_NAME, tableName,
RATE_LIMITER_TRIGGERED))
+ .equals(1L));
+
+ // Final check all the counters
+ assertCounters(1L, 0L);
+ } finally {
+ closeJobClient(jobClient);
+ }
+ }
+
+ @Test
+ void testConcurrentRunMetrics() throws Exception {
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ ManualSource<TableChange> source =
+ new ManualSource<>(env, TypeInformation.of(TableChange.class));
+ CollectingSink<Trigger> sink = new CollectingSink<>();
+
+ // High delay, so only triggered once
+ TriggerManagerOperatorFactory manager = manager(1L, 1_000_000L);
+
+ source
+ .dataStream()
+ .keyBy(unused -> true)
+ .transform(DUMMY_TASK_NAME, TypeInformation.of(Trigger.class), manager)
+ .forceNonParallel()
+ .sinkTo(sink);
+
+ JobClient jobClient = null;
+ try {
+ jobClient = env.executeAsync();
+
+ // Start the first trigger - notice that we do not remove the lock after
the trigger
+ source.sendRecord(TableChange.builder().commitCount(2).build());
+ assertThat(sink.poll(Duration.ofSeconds(5))).isNotNull();
+
+ // The second trigger will be blocked by the lock
+ source.sendRecord(TableChange.builder().commitCount(2).build());
+ Awaitility.await()
+ .until(
+ () ->
+ MetricsReporterFactoryForTests.counter(
+ ImmutableList.of(DUMMY_TASK_NAME, tableName,
CONCURRENT_RUN_THROTTLED))
+ .equals(1L));
+
+ // Final check all the counters
+ assertCounters(0L, 1L);
+ } finally {
+ closeJobClient(jobClient);
+ }
+ }
+
+ private void assertCounters(long rateLimiterTrigger, long
concurrentRunTrigger) {
+ MetricsReporterFactoryForTests.assertCounters(
+ new ImmutableMap.Builder<List<String>, Long>()
+ .put(
+ ImmutableList.of(DUMMY_TASK_NAME, tableName,
RATE_LIMITER_TRIGGERED),
+ rateLimiterTrigger)
+ .put(
+ ImmutableList.of(DUMMY_TASK_NAME, tableName,
CONCURRENT_RUN_THROTTLED),
+ concurrentRunTrigger)
+ .put(ImmutableList.of(DUMMY_TASK_NAME, tableName, TASKS[0], "0",
TRIGGERED), 1L)
+ .put(ImmutableList.of(DUMMY_TASK_NAME, tableName,
NOTHING_TO_TRIGGER), 0L)
+ .build());
+ }
+
+ private void addEventAndCheckResult(
+ TriggerManagerOperator operator,
+ OneInputStreamOperatorTestHarness<TableChange, Trigger> testHarness,
+ TableChange event,
+ int expectedSize)
+ throws Exception {
+ addEventAndCheckResult(operator, testHarness, event, expectedSize, true);
+ }
+
+ private void addEventAndCheckResult(
+ TriggerManagerOperator operator,
+ OneInputStreamOperatorTestHarness<TableChange, Trigger> testHarness,
+ TableChange event,
+ int expectedSize,
+ boolean removeLock)
+ throws Exception {
+ ++processingTime;
+ testHarness.setProcessingTime(processingTime);
+ testHarness.processElement(event, processingTime);
+ assertThat(testHarness.extractOutputValues()).hasSize(expectedSize);
+ if (removeLock && operator.lockTime() != null) {
+ // Remove the lock to allow the next trigger
+ operator.handleLockRelease(new LockReleaseEvent(tableName,
processingTime));
+ }
+ }
+
+ private TriggerManagerOperatorFactory manager(long minFireDelayMs, long
lockCheckDelayMs) {
+ return new TriggerManagerOperatorFactory(
+ tableName,
+ Lists.newArrayList(TASKS[0]),
+ Lists.newArrayList(new
TriggerEvaluator.Builder().commitCount(2).build()),
+ minFireDelayMs,
+ lockCheckDelayMs);
+ }
+
+ private static void assertTriggers(List<Trigger> expected, List<Trigger>
actual) {
+ assertThat(actual).hasSize(expected.size());
+ for (int i = 0; i < expected.size(); ++i) {
+ Trigger expectedTrigger = expected.get(i);
+ Trigger actualTrigger = actual.get(i);
+
assertThat(actualTrigger.timestamp()).isEqualTo(expectedTrigger.timestamp());
+ assertThat(actualTrigger.taskId()).isEqualTo(expectedTrigger.taskId());
+
assertThat(actualTrigger.isRecovery()).isEqualTo(expectedTrigger.isRecovery());
+ }
+ }
+
+ private static TriggerManagerCoordinator createCoordinator() {
+ return new TriggerManagerCoordinator(
+ OPERATOR_NAME, new MockOperatorCoordinatorContext(TEST_OPERATOR_ID,
1));
+ }
+
+ private TriggerManagerOperator createOperator(
+ TriggerEvaluator evaluator, OperatorEventGateway mockGateway) {
+ return createOperator(tableName, evaluator, mockGateway, 1, 1);
+ }
+
+ private TriggerManagerOperator createOperator(
+ String lockId,
+ TriggerEvaluator evaluator,
+ OperatorEventGateway mockGateway,
+ long minFireDelayMs,
+ long lockCheckDelayMs) {
+ return new TriggerManagerOperator(
+ null,
+ mockGateway,
+ Lists.newArrayList(TASKS[0]),
+ Lists.newArrayList(evaluator),
+ minFireDelayMs,
+ lockCheckDelayMs,
+ lockId);
+ }
+
+ private OneInputStreamOperatorTestHarness<TableChange, Trigger>
createHarness(
+ TriggerManagerOperator triggerManagerOperator) throws Exception {
+ OneInputStreamOperatorTestHarness<TableChange, Trigger> harness =
+ new OneInputStreamOperatorTestHarness<>(triggerManagerOperator);
+ harness.open();
+ return harness;
+ }
+}