[GitHub] [flink] zhuzhurk commented on a change in pull request #12278: [FLINK-17019][runtime] Fulfill slot requests in request order

2020-06-17 Thread GitBox


zhuzhurk commented on a change in pull request #12278:
URL: https://github.com/apache/flink/pull/12278#discussion_r441952039



##
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolPendingRequestFailureTest.java
##
@@ -99,6 +105,40 @@ public void testFailingAllocationFailsPendingSlotRequests() 
throws Exception {
}
}
 
+   @Test
+   public void testFailingAllocationFailsRemappedPendingSlotRequests() 
throws Exception {
+   final List allocations = new ArrayList<>();
+   resourceManagerGateway.setRequestSlotConsumer(slotRequest -> 
allocations.add(slotRequest.getAllocationId()));
+
+   try (SlotPoolImpl slotPool = setUpSlotPool()) {

Review comment:
   FLINK-18355 is opened for simplify the tests.

##
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolPendingRequestFailureTest.java
##
@@ -99,6 +105,40 @@ public void testFailingAllocationFailsPendingSlotRequests() 
throws Exception {
}
}
 
+   @Test
+   public void testFailingAllocationFailsRemappedPendingSlotRequests() 
throws Exception {
+   final List allocations = new ArrayList<>();
+   resourceManagerGateway.setRequestSlotConsumer(slotRequest -> 
allocations.add(slotRequest.getAllocationId()));
+
+   try (SlotPoolImpl slotPool = setUpSlotPool()) {

Review comment:
   FLINK-18355 is opened to simplify the tests.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] zhuzhurk commented on a change in pull request #12278: [FLINK-17019][runtime] Fulfill slot requests in request order

2020-06-17 Thread GitBox


zhuzhurk commented on a change in pull request #12278:
URL: https://github.com/apache/flink/pull/12278#discussion_r441432435



##
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolImplTest.java
##
@@ -738,6 +740,95 @@ public void testCalculationOfTaskExecutorUtilization() 
throws Exception {
}
}
 
+   @Test
+   public void testOrphanedAllocationCanBeRemapped() throws Exception {
+   try (SlotPoolImpl slotPool = createSlotPoolImpl()) {
+   final List allocationIds = new 
ArrayList<>();
+   resourceManagerGateway.setRequestSlotConsumer(
+   slotRequest -> 
allocationIds.add(slotRequest.getAllocationId()));
+
+   final List canceledAllocations = new 
ArrayList<>();
+   
resourceManagerGateway.setCancelSlotConsumer(canceledAllocations::add);
+
+   setupSlotPool(slotPool, resourceManagerGateway, 
mainThreadExecutor);
+   final Scheduler scheduler = setupScheduler(slotPool, 
mainThreadExecutor);
+
+   final SlotRequestId slotRequestId1 = new 
SlotRequestId();
+   scheduler.allocateSlot(
+   slotRequestId1,
+   new DummyScheduledUnit(),
+   SlotProfile.noRequirements(),
+   timeout);

Review comment:
   Done via introducing a `requestNewAllocatedSlots(SlotPool, 
SlotRequestId...)`.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] zhuzhurk commented on a change in pull request #12278: [FLINK-17019][runtime] Fulfill slot requests in request order

2020-06-17 Thread GitBox


zhuzhurk commented on a change in pull request #12278:
URL: https://github.com/apache/flink/pull/12278#discussion_r441431939



##
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolImplTest.java
##
@@ -738,6 +740,95 @@ public void testCalculationOfTaskExecutorUtilization() 
throws Exception {
}
}
 
+   @Test
+   public void testOrphanedAllocationCanBeRemapped() throws Exception {
+   try (SlotPoolImpl slotPool = createSlotPoolImpl()) {
+   final List allocationIds = new 
ArrayList<>();
+   resourceManagerGateway.setRequestSlotConsumer(
+   slotRequest -> 
allocationIds.add(slotRequest.getAllocationId()));
+
+   final List canceledAllocations = new 
ArrayList<>();
+   
resourceManagerGateway.setCancelSlotConsumer(canceledAllocations::add);
+
+   setupSlotPool(slotPool, resourceManagerGateway, 
mainThreadExecutor);
+   final Scheduler scheduler = setupScheduler(slotPool, 
mainThreadExecutor);

Review comment:
   You are right. I have dropped scheduler in the newly added cases. 
   However, this unnecessary complication is a common problem of most of the 
SlotPool tests. I think we can simplify them in a separate task as well.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] zhuzhurk commented on a change in pull request #12278: [FLINK-17019][runtime] Fulfill slot requests in request order

2020-06-17 Thread GitBox


zhuzhurk commented on a change in pull request #12278:
URL: https://github.com/apache/flink/pull/12278#discussion_r441430620



##
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolImplTest.java
##
@@ -738,6 +740,95 @@ public void testCalculationOfTaskExecutorUtilization() 
throws Exception {
}
}
 
+   @Test
+   public void testOrphanedAllocationCanBeRemapped() throws Exception {
+   try (SlotPoolImpl slotPool = createSlotPoolImpl()) {
+   final List allocationIds = new 
ArrayList<>();
+   resourceManagerGateway.setRequestSlotConsumer(
+   slotRequest -> 
allocationIds.add(slotRequest.getAllocationId()));
+
+   final List canceledAllocations = new 
ArrayList<>();
+   
resourceManagerGateway.setCancelSlotConsumer(canceledAllocations::add);

Review comment:
   Maybe later along with the rework of all SlotPool tests? Because 
resourceManagerGateway is shared between all cases and it's better to have an 
overview of the usages.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] zhuzhurk commented on a change in pull request #12278: [FLINK-17019][runtime] Fulfill slot requests in request order

2020-06-17 Thread GitBox


zhuzhurk commented on a change in pull request #12278:
URL: https://github.com/apache/flink/pull/12278#discussion_r441412492



##
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolPendingRequestFailureTest.java
##
@@ -99,6 +105,40 @@ public void testFailingAllocationFailsPendingSlotRequests() 
throws Exception {
}
}
 
+   @Test
+   public void testFailingAllocationFailsRemappedPendingSlotRequests() 
throws Exception {
+   final List allocations = new ArrayList<>();
+   resourceManagerGateway.setRequestSlotConsumer(slotRequest -> 
allocations.add(slotRequest.getAllocationId()));
+
+   try (SlotPoolImpl slotPool = setUpSlotPool()) {

Review comment:
   Possibly these test classes can share the same test base. So that the 
util methods and fields can be reused.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] zhuzhurk commented on a change in pull request #12278: [FLINK-17019][runtime] Fulfill slot requests in request order

2020-06-17 Thread GitBox


zhuzhurk commented on a change in pull request #12278:
URL: https://github.com/apache/flink/pull/12278#discussion_r441412744



##
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolPendingRequestFailureTest.java
##
@@ -99,6 +105,40 @@ public void testFailingAllocationFailsPendingSlotRequests() 
throws Exception {
}
}
 
+   @Test
+   public void testFailingAllocationFailsRemappedPendingSlotRequests() 
throws Exception {
+   final List allocations = new ArrayList<>();
+   resourceManagerGateway.setRequestSlotConsumer(slotRequest -> 
allocations.add(slotRequest.getAllocationId()));
+
+   try (SlotPoolImpl slotPool = setUpSlotPool()) {

Review comment:
   I can open a task for it if you feel it is Ok.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] zhuzhurk commented on a change in pull request #12278: [FLINK-17019][runtime] Fulfill slot requests in request order

2020-06-17 Thread GitBox


zhuzhurk commented on a change in pull request #12278:
URL: https://github.com/apache/flink/pull/12278#discussion_r441412492



##
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolPendingRequestFailureTest.java
##
@@ -99,6 +105,40 @@ public void testFailingAllocationFailsPendingSlotRequests() 
throws Exception {
}
}
 
+   @Test
+   public void testFailingAllocationFailsRemappedPendingSlotRequests() 
throws Exception {
+   final List allocations = new ArrayList<>();
+   resourceManagerGateway.setRequestSlotConsumer(slotRequest -> 
allocations.add(slotRequest.getAllocationId()));
+
+   try (SlotPoolImpl slotPool = setUpSlotPool()) {

Review comment:
   Possibly these test classes can share the same test base.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] zhuzhurk commented on a change in pull request #12278: [FLINK-17019][runtime] Fulfill slot requests in request order

2020-06-17 Thread GitBox


zhuzhurk commented on a change in pull request #12278:
URL: https://github.com/apache/flink/pull/12278#discussion_r441400235



##
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolPendingRequestFailureTest.java
##
@@ -99,6 +105,40 @@ public void testFailingAllocationFailsPendingSlotRequests() 
throws Exception {
}
}
 
+   @Test
+   public void testFailingAllocationFailsRemappedPendingSlotRequests() 
throws Exception {
+   final List allocations = new ArrayList<>();
+   resourceManagerGateway.setRequestSlotConsumer(slotRequest -> 
allocations.add(slotRequest.getAllocationId()));
+
+   try (SlotPoolImpl slotPool = setUpSlotPool()) {

Review comment:
   It's possible but unrelated. I do not feel like we should cleanup the 
tests here. 
   Like you said, the scheduler might also be not needed. I think it can be a 
larger task and would be better to be a separate task to fully rework the 
`SlotPoolImpl` tests, including those in `SlotPoolImplTest`, 
`SlotPoolInteractionsTest`, `SlotPoolSlotSharingTest`. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] zhuzhurk commented on a change in pull request #12278: [FLINK-17019][runtime] Fulfill slot requests in request order

2020-06-17 Thread GitBox


zhuzhurk commented on a change in pull request #12278:
URL: https://github.com/apache/flink/pull/12278#discussion_r441351136



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DualKeyLinkedMap.java
##
@@ -37,13 +41,13 @@
 
private final LinkedHashMap> aMap;
 
-   private final LinkedHashMap bMap;
+   private final HashMap bMap;

Review comment:
   I means confusion to developers. A `LinkedHashMap` for bMap indicates it 
makes some differences than `Map` but actually not.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] zhuzhurk commented on a change in pull request #12278: [FLINK-17019][runtime] Fulfill slot requests in request order

2020-06-17 Thread GitBox


zhuzhurk commented on a change in pull request #12278:
URL: https://github.com/apache/flink/pull/12278#discussion_r441347521



##
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DualKeyLinkedMapTest.java
##
@@ -85,4 +85,28 @@ public void 
ensuresOneToOneMappingBetweenKeysSameSecondaryKey() {
assertThat(map.getByKeyB(1), is(secondValue));
assertThat(map.getByKeyA(2), is(secondValue));
}
+
+   @Test
+   public void 
testPrimaryKeyOrderIsNotAffectedIfReInsertedWithSameSecondaryKey() {
+   final DualKeyLinkedMap map = new 
DualKeyLinkedMap<>(2);
+
+   final String value = "foobar";
+   map.put(1, 1, value);
+   map.put(2, 2, value);
+
+   map.put(1, 1, value);
+   assertThat(map.keySetA().iterator().next(), is(1));

Review comment:
   True. Added verifications for values.

##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DualKeyLinkedMap.java
##
@@ -74,6 +74,20 @@ public V getKeyB(B bKey) {
}
}
 
+   public A getKeyA(B bKey) {
+   return bMap.get(bKey);
+   }
+
+   public B getKeyB(A aKey) {

Review comment:
   done.

##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DualKeyLinkedMap.java
##
@@ -74,6 +74,20 @@ public V getKeyB(B bKey) {
}
}
 
+   public A getKeyA(B bKey) {

Review comment:
   done.

##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DualKeyLinkedMap.java
##
@@ -64,7 +64,7 @@ public V getKeyA(A aKey) {
}
}
 
-   public V getKeyB(B bKey) {
+   public V getByKeyB(B bKey) {

Review comment:
   done.

##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DualKeyLinkedMap.java
##
@@ -54,7 +54,7 @@ public int size() {
return aMap.size();
}
 
-   public V getKeyA(A aKey) {
+   public V getByKeyA(A aKey) {

Review comment:
   done.

##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DualKeyLinkedMap.java
##
@@ -22,12 +22,16 @@
 
 import java.util.AbstractCollection;
 import java.util.Collection;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.Set;
 
 /**
- * Map which stores values under two different indices.
+ * Map which stores values under two different indices. The mapping of the 
primary key to the
+ * value is backed by {@link LinkedHashMap} so that the iteration order over 
the values and
+ * the primary key set is the insertion order. Note that there is no contract 
of the iteration
+ * order over the secondary key set.

Review comment:
   done.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] zhuzhurk commented on a change in pull request #12278: [FLINK-17019][runtime] Fulfill slot requests in request order

2020-06-17 Thread GitBox


zhuzhurk commented on a change in pull request #12278:
URL: https://github.com/apache/flink/pull/12278#discussion_r441347160



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolImpl.java
##
@@ -592,6 +589,31 @@ private PendingRequest findMatchingPendingRequest(final 
AllocatedSlot slot) {
return null;
}
 
+   private void maybeRemapOrphanedAllocation(
+   final AllocationID allocationIdOfRequest,
+   final AllocationID allocationIdOfSlot) {
+
+   final AllocationID orphanedAllocationId = 
allocationIdOfRequest.equals(allocationIdOfSlot)
+   ? null : allocationIdOfRequest;
+
+   // if the request that initiated the allocation is still 
pending, it should take over the orphaned allocation
+   // of the fulfilled request so that it can fail fast if the 
remapped allocation fails
+   if (orphanedAllocationId != null) {

Review comment:
   done.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] zhuzhurk commented on a change in pull request #12278: [FLINK-17019][runtime] Fulfill slot requests in request order

2020-06-17 Thread GitBox


zhuzhurk commented on a change in pull request #12278:
URL: https://github.com/apache/flink/pull/12278#discussion_r441347449



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolImpl.java
##
@@ -592,6 +589,31 @@ private PendingRequest findMatchingPendingRequest(final 
AllocatedSlot slot) {
return null;
}
 
+   private void maybeRemapOrphanedAllocation(
+   final AllocationID allocationIdOfRequest,
+   final AllocationID allocationIdOfSlot) {
+
+   final AllocationID orphanedAllocationId = 
allocationIdOfRequest.equals(allocationIdOfSlot)
+   ? null : allocationIdOfRequest;
+
+   // if the request that initiated the allocation is still 
pending, it should take over the orphaned allocation
+   // of the fulfilled request so that it can fail fast if the 
remapped allocation fails
+   if (orphanedAllocationId != null) {
+   final SlotRequestId requestIdOfAllocatedSlot = 
pendingRequests.getKeyA(allocationIdOfSlot);
+   if (requestIdOfAllocatedSlot != null) {
+   final PendingRequest requestOfAllocatedSlot = 
pendingRequests.getByKeyA(requestIdOfAllocatedSlot);
+   
requestOfAllocatedSlot.setAllocationId(orphanedAllocationId);
+
+   // this re-insertion of initiatedRequestId will 
not affect its original insertion order
+   pendingRequests.put(requestIdOfAllocatedSlot, 
orphanedAllocationId, requestOfAllocatedSlot);
+   } else {
+   // cancel the slot request if the orphaned 
allocation is not remapped to a pending request.
+   // the request id can be null if the slot is 
returned by scheduler
+   
resourceManagerGateway.cancelSlotRequest(orphanedAllocationId);

Review comment:
   updated the comments to make it easier to understand





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] zhuzhurk commented on a change in pull request #12278: [FLINK-17019][runtime] Fulfill slot requests in request order

2020-06-17 Thread GitBox


zhuzhurk commented on a change in pull request #12278:
URL: https://github.com/apache/flink/pull/12278#discussion_r441336004



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolImpl.java
##
@@ -592,6 +589,31 @@ private PendingRequest findMatchingPendingRequest(final 
AllocatedSlot slot) {
return null;
}
 
+   private void maybeRemapOrphanedAllocation(
+   final AllocationID allocationIdOfRequest,
+   final AllocationID allocationIdOfSlot) {
+
+   final AllocationID orphanedAllocationId = 
allocationIdOfRequest.equals(allocationIdOfSlot)
+   ? null : allocationIdOfRequest;
+
+   // if the request that initiated the allocation is still 
pending, it should take over the orphaned allocation
+   // of the fulfilled request so that it can fail fast if the 
remapped allocation fails
+   if (orphanedAllocationId != null) {
+   final SlotRequestId requestIdOfAllocatedSlot = 
pendingRequests.getKeyA(allocationIdOfSlot);
+   if (requestIdOfAllocatedSlot != null) {
+   final PendingRequest requestOfAllocatedSlot = 
pendingRequests.getByKeyA(requestIdOfAllocatedSlot);
+   
requestOfAllocatedSlot.setAllocationId(orphanedAllocationId);
+
+   // this re-insertion of initiatedRequestId will 
not affect its original insertion order
+   pendingRequests.put(requestIdOfAllocatedSlot, 
orphanedAllocationId, requestOfAllocatedSlot);
+   } else {
+   // cancel the slot request if the orphaned 
allocation is not remapped to a pending request.
+   // the request id can be null if the slot is 
returned by scheduler
+   
resourceManagerGateway.cancelSlotRequest(orphanedAllocationId);

Review comment:
   yes. If such returned slot fulfills a pending request, the orphaned 
allocation would not be needed by any other pending requests which still have 
their own allocations.
   So we can safely cancel them to avoid allocating slots more than needed.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] zhuzhurk commented on a change in pull request #12278: [FLINK-17019][runtime] Fulfill slot requests in request order

2020-06-16 Thread GitBox


zhuzhurk commented on a change in pull request #12278:
URL: https://github.com/apache/flink/pull/12278#discussion_r441316261



##
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/DualKeyLinkedMapTest.java
##
@@ -85,4 +85,28 @@ public void 
ensuresOneToOneMappingBetweenKeysSameSecondaryKey() {
assertThat(map.getByKeyB(1), is(secondValue));
assertThat(map.getByKeyA(2), is(secondValue));
}
+
+   @Test
+   public void 
testPrimaryKeyOrderIsNotAffectedIfReInsertedWithSameSecondaryKey() {
+   final DualKeyLinkedMap map = new 
DualKeyLinkedMap<>(2);
+
+   final String value = "foobar";
+   map.put(1, 1, value);
+   map.put(2, 2, value);
+
+   map.put(1, 1, value);
+   assertThat(map.keySetA().iterator().next(), is(1));
+   }
+
+   @Test
+   public void 
testPrimaryKeyOrderIsNotAffectedIfReInsertedWithDifferentSecondaryKey() {
+   final DualKeyLinkedMap map = new 
DualKeyLinkedMap<>(2);
+
+   final String value = "foobar";
+   map.put(1, 1, value);
+   map.put(2, 2, value);
+
+   map.put(1, 3, value);

Review comment:
   This is tested in `ensuresOneToOneMappingBetweenKeysSameSecondaryKey`. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] zhuzhurk commented on a change in pull request #12278: [FLINK-17019][runtime] Fulfill slot requests in request order

2020-06-16 Thread GitBox


zhuzhurk commented on a change in pull request #12278:
URL: https://github.com/apache/flink/pull/12278#discussion_r440768678



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DualKeyLinkedMap.java
##
@@ -37,13 +41,13 @@
 
private final LinkedHashMap> aMap;
 
-   private final LinkedHashMap bMap;
+   private final HashMap bMap;

Review comment:
   It might cause confusion so I think it would be better to not make it a 
`LinkedHashMap`.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] zhuzhurk commented on a change in pull request #12278: [FLINK-17019][runtime] Fulfill slot requests in request order

2020-06-16 Thread GitBox


zhuzhurk commented on a change in pull request #12278:
URL: https://github.com/apache/flink/pull/12278#discussion_r440767961



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DualKeyLinkedMap.java
##
@@ -22,12 +22,16 @@
 
 import java.util.AbstractCollection;
 import java.util.Collection;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.Set;
 
 /**
- * Map which stores values under two different indices.
+ * Map which stores values under two different indices. The mapping of the 
primary key to the
+ * value is backed by {@link LinkedHashMap} so that the iteration order over 
the values and
+ * the primary key set is the insertion order. Note that there is no contract 
of the iteration
+ * order over the secondary key set.

Review comment:
   Good suggestion.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] zhuzhurk commented on a change in pull request #12278: [FLINK-17019][runtime] Fulfill slot requests in request order

2020-06-12 Thread GitBox


zhuzhurk commented on a change in pull request #12278:
URL: https://github.com/apache/flink/pull/12278#discussion_r439243225



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolImpl.java
##
@@ -648,26 +648,8 @@ boolean offerSlot(
slotOffer.getResourceProfile(),
taskManagerGateway);
 
-   // check whether we have request waiting for this slot
-   PendingRequest pendingRequest = 
pendingRequests.removeKeyB(allocationID);

Review comment:
   Thanks for confirming it.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] zhuzhurk commented on a change in pull request #12278: [FLINK-17019][runtime] Fulfill slot requests in request order

2020-06-09 Thread GitBox


zhuzhurk commented on a change in pull request #12278:
URL: https://github.com/apache/flink/pull/12278#discussion_r436628282



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolImpl.java
##
@@ -112,6 +114,9 @@
/** The requests that are waiting for the resource manager to be 
connected. */
private final LinkedHashMap 
waitingForResourceManager;
 
+   /** Maps a request to its allocation. */
+   private final BiMap requestedAllocations;

Review comment:
   Yes we can improve `DualKeyLinkedMap` and drop the 
`requestedAllocations`. 
   The needed improvement would be:
   1. re-insert a record does not affect its order
   2. get keyA from keyB, and vice versa

##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolImpl.java
##
@@ -112,6 +114,9 @@
/** The requests that are waiting for the resource manager to be 
connected. */
private final LinkedHashMap 
waitingForResourceManager;
 
+   /** Maps a request to its allocation. */
+   private final BiMap requestedAllocations;

Review comment:
   done.

##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DualKeyLinkedMap.java
##
@@ -37,20 +44,20 @@
 
private final LinkedHashMap> aMap;
 
-   private final LinkedHashMap bMap;
+   private final HashMap bMap;
 
private transient Collection values;
 
public DualKeyLinkedMap(int initialCapacity) {

Review comment:
   @azagrebin what do you think of making this class package private?
   It looks like a common util class but no one other than `SlotPoolImpl` uses 
it.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] zhuzhurk commented on a change in pull request #12278: [FLINK-17019][runtime] Fulfill slot requests in request order

2020-06-08 Thread GitBox


zhuzhurk commented on a change in pull request #12278:
URL: https://github.com/apache/flink/pull/12278#discussion_r436540565



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolImpl.java
##
@@ -648,26 +648,8 @@ boolean offerSlot(
slotOffer.getResourceProfile(),
taskManagerGateway);
 
-   // check whether we have request waiting for this slot
-   PendingRequest pendingRequest = 
pendingRequests.removeKeyB(allocationID);

Review comment:
   I had another thought and prefer to not make it configurable to fulfill 
request in request order.
   The FIFO order works for any scheduling strategy and is even a improvement 
for lazy from source scheduling. It is not just for pipelined region scheduling.
   There once was a ticket/PR for the same purpose "[FLINK-13165] Complete slot 
requests in request order" although it did not fully make it at last. 
`LinkedHashMap` was introduced by it and eases this PR. 
`SlotPoolRequestCompletionTest` was also introduced by it and can be reused.
   
   The orphaned allocation remapping also ensures that the fail-fast route will 
not break with this change.
   
   What do you think?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] zhuzhurk commented on a change in pull request #12278: [FLINK-17019][runtime] Fulfill slot requests in request order

2020-06-08 Thread GitBox


zhuzhurk commented on a change in pull request #12278:
URL: https://github.com/apache/flink/pull/12278#discussion_r436540565



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolImpl.java
##
@@ -648,26 +648,8 @@ boolean offerSlot(
slotOffer.getResourceProfile(),
taskManagerGateway);
 
-   // check whether we have request waiting for this slot
-   PendingRequest pendingRequest = 
pendingRequests.removeKeyB(allocationID);

Review comment:
   I had another thought and prefer to not make it configurable to fulfill 
request in request order.
   The FIFO order works for any scheduling strategy and is even a improvement 
for lazy from source scheduling. There was a ticket for the same purpose 
"[FLINK-13165] Complete slot requests in request order" although it did not 
fully make it at last. `LinkedHashMap` was introduced by it and eases this PR. 
`SlotPoolRequestCompletionTest` was also introduced by it and can be reused.
   This PR is more alike a fix to FLINK-13165.
   
   The orphaned allocation remapping also ensures that the fail-fast route will 
not break with this change.
   
   What do you think?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] zhuzhurk commented on a change in pull request #12278: [FLINK-17019][runtime] Fulfill slot requests in request order

2020-06-08 Thread GitBox


zhuzhurk commented on a change in pull request #12278:
URL: https://github.com/apache/flink/pull/12278#discussion_r436540565



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolImpl.java
##
@@ -648,26 +648,8 @@ boolean offerSlot(
slotOffer.getResourceProfile(),
taskManagerGateway);
 
-   // check whether we have request waiting for this slot
-   PendingRequest pendingRequest = 
pendingRequests.removeKeyB(allocationID);

Review comment:
   I had another thought and prefers to not make it configurable to fulfill 
request in request order.
   The FIFO order works for any scheduling strategy and is even a improvement 
for lazy from source scheduling. There was a ticket for the same purpose 
"[FLINK-13165] Complete slot requests in request order" although it did not 
fully make it at last. `LinkedHashMap` was introduced by it and eases this PR. 
`SlotPoolRequestCompletionTest` was also introduced by it and can be reused.
   This PR is more alike a fix to FLINK-13165.
   
   The orphaned allocation remapping also ensures that the fail-fast route will 
not break with this change.
   
   What do you think?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] zhuzhurk commented on a change in pull request #12278: [FLINK-17019][runtime] Fulfill slot requests in request order

2020-06-08 Thread GitBox


zhuzhurk commented on a change in pull request #12278:
URL: https://github.com/apache/flink/pull/12278#discussion_r436533148



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolImpl.java
##
@@ -648,26 +648,8 @@ boolean offerSlot(
slotOffer.getResourceProfile(),
taskManagerGateway);
 
-   // check whether we have request waiting for this slot
-   PendingRequest pendingRequest = 
pendingRequests.removeKeyB(allocationID);

Review comment:
   Yes still need the remapping to allow fail-fast on 
`UnfulfillableSlotRequestException`.
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] zhuzhurk commented on a change in pull request #12278: [FLINK-17019][runtime] Fulfill slot requests in request order

2020-06-01 Thread GitBox


zhuzhurk commented on a change in pull request #12278:
URL: https://github.com/apache/flink/pull/12278#discussion_r433607269



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolImpl.java
##
@@ -648,26 +648,8 @@ boolean offerSlot(
slotOffer.getResourceProfile(),
taskManagerGateway);
 
-   // check whether we have request waiting for this slot
-   PendingRequest pendingRequest = 
pendingRequests.removeKeyB(allocationID);

Review comment:
   I think it's a good idea to make it configurable. 
   
   Besides the benefit to reduce risk for streaming and DataSet jobs, another 
benefit is that we can also drop the change to remap orphaned allocations. This 
is because the remapping is for fail-fast of pending requests in 
`failAllocation(...)` which makes difference only if it is a streaming job.
   
   Actually I'm thinking whether we need to keep the fail-fast mechanism in 
`failAllocation(...)` in the future. It requires the slot pool to differentiate 
streaming requests and batch requests. And in the future if a slotpool contains 
both batch slots(occupied temporarily) and streaming slots(occupied 
indefinitely), a failed allocation for streaming request does not need to fail 
immediately if it is still fulfillable, just like how we currently deal with 
batch requests.
   
   What do you think of dropping the commit to remap orphaned allocations?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] zhuzhurk commented on a change in pull request #12278: [FLINK-17019][runtime] Fulfill slot requests in request order

