Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4916#discussion_r148216894 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java --- @@ -878,113 +880,70 @@ private void scheduleEager(SlotProvider slotProvider, final Time timeout) { // that way we do not have any operation that can fail between allocating the slots // and adding them to the list. If we had a failure in between there, that would // cause the slots to get lost - final ArrayList<ExecutionAndSlot[]> resources = new ArrayList<>(getNumberOfExecutionJobVertices()); final boolean queued = allowQueuedScheduling; - // we use this flag to handle failures in a 'finally' clause - // that allows us to not go through clumsy cast-and-rethrow logic - boolean successful = false; + // collecting all the slots may resize and fail in that operation without slots getting lost + final ArrayList<CompletableFuture<Execution>> allAllocationFutures = new ArrayList<>(getNumberOfExecutionJobVertices()); - try { - // collecting all the slots may resize and fail in that operation without slots getting lost - final ArrayList<CompletableFuture<SimpleSlot>> slotFutures = new ArrayList<>(getNumberOfExecutionJobVertices()); + // allocate the slots (obtain all their futures + for (ExecutionJobVertex ejv : getVerticesTopologically()) { + // these calls are not blocking, they only return futures --- End diff -- But be aware that the `allocateAndAssign` call is non-blocking and the actual order depends on the preferred locations futures.
---