This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit b86ba3bf2426d7e5cec5a7d2711fd5b278e863ea
Author: Chesnay Schepler <ches...@apache.org>
AuthorDate: Tue May 28 16:11:23 2019 +0200

    [FLINK-12667][runtime] Add JobID to TaskExecutorGateway#releasePartitions
---
 .../flink/runtime/executiongraph/Execution.java    |  2 +-
 .../jobmanager/slots/TaskManagerGateway.java       |  3 +-
 .../runtime/jobmaster/RpcTaskManagerGateway.java   |  4 +-
 .../flink/runtime/taskexecutor/TaskExecutor.java   |  2 +-
 .../runtime/taskexecutor/TaskExecutorGateway.java  |  3 +-
 .../runtime/executiongraph/ExecutionTest.java      | 84 +++++++++++++++++++++-
 .../utils/SimpleAckingTaskManagerGateway.java      | 10 ++-
 .../taskexecutor/TestingTaskExecutorGateway.java   |  2 +-
 8 files changed, 101 insertions(+), 9 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index 5e7677e..b60e2e2 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -1329,7 +1329,7 @@ public class Execution implements AccessExecution, 
Archiveable<ArchivedExecution
 
                        if (!partitionIds.isEmpty()) {
                                // TODO For some tests this could be a problem 
when querying too early if all resources were released
-                               
taskManagerGateway.releasePartitions(partitionIds);
+                               
taskManagerGateway.releasePartitions(getVertex().getJobId(), partitionIds);
                        }
                }
        }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/TaskManagerGateway.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/TaskManagerGateway.java