2020-06-01 Thread GitBox


zhuzhurk commented on a change in pull request #12278:
URL: https://github.com/apache/flink/pull/12278#discussion_r433114179



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolImpl.java
##
@@ -698,13 +680,13 @@ boolean offerSlot(
 
componentMainThreadExecutor.assertRunningInMainThread();
 
-   final PendingRequest pendingRequest = 
pendingRequests.removeKeyB(allocationID);
+   final PendingRequest pendingRequest = 
pendingRequests.getKeyB(allocationID);

Review comment:
   Sure we can have a separate PR to centralize pending requests removal.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] zhuzhurk commented on a change in pull request #12278: [FLINK-17019][runtime] Fulfill slot requests in request order

2020-06-01 Thread GitBox


zhuzhurk commented on a change in pull request #12278:
URL: https://github.com/apache/flink/pull/12278#discussion_r433113663



##
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolRequestCompletionTest.java
##
@@ -103,7 +123,7 @@ private void runSlotRequestCompletionTest(
// check that the slot requests get completed in 
sequential order
for (int i = 0; i < slotRequestIds.size(); i++) {
final CompletableFuture 
slotRequestFuture = slotRequests.get(i);
-   slotRequestFuture.get();
+   assertThat(slotRequestFuture.getNow(null), 
not(is(nullValue(;

Review comment:
   OK.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org