[GitHub] [flink] xintongsong commented on a change in pull request #9693: [FLINK-13984] Separate on-heap and off-heap managed memory pools

2019-10-15 Thread GitBox
xintongsong commented on a change in pull request #9693: [FLINK-13984] Separate 
on-heap and off-heap managed memory pools
URL: https://github.com/apache/flink/pull/9693#discussion_r334743444
 
 

 ##
 File path: 
flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
 ##
 @@ -234,16 +234,15 @@
/**
 * Whether TaskManager managed memory should be pre-allocated when the 
TaskManager is starting.
 */
+   @Deprecated
 
 Review comment:
   This is overlooked?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xintongsong commented on a change in pull request #9693: [FLINK-13984] Separate on-heap and off-heap managed memory pools

2019-10-12 Thread GitBox
xintongsong commented on a change in pull request #9693: [FLINK-13984] Separate 
on-heap and off-heap managed memory pools
URL: https://github.com/apache/flink/pull/9693#discussion_r334223624
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java
 ##
 @@ -568,144 +569,26 @@ public int computeNumberOfPages(double fraction) {
throw new IllegalArgumentException("The fraction of 
memory to allocate must within (0, 1].");
}
 
+   //noinspection NumericCastThatLosesPrecision
return (int) (totalNumPages * fraction / numberOfSlots);
}
 
 
 Review comment:
   I think the reason we had this method was not only hold the number of slots, 
but also hold the total number of pages. In other words, MM knows how many 
memory in total the slot has, which will not change when we have MM per slot.
   
   I agree that computing operator level memory budget from the slot total 
memory and the operator fraction is another responsibility for MM. It makes 
sense that MM exposes the slot total memory and operator calls a util to 
compute the budget from the slot total memory and the fraction.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xintongsong commented on a change in pull request #9693: [FLINK-13984] Separate on-heap and off-heap managed memory pools

2019-10-12 Thread GitBox
xintongsong commented on a change in pull request #9693: [FLINK-13984] Separate 
on-heap and off-heap managed memory pools
URL: https://github.com/apache/flink/pull/9693#discussion_r334223099
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java
 ##
 @@ -402,29 +386,25 @@ public void releaseAll(Object owner) {
return;
}
 
