[GitHub] [flink] xintongsong commented on a change in pull request #14560: [FLINK-20837] Refactor dynamic SlotID

2021-01-07 Thread GitBox


xintongsong commented on a change in pull request #14560:
URL: https://github.com/apache/flink/pull/14560#discussion_r553743795



##
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTableImplTest.java
##
@@ -115,11 +115,11 @@ public void testRetrievingAllActiveSlots() throws 
Exception {
 }
 
 /**
- * Tests that redundant slot allocation with the same AllocationID to a 
different slot is
- * rejected.
+ * Tests that inconsistent static slot allocation with the same 
AllocationID to a different slot
+ * is rejected.
  */
 @Test
-public void testRedundantSlotAllocation() throws Exception {
+public void testInconsistentStaticSlotAllocation() throws Exception {

Review comment:
   This covers the case trying to allocate the same allocation id to 
different slot index.
   I think we need to also cover the case that different allocation ids trying 
to take the same slot index

##
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTableImplTest.java
##
@@ -137,6 +137,68 @@ public void testRedundantSlotAllocation() throws Exception 
{
 }
 }
 
+/**
+ * Tests that inconsistent dynamic slot allocation with the same 
AllocationID to a different
+ * slot is rejected.
+ */
+@Test
+public void testInconsistentDynamicSlotAllocation() throws Exception {
+try (final TaskSlotTable taskSlotTable = 
createTaskSlotTableAndStart(1)) {
+final JobID jobId1 = new JobID();
+final JobID jobId2 = new JobID();
+final AllocationID allocationId = new AllocationID();
+
+assertThat(
+taskSlotTable.allocateSlot(-1, jobId1, allocationId, 
SLOT_TIMEOUT), is(true));
+assertThat(
+taskSlotTable.allocateSlot(-1, jobId2, allocationId, 
SLOT_TIMEOUT), is(false));
+
+assertThat(taskSlotTable.isAllocated(-1, jobId1, allocationId), 
is(true));
+
+Iterator> allocatedSlots =
+taskSlotTable.getAllocatedSlots(jobId1);
+assertThat(allocatedSlots.next().getAllocationId(), 
is(allocationId));
+assertThat(allocatedSlots.hasNext(), is(false));
+}
+}
+
+@Test
+public void testDuplicateStaticSlotAllocation() throws Exception {
+try (final TaskSlotTable taskSlotTable = 
createTaskSlotTableAndStart(2)) {
+final JobID jobId = new JobID();
+final AllocationID allocationId = new AllocationID();
+
+assertThat(taskSlotTable.allocateSlot(0, jobId, allocationId, 
SLOT_TIMEOUT), is(true));
+assertThat(taskSlotTable.allocateSlot(0, jobId, allocationId, 
SLOT_TIMEOUT), is(true));
+
+assertThat(taskSlotTable.isAllocated(0, jobId, allocationId), 
is(true));
+assertThat(taskSlotTable.isSlotFree(1), is(true));
+
+Iterator> allocatedSlots =
+taskSlotTable.getAllocatedSlots(jobId);
+assertThat(allocatedSlots.next().getIndex(), is(0));
+assertThat(allocatedSlots.hasNext(), is(false));
+}
+}
+
+@Test
+public void testDuplicateDynamicSlotAllocation() throws Exception {

Review comment:
   I think we should also verify that the second call of `allocateSlot` 
does not replace the slot allocated by the first call.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] xintongsong commented on a change in pull request #14560: [FLINK-20837] Refactor dynamic SlotID

2021-01-07 Thread GitBox


xintongsong commented on a change in pull request #14560:
URL: https://github.com/apache/flink/pull/14560#discussion_r553190497



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTableImpl.java
##
@@ -349,6 +345,28 @@ public boolean allocateSlot(
 return true;
 }
 
+private boolean isDuplicatedSlot(
+TaskSlot taskSlot, JobID jobId, ResourceProfile resourceProfile, 
int index) {
+LOG.info(
+"Slot with allocationId {} already exist, with resource 
profile {}, job id {} and index {}. The required index is {}.",
+taskSlot.getAllocationId(),
+taskSlot.getResourceProfile(),
+taskSlot.getJobId(),
+taskSlot.getIndex(),
+index);
+return taskSlot.getJobId().equals(jobId)
+&& taskSlot.getResourceProfile().equals(resourceProfile)
+&& (isDynamicIndex(index) || taskSlot.getIndex() == index);
+}
+
+private boolean isIndexAlreadyTaken(int index) {
+return !isDynamicIndex(index) && taskSlots.get(index) != null;

Review comment:
   Is `!isDynamicIndex(index)` necessary.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] xintongsong commented on a change in pull request #14560: [FLINK-20837] Refactor dynamic SlotID

2021-01-06 Thread GitBox


xintongsong commented on a change in pull request #14560:
URL: https://github.com/apache/flink/pull/14560#discussion_r553155630



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTableImpl.java
##
@@ -288,22 +288,13 @@ public boolean allocateSlot(
 
 TaskSlot taskSlot = allocatedSlots.get(allocationId);
 if (taskSlot != null) {
-LOG.info("Allocation ID {} is already allocated in {}.", 
allocationId, taskSlot);
-return false;
-}
-
-if (taskSlots.containsKey(index)) {
-TaskSlot duplicatedTaskSlot = taskSlots.get(index);
+return isDuplicatedSlot(taskSlot, jobId, resourceProfile, index);
+} else if (!isIndexAlreadyTaken(index)) {

Review comment:
   `!` should be removed.

##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTableImpl.java
##
@@ -277,27 +280,31 @@ public boolean allocateSlot(
 
 @Override
 public boolean allocateSlot(
-int index,
+int requestedIndex,
 JobID jobId,
 AllocationID allocationId,
 ResourceProfile resourceProfile,
 Time slotTimeout) {
 checkRunning();
 
-Preconditions.checkArgument(index < numberSlots);
+Preconditions.checkArgument(requestedIndex < numberSlots);
+
+// The negative requestIndex indicate that the SlotManger allocate a 
dynamic slot, we
+// transfer the index to an increasing number not less than the 
numberSlots.
+int index = requestedIndex < 0 ? nextDynamicSlotIndex() : 
requestedIndex;
 
 TaskSlot taskSlot = allocatedSlots.get(allocationId);
 if (taskSlot != null) {
 return isDuplicatedSlot(taskSlot, jobId, resourceProfile, index);
-} else if (!isIndexAlreadyTaken(index)) {
+} else if (isIndexAlreadyTaken(index)) {
 LOG.info(
 "The static slot with index {} is already assigned to 
another allocation with id {}.",
 index,
 taskSlots.get(index).getAllocationId());
 return false;
 }
 
-resourceProfile = index >= 0 ? defaultSlotResourceProfile : 
resourceProfile;
+resourceProfile = index < numberSlots ? defaultSlotResourceProfile : 
resourceProfile;

Review comment:
   There are 3 occurrences of `index < numberSlots` and 1 occurrence of 
`index >= numberSlots` in this file.
   Let's deduplicate it with a util method.

##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTableImpl.java
##
@@ -288,22 +288,13 @@ public boolean allocateSlot(
 
 TaskSlot taskSlot = allocatedSlots.get(allocationId);
 if (taskSlot != null) {
-LOG.info("Allocation ID {} is already allocated in {}.", 
allocationId, taskSlot);
-return false;
-}
-
-if (taskSlots.containsKey(index)) {
-TaskSlot duplicatedTaskSlot = taskSlots.get(index);
+return isDuplicatedSlot(taskSlot, jobId, resourceProfile, index);
+} else if (!isIndexAlreadyTaken(index)) {
 LOG.info(
-"Slot with index {} already exist, with resource profile 
{}, job id {} and allocation id {}.",
+"The static slot with index {} is already assigned to 
another allocation with id {}.",

Review comment:
   Not sure about exposing the concept *static* slot here.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] xintongsong commented on a change in pull request #14560: [FLINK-20837] Refactor dynamic SlotID

2021-01-06 Thread GitBox


xintongsong commented on a change in pull request #14560:
URL: https://github.com/apache/flink/pull/14560#discussion_r552509039



##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTableImpl.java
##
@@ -463,7 +473,7 @@ public boolean isAllocated(int index, JobID jobId, 
AllocationID allocationId) {
 TaskSlot taskSlot = taskSlots.get(index);
 if (taskSlot != null) {
 return taskSlot.isAllocated(jobId, allocationId);
-} else if (index < 0) {
+} else if (index >= numberSlots) {

Review comment:
   If we also insert dynamic slot to the `taskSlot`, we won't need this 
`else-if` branch anymore.

##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTableImpl.java
##
@@ -95,6 +96,9 @@
 /** The table state. */
 private volatile State state;
 
+/** Current index for dynamic slot, should always not less than 
numberSlots */
+private AtomicInteger dynamicSlotIndex;

Review comment:
   I think `TaskSlotTableImpl` is not designed to be thread-safe, and 
should always be accessed from the rpc main thread. So we should not need 
`AtomicInteger` here.

##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTableImpl.java
##
@@ -321,6 +325,12 @@ public boolean allocateSlot(
 return false;
 }
 
+// The negative index indicate that the SlotManger allocate a dynamic 
slot, we transfer the
+// index to an increasing number not less than the numberSlots.
+if (index < 0) {
+index = nextDynamicSlotIndex();
+}

Review comment:
   It's quite implicit that the method argument is overwritten in the 
middle of the method body.
   
   I would suggest the following to convert `index` into a `effectiveIndex` at 
the beginning of this method. (Or maybe rename the argument to `requestedIndex` 
and convert it to `index`). Then use the effective index for the rest of the 
method.
   
   That also means all the `index < 0` checks should be replaced with `index >= 
numberSlots`. Maybe introduce a util method `isDynamicIndex`.

##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTableImpl.java
##
@@ -288,6 +288,11 @@ public boolean allocateSlot(
 
 TaskSlot taskSlot = allocatedSlots.get(allocationId);
 if (taskSlot != null) {
+if (index < 0 && taskSlot.isAllocated(jobId, allocationId)) {
+// If the slot is a dynamic slot with expected jobId and 
allocationId, it should be
+// treated as duplicate allocate request.
+return true;
+}

Review comment:
   These boolean expressions in the `if` and `return` statements have 
become quite hard to understand.
   Maybe we can wrap them into separate methods with meaningful names.
   Something like:
   ```
   if (isAllocationIdExist()) {
 return isDuplicateSlot();
   } else if (isSlotIndexTaken()) {
 return false;
   }
   ```

##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTableImpl.java
##
@@ -288,6 +288,11 @@ public boolean allocateSlot(
 
 TaskSlot taskSlot = allocatedSlots.get(allocationId);
 if (taskSlot != null) {
+if (index < 0 && taskSlot.isAllocated(jobId, allocationId)) {
+// If the slot is a dynamic slot with expected jobId and 
allocationId, it should be
+// treated as duplicate allocate request.
+return true;
+}

Review comment:
   I think this is a reported issue, FLINK-15660.

##
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTableImpl.java
##
@@ -329,7 +339,7 @@ public boolean allocateSlot(
 jobId,
 allocationId,
 memoryVerificationExecutor);
-if (index >= 0) {
+if (index < numberSlots) {

Review comment:
   Now since the dynamic slots also have unique indexes, we can also insert 
them into `taskSlots`.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org