xintongsong commented on a change in pull request #9693: [FLINK-13984] Separate 
on-heap and off-heap managed memory pools
URL: https://github.com/apache/flink/pull/9693#discussion_r327922550
 
 

 ##########
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/memory/KeyedBudget.java
 ##########
 @@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.memory;
+
+import org.apache.flink.types.Either;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+
+class KeyedBudget<K> {
+       private final Map<K, Long> maxBudgetByKey;
+
+       @GuardedBy("lock")
+       private final Map<K, Long> availableBudgetByKey;
+
+       private final Object lock = new Object();
+
+       KeyedBudget(Map<K, Long> maxBudgetByKey) {
+               this.maxBudgetByKey = new HashMap<>(maxBudgetByKey);
+               this.availableBudgetByKey = new HashMap<>(maxBudgetByKey);
+       }
+
+       /**
+        * Tries to acquire budget which equals to the number of pages times 
page size.
+        *
+        * <p>See also {@link #acquirePagedBudgetForKeys(Iterable, long, long)}
+        */
+       Either<Map<K, Long>, Long> acquirePagedBudget(int pageNum, long 
pageSize) {
+               return acquirePagedBudgetForKeys(maxBudgetByKey.keySet(), 
pageNum, pageSize);
+       }
+
+       /**
+        * Tries to acquire budget for a given key.
+        *
+        * <p>No budget is acquired if it was not possible to fully acquire the 
requested budget.
+        *
+        * @param key the key to acquire budget from
+        * @param size the size of budget to acquire from the given key
+        * @return the fully acquired budget for the key or max possible budget 
to acquir
+        * if it was not possible to acquire the requested budget.
+        */
+       long acquireBudgetForKey(K key, long size) {
+               Either<Map<K, Long>, Long> result = 
acquirePagedBudgetForKeys(Collections.singletonList(key), size, 1L);
+               return result.isLeft() ? result.left().get(key) : 
result.right();
+       }
+
+       /**
+        * Tries to acquire budget which equals to the number of pages times 
page size.
+        *
+        * <p>The budget will be acquired only from the given keys. Only 
integer number of pages will be acquired from each key.
+        * If the next page does not fit into the available budget of some key, 
it will try to be acquired from another key.
+        * The acquisition is successful if the acquired number of pages for 
each key sums up to the requested number of pages.
+        * The function does not make any preference about which keys from the 
given keys to acquire from.
+        *
+        * @param keys the keys to acquire budget from
+        * @param pageNum the total number of pages to acquire from the given 
keys
+        * @param pageSize the size of budget to acquire per page
+        * @return the acquired number of pages for each key if the acquisition 
is successful (either left) or
+        * the total number of pages which were available for the given keys 
(either right).
+        */
+       Either<Map<K, Long>, Long> acquirePagedBudgetForKeys(Iterable<K> keys, 
long pageNum, long pageSize) {
+               synchronized (lock) {
+                       long totalPossiblePages = 0L;
+                       Map<K, Long> pagesToReserveByKey = new HashMap<>();
+                       for (K key : keys) {
+                               long currentKeyBudget = 
availableBudgetByKey.getOrDefault(key, 0L);
+                               long currentKeyPages = currentKeyBudget / 
pageSize;
+                               if (totalPossiblePages + currentKeyPages >= 
pageNum) {
+                                       pagesToReserveByKey.put(key, pageNum - 
totalPossiblePages);
+                                       totalPossiblePages = pageNum;
 
 Review comment:
   We can break the loop here.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to