index 593a853..77b0460 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/TaskManagerGateway.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/TaskManagerGateway.java
@@ -102,9 +102,10 @@ public interface TaskManagerGateway {
        /**
         * Batch release intermediate result partitions.
         *
+        * @param jobId id of the job that the partitions belong to
         * @param partitionIds partition ids to release
         */
-       void releasePartitions(Collection<ResultPartitionID> partitionIds);
+       void releasePartitions(JobID jobId, Collection<ResultPartitionID> 
partitionIds);
 
        /**
         * Notify the given task about a completed checkpoint.
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/RpcTaskManagerGateway.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/RpcTaskManagerGateway.java
index 1fb2d49..1ef6416 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/RpcTaskManagerGateway.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/RpcTaskManagerGateway.java
@@ -88,8 +88,8 @@ public class RpcTaskManagerGateway implements 
TaskManagerGateway {
        }
 
        @Override
-       public void releasePartitions(Collection<ResultPartitionID> 
partitionIds) {
-               taskExecutorGateway.releasePartitions(partitionIds);
+       public void releasePartitions(JobID jobId, 
Collection<ResultPartitionID> partitionIds) {
+               taskExecutorGateway.releasePartitions(jobId, partitionIds);
        }
 
        @Override
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index a31521e..6899b32 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -640,7 +640,7 @@ public class TaskExecutor extends RpcEndpoint implements 
TaskExecutorGateway {
        }
 
        @Override
-       public void releasePartitions(Collection<ResultPartitionID> 
partitionIds) {
+       public void releasePartitions(JobID jobId, 
Collection<ResultPartitionID> partitionIds) {
                try {
                        shuffleEnvironment.releasePartitions(partitionIds);
                } catch (Throwable t) {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
index 8a653df..5ab16dc 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
@@ -103,9 +103,10 @@ public interface TaskExecutorGateway extends RpcGateway {
        /**
         * Batch release intermediate result partitions.
         *
+        * @param jobId id of the job that the partitions belong to
         * @param partitionIds partition ids to release
         */
-       void releasePartitions(Collection<ResultPartitionID> partitionIds);
+       void releasePartitions(JobID jobId, Collection<ResultPartitionID> 
partitionIds);
 
        /**
         * Trigger the checkpoint for the given task. The checkpoint is 
identified by the checkpoint ID
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java
index 892552c..efca2d4 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java
@@ -19,12 +19,16 @@
 package org.apache.flink.runtime.executiongraph;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.checkpoint.JobManagerTaskRestore;
 import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
 import 
org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
 import org.apache.flink.runtime.instance.SimpleSlot;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import 
org.apache.flink.runtime.jobmanager.scheduler.LocationPreferenceConstraint;
@@ -54,6 +58,7 @@ import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
+import java.util.function.Consumer;
 
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.equalTo;
@@ -64,6 +69,7 @@ import static org.hamcrest.Matchers.nullValue;
 import static org.hamcrest.Matchers.sameInstance;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 
@@ -294,6 +300,82 @@ public class ExecutionTest extends TestLogger {
        }
 
        /**
+        * Tests that the partitions are released in case of an execution 
cancellation after the execution is already finished.
+        */
+       @Test
+       public void testPartitionReleaseOnCancelingAfterBeingFinished() throws 
Exception {
+               testPartitionReleaseAfterFinished(Execution::cancel);
+       }
+
+       /**
+        * Tests that the partitions are released in case of an execution 
suspension after the execution is already finished.
+        */
+       @Test
+       public void testPartitionReleaseOnSuspendingAfterBeingFinished() throws 
Exception {
+               testPartitionReleaseAfterFinished(Execution::suspend);
+       }
+
+       private void testPartitionReleaseAfterFinished(Consumer<Execution> 
postFinishedExecutionAction) throws Exception {
+               final Tuple2<JobID, Collection<ResultPartitionID>> 
releasedPartitions = Tuple2.of(null, null);
+               final SimpleAckingTaskManagerGateway taskManagerGateway = new 
SimpleAckingTaskManagerGateway();
+               
taskManagerGateway.setReleasePartitionsConsumer(releasedPartitions::setFields);
+
+               final JobVertex producerVertex = createNoOpJobVertex();
+               final JobVertex consumerVertex = createNoOpJobVertex();
+               consumerVertex.connectNewDataSetAsInput(producerVertex, 
DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING);
+
+               final SimpleSlot slot = new SimpleSlot(
+                       new SingleSlotTestingSlotOwner(),
+                       new LocalTaskManagerLocation(),
+                       0,
+                       taskManagerGateway);
+
+               final ProgrammedSlotProvider slotProvider = new 
ProgrammedSlotProvider(1);
+               slotProvider.addSlot(producerVertex.getID(), 0, 
CompletableFuture.completedFuture(slot));
+
+               ExecutionGraph executionGraph = 
ExecutionGraphTestUtils.createSimpleTestGraph(
+                       new JobID(),
+                       slotProvider,
+                       new NoRestartStrategy(),
+                       producerVertex,
+                       consumerVertex);
+
+               
executionGraph.start(TestingComponentMainThreadExecutorServiceAdapter.forMainThread());
+
+               ExecutionJobVertex executionJobVertex = 
executionGraph.getJobVertex(producerVertex.getID());
+               ExecutionVertex executionVertex = 
executionJobVertex.getTaskVertices()[0];
+
+               final Execution execution = 
executionVertex.getCurrentExecutionAttempt();
+
+               execution.allocateResourcesForExecution(
+                       slotProvider,
+                       false,
+                       LocationPreferenceConstraint.ALL,
+                       Collections.emptySet(),
+                       TestingUtils.infiniteTime());
+
+               execution.deploy();
+               execution.switchToRunning();
+
+               // simulate a case where a cancel/suspend call is too slow and 
the task is already finished
+               // in this case we have to explicitly release the finished 
partition
+               // if the task were canceled properly the TM would release the 
partition automatically
+               execution.markFinished();
+               postFinishedExecutionAction.accept(execution);
+
+               assertEquals(executionGraph.getJobID(), releasedPartitions.f0);
+               assertEquals(executionVertex.getProducedPartitions().size(), 
releasedPartitions.f1.size());
+               for (ResultPartitionID partitionId : releasedPartitions.f1) {
+                       // ensure all IDs of released partitions are actually 
valid
+                       IntermediateResultPartition intermediateResultPartition 
= executionVertex
+                               .getProducedPartitions()
+                               .get(partitionId.getPartitionId());
+                       assertNotNull(intermediateResultPartition);
+                       assertEquals(execution.getAttemptId(), 
partitionId.getProducerId());
+               }
+       }
+
+       /**
         * Tests that all preferred locations are calculated.
         */
        @Test
@@ -417,7 +499,7 @@ public class ExecutionTest extends TestLogger {
                        slotProvider,
                        new NoRestartStrategy(),
                        jobVertex);
-               
+
                ExecutionJobVertex executionJobVertex = 
executionGraph.getJobVertex(jobVertexId);
 
                ExecutionVertex executionVertex = 
executionJobVertex.getTaskVertices()[0];
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleAckingTaskManagerGateway.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleAckingTaskManagerGateway.java
index dba0e7d..ffb7b49 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleAckingTaskManagerGateway.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleAckingTaskManagerGateway.java
@@ -34,6 +34,7 @@ import 
org.apache.flink.runtime.messages.StackTraceSampleResponse;
 import java.util.Collection;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
+import java.util.function.BiConsumer;
 import java.util.function.BiFunction;
 import java.util.function.Consumer;
 
@@ -51,6 +52,8 @@ public class SimpleAckingTaskManagerGateway implements 
TaskManagerGateway {
 
        private volatile BiFunction<AllocationID, Throwable, 
CompletableFuture<Acknowledge>> freeSlotFunction;
 
+       private BiConsumer<JobID, Collection<ResultPartitionID>> 
releasePartitionsConsumer = (ignore1, ignore2) -> { };
+
        public void setSubmitConsumer(Consumer<TaskDeploymentDescriptor> 
predicate) {
                submitConsumer = predicate;
        }
@@ -63,6 +66,10 @@ public class SimpleAckingTaskManagerGateway implements 
TaskManagerGateway {
                this.freeSlotFunction = freeSlotFunction;
        }
 
+       public void setReleasePartitionsConsumer(BiConsumer<JobID, 
Collection<ResultPartitionID>> releasePartitionsConsumer) {
+               this.releasePartitionsConsumer = releasePartitionsConsumer;
+       }
+
        @Override
        public String getAddress() {
                return address;
@@ -97,7 +104,8 @@ public class SimpleAckingTaskManagerGateway implements 
TaskManagerGateway {
        }
 
        @Override
-       public void releasePartitions(Collection<ResultPartitionID> 
partitionIds) {
+       public void releasePartitions(JobID jobId, 
Collection<ResultPartitionID> partitionIds) {
+               releasePartitionsConsumer.accept(jobId, partitionIds);
        }
 
        @Override
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGateway.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGateway.java
index aca0bb2..1021a77 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGateway.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGateway.java
@@ -125,7 +125,7 @@ public class TestingTaskExecutorGateway implements 
TaskExecutorGateway {
        }
 
        @Override
-       public void releasePartitions(Collection<ResultPartitionID> 
partitionIds) {
+       public void releasePartitions(JobID jobId, 
Collection<ResultPartitionID> partitionIds) {
                // noop
        }
 

Reply via email to