This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 5e498dc9e763a5daa3867456f657e19cd08fbb66
Author: Till Rohrmann <trohrm...@apache.org>
AuthorDate: Wed Mar 27 09:09:13 2019 +0100

    [FLINK-12021] Deploy execution in topological sorted order
    
    Due to changes how the slot futures are completed and due to the fact that 
the
    ResultConjunctFuture does not maintain the order in which the futures were 
specified,
    it could happen that executions were not deployed in topological order. 
This commit
    fixes this problem by changing the ResultConjunctFuture so that it 
maintains the order
    of the specified futures in its result collection.
    
    This closes #8060.
---
 .../flink/runtime/concurrent/FutureUtils.java      |  12 +-
 .../runtime/concurrent/ConjunctFutureTest.java     |  40 ++++--
 .../ExecutionGraphDeploymentTest.java              | 149 +++++++++++++++++++++
 .../executiongraph/TestingSlotProvider.java        |   2 +-
 .../taskmanager/LocalTaskManagerLocation.java      |   2 +-
 5 files changed, 184 insertions(+), 21 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
index 61b88be..1458eab 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
@@ -563,9 +563,6 @@ public class FutureUtils {
                /** The total number of futures in the conjunction. */
                private final int numTotal;
 
-               /** The next free index in the results arrays. */
-               private final AtomicInteger nextIndex = new AtomicInteger(0);
-
                /** The number of futures in the conjunction that are already 
complete. */
                private final AtomicInteger numCompleted = new AtomicInteger(0);
 
@@ -575,12 +572,10 @@ public class FutureUtils {
                /** The function that is attached to all futures in the 
conjunction. Once a future
                 * is complete, this function tracks the completion or fails 
the conjunct.
                 */
-               private void handleCompletedFuture(T value, Throwable 
throwable) {
+               private void handleCompletedFuture(int index, T value, 
Throwable throwable) {
                        if (throwable != null) {
                                completeExceptionally(throwable);
                        } else {
-                               int index = nextIndex.getAndIncrement();
-
                                results[index] = value;
 
                                if (numCompleted.incrementAndGet() == numTotal) 
{
@@ -598,8 +593,11 @@ public class FutureUtils {
                                complete(Collections.emptyList());
                        }
                        else {
+                               int counter = 0;
                                for (CompletableFuture<? extends T> future : 
resultFutures) {
-                                       
future.whenComplete(this::handleCompletedFuture);
+                                       final int index = counter;
+                                       counter++;
+                                       future.whenComplete((value, throwable) 
-> handleCompletedFuture(index, value, throwable));
                                }
                        }
                }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/ConjunctFutureTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/ConjunctFutureTest.java
index 9f33866..28fa8fb 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/ConjunctFutureTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/ConjunctFutureTest.java
@@ -18,21 +18,27 @@
 
 package org.apache.flink.runtime.concurrent;
 
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.concurrent.FutureUtils.ConjunctFuture;
 import org.apache.flink.util.TestLogger;
 
-import org.hamcrest.collection.IsIterableContainingInAnyOrder;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertThat;
@@ -40,7 +46,7 @@ import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 /**
- * Tests for the {@link ConjunctFuture} and {@link 
FutureUtils.WaitingConjunctFuture}.
+ * Tests for the {@link ConjunctFuture} and its sub classes.
  */
 @RunWith(Parameterized.class)
 public class ConjunctFutureTest extends TestLogger {
@@ -193,23 +199,33 @@ public class ConjunctFutureTest extends TestLogger {
        }
 
        /**
-        * Tests that the conjunct future returns upon completion the 
collection of all future values.
+        * Tests that the conjunct future returns upon completion the 
collection of all future values
+        * in the same order in which the futures were inserted.
         */
        @Test
-       public void testConjunctFutureValue() throws ExecutionException, 
InterruptedException {
-               java.util.concurrent.CompletableFuture<Integer> future1 = 
java.util.concurrent.CompletableFuture.completedFuture(1);
-               java.util.concurrent.CompletableFuture<Long> future2 = 
java.util.concurrent.CompletableFuture.completedFuture(2L);
-               java.util.concurrent.CompletableFuture<Double> future3 = new 
java.util.concurrent.CompletableFuture<>();
+       public void testConjunctFutureValue() throws Exception {
+               final int numberFutures = 10;
 
-               ConjunctFuture<Collection<Number>> result = 
FutureUtils.combineAll(Arrays.asList(future1, future2, future3));
+               final List<CompletableFuture<Integer>> futures = new 
ArrayList<>(numberFutures);
+               for (int i = 0; i < numberFutures; i++) {
+                       futures.add(new CompletableFuture<>());
+               }
 
-               assertFalse(result.isDone());
+               ConjunctFuture<Collection<Number>> result = 
FutureUtils.combineAll(futures);
 
-               future3.complete(.1);
+               final List<Tuple2<Integer, CompletableFuture<Integer>>> 
shuffledFutures = IntStream.range(0, futures.size())
+                       .mapToObj(index -> Tuple2.of(index, futures.get(index)))
+                       .collect(Collectors.toList());
+               Collections.shuffle(shuffledFutures);
 
-               assertTrue(result.isDone());
+               for (Tuple2<Integer, CompletableFuture<Integer>> shuffledFuture 
: shuffledFutures) {
+                       assertThat(result.isDone(), is(false));
+                       shuffledFuture.f1.complete(shuffledFuture.f0);
+               }
+
+               assertThat(result.isDone(), is(true));
 
-               assertThat(result.get(), 
IsIterableContainingInAnyOrder.<Number>containsInAnyOrder(1, 2L, .1));
+               assertThat(result.get(), is(equalTo(IntStream.range(0, 
numberFutures).boxed().collect(Collectors.toList()))));
        }
 
        @Test
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
index 5ececd6..0324928 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
@@ -33,6 +33,7 @@ import org.apache.flink.runtime.blob.PermanentBlobService;
 import org.apache.flink.runtime.blob.VoidBlobWriter;
 import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy;
 import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
+import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
@@ -50,24 +51,34 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.ScheduleMode;
 import 
org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
 import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
+import org.apache.flink.runtime.jobmaster.JobMasterId;
 import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.jobmaster.RpcTaskManagerGateway;
 import org.apache.flink.runtime.jobmaster.SlotOwner;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
 import org.apache.flink.runtime.jobmaster.TestingLogicalSlot;
 import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
+import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.operators.BatchTask;
+import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGateway;
+import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
 import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.testtasks.NoOpInvokable;
 import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
+import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.TestLogger;
 import org.apache.flink.util.function.FunctionUtils;
 
+import org.hamcrest.Description;
+import org.hamcrest.TypeSafeMatcher;
 import org.junit.Test;
 import org.slf4j.LoggerFactory;
 
 import java.util.ArrayDeque;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
@@ -75,15 +86,18 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.function.Function;
 
 import static junit.framework.TestCase.assertTrue;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThat;
 import static org.junit.Assert.fail;
 import static org.mockito.Mockito.mock;
 
@@ -662,6 +676,107 @@ public class ExecutionGraphDeploymentTest extends 
TestLogger {
                }
        }
 
+       /**
+        * Tests that the {@link ExecutionGraph} is deployed in topological 
order.
+        */
+       @Test
+       public void testExecutionGraphIsDeployedInTopologicalOrder() throws 
Exception {
+               final int sourceParallelism = 2;
+               final int sinkParallelism = 1;
+
+               final JobVertex sourceVertex = new JobVertex("source");
+               sourceVertex.setInvokableClass(NoOpInvokable.class);
+               sourceVertex.setParallelism(sourceParallelism);
+
+               final JobVertex sinkVertex = new JobVertex("sink");
+               sinkVertex.setInvokableClass(NoOpInvokable.class);
+               sinkVertex.setParallelism(sinkParallelism);
+
+               sinkVertex.connectNewDataSetAsInput(sourceVertex, 
DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
+
+               final JobID jobId = new JobID();
+               final int numberTasks = sourceParallelism + sinkParallelism;
+               final ArrayBlockingQueue<ExecutionAttemptID> 
submittedTasksQueue = new ArrayBlockingQueue<>(numberTasks);
+               TestingTaskExecutorGatewayBuilder 
testingTaskExecutorGatewayBuilder = new TestingTaskExecutorGatewayBuilder();
+               
testingTaskExecutorGatewayBuilder.setSubmitTaskConsumer((taskDeploymentDescriptor,
 jobMasterId) -> {
+                       
submittedTasksQueue.offer(taskDeploymentDescriptor.getExecutionAttemptId());
+                       return 
CompletableFuture.completedFuture(Acknowledge.get());
+               });
+
+               final TestingTaskExecutorGateway taskExecutorGateway = 
testingTaskExecutorGatewayBuilder.createTestingTaskExecutorGateway();
+               final RpcTaskManagerGateway taskManagerGateway = new 
RpcTaskManagerGateway(taskExecutorGateway, JobMasterId.generate());
+
+               final Collection<CompletableFuture<LogicalSlot>> slotFutures = 
new ArrayList<>(numberTasks);
+               for (int i = 0; i < numberTasks; i++) {
+                       slotFutures.add(new CompletableFuture<>());
+               }
+
+               final SlotProvider slotProvider = new 
IteratorTestingSlotProvider(slotFutures.iterator());
+
+               final ExecutionGraph executionGraph = 
ExecutionGraphTestUtils.createExecutionGraph(
+                       jobId,
+                       slotProvider,
+                       new NoRestartStrategy(),
+                       new DirectScheduledExecutorService(),
+                       sourceVertex,
+                       sinkVertex);
+               executionGraph.setScheduleMode(ScheduleMode.EAGER);
+               executionGraph.setQueuedSchedulingAllowed(true);
+
+               
executionGraph.start(TestingComponentMainThreadExecutorServiceAdapter.forMainThread());
+
+               executionGraph.scheduleForExecution();
+
+               // change the order in which the futures are completed
+               final List<CompletableFuture<LogicalSlot>> shuffledFutures = 
new ArrayList<>(slotFutures);
+               Collections.shuffle(shuffledFutures);
+
+               for (CompletableFuture<LogicalSlot> slotFuture : 
shuffledFutures) {
+                       slotFuture.complete(new 
TestingLogicalSlot(taskManagerGateway));
+               }
+
+               final List<ExecutionAttemptID> submittedTasks = new 
ArrayList<>(numberTasks);
+
+               for (int i = 0; i < numberTasks; i++) {
+                       submittedTasks.add(submittedTasksQueue.take());
+               }
+
+               final Collection<ExecutionAttemptID> firstStage = new 
ArrayList<>(sourceParallelism);
+               for (ExecutionVertex taskVertex : 
executionGraph.getJobVertex(sourceVertex.getID()).getTaskVertices()) {
+                       
firstStage.add(taskVertex.getCurrentExecutionAttempt().getAttemptId());
+               }
+
+               final Collection<ExecutionAttemptID> secondStage = new 
ArrayList<>(sinkParallelism);
+               for (ExecutionVertex taskVertex : 
executionGraph.getJobVertex(sinkVertex.getID()).getTaskVertices()) {
+                       
secondStage.add(taskVertex.getCurrentExecutionAttempt().getAttemptId());
+               }
+
+               assertThat(submittedTasks, new 
ExecutionStageMatcher(Arrays.asList(firstStage, secondStage)));
+       }
+
+       private static final class IteratorTestingSlotProvider extends 
TestingSlotProvider {
+               private IteratorTestingSlotProvider(final 
Iterator<CompletableFuture<LogicalSlot>> slotIterator) {
+                       super(new IteratorSlotFutureFunction(slotIterator));
+               }
+
+               private static class IteratorSlotFutureFunction implements 
Function<SlotRequestId, CompletableFuture<LogicalSlot>> {
+                       final Iterator<CompletableFuture<LogicalSlot>> 
slotIterator;
+
+                       
IteratorSlotFutureFunction(Iterator<CompletableFuture<LogicalSlot>> 
slotIterator) {
+                               this.slotIterator = slotIterator;
+                       }
+
+                       @Override
+                       public CompletableFuture<LogicalSlot> 
apply(SlotRequestId slotRequestId) {
+                               if (slotIterator.hasNext()) {
+                                       return slotIterator.next();
+                               } else {
+                                       return 
FutureUtils.completedExceptionally(new FlinkException("No more slots 
available."));
+                               }
+                       }
+               }
+       }
+
        private SimpleSlot createSlot(TaskManagerLocation taskManagerLocation, 
int index) {
                return new SimpleSlot(
                        mock(SlotOwner.class),
@@ -707,4 +822,38 @@ public class ExecutionGraphDeploymentTest extends 
TestLogger {
                        timeout,
                        LoggerFactory.getLogger(getClass()));
        }
+
+       private static final class ExecutionStageMatcher extends 
TypeSafeMatcher<List<ExecutionAttemptID>> {
+               private final List<Collection<ExecutionAttemptID>> 
executionStages;
+
+               private 
ExecutionStageMatcher(List<Collection<ExecutionAttemptID>> executionStages) {
+                       this.executionStages = executionStages;
+               }
+
+               @Override
+               protected boolean matchesSafely(List<ExecutionAttemptID> 
submissionOrder) {
+                       final Iterator<ExecutionAttemptID> submissionIterator = 
submissionOrder.iterator();
+
+                       for (Collection<ExecutionAttemptID> stage : 
executionStages) {
+                               final Collection<ExecutionAttemptID> 
currentStage = new ArrayList<>(stage);
+
+                               while (!currentStage.isEmpty() && 
submissionIterator.hasNext()) {
+                                       if 
(!currentStage.remove(submissionIterator.next())) {
+                                               return false;
+                                       }
+                               }
+
+                               if (!currentStage.isEmpty()) {
+                                       return false;
+                               }
+                       }
+
+                       return !submissionIterator.hasNext();
+               }
+
+               @Override
+               public void describeTo(Description description) {
+                       description.appendValueList("<[", ", ", "]>", 
executionStages);
+               }
+       }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TestingSlotProvider.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TestingSlotProvider.java
index b0ca786..a795278 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TestingSlotProvider.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TestingSlotProvider.java
@@ -38,7 +38,7 @@ import java.util.function.Function;
 /**
  * {@link SlotProvider} implementation for testing purposes.
  */
-public final class TestingSlotProvider implements SlotProvider {
+public class TestingSlotProvider implements SlotProvider {
 
        private final ConcurrentMap<SlotRequestId, 
CompletableFuture<LogicalSlot>> slotFutures;
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/LocalTaskManagerLocation.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/LocalTaskManagerLocation.java
index 60dddbb..1c46891 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/LocalTaskManagerLocation.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/LocalTaskManagerLocation.java
@@ -30,6 +30,6 @@ public class LocalTaskManagerLocation extends 
TaskManagerLocation {
        private static final long serialVersionUID = 2396142513336559461L;
 
        public LocalTaskManagerLocation() {
-               super(ResourceID.generate(), InetAddress.getLoopbackAddress(), 
-1);
+               super(ResourceID.generate(), InetAddress.getLoopbackAddress(), 
42);
        }
 }

Reply via email to