[GitHub] [flink] xintongsong commented on a change in pull request #9693: [FLINK-13984] Separate on-heap and off-heap managed memory pools
xintongsong commented on a change in pull request #9693: [FLINK-13984] Separate on-heap and off-heap managed memory pools URL: https://github.com/apache/flink/pull/9693#discussion_r334743444 ## File path: flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java ## @@ -234,16 +234,15 @@ /** * Whether TaskManager managed memory should be pre-allocated when the TaskManager is starting. */ + @Deprecated Review comment: This is overlooked? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] xintongsong commented on a change in pull request #9693: [FLINK-13984] Separate on-heap and off-heap managed memory pools
xintongsong commented on a change in pull request #9693: [FLINK-13984] Separate on-heap and off-heap managed memory pools URL: https://github.com/apache/flink/pull/9693#discussion_r334223624 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java ## @@ -568,144 +569,26 @@ public int computeNumberOfPages(double fraction) { throw new IllegalArgumentException("The fraction of memory to allocate must within (0, 1]."); } + //noinspection NumericCastThatLosesPrecision return (int) (totalNumPages * fraction / numberOfSlots); } Review comment: I think the reason we had this method was not only hold the number of slots, but also hold the total number of pages. In other words, MM knows how many memory in total the slot has, which will not change when we have MM per slot. I agree that computing operator level memory budget from the slot total memory and the operator fraction is another responsibility for MM. It makes sense that MM exposes the slot total memory and operator calls a util to compute the budget from the slot total memory and the fraction. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] xintongsong commented on a change in pull request #9693: [FLINK-13984] Separate on-heap and off-heap managed memory pools
xintongsong commented on a change in pull request #9693: [FLINK-13984] Separate on-heap and off-heap managed memory pools URL: https://github.com/apache/flink/pull/9693#discussion_r334223099 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java ## @@ -402,29 +386,25 @@ public void releaseAll(Object owner) { return; } - // BEGIN CRITICAL SECTION --- - synchronized (lock) { - if (isShutDown) { - throw new IllegalStateException("Memory manager has been shut down."); - } + Preconditions.checkState(!isShutDown, "Memory manager has been shut down."); - // get all segments - final Set segments = allocatedSegments.remove(owner); + // get all segments + Set segments = allocatedSegments.remove(owner); - // all segments may have been freed previously individually - if (segments == null || segments.isEmpty()) { - return; - } - - // free each segment - for (MemorySegment seg : segments) { - seg.free(); - } - numNonAllocatedPages += segments.size(); + // all segments may have been freed previously individually + if (segments == null || segments.isEmpty()) { + return; + } - segments.clear(); + // free each segment + EnumMap releasedMemory = new EnumMap<>(MemoryType.class); + for (MemorySegment segment : segments) { + segment.free(); Review comment: True, this comment is out of sync. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] xintongsong commented on a change in pull request #9693: [FLINK-13984] Separate on-heap and off-heap managed memory pools
xintongsong commented on a change in pull request #9693: [FLINK-13984] Separate on-heap and off-heap managed memory pools URL: https://github.com/apache/flink/pull/9693#discussion_r334223063 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/memory/KeyedBudget.java ## @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.memory; + +import org.apache.flink.types.Either; + +import javax.annotation.concurrent.GuardedBy; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Map.Entry; + +class KeyedBudget { Review comment: I cannot come up with a better descriptive name neither. `KeyedBudgetManager` sounds better than `KeyedBudget`, but still not as intuitional as that people can immediately tell what does this class do from its name. I guess sometimes it's not easy to always find super matching names for a classes, and in those cases having some docs explaining the general purpose and responsibility of the class should be good enough. Regarding `MemorySize` vs. bytes in long, I'm not sure how general this `KeyedBudget` could be used. But agree that using `MemorySize` is not a critical need, and if needed we can do it easily with a follow-up. FTM, let's keep it the current way. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] xintongsong commented on a change in pull request #9693: [FLINK-13984] Separate on-heap and off-heap managed memory pools
xintongsong commented on a change in pull request #9693: [FLINK-13984] Separate on-heap and off-heap managed memory pools URL: https://github.com/apache/flink/pull/9693#discussion_r333814167 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java ## @@ -300,31 +255,21 @@ public void allocatePages(Object owner, List target, int numPages allocatedSegments.put(owner, segmentsForOwner); } - if (isPreAllocated) { - for (int i = numPages; i > 0; i--) { - MemorySegment segment = memoryPool.requestSegmentFromPool(owner); - target.add(segment); - segmentsForOwner.add(segment); - } - } - else { - for (int i = numPages; i > 0; i--) { - MemorySegment segment = memoryPool.allocateNewSegment(owner); - target.add(segment); - segmentsForOwner.add(segment); - } - numNonAllocatedPages -= numPages; + for (int i = numPages; i > 0; i--) { + MemorySegment segment = allocateManagedSegment(memoryType, owner); + target.add(segment); + segmentsForOwner.add(segment); } + numNonAllocatedPages -= numPages; } // END CRITICAL SECTION --- } /** -* Tries to release the memory for the specified segment. If the segment has already been released or -* is null, the request is simply ignored. +* Tries to release the memory for the specified segment. * -* If the memory manager manages pre-allocated memory, the memory segment goes back to the memory pool. -* Otherwise, the segment is only freed and made eligible for reclamation by the GC. +* If the segment has already been released or is null, the request is simply ignored. Review comment: My bad, please ignore this comment. I think I found some kind of typo with an extra 'if' in the doc/comment when I reviewed this PR locally in my IDE. But I cannot find it now. I probably have left this comment at wrong place when moving from IDE to GitHub. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] xintongsong commented on a change in pull request #9693: [FLINK-13984] Separate on-heap and off-heap managed memory pools
xintongsong commented on a change in pull request #9693: [FLINK-13984] Separate on-heap and off-heap managed memory pools URL: https://github.com/apache/flink/pull/9693#discussion_r333812031 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java ## @@ -564,25 +529,6 @@ public int computeNumberOfPages(double fraction) { return (int) (totalNumPages * fraction / numberOfSlots); } - /** -* Computes the memory size of the fraction per slot. -* -* @param fraction The fraction of the memory of the task slot. -* @return The number of pages corresponding to the memory fraction. -*/ - public long computeMemorySize(double fraction) { Review comment: I think it makes sense to keep the two methods in memory manger, so that only memory manager but not operators needs to known the total managed memory size of the slot. Let's leave the discussion to when we make memory manager per slot. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] xintongsong commented on a change in pull request #9693: [FLINK-13984] Separate on-heap and off-heap managed memory pools
xintongsong commented on a change in pull request #9693: [FLINK-13984] Separate on-heap and off-heap managed memory pools URL: https://github.com/apache/flink/pull/9693#discussion_r327962042 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java ## @@ -216,6 +216,9 @@ public boolean verifyEmpty() { /** * Allocates a set of memory segments from this memory manager. * +* The returned segments can have any memory type. The total allocated memory for each type will not exceed its Review comment: returned -> allocated This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] xintongsong commented on a change in pull request #9693: [FLINK-13984] Separate on-heap and off-heap managed memory pools
xintongsong commented on a change in pull request #9693: [FLINK-13984] Separate on-heap and off-heap managed memory pools URL: https://github.com/apache/flink/pull/9693#discussion_r327922460 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/memory/KeyedBudget.java ## @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.memory; + +import org.apache.flink.types.Either; + +import javax.annotation.concurrent.GuardedBy; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Map.Entry; + +class KeyedBudget { + private final Map maxBudgetByKey; + + @GuardedBy("lock") + private final Map availableBudgetByKey; + + private final Object lock = new Object(); + + KeyedBudget(Map maxBudgetByKey) { + this.maxBudgetByKey = new HashMap<>(maxBudgetByKey); + this.availableBudgetByKey = new HashMap<>(maxBudgetByKey); + } + + /** +* Tries to acquire budget which equals to the number of pages times page size. +* +* See also {@link #acquirePagedBudgetForKeys(Iterable, long, long)} +*/ + Either, Long> acquirePagedBudget(int pageNum, long pageSize) { + return acquirePagedBudgetForKeys(maxBudgetByKey.keySet(), pageNum, pageSize); + } + + /** +* Tries to acquire budget for a given key. +* +* No budget is acquired if it was not possible to fully acquire the requested budget. +* +* @param key the key to acquire budget from +* @param size the size of budget to acquire from the given key +* @return the fully acquired budget for the key or max possible budget to acquir Review comment: typo: acquir -> aquire This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] xintongsong commented on a change in pull request #9693: [FLINK-13984] Separate on-heap and off-heap managed memory pools
xintongsong commented on a change in pull request #9693: [FLINK-13984] Separate on-heap and off-heap managed memory pools URL: https://github.com/apache/flink/pull/9693#discussion_r327912570 ## File path: flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java ## @@ -234,16 +234,15 @@ /** * Whether TaskManager managed memory should be pre-allocated when the TaskManager is starting. */ + @Deprecated Review comment: I would suggest to remove this option. To my understanding, 'deprecated' means something is still available, but is not recommended and might be removed later. In this case, pre-allocation is no longer available. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] xintongsong commented on a change in pull request #9693: [FLINK-13984] Separate on-heap and off-heap managed memory pools
xintongsong commented on a change in pull request #9693: [FLINK-13984] Separate on-heap and off-heap managed memory pools URL: https://github.com/apache/flink/pull/9693#discussion_r327915668 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java ## @@ -300,31 +255,21 @@ public void allocatePages(Object owner, List target, int numPages allocatedSegments.put(owner, segmentsForOwner); } - if (isPreAllocated) { - for (int i = numPages; i > 0; i--) { - MemorySegment segment = memoryPool.requestSegmentFromPool(owner); - target.add(segment); - segmentsForOwner.add(segment); - } - } - else { - for (int i = numPages; i > 0; i--) { - MemorySegment segment = memoryPool.allocateNewSegment(owner); - target.add(segment); - segmentsForOwner.add(segment); - } - numNonAllocatedPages -= numPages; + for (int i = numPages; i > 0; i--) { + MemorySegment segment = allocateManagedSegment(memoryType, owner); + target.add(segment); + segmentsForOwner.add(segment); } + numNonAllocatedPages -= numPages; } // END CRITICAL SECTION --- } /** -* Tries to release the memory for the specified segment. If the segment has already been released or -* is null, the request is simply ignored. +* Tries to release the memory for the specified segment. * -* If the memory manager manages pre-allocated memory, the memory segment goes back to the memory pool. -* Otherwise, the segment is only freed and made eligible for reclamation by the GC. +* If the segment has already been released or is null, the request is simply ignored. Review comment: remove 'If' This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] xintongsong commented on a change in pull request #9693: [FLINK-13984] Separate on-heap and off-heap managed memory pools
xintongsong commented on a change in pull request #9693: [FLINK-13984] Separate on-heap and off-heap managed memory pools URL: https://github.com/apache/flink/pull/9693#discussion_r327922550 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/memory/KeyedBudget.java ## @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.memory; + +import org.apache.flink.types.Either; + +import javax.annotation.concurrent.GuardedBy; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Map.Entry; + +class KeyedBudget { + private final Map maxBudgetByKey; + + @GuardedBy("lock") + private final Map availableBudgetByKey; + + private final Object lock = new Object(); + + KeyedBudget(Map maxBudgetByKey) { + this.maxBudgetByKey = new HashMap<>(maxBudgetByKey); + this.availableBudgetByKey = new HashMap<>(maxBudgetByKey); + } + + /** +* Tries to acquire budget which equals to the number of pages times page size. +* +* See also {@link #acquirePagedBudgetForKeys(Iterable, long, long)} +*/ + Either, Long> acquirePagedBudget(int pageNum, long pageSize) { + return acquirePagedBudgetForKeys(maxBudgetByKey.keySet(), pageNum, pageSize); + } + + /** +* Tries to acquire budget for a given key. +* +* No budget is acquired if it was not possible to fully acquire the requested budget. +* +* @param key the key to acquire budget from +* @param size the size of budget to acquire from the given key +* @return the fully acquired budget for the key or max possible budget to acquir +* if it was not possible to acquire the requested budget. +*/ + long acquireBudgetForKey(K key, long size) { + Either, Long> result = acquirePagedBudgetForKeys(Collections.singletonList(key), size, 1L); + return result.isLeft() ? result.left().get(key) : result.right(); + } + + /** +* Tries to acquire budget which equals to the number of pages times page size. +* +* The budget will be acquired only from the given keys. Only integer number of pages will be acquired from each key. +* If the next page does not fit into the available budget of some key, it will try to be acquired from another key. +* The acquisition is successful if the acquired number of pages for each key sums up to the requested number of pages. +* The function does not make any preference about which keys from the given keys to acquire from. +* +* @param keys the keys to acquire budget from +* @param pageNum the total number of pages to acquire from the given keys +* @param pageSize the size of budget to acquire per page +* @return the acquired number of pages for each key if the acquisition is successful (either left) or +* the total number of pages which were available for the given keys (either right). +*/ + Either, Long> acquirePagedBudgetForKeys(Iterable keys, long pageNum, long pageSize) { + synchronized (lock) { + long totalPossiblePages = 0L; + Map pagesToReserveByKey = new HashMap<>(); + for (K key : keys) { + long currentKeyBudget = availableBudgetByKey.getOrDefault(key, 0L); + long currentKeyPages = currentKeyBudget / pageSize; + if (totalPossiblePages + currentKeyPages >= pageNum) { + pagesToReserveByKey.put(key, pageNum - totalPossiblePages); + totalPossiblePages = pageNum; Review comment: We can break the loop here. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service,
[GitHub] [flink] xintongsong commented on a change in pull request #9693: [FLINK-13984] Separate on-heap and off-heap managed memory pools
xintongsong commented on a change in pull request #9693: [FLINK-13984] Separate on-heap and off-heap managed memory pools URL: https://github.com/apache/flink/pull/9693#discussion_r328001404 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java ## @@ -257,133 +216,96 @@ public boolean verifyEmpty() { * of memory pages any more. */ public List allocatePages(Object owner, int numPages) throws MemoryAllocationException { - final ArrayList segs = new ArrayList(numPages); - allocatePages(owner, segs, numPages); - return segs; + List segments = new ArrayList<>(numPages); + allocatePages(owner, segments, numPages); + return segments; } /** -* Allocates a set of memory segments from this memory manager. If the memory manager pre-allocated the -* segments, they will be taken from the pool of memory segments. Otherwise, they will be allocated -* as part of this call. +* Allocates a set of memory segments from this memory manager. +* +* The returned segments can have any memory type. The total allocated memory for each type will not exceed its +* size limit, announced in the constructor. * * @param owner The owner to associate with the memory segment, for the fallback release. * @param target The list into which to put the allocated memory pages. * @param numPages The number of pages to allocate. * @throws MemoryAllocationException Thrown, if this memory manager does not have the requested amount * of memory pages any more. */ - public void allocatePages(Object owner, List target, int numPages) - throws MemoryAllocationException { + public void allocatePages( Review comment: In addition to this, I think we also need 'allocatePages' that support specifying memory types. The current 'allocatePages', which do not specify which type (on-heap / off-heap) of memory should be allocated, may work well with DataSet jobs without fine grained resource requirement. But for blink sql jobs (bounded streaming) with fine grained resource profiles (flip-53), operators have specific and separate memory quota for on-heap / off-heap managed memory, and will be scheduled accordingly. Thus even they can use pages of either type, they need to allocate pages respecting their quota for each pool. Otherwise, other operators that can only use one of the type (e.g., flip-58 python udx that uses off-heap managed memory) might be affected. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] xintongsong commented on a change in pull request #9693: [FLINK-13984] Separate on-heap and off-heap managed memory pools
xintongsong commented on a change in pull request #9693: [FLINK-13984] Separate on-heap and off-heap managed memory pools URL: https://github.com/apache/flink/pull/9693#discussion_r327920212 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/memory/KeyedBudget.java ## @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.memory; + +import org.apache.flink.types.Either; + +import javax.annotation.concurrent.GuardedBy; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Map.Entry; + +class KeyedBudget { Review comment: The responsibility of this class is not very intuitional from the class name. I think we can use some javadoc here. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] xintongsong commented on a change in pull request #9693: [FLINK-13984] Separate on-heap and off-heap managed memory pools
xintongsong commented on a change in pull request #9693: [FLINK-13984] Separate on-heap and off-heap managed memory pools URL: https://github.com/apache/flink/pull/9693#discussion_r327980073 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/memory/MemoryManagerTest.java ## @@ -237,4 +237,26 @@ public void testAllocateMixedMemoryType() throws MemoryAllocationException { split.put(MemoryType.OFF_HEAP, offHeapPages); return split; } + + @Test + public void testMemoryReservation() throws MemoryAllocationException { + Object owner = new Object(); + + memoryManager.reserveMemory(owner, MemoryType.HEAP, PAGE_SIZE); + memoryManager.reserveMemory(owner, MemoryType.OFF_HEAP, memoryManager.getMemorySizeByType(MemoryType.OFF_HEAP)); Review comment: I would suggest to validate that after reserving all the off-heap memory, we cannot reserve even one more byte off-heap memory from the memory manager. This could prove that the first call of reserving takes effect. And I also suggest adding test cases to verify that reserving and segment allocation shares the same budget / quota. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] xintongsong commented on a change in pull request #9693: [FLINK-13984] Separate on-heap and off-heap managed memory pools
xintongsong commented on a change in pull request #9693: [FLINK-13984] Separate on-heap and off-heap managed memory pools URL: https://github.com/apache/flink/pull/9693#discussion_r327943136 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/memory/KeyedBudget.java ## @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.memory; + +import org.apache.flink.types.Either; + +import javax.annotation.concurrent.GuardedBy; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Map.Entry; + +class KeyedBudget { Review comment: And I would suggest use MemorySize for the budget sizes and page sizes, so we do not worry about the unit when calling to the acquiring / releasing methods. We can use long type internally representing bytes for convenience of calculations, and MemorySize for arguments and return values of the methods. This probably also involves replacing int/long type memory sizes with MemorySize for MemoryManager. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] xintongsong commented on a change in pull request #9693: [FLINK-13984] Separate on-heap and off-heap managed memory pools
xintongsong commented on a change in pull request #9693: [FLINK-13984] Separate on-heap and off-heap managed memory pools URL: https://github.com/apache/flink/pull/9693#discussion_r327909983 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java ## @@ -564,25 +529,6 @@ public int computeNumberOfPages(double fraction) { return (int) (totalNumPages * fraction / numberOfSlots); } - /** -* Computes the memory size of the fraction per slot. -* -* @param fraction The fraction of the memory of the task slot. -* @return The number of pages corresponding to the memory fraction. -*/ - public long computeMemorySize(double fraction) { Review comment: Can this be used for reserving memory? I think for operators that reserve memory from MemoryManager, we need such an interface to compute their quota from fraction (just like 'computeNumberOfPages' for operators that allocate segments). And we need to compute the quota separately for different memory types. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] xintongsong commented on a change in pull request #9693: [FLINK-13984] Separate on-heap and off-heap managed memory pools
xintongsong commented on a change in pull request #9693: [FLINK-13984] Separate on-heap and off-heap managed memory pools URL: https://github.com/apache/flink/pull/9693#discussion_r327959576 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java ## @@ -402,29 +386,25 @@ public void releaseAll(Object owner) { return; } - // BEGIN CRITICAL SECTION --- - synchronized (lock) { - if (isShutDown) { - throw new IllegalStateException("Memory manager has been shut down."); - } + Preconditions.checkState(!isShutDown, "Memory manager has been shut down."); - // get all segments - final Set segments = allocatedSegments.remove(owner); + // get all segments + Set segments = allocatedSegments.remove(owner); - // all segments may have been freed previously individually - if (segments == null || segments.isEmpty()) { - return; - } - - // free each segment - for (MemorySegment seg : segments) { - seg.free(); - } - numNonAllocatedPages += segments.size(); + // all segments may have been freed previously individually + if (segments == null || segments.isEmpty()) { + return; + } - segments.clear(); + // free each segment + EnumMap releasedMemory = new EnumMap<>(MemoryType.class); + for (MemorySegment segment : segments) { + segment.free(); Review comment: 'segment.free()' can be removed. It's covered by 'releaseSegment()'. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] xintongsong commented on a change in pull request #9693: [FLINK-13984] Separate on-heap and off-heap managed memory pools
xintongsong commented on a change in pull request #9693: [FLINK-13984] Separate on-heap and off-heap managed memory pools URL: https://github.com/apache/flink/pull/9693#discussion_r328001905 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java ## @@ -568,144 +569,26 @@ public int computeNumberOfPages(double fraction) { throw new IllegalArgumentException("The fraction of memory to allocate must within (0, 1]."); } + //noinspection NumericCastThatLosesPrecision return (int) (totalNumPages * fraction / numberOfSlots); } Review comment: I think we also need 'computeNumberOfPages' for specific memory type, for the same reason as 'allocatePages'. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] xintongsong commented on a change in pull request #9693: [FLINK-13984] Separate on-heap and off-heap managed memory pools
xintongsong commented on a change in pull request #9693: [FLINK-13984] Separate on-heap and off-heap managed memory pools URL: https://github.com/apache/flink/pull/9693#discussion_r327927379 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/memory/KeyedBudgetTest.java ## @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.memory; + +import org.apache.flink.types.Either; +import org.apache.flink.util.Preconditions; + +import org.junit.Test; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; +import java.util.stream.LongStream; + +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.fail; + +/** + * Test suite for {@link KeyedBudget}. + */ +@SuppressWarnings("MagicNumber") +public class KeyedBudgetTest { + private static final String[] TEST_KEYS = {"k1", "k2", "k3", "k4"}; + private static final long[] TEST_BUDGETS = {15, 17, 22, 11}; + + @Test + public void testSuccessfulAcquisitionForKey() { + KeyedBudget keyedBudget = createSimpleKeyedBudget(); + + long acquired = keyedBudget.acquireBudgetForKey("k1", 10L); + + assertThat(acquired, is(10L)); + checkOneKeyBudgetChange(keyedBudget, "k1", 5L); + } + + @Test + public void testFailedAcquisitionForKey() { + KeyedBudget keyedBudget = createSimpleKeyedBudget(); + + long acquired = keyedBudget.acquireBudgetForKey("k1", 20L); + + assertThat(acquired, is(15L)); + checkOneKeyBudgetChange(keyedBudget, "k1", 15L); Review comment: How about having another util method 'checkNoBudgetChange'? It can be implemented by calling 'checkOneKeyBudgetChange', and will make the test cases more intuitional. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] xintongsong commented on a change in pull request #9693: [FLINK-13984] Separate on-heap and off-heap managed memory pools
xintongsong commented on a change in pull request #9693: [FLINK-13984] Separate on-heap and off-heap managed memory pools URL: https://github.com/apache/flink/pull/9693#discussion_r327957575 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java ## @@ -394,74 +318,79 @@ public void release(Collection segments) { return; } - // BEGIN CRITICAL SECTION --- - synchronized (lock) { - if (isShutDown) { - throw new IllegalStateException("Memory manager has been shut down."); - } + Preconditions.checkState(!isShutDown, "Memory manager has been shut down."); - // since concurrent modifications to the collection - // can disturb the release, we need to try potentially multiple times - boolean successfullyReleased = false; - do { - final Iterator segmentsIterator = segments.iterator(); + EnumMap releasedMemory = new EnumMap<>(MemoryType.class); - Object lastOwner = null; - Set segsForOwner = null; + // since concurrent modifications to the collection + // can disturb the release, we need to try potentially multiple times + boolean successfullyReleased = false; + do { + Iterator segmentsIterator = segments.iterator(); - try { - // go over all segments - while (segmentsIterator.hasNext()) { - - final MemorySegment seg = segmentsIterator.next(); - if (seg == null || seg.isFreed()) { - continue; - } - - final Object owner = seg.getOwner(); - - try { - // get the list of segments by this owner only if it is a different owner than for - // the previous one (or it is the first segment) - if (lastOwner != owner) { - lastOwner = owner; - segsForOwner = this.allocatedSegments.get(owner); - } - - // remove the segment from the list - if (segsForOwner != null) { - segsForOwner.remove(seg); - if (segsForOwner.isEmpty()) { - this.allocatedSegments.remove(owner); - } - } - - if (isPreAllocated) { - memoryPool.returnSegmentToPool(seg); - } - else { - seg.free(); - numNonAllocatedPages++; - } - } - catch (Throwable t) { - throw new RuntimeException( - "Error removing book-keeping reference to allocated memory segment.", t); - } + //noinspection ProhibitedExceptionCaught + try { + MemorySegment segment = null; + while (segment == null && segmentsIterator.hasNext()) { + segment = segmentsIterator.next(); + if (segment.isFreed()) { + segment = null; } + } + while (segment != null) { +
[GitHub] [flink] xintongsong commented on a change in pull request #9693: [FLINK-13984] Separate on-heap and off-heap managed memory pools
xintongsong commented on a change in pull request #9693: [FLINK-13984] Separate on-heap and off-heap managed memory pools URL: https://github.com/apache/flink/pull/9693#discussion_r327940329 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/memory/KeyedBudgetTest.java ## @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.memory; + +import org.apache.flink.types.Either; +import org.apache.flink.util.Preconditions; + +import org.junit.Test; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; +import java.util.stream.LongStream; + +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.fail; + +/** + * Test suite for {@link KeyedBudget}. + */ +@SuppressWarnings("MagicNumber") +public class KeyedBudgetTest { + private static final String[] TEST_KEYS = {"k1", "k2", "k3", "k4"}; + private static final long[] TEST_BUDGETS = {15, 17, 22, 11}; + + @Test + public void testSuccessfulAcquisitionForKey() { + KeyedBudget keyedBudget = createSimpleKeyedBudget(); + + long acquired = keyedBudget.acquireBudgetForKey("k1", 10L); + + assertThat(acquired, is(10L)); + checkOneKeyBudgetChange(keyedBudget, "k1", 5L); + } + + @Test + public void testFailedAcquisitionForKey() { + KeyedBudget keyedBudget = createSimpleKeyedBudget(); + + long acquired = keyedBudget.acquireBudgetForKey("k1", 20L); + + assertThat(acquired, is(15L)); + checkOneKeyBudgetChange(keyedBudget, "k1", 15L); + } + + @Test + public void testSuccessfulReleaseForKey() { + KeyedBudget keyedBudget = createSimpleKeyedBudget(); + + keyedBudget.acquireBudgetForKey("k1", 10L); + keyedBudget.releaseBudgetForKey("k1", 5L); + + checkOneKeyBudgetChange(keyedBudget, "k1", 10L); + } + + @Test + public void testFailedReleaseForKey() { + KeyedBudget keyedBudget = createSimpleKeyedBudget(); + + keyedBudget.acquireBudgetForKey("k1", 10L); + try { + keyedBudget.releaseBudgetForKey("k1", 15L); + fail("IllegalStateException is expected to fail over-sized release"); + } catch (IllegalStateException e) { + // expected + } + + checkOneKeyBudgetChange(keyedBudget, "k1", 5L); + } + + @Test + public void testSuccessfulAcquisitionForKeys() { + KeyedBudget keyedBudget = createSimpleKeyedBudget(); + + Either, Long> acquired = + keyedBudget.acquirePagedBudgetForKeys(Arrays.asList("k2", "k3"), 4, 5); + + assertThat(acquired.isLeft(), is(true)); + assertThat(acquired.left().values().stream().mapToLong(b -> b).sum(), is(4L)); + + assertThat(keyedBudget.availableBudgetForKey("k1"), is(15L)); + assertThat(keyedBudget.availableBudgetForKeys(Arrays.asList("k2", "k3")), is(19L)); + assertThat(keyedBudget.totalAvailableBudget(), is(45L)); + } + + @Test + public void testSuccessfulReleaseForKeys() { + KeyedBudget keyedBudget = createSimpleKeyedBudget(); + + keyedBudget.acquirePagedBudgetForKeys(Arrays.asList("k2", "k3"), 4, 8); + keyedBudget.releaseBudgetForKeys(createdBudgetMap(new String[] {"k2", "k3"}, new long[] {7, 10})); + + assertThat(keyedBudget.availableBudgetForKeys(Arrays.asList("k2", "k3")), is(24L)); + assertThat(keyedBudget.availableBudgetForKeys(Arrays.asList("k1", "k4")), is(26L)); + assertThat(keyedBudget.totalAvailableBudget(), is(50L)); + } + + @Test + public void testSuccessfulReleaseForKeysWithMixedRequests() { + KeyedBudget keyedBudget = createSimpleKeyedBudget(); + + keyedBudget.acquirePagedBudgetForKeys(Arrays.asList("k2", "k3"), 4, 8); +
[GitHub] [flink] xintongsong commented on a change in pull request #9693: [FLINK-13984] Separate on-heap and off-heap managed memory pools
xintongsong commented on a change in pull request #9693: [FLINK-13984] Separate on-heap and off-heap managed memory pools URL: https://github.com/apache/flink/pull/9693#discussion_r327916130 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java ## @@ -288,10 +244,9 @@ public void allocatePages(Object owner, List target, int numPages // in the case of pre-allocated memory, the 'numNonAllocatedPages' is zero, in the // lazy case, the 'freeSegments.size()' is zero. Review comment: Comments need to be updated This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services