This is an automated email from the ASF dual-hosted git repository. chesnay pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit b86ba3bf2426d7e5cec5a7d2711fd5b278e863ea Author: Chesnay Schepler <ches...@apache.org> AuthorDate: Tue May 28 16:11:23 2019 +0200 [FLINK-12667][runtime] Add JobID to TaskExecutorGateway#releasePartitions --- .../flink/runtime/executiongraph/Execution.java | 2 +- .../jobmanager/slots/TaskManagerGateway.java | 3 +- .../runtime/jobmaster/RpcTaskManagerGateway.java | 4 +- .../flink/runtime/taskexecutor/TaskExecutor.java | 2 +- .../runtime/taskexecutor/TaskExecutorGateway.java | 3 +- .../runtime/executiongraph/ExecutionTest.java | 84 +++++++++++++++++++++- .../utils/SimpleAckingTaskManagerGateway.java | 10 ++- .../taskexecutor/TestingTaskExecutorGateway.java | 2 +- 8 files changed, 101 insertions(+), 9 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java index 5e7677e..b60e2e2 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java @@ -1329,7 +1329,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution if (!partitionIds.isEmpty()) { // TODO For some tests this could be a problem when querying too early if all resources were released - taskManagerGateway.releasePartitions(partitionIds); + taskManagerGateway.releasePartitions(getVertex().getJobId(), partitionIds); } } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/TaskManagerGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/TaskManagerGateway.java index 593a853..77b0460 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/TaskManagerGateway.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/TaskManagerGateway.java @@ -102,9 +102,10 @@ public interface TaskManagerGateway { /** * Batch release intermediate result partitions. * + * @param jobId id of the job that the partitions belong to * @param partitionIds partition ids to release */ - void releasePartitions(Collection<ResultPartitionID> partitionIds); + void releasePartitions(JobID jobId, Collection<ResultPartitionID> partitionIds); /** * Notify the given task about a completed checkpoint. diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/RpcTaskManagerGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/RpcTaskManagerGateway.java index 1fb2d49..1ef6416 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/RpcTaskManagerGateway.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/RpcTaskManagerGateway.java @@ -88,8 +88,8 @@ public class RpcTaskManagerGateway implements TaskManagerGateway { } @Override - public void releasePartitions(Collection<ResultPartitionID> partitionIds) { - taskExecutorGateway.releasePartitions(partitionIds); + public void releasePartitions(JobID jobId, Collection<ResultPartitionID> partitionIds) { + taskExecutorGateway.releasePartitions(jobId, partitionIds); } @Override diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java index a31521e..6899b32 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java @@ -640,7 +640,7 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway { } @Override - public void releasePartitions(Collection<ResultPartitionID> partitionIds) { + public void releasePartitions(JobID jobId, Collection<ResultPartitionID> partitionIds) { try { shuffleEnvironment.releasePartitions(partitionIds); } catch (Throwable t) { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java index 8a653df..5ab16dc 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java @@ -103,9 +103,10 @@ public interface TaskExecutorGateway extends RpcGateway { /** * Batch release intermediate result partitions. * + * @param jobId id of the job that the partitions belong to * @param partitionIds partition ids to release */ - void releasePartitions(Collection<ResultPartitionID> partitionIds); + void releasePartitions(JobID jobId, Collection<ResultPartitionID> partitionIds); /** * Trigger the checkpoint for the given task. The checkpoint is identified by the checkpoint ID diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java index 892552c..efca2d4 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java @@ -19,12 +19,16 @@ package org.apache.flink.runtime.executiongraph; import org.apache.flink.api.common.JobID; +import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.runtime.checkpoint.JobManagerTaskRestore; import org.apache.flink.runtime.checkpoint.TaskStateSnapshot; import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy; import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway; import org.apache.flink.runtime.instance.SimpleSlot; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.io.network.partition.ResultPartitionType; +import org.apache.flink.runtime.jobgraph.DistributionPattern; import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobmanager.scheduler.LocationPreferenceConstraint; @@ -54,6 +58,7 @@ import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; +import java.util.function.Consumer; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.equalTo; @@ -64,6 +69,7 @@ import static org.hamcrest.Matchers.nullValue; import static org.hamcrest.Matchers.sameInstance; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; @@ -294,6 +300,82 @@ public class ExecutionTest extends TestLogger { } /** + * Tests that the partitions are released in case of an execution cancellation after the execution is already finished. + */ + @Test + public void testPartitionReleaseOnCancelingAfterBeingFinished() throws Exception { + testPartitionReleaseAfterFinished(Execution::cancel); + } + + /** + * Tests that the partitions are released in case of an execution suspension after the execution is already finished. + */ + @Test + public void testPartitionReleaseOnSuspendingAfterBeingFinished() throws Exception { + testPartitionReleaseAfterFinished(Execution::suspend); + } + + private void testPartitionReleaseAfterFinished(Consumer<Execution> postFinishedExecutionAction) throws Exception { + final Tuple2<JobID, Collection<ResultPartitionID>> releasedPartitions = Tuple2.of(null, null); + final SimpleAckingTaskManagerGateway taskManagerGateway = new SimpleAckingTaskManagerGateway(); + taskManagerGateway.setReleasePartitionsConsumer(releasedPartitions::setFields); + + final JobVertex producerVertex = createNoOpJobVertex(); + final JobVertex consumerVertex = createNoOpJobVertex(); + consumerVertex.connectNewDataSetAsInput(producerVertex, DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING); + + final SimpleSlot slot = new SimpleSlot( + new SingleSlotTestingSlotOwner(), + new LocalTaskManagerLocation(), + 0, + taskManagerGateway); + + final ProgrammedSlotProvider slotProvider = new ProgrammedSlotProvider(1); + slotProvider.addSlot(producerVertex.getID(), 0, CompletableFuture.completedFuture(slot)); + + ExecutionGraph executionGraph = ExecutionGraphTestUtils.createSimpleTestGraph( + new JobID(), + slotProvider, + new NoRestartStrategy(), + producerVertex, + consumerVertex); + + executionGraph.start(TestingComponentMainThreadExecutorServiceAdapter.forMainThread()); + + ExecutionJobVertex executionJobVertex = executionGraph.getJobVertex(producerVertex.getID()); + ExecutionVertex executionVertex = executionJobVertex.getTaskVertices()[0]; + + final Execution execution = executionVertex.getCurrentExecutionAttempt(); + + execution.allocateResourcesForExecution( + slotProvider, + false, + LocationPreferenceConstraint.ALL, + Collections.emptySet(), + TestingUtils.infiniteTime()); + + execution.deploy(); + execution.switchToRunning(); + + // simulate a case where a cancel/suspend call is too slow and the task is already finished + // in this case we have to explicitly release the finished partition + // if the task were canceled properly the TM would release the partition automatically + execution.markFinished(); + postFinishedExecutionAction.accept(execution); + + assertEquals(executionGraph.getJobID(), releasedPartitions.f0); + assertEquals(executionVertex.getProducedPartitions().size(), releasedPartitions.f1.size()); + for (ResultPartitionID partitionId : releasedPartitions.f1) { + // ensure all IDs of released partitions are actually valid + IntermediateResultPartition intermediateResultPartition = executionVertex + .getProducedPartitions() + .get(partitionId.getPartitionId()); + assertNotNull(intermediateResultPartition); + assertEquals(execution.getAttemptId(), partitionId.getProducerId()); + } + } + + /** * Tests that all preferred locations are calculated. */ @Test @@ -417,7 +499,7 @@ public class ExecutionTest extends TestLogger { slotProvider, new NoRestartStrategy(), jobVertex); - + ExecutionJobVertex executionJobVertex = executionGraph.getJobVertex(jobVertexId); ExecutionVertex executionVertex = executionJobVertex.getTaskVertices()[0]; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleAckingTaskManagerGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleAckingTaskManagerGateway.java index dba0e7d..ffb7b49 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleAckingTaskManagerGateway.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleAckingTaskManagerGateway.java @@ -34,6 +34,7 @@ import org.apache.flink.runtime.messages.StackTraceSampleResponse; import java.util.Collection; import java.util.UUID; import java.util.concurrent.CompletableFuture; +import java.util.function.BiConsumer; import java.util.function.BiFunction; import java.util.function.Consumer; @@ -51,6 +52,8 @@ public class SimpleAckingTaskManagerGateway implements TaskManagerGateway { private volatile BiFunction<AllocationID, Throwable, CompletableFuture<Acknowledge>> freeSlotFunction; + private BiConsumer<JobID, Collection<ResultPartitionID>> releasePartitionsConsumer = (ignore1, ignore2) -> { }; + public void setSubmitConsumer(Consumer<TaskDeploymentDescriptor> predicate) { submitConsumer = predicate; } @@ -63,6 +66,10 @@ public class SimpleAckingTaskManagerGateway implements TaskManagerGateway { this.freeSlotFunction = freeSlotFunction; } + public void setReleasePartitionsConsumer(BiConsumer<JobID, Collection<ResultPartitionID>> releasePartitionsConsumer) { + this.releasePartitionsConsumer = releasePartitionsConsumer; + } + @Override public String getAddress() { return address; @@ -97,7 +104,8 @@ public class SimpleAckingTaskManagerGateway implements TaskManagerGateway { } @Override - public void releasePartitions(Collection<ResultPartitionID> partitionIds) { + public void releasePartitions(JobID jobId, Collection<ResultPartitionID> partitionIds) { + releasePartitionsConsumer.accept(jobId, partitionIds); } @Override diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGateway.java index aca0bb2..1021a77 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGateway.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGateway.java @@ -125,7 +125,7 @@ public class TestingTaskExecutorGateway implements TaskExecutorGateway { } @Override - public void releasePartitions(Collection<ResultPartitionID> partitionIds) { + public void releasePartitions(JobID jobId, Collection<ResultPartitionID> partitionIds) { // noop }