This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit de509fc2439c2cacb02c9caa40168b3765e84ceb
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Wed Mar 31 19:55:53 2021 +0200

    [FLINK-21996][tests] Add ITCase to test for delayed and failed operator 
event sending.
---
 .../TaskExecutorGatewayDecoratorBase.java          | 231 +++++++++++
 .../OperatorEventSendingCheckpointITCase.java      | 450 +++++++++++++++++++++
 2 files changed, 681 insertions(+)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGatewayDecoratorBase.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGatewayDecoratorBase.java
new file mode 100644
index 0000000..2667abf
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGatewayDecoratorBase.java
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.blob.TransientBlobKey;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.clusterframework.types.SlotID;
+import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.PartitionInfo;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.jobmaster.AllocatedSlotReport;
+import org.apache.flink.runtime.jobmaster.JobMasterId;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.messages.TaskThreadInfoResponse;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
+import org.apache.flink.runtime.rest.messages.LogInfo;
+import org.apache.flink.runtime.rest.messages.taskmanager.ThreadDumpInfo;
+import org.apache.flink.runtime.webmonitor.threadinfo.ThreadInfoSamplesRequest;
+import org.apache.flink.types.SerializableOptional;
+import org.apache.flink.util.SerializedValue;
+
+import java.util.Collection;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * A class that decorates/forwards calls to a {@link TaskExecutorGateway}.
+ *
+ * <p>This class is meant as a base for custom decorators, to avoid having to 
maintain all the
+ * method overrides in each decorator.
+ */
+public class TaskExecutorGatewayDecoratorBase implements TaskExecutorGateway {
+
+    protected final TaskExecutorGateway originalGateway;
+
+    protected TaskExecutorGatewayDecoratorBase(TaskExecutorGateway 
originalGateway) {
+        this.originalGateway = originalGateway;
+    }
+
+    @Override
+    public String getAddress() {
+        return originalGateway.getAddress();
+    }
+
+    @Override
+    public String getHostname() {
+        return originalGateway.getHostname();
+    }
+
+    @Override
+    public CompletableFuture<Acknowledge> requestSlot(
+            SlotID slotId,
+            JobID jobId,
+            AllocationID allocationId,
+            ResourceProfile resourceProfile,
+            String targetAddress,
+            ResourceManagerId resourceManagerId,
+            Time timeout) {
+        return originalGateway.requestSlot(
+                slotId,
+                jobId,
+                allocationId,
+                resourceProfile,
+                targetAddress,
+                resourceManagerId,
+                timeout);
+    }
+
+    @Override
+    public CompletableFuture<Acknowledge> submitTask(
+            TaskDeploymentDescriptor tdd, JobMasterId jobMasterId, Time 
timeout) {
+        return originalGateway.submitTask(tdd, jobMasterId, timeout);
+    }
+
+    @Override
+    public CompletableFuture<Acknowledge> updatePartitions(
+            ExecutionAttemptID executionAttemptID,
+            Iterable<PartitionInfo> partitionInfos,
+            Time timeout) {
+        return originalGateway.updatePartitions(executionAttemptID, 
partitionInfos, timeout);
+    }
+
+    @Override
+    public void releaseOrPromotePartitions(
+            JobID jobId,
+            Set<ResultPartitionID> partitionToRelease,
+            Set<ResultPartitionID> partitionsToPromote) {
+        originalGateway.releaseOrPromotePartitions(jobId, partitionToRelease, 
partitionsToPromote);
+    }
+
+    @Override
+    public CompletableFuture<Acknowledge> releaseClusterPartitions(
+            Collection<IntermediateDataSetID> dataSetsToRelease, Time timeout) 
{
+        return originalGateway.releaseClusterPartitions(dataSetsToRelease, 
timeout);
+    }
+
+    @Override
+    public CompletableFuture<Acknowledge> triggerCheckpoint(
+            ExecutionAttemptID executionAttemptID,
+            long checkpointID,
+            long checkpointTimestamp,
+            CheckpointOptions checkpointOptions) {
+        return originalGateway.triggerCheckpoint(
+                executionAttemptID, checkpointID, checkpointTimestamp, 
checkpointOptions);
+    }
+
+    @Override
+    public CompletableFuture<Acknowledge> confirmCheckpoint(
+            ExecutionAttemptID executionAttemptID, long checkpointId, long 
checkpointTimestamp) {
+        return originalGateway.confirmCheckpoint(
+                executionAttemptID, checkpointId, checkpointTimestamp);
+    }
+
+    @Override
+    public CompletableFuture<Acknowledge> abortCheckpoint(
+            ExecutionAttemptID executionAttemptID, long checkpointId, long 
checkpointTimestamp) {
+        return originalGateway.abortCheckpoint(
+                executionAttemptID, checkpointId, checkpointTimestamp);
+    }
+
+    @Override
+    public CompletableFuture<Acknowledge> cancelTask(
+            ExecutionAttemptID executionAttemptID, Time timeout) {
+        return originalGateway.cancelTask(executionAttemptID, timeout);
+    }
+
+    @Override
+    public void heartbeatFromJobManager(
+            ResourceID heartbeatOrigin, AllocatedSlotReport 
allocatedSlotReport) {
+        originalGateway.heartbeatFromJobManager(heartbeatOrigin, 
allocatedSlotReport);
+    }
+
+    @Override
+    public void heartbeatFromResourceManager(ResourceID heartbeatOrigin) {
+        originalGateway.heartbeatFromResourceManager(heartbeatOrigin);
+    }
+
+    @Override
+    public void disconnectJobManager(JobID jobId, Exception cause) {
+        originalGateway.disconnectJobManager(jobId, cause);
+    }
+
+    @Override
+    public void disconnectResourceManager(Exception cause) {
+        originalGateway.disconnectResourceManager(cause);
+    }
+
+    @Override
+    public CompletableFuture<Acknowledge> freeSlot(
+            AllocationID allocationId, Throwable cause, Time timeout) {
+        return originalGateway.freeSlot(allocationId, cause, timeout);
+    }
+
+    @Override
+    public void freeInactiveSlots(JobID jobId, Time timeout) {
+        originalGateway.freeInactiveSlots(jobId, timeout);
+    }
+
+    @Override
+    public CompletableFuture<TransientBlobKey> requestFileUploadByType(
+            FileType fileType, Time timeout) {
+        return originalGateway.requestFileUploadByType(fileType, timeout);
+    }
+
+    @Override
+    public CompletableFuture<TransientBlobKey> requestFileUploadByName(
+            String fileName, Time timeout) {
+        return originalGateway.requestFileUploadByName(fileName, timeout);
+    }
+
+    @Override
+    public CompletableFuture<SerializableOptional<String>> 
requestMetricQueryServiceAddress(
+            Time timeout) {
+        return originalGateway.requestMetricQueryServiceAddress(timeout);
+    }
+
+    @Override
+    public CompletableFuture<Boolean> canBeReleased() {
+        return originalGateway.canBeReleased();
+    }
+
+    @Override
+    public CompletableFuture<Collection<LogInfo>> requestLogList(Time timeout) 
{
+        return originalGateway.requestLogList(timeout);
+    }
+
+    @Override
+    public CompletableFuture<Acknowledge> sendOperatorEventToTask(
+            ExecutionAttemptID task, OperatorID operator, 
SerializedValue<OperatorEvent> evt) {
+        return originalGateway.sendOperatorEventToTask(task, operator, evt);
+    }
+
+    @Override
+    public CompletableFuture<ThreadDumpInfo> requestThreadDump(Time timeout) {
+        return originalGateway.requestThreadDump(timeout);
+    }
+
+    @Override
+    public CompletableFuture<TaskThreadInfoResponse> requestThreadInfoSamples(
+            ExecutionAttemptID taskExecutionAttemptId,
+            ThreadInfoSamplesRequest requestParams,
+            Time timeout) {
+        return originalGateway.requestThreadInfoSamples(
+                taskExecutionAttemptId, requestParams, timeout);
+    }
+}
diff --git 
a/flink-tests/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorEventSendingCheckpointITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorEventSendingCheckpointITCase.java
new file mode 100644
index 0000000..bd59ab0
--- /dev/null
+++ 
b/flink-tests/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorEventSendingCheckpointITCase.java
@@ -0,0 +1,450 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.coordination;
+
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.connector.source.lib.NumberSequenceSource;
+import org.apache.flink.api.connector.source.lib.util.IteratorSourceEnumerator;
+import org.apache.flink.api.connector.source.lib.util.IteratorSourceSplit;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
+import org.apache.flink.runtime.minicluster.RpcServiceSharing;
+import org.apache.flink.runtime.rpc.RpcGateway;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
+import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceConfiguration;
+import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils;
+import org.apache.flink.runtime.source.event.AddSplitEvent;
+import org.apache.flink.runtime.source.event.NoMoreSplitsEvent;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorGatewayDecoratorBase;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.util.TestStreamEnvironment;
+import org.apache.flink.util.SerializedValue;
+import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.function.TriFunction;
+
+import akka.actor.ActorSystem;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+
+import java.time.Duration;
+import java.util.ArrayDeque;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import java.util.stream.LongStream;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * A test suite for source enumerator (operator coordinator) for situations 
where RPC calls for
+ * split assignments (operator events) fails from time to time.
+ */
+public class OperatorEventSendingCheckpointITCase extends TestLogger {
+
+    private static final int PARALLELISM = 1;
+    private static MiniCluster flinkCluster;
+
+    @BeforeClass
+    public static void setupMiniClusterAndEnv() throws Exception {
+        flinkCluster = new MiniClusterWithRpcIntercepting(PARALLELISM);
+        flinkCluster.start();
+        TestStreamEnvironment.setAsContext(flinkCluster, PARALLELISM);
+    }
+
+    @AfterClass
+    public static void clearEnvAndStopMiniCluster() throws Exception {
+        TestStreamEnvironment.unsetAsContext();
+        if (flinkCluster != null) {
+            flinkCluster.close();
+            flinkCluster = null;
+        }
+    }
+
+    // ------------------------------------------------------------------------
+    //  tests
+    // ------------------------------------------------------------------------
+
+    /**
+     * Every second assign split event is lost. Eventually, the enumerator 
must recognize that an
+     * event was lost and trigger recovery to prevent data loss. Data loss 
would manifest in a
+     * stalled test, because we could wait forever to collect the required 
number of events back.
+     */
+    @Ignore // ignore for now, because this test fails due to FLINK-21996
+    @Test
+    public void testOperatorEventLostNoReaderFailure() throws Exception {
+        final int[] eventsToLose = new int[] {2, 4, 6};
+
+        OpEventRpcInterceptor.currentHandler =
+                new OperatorEventRpcHandler(
+                        (task, operator, event, originalRpcHandler) -> 
askTimeoutFuture(),
+                        eventsToLose);
+
+        runTest(false);
+    }
+
+    /**
+     * First and third assign split events are lost. In the middle of all 
events being processed
+     * (which is after the second successful event delivery, the fourth 
event), there is
+     * additionally a failure on the reader that triggers recovery.
+     */
+    @Ignore // ignore for now, because this test fails due to FLINK-21996
+    @Test
+    public void testOperatorEventLostWithReaderFailure() throws Exception {
+        final int[] eventsToLose = new int[] {1, 3};
+
+        OpEventRpcInterceptor.currentHandler =
+                new OperatorEventRpcHandler(
+                        (task, operator, event, originalRpcHandler) -> 
askTimeoutFuture(),
+                        eventsToLose);
+
+        runTest(true);
+    }
+
+    /**
+     * This test the case that the enumerator must handle the case of 
presumably lost splits that
+     * were actually delivered.
+     *
+     * <p>Some split assignment events happen normally, but for some their 
acknowledgement never
+     * comes back. The enumerator must assume the assignments were 
unsuccessful, even though the
+     * split assignment was received by the reader.
+     */
+    @Test
+    public void testOperatorEventAckLost() throws Exception {
+        final int[] eventsWithLostAck = new int[] {2, 4};
+
+        OpEventRpcInterceptor.currentHandler =
+                new OperatorEventRpcHandler(
+                        (task, operator, event, originalRpcHandler) -> {
+                            // forward call
+                            originalRpcHandler.apply(task, operator, event);
+                            // but return an ack future that times out to 
simulate lost response
+                            return askTimeoutFuture();
+                        },
+                        eventsWithLostAck);
+
+        runTest(false);
+    }
+
+    /**
+     * This tests the case where the status of an assignment remains unknown 
across checkpoints.
+     *
+     * <p>Some split assignment events happen normally, but for some their 
acknowledgement comes
+     * very late, so that we expect multiple checkpoints would have normally 
happened in the
+     * meantime. We trigger a failure (which happens after the second split)
+     */
+    @Test
+    public void testOperatorEventAckDelay() throws Exception {
+        final int[] eventsWithLateAck = new int[] {2, 4};
+
+        OpEventRpcInterceptor.currentHandler =
+                new OperatorEventRpcHandler(
+                        (task, operator, event, originalRpcHandler) -> {
+                            // forward call
+                            final CompletableFuture<Acknowledge> result =
+                                    originalRpcHandler.apply(task, operator, 
event);
+                            // but return an ack future that completes late, 
after
+                            // multiple checkpoints should have happened
+                            final CompletableFuture<Acknowledge> late = 
lateFuture();
+                            return result.thenCompose((v) -> late);
+                        },
+                        eventsWithLateAck);
+
+        runTest(false);
+    }
+
+    /**
+     * Runs the test program, which uses a single reader (parallelism = 1) and 
has three splits of
+     * data, to be assigned to the same reader.
+     *
+     * <p>If an intermittent failure should happen, it will happen after the 
second split was
+     * assigned.
+     */
+    private void runTest(boolean intermittentFailure) throws Exception {
+        final int numElements = 100;
+        final int failAt = intermittentFailure ? numElements / 2 : numElements 
* 2;
+
+        final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(1);
+        env.enableCheckpointing(50);
+
+        final DataStream<Long> numbers =
+                env.fromSource(
+                                new TestingNumberSequenceSource(1L, 
numElements, 3),
+                                WatermarkStrategy.noWatermarks(),
+                                "numbers")
+                        .map(
+                                new MapFunction<Long, Long>() {
+                                    private int num;
+
+                                    @Override
+                                    public Long map(Long value) throws 
Exception {
+                                        if (++num > failAt) {
+                                            throw new Exception("Artificial 
intermittent failure.");
+                                        }
+                                        return value;
+                                    }
+                                });
+
+        final List<Long> sequence = numbers.executeAndCollect(numElements);
+
+        final List<Long> expectedSequence =
+                LongStream.rangeClosed(1L, 
numElements).boxed().collect(Collectors.toList());
+
+        assertEquals(expectedSequence, sequence);
+    }
+
+    private static CompletableFuture<Acknowledge> askTimeoutFuture() {
+        final CompletableFuture<Acknowledge> future = new 
CompletableFuture<>();
+        FutureUtils.orTimeout(future, 500, TimeUnit.MILLISECONDS);
+        return future;
+    }
+
+    private static CompletableFuture<Acknowledge> lateFuture() {
+        final CompletableFuture<Acknowledge> future = new 
CompletableFuture<>();
+        FutureUtils.completeDelayed(future, Acknowledge.get(), 
Duration.ofMillis(500));
+        return future;
+    }
+
+    // ------------------------------------------------------------------------
+    //  Specialized Source
+    // ------------------------------------------------------------------------
+
+    /**
+     * This is an enumerator for the {@link NumberSequenceSource}, which only 
responds to the split
+     * requests after the next checkpoint is complete. That way, we naturally 
draw the split
+     * processing across checkpoints without artificial sleep statements.
+     */
+    private static final class AssignAfterCheckpointEnumerator<
+                    SplitT extends IteratorSourceSplit<?, ?>>
+            extends IteratorSourceEnumerator<SplitT> {
+
+        private final Queue<Integer> pendingRequests = new ArrayDeque<>();
+        private final SplitEnumeratorContext<?> context;
+
+        public AssignAfterCheckpointEnumerator(
+                SplitEnumeratorContext<SplitT> context, Collection<SplitT> 
splits) {
+            super(context, splits);
+            this.context = context;
+        }
+
+        @Override
+        public void handleSplitRequest(int subtaskId, @Nullable String 
requesterHostname) {
+            pendingRequests.add(subtaskId);
+        }
+
+        @Override
+        public Collection<SplitT> snapshotState() throws Exception {
+            // this will be enqueued in the enumerator thread, so it will 
actually run after this
+            // method (the snapshot operation) is complete!
+            context.runInCoordinatorThread(this::fullFillPendingRequests);
+
+            return super.snapshotState();
+        }
+
+        private void fullFillPendingRequests() {
+            for (int subtask : pendingRequests) {
+                super.handleSplitRequest(subtask, null);
+            }
+            pendingRequests.clear();
+        }
+    }
+
+    private static class TestingNumberSequenceSource extends 
NumberSequenceSource {
+        private static final long serialVersionUID = 1L;
+
+        private final int numSplits;
+
+        public TestingNumberSequenceSource(long from, long to, int numSplits) {
+            super(from, to);
+            this.numSplits = numSplits;
+        }
+
+        @Override
+        public SplitEnumerator<NumberSequenceSplit, 
Collection<NumberSequenceSplit>>
+                createEnumerator(final 
SplitEnumeratorContext<NumberSequenceSplit> enumContext) {
+            final List<NumberSequenceSplit> splits =
+                    splitNumberRange(getFrom(), getTo(), numSplits);
+            return new AssignAfterCheckpointEnumerator<>(enumContext, splits);
+        }
+    }
+
+    // ------------------------------------------------------------------------
+    //  Source Operator Event specific intercepting
+    // ------------------------------------------------------------------------
+
+    private static class OperatorEventRpcHandler {
+
+        private final FilteredRpcAction actionForFilteredEvent;
+        private final Set<Integer> eventsToFilter;
+        private int eventNum;
+
+        OperatorEventRpcHandler(FilteredRpcAction actionForFilteredEvent, 
int... eventsToFilter) {
+            this(
+                    actionForFilteredEvent,
+                    
IntStream.of(eventsToFilter).boxed().collect(Collectors.toSet()));
+        }
+
+        OperatorEventRpcHandler(
+                FilteredRpcAction actionForFilteredEvent, Set<Integer> 
eventsToFilter) {
+            this.actionForFilteredEvent = actionForFilteredEvent;
+            this.eventsToFilter = eventsToFilter;
+        }
+
+        CompletableFuture<Acknowledge> filterCall(
+                ExecutionAttemptID task,
+                OperatorID operator,
+                SerializedValue<OperatorEvent> evt,
+                TriFunction<
+                                ExecutionAttemptID,
+                                OperatorID,
+                                SerializedValue<OperatorEvent>,
+                                CompletableFuture<Acknowledge>>
+                        rpcHandler) {
+
+            final Object o;
+            try {
+                o = evt.deserializeValue(getClass().getClassLoader());
+            } catch (Exception e) {
+                throw new Error(e); // should never happen
+            }
+
+            if (o instanceof AddSplitEvent || o instanceof NoMoreSplitsEvent) {
+                // only deal with split related events here
+                if (eventsToFilter.contains(++eventNum)) {
+                    return actionForFilteredEvent.handleEvent(task, operator, 
evt, rpcHandler);
+                }
+            }
+
+            return rpcHandler.apply(task, operator, evt);
+        }
+
+        interface FilteredRpcAction {
+
+            CompletableFuture<Acknowledge> handleEvent(
+                    ExecutionAttemptID task,
+                    OperatorID operator,
+                    SerializedValue<OperatorEvent> evt,
+                    TriFunction<
+                                    ExecutionAttemptID,
+                                    OperatorID,
+                                    SerializedValue<OperatorEvent>,
+                                    CompletableFuture<Acknowledge>>
+                            rpcHandler);
+        }
+    }
+
+    // ------------------------------------------------------------------------
+    //  Utils for MiniCluster RPC intercepting
+    // ------------------------------------------------------------------------
+
+    private static final class OpEventRpcInterceptor extends 
TaskExecutorGatewayDecoratorBase {
+
+        // initialize with a handler that filters nothing
+        static OperatorEventRpcHandler currentHandler =
+                new OperatorEventRpcHandler((task, id, evt, rpc) -> null, 
Collections.emptySet());
+
+        OpEventRpcInterceptor(TaskExecutorGateway originalGateway) {
+            super(originalGateway);
+        }
+
+        @Override
+        public CompletableFuture<Acknowledge> sendOperatorEventToTask(
+                ExecutionAttemptID task, OperatorID operator, 
SerializedValue<OperatorEvent> evt) {
+            return currentHandler.filterCall(task, operator, evt, 
super::sendOperatorEventToTask);
+        }
+    }
+
+    private static class InterceptingRpcService extends AkkaRpcService {
+
+        public InterceptingRpcService(
+                ActorSystem actorSystem, AkkaRpcServiceConfiguration 
configuration) {
+            super(actorSystem, configuration);
+        }
+
+        @Override
+        public <C extends RpcGateway> CompletableFuture<C> connect(String 
address, Class<C> clazz) {
+            final CompletableFuture<C> future = super.connect(address, clazz);
+            return clazz == TaskExecutorGateway.class ? 
decorateTmGateway(future) : future;
+        }
+
+        @SuppressWarnings("unchecked")
+        private <C extends RpcGateway> CompletableFuture<C> decorateTmGateway(
+                CompletableFuture<C> future) {
+            final CompletableFuture<TaskExecutorGateway> wrapped =
+                    future.thenApply(
+                            (gateway) -> new 
OpEventRpcInterceptor((TaskExecutorGateway) gateway));
+            return (CompletableFuture<C>) wrapped;
+        }
+    }
+
+    private static class MiniClusterWithRpcIntercepting extends MiniCluster {
+
+        private boolean localRpcCreated;
+
+        public MiniClusterWithRpcIntercepting(final int numSlots) {
+            super(
+                    new MiniClusterConfiguration.Builder()
+                            .setRpcServiceSharing(RpcServiceSharing.SHARED)
+                            .setNumTaskManagers(1)
+                            .setNumSlotsPerTaskManager(numSlots)
+                            .build());
+        }
+
+        @Override
+        public void start() throws Exception {
+            super.start();
+
+            if (!localRpcCreated) {
+                throw new Exception(
+                        "MiniClusterWithRpcIntercepting is broken, the 
intercepting local RPC service was not created.");
+            }
+        }
+
+        @Override
+        protected RpcService createLocalRpcService(Configuration 
configuration) throws Exception {
+            localRpcCreated = true;
+
+            return AkkaRpcServiceUtils.localServiceBuilder(configuration)
+                    .withCustomConfig(AkkaUtils.testDispatcherConfig())
+                    .createAndStart(InterceptingRpcService::new);
+        }
+    }
+}

Reply via email to