This is an automated email from the ASF dual-hosted git repository.

azagrebin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 8d50daa  [FLINK-19388][coordination] Do not remove logical slots from 
SharedSlot if it is releasing
8d50daa is described below

commit 8d50daa7e13e00cffbd09efac73e6bea0e48ee5c
Author: Andrey Zagrebin <azagre...@apache.org>
AuthorDate: Tue Sep 29 14:09:29 2020 +0200

    [FLINK-19388][coordination] Do not remove logical slots from SharedSlot if 
it is releasing
    
    `SharedSlot#release` releases all logical slots in a loop over their 
collection.
    The logical slot releases lead to their execution failures.
    This can cause cancellation of other executions sharing same slot.
    The execution failure can cause cancelation of other sharing executions by 
`Scheduler`.
    The canceled executions subsequently call `SharedSlot#returnLogicalSlot`
    which modifies the logical slot collection while it is being iterated in 
`SharedSlot#release`,
    if the canceled executions share the same slot. This leads to 
`ConcurrentModificationException`.
    
    To avoid the `ConcurrentModificationException`, the logical slot collection 
can be copied before iterating it.
    
    This closes #13511.
---
 .../apache/flink/runtime/scheduler/SharedSlot.java | 21 +++++++++++++----
 .../flink/runtime/scheduler/SharedSlotTest.java    | 27 +++++++++++++++++++++-
 2 files changed, 43 insertions(+), 5 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SharedSlot.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SharedSlot.java
index 1d55bbd..1525060 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SharedSlot.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SharedSlot.java
@@ -35,9 +35,11 @@ import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
 
+import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.function.Consumer;
 import java.util.function.Function;
+import java.util.stream.Collectors;
 
 /**
  * Shared slot implementation for the {@link 
SlotSharingExecutionSlotAllocator}.
@@ -203,6 +205,7 @@ class SharedSlot implements SlotOwner, PhysicalSlot.Payload 
{
        }
 
        private void removeLogicalSlotRequest(SlotRequestId 
logicalSlotRequestId) {
+               LOG.debug("Remove {}", 
getLogicalSlotString(logicalSlotRequestId));
                Preconditions.checkState(
                        requestedLogicalSlots.removeKeyB(logicalSlotRequestId) 
!= null,
                        "Trying to remove a logical slot request which has been 
either already removed or never created.");
@@ -215,10 +218,19 @@ class SharedSlot implements SlotOwner, 
PhysicalSlot.Payload {
                        slotContextFuture.isDone(),
                        "Releasing of the shared slot is expected only from its 
successfully allocated physical slot ({})",
                        physicalSlotRequestId);
-               for (ExecutionVertexID executionVertexId : 
requestedLogicalSlots.keySetA()) {
-                       LOG.debug("Release {}", 
getLogicalSlotString(executionVertexId));
+               LOG.debug("Release shared slot ({})", physicalSlotRequestId);
+
+               // copy the logical slot collection to avoid 
ConcurrentModificationException
+               // if logical slot releases cause cancellation of other 
executions
+               // which will try to call returnLogicalSlot and modify 
requestedLogicalSlots collection
+               Map<ExecutionVertexID, CompletableFuture<SingleLogicalSlot>> 
logicalSlotFutures = requestedLogicalSlots
+                       .keySetA()
+                       .stream()
+                       .collect(Collectors.toMap(executionVertexId -> 
executionVertexId, requestedLogicalSlots::getValueByKeyA));
+               for (Map.Entry<ExecutionVertexID, 
CompletableFuture<SingleLogicalSlot>> entry : logicalSlotFutures.entrySet()) {
+                       LOG.debug("Release {}", 
getLogicalSlotString(entry.getKey()));
                        CompletableFuture<SingleLogicalSlot> logicalSlotFuture =
-                               
requestedLogicalSlots.getValueByKeyA(executionVertexId);
+                               entry.getValue();
                        Preconditions.checkNotNull(logicalSlotFuture);
                        Preconditions.checkState(
                                logicalSlotFuture.isDone(),
@@ -231,8 +243,9 @@ class SharedSlot implements SlotOwner, PhysicalSlot.Payload 
{
        }
 
        private void releaseExternally() {
-               if (state == State.ALLOCATED && 
requestedLogicalSlots.values().isEmpty()) {
+               if (state != State.RELEASED && 
requestedLogicalSlots.values().isEmpty()) {
                        state = State.RELEASED;
+                       LOG.debug("Release shared slot externally ({})", 
physicalSlotRequestId);
                        
externalReleaseCallback.accept(executionSlotSharingGroup);
                }
        }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SharedSlotTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SharedSlotTest.java
index da5091c..afb5b3e 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SharedSlotTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SharedSlotTest.java
@@ -50,7 +50,8 @@ import static org.junit.Assert.fail;
  */
 public class SharedSlotTest extends TestLogger {
        private static final ExecutionVertexID EV1 = 
createRandomExecutionVertexId();
-       private static final ExecutionSlotSharingGroup SG = 
createExecutionSlotSharingGroup(EV1);
+       private static final ExecutionVertexID EV2 = 
createRandomExecutionVertexId();
+       private static final ExecutionSlotSharingGroup SG = 
createExecutionSlotSharingGroup(EV1, EV2);
        private static final SlotRequestId PHYSICAL_SLOT_REQUEST_ID = new 
SlotRequestId();
        private static final ResourceProfile RP = 
ResourceProfile.newBuilder().setCpuCores(2.0).build();
 
@@ -285,6 +286,30 @@ public class SharedSlotTest extends TestLogger {
                assertThat(released.get(), is(1));
        }
 
+       @Test
+       public void 
testReturnLogicalSlotWhileReleasingDoesNotCauseConcurrentModificationException()
 {
+               CompletableFuture<PhysicalSlot> slotContextFuture = 
CompletableFuture
+                       .completedFuture(new TestingPhysicalSlot(RP, new 
AllocationID()));
+               SharedSlot sharedSlot = SharedSlotBuilder
+                       .newBuilder()
+                       .withSlotContextFuture(slotContextFuture)
+                       .build();
+               LogicalSlot logicalSlot1 = 
sharedSlot.allocateLogicalSlot(EV1).join();
+               LogicalSlot logicalSlot2 = 
sharedSlot.allocateLogicalSlot(EV2).join();
+               logicalSlot1.tryAssignPayload(new LogicalSlot.Payload() {
+                       @Override
+                       public void fail(Throwable cause) {
+                               sharedSlot.returnLogicalSlot(logicalSlot2);
+                       }
+
+                       @Override
+                       public CompletableFuture<?> getTerminalStateFuture() {
+                               return CompletableFuture.completedFuture(null);
+                       }
+               });
+               sharedSlot.release(new Throwable());
+       }
+
        private static class SharedSlotBuilder {
                private CompletableFuture<PhysicalSlot> slotContextFuture = new 
CompletableFuture<>();
                private boolean slotWillBeOccupiedIndefinitely = false;

Reply via email to