-   //  BEGIN CRITICAL SECTION 
---
-   synchronized (lock) {
-   if (isShutDown) {
-   throw new IllegalStateException("Memory manager 
has been shut down.");
-   }
+   Preconditions.checkState(!isShutDown, "Memory manager has been 
shut down.");
 
-   // get all segments
-   final Set segments = 
allocatedSegments.remove(owner);
+   // get all segments
+   Set segments = allocatedSegments.remove(owner);
 
-   // all segments may have been freed previously 
individually
-   if (segments == null || segments.isEmpty()) {
-   return;
-   }
-
-   // free each segment
-   for (MemorySegment seg : segments) {
-   seg.free();
-   }
-   numNonAllocatedPages += segments.size();
+   // all segments may have been freed previously individually
+   if (segments == null || segments.isEmpty()) {
+   return;
+   }
 
-   segments.clear();
+   // free each segment
+   EnumMap releasedMemory = new 
EnumMap<>(MemoryType.class);
+   for (MemorySegment segment : segments) {
+   segment.free();
 
 Review comment:
   True, this comment is out of sync.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xintongsong commented on a change in pull request #9693: [FLINK-13984] Separate on-heap and off-heap managed memory pools

2019-10-12 Thread GitBox
xintongsong commented on a change in pull request #9693: [FLINK-13984] Separate 
on-heap and off-heap managed memory pools
URL: https://github.com/apache/flink/pull/9693#discussion_r334223063
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/memory/KeyedBudget.java
 ##
 @@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.memory;
+
+import org.apache.flink.types.Either;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+
+class KeyedBudget {
 
 Review comment:
   I cannot come up with a better descriptive name neither. 
`KeyedBudgetManager` sounds better than `KeyedBudget`, but still not as 
intuitional as that people can immediately tell what does this class do from 
its name. I guess sometimes it's not easy to always find super matching names 
for a classes, and in those cases having some docs explaining the general 
purpose and responsibility of the class should be good enough.
   
   Regarding `MemorySize` vs. bytes in long, I'm not sure how general this 
`KeyedBudget` could be used. But agree that using `MemorySize` is not a 
critical need, and if needed we can do it easily with a follow-up. FTM, let's 
keep it the current way.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xintongsong commented on a change in pull request #9693: [FLINK-13984] Separate on-heap and off-heap managed memory pools

2019-10-10 Thread GitBox
xintongsong commented on a change in pull request #9693: [FLINK-13984] Separate 
on-heap and off-heap managed memory pools
URL: https://github.com/apache/flink/pull/9693#discussion_r333814167
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java
 ##
 @@ -300,31 +255,21 @@ public void allocatePages(Object owner, 
List target, int numPages
allocatedSegments.put(owner, segmentsForOwner);
}
 
-   if (isPreAllocated) {
-   for (int i = numPages; i > 0; i--) {
-   MemorySegment segment = 
memoryPool.requestSegmentFromPool(owner);
-   target.add(segment);
-   segmentsForOwner.add(segment);
-   }
-   }
-   else {
-   for (int i = numPages; i > 0; i--) {
-   MemorySegment segment = 
memoryPool.allocateNewSegment(owner);
-   target.add(segment);
-   segmentsForOwner.add(segment);
-   }
-   numNonAllocatedPages -= numPages;
+   for (int i = numPages; i > 0; i--) {
+   MemorySegment segment = 
allocateManagedSegment(memoryType, owner);
+   target.add(segment);
+   segmentsForOwner.add(segment);
}
+   numNonAllocatedPages -= numPages;
}
//  END CRITICAL SECTION ---
}
 
/**
-* Tries to release the memory for the specified segment. If the 
segment has already been released or
-* is null, the request is simply ignored.
+* Tries to release the memory for the specified segment.
 *
-* If the memory manager manages pre-allocated memory, the memory 
segment goes back to the memory pool.
-* Otherwise, the segment is only freed and made eligible for 
reclamation by the GC.
+* If the segment has already been released or is null, the request 
is simply ignored.
 
 Review comment:
   My bad, please ignore this comment.
   
   I think I found some kind of typo with an extra 'if' in the doc/comment when 
I reviewed this PR locally in my IDE. But I cannot find it now. I probably have 
left this comment at wrong place when moving from IDE to GitHub.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xintongsong commented on a change in pull request #9693: [FLINK-13984] Separate on-heap and off-heap managed memory pools

2019-10-10 Thread GitBox
xintongsong commented on a change in pull request #9693: [FLINK-13984] Separate 
on-heap and off-heap managed memory pools
URL: https://github.com/apache/flink/pull/9693#discussion_r333812031
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java
 ##
 @@ -564,25 +529,6 @@ public int computeNumberOfPages(double fraction) {
return (int) (totalNumPages * fraction / numberOfSlots);
}
 
-   /**
-* Computes the memory size of the fraction per slot.
-*
-* @param fraction The fraction of the memory of the task slot.
-* @return The number of pages corresponding to the memory fraction.
-*/
-   public long computeMemorySize(double fraction) {
 
 Review comment:
   I think it makes sense to keep the two methods in memory manger, so that 
only memory manager but not operators needs to known the total managed memory 
size of the slot. Let's leave the discussion to when we make memory manager per 
slot.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xintongsong commented on a change in pull request #9693: [FLINK-13984] Separate on-heap and off-heap managed memory pools

2019-09-25 Thread GitBox
xintongsong commented on a change in pull request #9693: [FLINK-13984] Separate 
on-heap and off-heap managed memory pools
URL: https://github.com/apache/flink/pull/9693#discussion_r327962042
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java
 ##
 @@ -216,6 +216,9 @@ public boolean verifyEmpty() {
/**
 * Allocates a set of memory segments from this memory manager.
 *
+* The returned segments can have any memory type. The total 
allocated memory for each type will not exceed its
 
 Review comment:
   returned -> allocated


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xintongsong commented on a change in pull request #9693: [FLINK-13984] Separate on-heap and off-heap managed memory pools

2019-09-25 Thread GitBox
xintongsong commented on a change in pull request #9693: [FLINK-13984] Separate 
on-heap and off-heap managed memory pools
URL: https://github.com/apache/flink/pull/9693#discussion_r327922460
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/memory/KeyedBudget.java
 ##
 @@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.memory;
+
+import org.apache.flink.types.Either;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+
+class KeyedBudget {
+   private final Map maxBudgetByKey;
+
+   @GuardedBy("lock")
+   private final Map availableBudgetByKey;
+
+   private final Object lock = new Object();
+
+   KeyedBudget(Map maxBudgetByKey) {
+   this.maxBudgetByKey = new HashMap<>(maxBudgetByKey);
+   this.availableBudgetByKey = new HashMap<>(maxBudgetByKey);
+   }
+
+   /**
+* Tries to acquire budget which equals to the number of pages times 
page size.
+*
+* See also {@link #acquirePagedBudgetForKeys(Iterable, long, long)}
+*/
+   Either, Long> acquirePagedBudget(int pageNum, long 
pageSize) {
+   return acquirePagedBudgetForKeys(maxBudgetByKey.keySet(), 
pageNum, pageSize);
+   }
+
+   /**
+* Tries to acquire budget for a given key.
+*
+* No budget is acquired if it was not possible to fully acquire the 
requested budget.
+*
+* @param key the key to acquire budget from
+* @param size the size of budget to acquire from the given key
+* @return the fully acquired budget for the key or max possible budget 
to acquir
 
 Review comment:
   typo: acquir -> aquire


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xintongsong commented on a change in pull request #9693: [FLINK-13984] Separate on-heap and off-heap managed memory pools

2019-09-25 Thread GitBox
xintongsong commented on a change in pull request #9693: [FLINK-13984] Separate 
on-heap and off-heap managed memory pools
URL: https://github.com/apache/flink/pull/9693#discussion_r327912570
 
 

 ##
 File path: 
flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java
 ##
 @@ -234,16 +234,15 @@
/**
 * Whether TaskManager managed memory should be pre-allocated when the 
TaskManager is starting.
 */
+   @Deprecated
 
 Review comment:
   I would suggest to remove this option.
   To my understanding, 'deprecated' means something is still available, but is 
not recommended and might be removed later. In this case, pre-allocation is no 
longer available.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xintongsong commented on a change in pull request #9693: [FLINK-13984] Separate on-heap and off-heap managed memory pools

2019-09-25 Thread GitBox
xintongsong commented on a change in pull request #9693: [FLINK-13984] Separate 
on-heap and off-heap managed memory pools
URL: https://github.com/apache/flink/pull/9693#discussion_r327915668
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java
 ##
 @@ -300,31 +255,21 @@ public void allocatePages(Object owner, 
List target, int numPages
allocatedSegments.put(owner, segmentsForOwner);
}
 
-   if (isPreAllocated) {
-   for (int i = numPages; i > 0; i--) {
-   MemorySegment segment = 
memoryPool.requestSegmentFromPool(owner);
-   target.add(segment);
-   segmentsForOwner.add(segment);
-   }
-   }
-   else {
-   for (int i = numPages; i > 0; i--) {
-   MemorySegment segment = 
memoryPool.allocateNewSegment(owner);
-   target.add(segment);
-   segmentsForOwner.add(segment);
-   }
-   numNonAllocatedPages -= numPages;
+   for (int i = numPages; i > 0; i--) {
+   MemorySegment segment = 
allocateManagedSegment(memoryType, owner);
+   target.add(segment);
+   segmentsForOwner.add(segment);
}
+   numNonAllocatedPages -= numPages;
}
//  END CRITICAL SECTION ---
}
 
/**
-* Tries to release the memory for the specified segment. If the 
segment has already been released or
-* is null, the request is simply ignored.
+* Tries to release the memory for the specified segment.
 *
-* If the memory manager manages pre-allocated memory, the memory 
segment goes back to the memory pool.
-* Otherwise, the segment is only freed and made eligible for 
reclamation by the GC.
+* If the segment has already been released or is null, the request 
is simply ignored.
 
 Review comment:
   remove 'If'


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xintongsong commented on a change in pull request #9693: [FLINK-13984] Separate on-heap and off-heap managed memory pools

2019-09-25 Thread GitBox
xintongsong commented on a change in pull request #9693: [FLINK-13984] Separate 
on-heap and off-heap managed memory pools
URL: https://github.com/apache/flink/pull/9693#discussion_r327922550
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/memory/KeyedBudget.java
 ##
 @@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.memory;
+
+import org.apache.flink.types.Either;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+
+class KeyedBudget {
+   private final Map maxBudgetByKey;
+
+   @GuardedBy("lock")
+   private final Map availableBudgetByKey;
+
+   private final Object lock = new Object();
+
+   KeyedBudget(Map maxBudgetByKey) {
+   this.maxBudgetByKey = new HashMap<>(maxBudgetByKey);
+   this.availableBudgetByKey = new HashMap<>(maxBudgetByKey);
+   }
+
+   /**
+* Tries to acquire budget which equals to the number of pages times 
page size.
+*
+* See also {@link #acquirePagedBudgetForKeys(Iterable, long, long)}
+*/
+   Either, Long> acquirePagedBudget(int pageNum, long 
pageSize) {
+   return acquirePagedBudgetForKeys(maxBudgetByKey.keySet(), 
pageNum, pageSize);
+   }
+
+   /**
+* Tries to acquire budget for a given key.
+*
+* No budget is acquired if it was not possible to fully acquire the 
requested budget.
+*
+* @param key the key to acquire budget from
+* @param size the size of budget to acquire from the given key
+* @return the fully acquired budget for the key or max possible budget 
to acquir
+* if it was not possible to acquire the requested budget.
+*/
+   long acquireBudgetForKey(K key, long size) {
+   Either, Long> result = 
acquirePagedBudgetForKeys(Collections.singletonList(key), size, 1L);
+   return result.isLeft() ? result.left().get(key) : 
result.right();
+   }
+
+   /**
+* Tries to acquire budget which equals to the number of pages times 
page size.
+*
+* The budget will be acquired only from the given keys. Only 
integer number of pages will be acquired from each key.
+* If the next page does not fit into the available budget of some key, 
it will try to be acquired from another key.
+* The acquisition is successful if the acquired number of pages for 
each key sums up to the requested number of pages.
+* The function does not make any preference about which keys from the 
given keys to acquire from.
+*
+* @param keys the keys to acquire budget from
+* @param pageNum the total number of pages to acquire from the given 
keys
+* @param pageSize the size of budget to acquire per page
+* @return the acquired number of pages for each key if the acquisition 
is successful (either left) or
+* the total number of pages which were available for the given keys 
(either right).
+*/
+   Either, Long> acquirePagedBudgetForKeys(Iterable keys, 
long pageNum, long pageSize) {
+   synchronized (lock) {
+   long totalPossiblePages = 0L;
+   Map pagesToReserveByKey = new HashMap<>();
+   for (K key : keys) {
+   long currentKeyBudget = 
availableBudgetByKey.getOrDefault(key, 0L);
+   long currentKeyPages = currentKeyBudget / 
pageSize;
+   if (totalPossiblePages + currentKeyPages >= 
pageNum) {
+   pagesToReserveByKey.put(key, pageNum - 
totalPossiblePages);
+   totalPossiblePages = pageNum;
 
 Review comment:
   We can break the loop here.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, 

[GitHub] [flink] xintongsong commented on a change in pull request #9693: [FLINK-13984] Separate on-heap and off-heap managed memory pools

2019-09-25 Thread GitBox
xintongsong commented on a change in pull request #9693: [FLINK-13984] Separate 
on-heap and off-heap managed memory pools
URL: https://github.com/apache/flink/pull/9693#discussion_r328001404
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java
 ##
 @@ -257,133 +216,96 @@ public boolean verifyEmpty() {
 *   of memory pages any more.
 */
public List allocatePages(Object owner, int numPages) 
throws MemoryAllocationException {
-   final ArrayList segs = new 
ArrayList(numPages);
-   allocatePages(owner, segs, numPages);
-   return segs;
+   List segments = new ArrayList<>(numPages);
+   allocatePages(owner, segments, numPages);
+   return segments;
}
 
/**
-* Allocates a set of memory segments from this memory manager. If the 
memory manager pre-allocated the
-* segments, they will be taken from the pool of memory segments. 
Otherwise, they will be allocated
-* as part of this call.
+* Allocates a set of memory segments from this memory manager.
+*
+* The returned segments can have any memory type. The total 
allocated memory for each type will not exceed its
+* size limit, announced in the constructor.
 *
 * @param owner The owner to associate with the memory segment, for the 
fallback release.
 * @param target The list into which to put the allocated memory pages.
 * @param numPages The number of pages to allocate.
 * @throws MemoryAllocationException Thrown, if this memory manager 
does not have the requested amount
 *   of memory pages any more.
 */
-   public void allocatePages(Object owner, List target, int 
numPages)
-   throws MemoryAllocationException {
+   public void allocatePages(
 
 Review comment:
   In addition to this, I think we also need 'allocatePages' that support 
specifying memory types.
   
   The current 'allocatePages', which do not specify which type (on-heap / 
off-heap) of memory should be allocated, may work well with DataSet jobs 
without fine grained resource requirement. But for blink sql jobs (bounded 
streaming) with fine grained resource profiles (flip-53), operators have 
specific and separate memory quota for on-heap / off-heap managed memory,  and 
will be scheduled accordingly. Thus even they can use pages of either type, 
they need to allocate pages respecting their quota for each pool. Otherwise, 
other operators that can only use one of the type (e.g., flip-58 python udx 
that uses off-heap managed memory) might be affected.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xintongsong commented on a change in pull request #9693: [FLINK-13984] Separate on-heap and off-heap managed memory pools

2019-09-25 Thread GitBox
xintongsong commented on a change in pull request #9693: [FLINK-13984] Separate 
on-heap and off-heap managed memory pools
URL: https://github.com/apache/flink/pull/9693#discussion_r327920212
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/memory/KeyedBudget.java
 ##
 @@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.memory;
+
+import org.apache.flink.types.Either;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+
+class KeyedBudget {
 
 Review comment:
   The responsibility of this class is not very intuitional from the class 
name. I think we can use some javadoc here.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xintongsong commented on a change in pull request #9693: [FLINK-13984] Separate on-heap and off-heap managed memory pools

2019-09-25 Thread GitBox
xintongsong commented on a change in pull request #9693: [FLINK-13984] Separate 
on-heap and off-heap managed memory pools
URL: https://github.com/apache/flink/pull/9693#discussion_r327980073
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/memory/MemoryManagerTest.java
 ##
 @@ -237,4 +237,26 @@ public void testAllocateMixedMemoryType() throws 
MemoryAllocationException {
split.put(MemoryType.OFF_HEAP, offHeapPages);
return split;
}
+
+   @Test
+   public void testMemoryReservation() throws MemoryAllocationException {
+   Object owner = new Object();
+
+   memoryManager.reserveMemory(owner, MemoryType.HEAP, PAGE_SIZE);
+   memoryManager.reserveMemory(owner, MemoryType.OFF_HEAP, 
memoryManager.getMemorySizeByType(MemoryType.OFF_HEAP));
 
 Review comment:
   I would suggest to validate that after reserving all the off-heap memory, we 
cannot reserve even one more byte off-heap memory from the memory manager. This 
could prove that the first call of reserving takes effect.
   
   And I also suggest adding test cases to verify that reserving and segment 
allocation shares the same budget / quota.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xintongsong commented on a change in pull request #9693: [FLINK-13984] Separate on-heap and off-heap managed memory pools

2019-09-25 Thread GitBox
xintongsong commented on a change in pull request #9693: [FLINK-13984] Separate 
on-heap and off-heap managed memory pools
URL: https://github.com/apache/flink/pull/9693#discussion_r327943136
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/memory/KeyedBudget.java
 ##
 @@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.memory;
+
+import org.apache.flink.types.Either;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+
+class KeyedBudget {
 
 Review comment:
   And I would suggest use MemorySize for the budget sizes and page sizes, so 
we do not worry about the unit when calling to the acquiring / releasing 
methods. We can use long type internally representing bytes for convenience of 
calculations, and MemorySize for arguments and return values of the methods.
   
   This probably also involves replacing int/long type memory sizes with 
MemorySize for MemoryManager.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xintongsong commented on a change in pull request #9693: [FLINK-13984] Separate on-heap and off-heap managed memory pools

2019-09-25 Thread GitBox
xintongsong commented on a change in pull request #9693: [FLINK-13984] Separate 
on-heap and off-heap managed memory pools
URL: https://github.com/apache/flink/pull/9693#discussion_r327909983
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java
 ##
 @@ -564,25 +529,6 @@ public int computeNumberOfPages(double fraction) {
return (int) (totalNumPages * fraction / numberOfSlots);
}
 
-   /**
-* Computes the memory size of the fraction per slot.
-*
-* @param fraction The fraction of the memory of the task slot.
-* @return The number of pages corresponding to the memory fraction.
-*/
-   public long computeMemorySize(double fraction) {
 
 Review comment:
   Can this be used for reserving memory?
   I think for operators that reserve memory from MemoryManager, we need such 
an interface to compute their quota from fraction (just like 
'computeNumberOfPages' for operators that allocate segments). And we need to 
compute the quota separately for different memory types.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xintongsong commented on a change in pull request #9693: [FLINK-13984] Separate on-heap and off-heap managed memory pools

2019-09-25 Thread GitBox
xintongsong commented on a change in pull request #9693: [FLINK-13984] Separate 
on-heap and off-heap managed memory pools
URL: https://github.com/apache/flink/pull/9693#discussion_r327959576
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java
 ##
 @@ -402,29 +386,25 @@ public void releaseAll(Object owner) {
return;
}
 
-   //  BEGIN CRITICAL SECTION 
---
-   synchronized (lock) {
-   if (isShutDown) {
-   throw new IllegalStateException("Memory manager 
has been shut down.");
-   }
+   Preconditions.checkState(!isShutDown, "Memory manager has been 
shut down.");
 
-   // get all segments
-   final Set segments = 
allocatedSegments.remove(owner);
+   // get all segments
+   Set segments = allocatedSegments.remove(owner);
 
-   // all segments may have been freed previously 
individually
-   if (segments == null || segments.isEmpty()) {
-   return;
-   }
-
-   // free each segment
-   for (MemorySegment seg : segments) {
-   seg.free();
-   }
-   numNonAllocatedPages += segments.size();
+   // all segments may have been freed previously individually
+   if (segments == null || segments.isEmpty()) {
+   return;
+   }
 
-   segments.clear();
+   // free each segment
+   EnumMap releasedMemory = new 
EnumMap<>(MemoryType.class);
+   for (MemorySegment segment : segments) {
+   segment.free();
 
 Review comment:
   'segment.free()' can be removed. It's covered by 'releaseSegment()'.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xintongsong commented on a change in pull request #9693: [FLINK-13984] Separate on-heap and off-heap managed memory pools

2019-09-25 Thread GitBox
xintongsong commented on a change in pull request #9693: [FLINK-13984] Separate 
on-heap and off-heap managed memory pools
URL: https://github.com/apache/flink/pull/9693#discussion_r328001905
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java
 ##
 @@ -568,144 +569,26 @@ public int computeNumberOfPages(double fraction) {
throw new IllegalArgumentException("The fraction of 
memory to allocate must within (0, 1].");
}
 
+   //noinspection NumericCastThatLosesPrecision
return (int) (totalNumPages * fraction / numberOfSlots);
}
 
 
 Review comment:
   I think we also need 'computeNumberOfPages' for specific memory type, for 
the same reason as 'allocatePages'.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xintongsong commented on a change in pull request #9693: [FLINK-13984] Separate on-heap and off-heap managed memory pools

2019-09-25 Thread GitBox
xintongsong commented on a change in pull request #9693: [FLINK-13984] Separate 
on-heap and off-heap managed memory pools
URL: https://github.com/apache/flink/pull/9693#discussion_r327927379
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/memory/KeyedBudgetTest.java
 ##
 @@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.memory;
+
+import org.apache.flink.types.Either;
+import org.apache.flink.util.Preconditions;
+
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.LongStream;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
+
+/**
+ * Test suite for {@link KeyedBudget}.
+ */
+@SuppressWarnings("MagicNumber")
+public class KeyedBudgetTest {
+   private static final String[] TEST_KEYS = {"k1", "k2", "k3", "k4"};
+   private static final long[] TEST_BUDGETS = {15, 17, 22, 11};
+
+   @Test
+   public void testSuccessfulAcquisitionForKey() {
+   KeyedBudget keyedBudget = createSimpleKeyedBudget();
+
+   long acquired = keyedBudget.acquireBudgetForKey("k1", 10L);
+
+   assertThat(acquired, is(10L));
+   checkOneKeyBudgetChange(keyedBudget, "k1", 5L);
+   }
+
+   @Test
+   public void testFailedAcquisitionForKey() {
+   KeyedBudget keyedBudget = createSimpleKeyedBudget();
+
+   long acquired = keyedBudget.acquireBudgetForKey("k1", 20L);
+
+   assertThat(acquired, is(15L));
+   checkOneKeyBudgetChange(keyedBudget, "k1", 15L);
 
 Review comment:
   How about having another util method 'checkNoBudgetChange'? It can be 
implemented by calling 'checkOneKeyBudgetChange', and will make the test cases 
more intuitional.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xintongsong commented on a change in pull request #9693: [FLINK-13984] Separate on-heap and off-heap managed memory pools

2019-09-25 Thread GitBox
xintongsong commented on a change in pull request #9693: [FLINK-13984] Separate 
on-heap and off-heap managed memory pools
URL: https://github.com/apache/flink/pull/9693#discussion_r327957575
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java
 ##
 @@ -394,74 +318,79 @@ public void release(Collection segments) {
return;
}
 
-   //  BEGIN CRITICAL SECTION 
---
-   synchronized (lock) {
-   if (isShutDown) {
-   throw new IllegalStateException("Memory manager 
has been shut down.");
-   }
+   Preconditions.checkState(!isShutDown, "Memory manager has been 
shut down.");
 
-   // since concurrent modifications to the collection
-   // can disturb the release, we need to try potentially 
multiple times
-   boolean successfullyReleased = false;
-   do {
-   final Iterator segmentsIterator 
= segments.iterator();
+   EnumMap releasedMemory = new 
EnumMap<>(MemoryType.class);
 
-   Object lastOwner = null;
-   Set segsForOwner = null;
+   // since concurrent modifications to the collection
+   // can disturb the release, we need to try potentially multiple 
times
+   boolean successfullyReleased = false;
+   do {
+   Iterator segmentsIterator = 
segments.iterator();
 
-   try {
-   // go over all segments
-   while (segmentsIterator.hasNext()) {
-
-   final MemorySegment seg = 
segmentsIterator.next();
-   if (seg == null || 
seg.isFreed()) {
-   continue;
-   }
-
-   final Object owner = 
seg.getOwner();
-
-   try {
-   // get the list of 
segments by this owner only if it is a different owner than for
-   // the previous one (or 
it is the first segment)
-   if (lastOwner != owner) 
{
-   lastOwner = 
owner;
-   segsForOwner = 
this.allocatedSegments.get(owner);
-   }
-
-   // remove the segment 
from the list
-   if (segsForOwner != 
null) {
-   
segsForOwner.remove(seg);
-   if 
(segsForOwner.isEmpty()) {
-   
this.allocatedSegments.remove(owner);
-   }
-   }
-
-   if (isPreAllocated) {
-   
memoryPool.returnSegmentToPool(seg);
-   }
-   else {
-   seg.free();
-   
numNonAllocatedPages++;
-   }
-   }
-   catch (Throwable t) {
-   throw new 
RuntimeException(
-   "Error 
removing book-keeping reference to allocated memory segment.", t);
-   }
+   //noinspection ProhibitedExceptionCaught
+   try {
+   MemorySegment segment = null;
+   while (segment == null && 
segmentsIterator.hasNext()) {
+   segment = segmentsIterator.next();
+   if (segment.isFreed()) {
+   segment = null;
}
+   }
+   while (segment != null) {
+ 

[GitHub] [flink] xintongsong commented on a change in pull request #9693: [FLINK-13984] Separate on-heap and off-heap managed memory pools

2019-09-25 Thread GitBox
xintongsong commented on a change in pull request #9693: [FLINK-13984] Separate 
on-heap and off-heap managed memory pools
URL: https://github.com/apache/flink/pull/9693#discussion_r327940329
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/memory/KeyedBudgetTest.java
 ##
 @@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.memory;
+
+import org.apache.flink.types.Either;
+import org.apache.flink.util.Preconditions;
+
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.LongStream;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
+
+/**
+ * Test suite for {@link KeyedBudget}.
+ */
+@SuppressWarnings("MagicNumber")
+public class KeyedBudgetTest {
+   private static final String[] TEST_KEYS = {"k1", "k2", "k3", "k4"};
+   private static final long[] TEST_BUDGETS = {15, 17, 22, 11};
+
+   @Test
+   public void testSuccessfulAcquisitionForKey() {
+   KeyedBudget keyedBudget = createSimpleKeyedBudget();
+
+   long acquired = keyedBudget.acquireBudgetForKey("k1", 10L);
+
+   assertThat(acquired, is(10L));
+   checkOneKeyBudgetChange(keyedBudget, "k1", 5L);
+   }
+
+   @Test
+   public void testFailedAcquisitionForKey() {
+   KeyedBudget keyedBudget = createSimpleKeyedBudget();
+
+   long acquired = keyedBudget.acquireBudgetForKey("k1", 20L);
+
+   assertThat(acquired, is(15L));
+   checkOneKeyBudgetChange(keyedBudget, "k1", 15L);
+   }
+
+   @Test
+   public void testSuccessfulReleaseForKey() {
+   KeyedBudget keyedBudget = createSimpleKeyedBudget();
+
+   keyedBudget.acquireBudgetForKey("k1", 10L);
+   keyedBudget.releaseBudgetForKey("k1", 5L);
+
+   checkOneKeyBudgetChange(keyedBudget, "k1", 10L);
+   }
+
+   @Test
+   public void testFailedReleaseForKey() {
+   KeyedBudget keyedBudget = createSimpleKeyedBudget();
+
+   keyedBudget.acquireBudgetForKey("k1", 10L);
+   try {
+   keyedBudget.releaseBudgetForKey("k1", 15L);
+   fail("IllegalStateException is expected to fail 
over-sized release");
+   } catch (IllegalStateException e) {
+   // expected
+   }
+
+   checkOneKeyBudgetChange(keyedBudget, "k1", 5L);
+   }
+
+   @Test
+   public void testSuccessfulAcquisitionForKeys() {
+   KeyedBudget keyedBudget = createSimpleKeyedBudget();
+
+   Either, Long> acquired =
+   
keyedBudget.acquirePagedBudgetForKeys(Arrays.asList("k2", "k3"), 4, 5);
+
+   assertThat(acquired.isLeft(), is(true));
+   assertThat(acquired.left().values().stream().mapToLong(b -> 
b).sum(), is(4L));
+
+   assertThat(keyedBudget.availableBudgetForKey("k1"), is(15L));
+   
assertThat(keyedBudget.availableBudgetForKeys(Arrays.asList("k2", "k3")), 
is(19L));
+   assertThat(keyedBudget.totalAvailableBudget(), is(45L));
+   }
+
+   @Test
+   public void testSuccessfulReleaseForKeys() {
+   KeyedBudget keyedBudget = createSimpleKeyedBudget();
+
+   keyedBudget.acquirePagedBudgetForKeys(Arrays.asList("k2", 
"k3"), 4, 8);
+   keyedBudget.releaseBudgetForKeys(createdBudgetMap(new String[] 
{"k2", "k3"}, new long[] {7, 10}));
+
+   
assertThat(keyedBudget.availableBudgetForKeys(Arrays.asList("k2", "k3")), 
is(24L));
+   
assertThat(keyedBudget.availableBudgetForKeys(Arrays.asList("k1", "k4")), 
is(26L));
+   assertThat(keyedBudget.totalAvailableBudget(), is(50L));
+   }
+
+   @Test
+   public void testSuccessfulReleaseForKeysWithMixedRequests() {
+   KeyedBudget keyedBudget = createSimpleKeyedBudget();
+
+   keyedBudget.acquirePagedBudgetForKeys(Arrays.asList("k2", 
"k3"), 4, 8);
+   

[GitHub] [flink] xintongsong commented on a change in pull request #9693: [FLINK-13984] Separate on-heap and off-heap managed memory pools

2019-09-25 Thread GitBox
xintongsong commented on a change in pull request #9693: [FLINK-13984] Separate 
on-heap and off-heap managed memory pools
URL: https://github.com/apache/flink/pull/9693#discussion_r327916130
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java
 ##
 @@ -288,10 +244,9 @@ public void allocatePages(Object owner, 
List target, int numPages
 
// in the case of pre-allocated memory, the 
'numNonAllocatedPages' is zero, in the
// lazy case, the 'freeSegments.size()' is zero.
 
 Review comment:
   Comments need to be updated


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services