zhipeng93 commented on a change in pull request #18: URL: https://github.com/apache/flink-ml/pull/18#discussion_r732757633
########## File path: flink-ml-lib/src/main/java/org/apache/flink/ml/common/broadcast/BroadcastContext.java ########## @@ -18,106 +18,54 @@ package org.apache.flink.ml.common.broadcast; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.java.tuple.Tuple2; -import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.concurrent.ConcurrentHashMap; public class BroadcastContext { /** - * Store broadcast DataStreams in a Map. The key is (broadcastName, partitionId) and the value - * is (isBroaddcastVariableReady, cacheList). + * stores broadcast data streams in a map. The key is broadcastName-partitionId and the value is + * (isBroadcastVariableReady, cacheList). */ - private static Map<Tuple2<String, Integer>, Tuple2<Boolean, List<?>>> broadcastVariables = - new HashMap<>(); - /** - * We use lock because we want to enable `getBroadcastVariable(String)` in a TM with multiple - * slots here. Note that using ConcurrentHashMap is not enough since we need "contains and get - * in an atomic operation". - */ - private static ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); + private static final Map<String, Tuple2<Boolean, List<?>>> BROADCAST_VARIABLES = + new ConcurrentHashMap<>(); - public static void putBroadcastVariable( - Tuple2<String, Integer> key, Tuple2<Boolean, List<?>> variable) { - lock.writeLock().lock(); - try { - broadcastVariables.put(key, variable); - } finally { - lock.writeLock().unlock(); - } + @VisibleForTesting + public static void putBroadcastVariable(String key, Tuple2<Boolean, List<?>> variable) { + BROADCAST_VARIABLES.put(key, variable); } /** - * get the cached list with the given key. - * - * @param key - * @param <T> - * @return the cache list. - */ - public static <T> List<T> getBroadcastVariable(Tuple2<String, Integer> key) { - lock.readLock().lock(); - List<?> result = null; - try { - result = broadcastVariables.get(key).f1; - } finally { - lock.readLock().unlock(); - } - return (List<T>) result; - } - - /** - * get broadcast variables by name + * gets broadcast variables by name if this broadcast variable is fully cached. * * @param name * @param <T> - * @return + * @return the cache broadcast variable. Return null if it is not fully cached. */ + @VisibleForTesting public static <T> List<T> getBroadcastVariable(String name) { - lock.readLock().lock(); - List<?> result = null; - try { - for (Tuple2<String, Integer> nameAndPartitionId : broadcastVariables.keySet()) { - if (name.equals(nameAndPartitionId.f0) && isCacheFinished(nameAndPartitionId)) { - result = broadcastVariables.get(nameAndPartitionId).f1; - break; - } - } - } finally { - lock.readLock().unlock(); + Tuple2<Boolean, List<?>> cacheReadyAndList = BROADCAST_VARIABLES.get(name); + if (cacheReadyAndList.f0) { + return (List<T>) cacheReadyAndList.f1; } - return (List<T>) result; + return null; } - public static void remove(Tuple2<String, Integer> key) { - lock.writeLock().lock(); - try { - broadcastVariables.remove(key); - } finally { - lock.writeLock().unlock(); - } + @VisibleForTesting + public static void remove(String key) { + BROADCAST_VARIABLES.remove(key); } - public static void markCacheFinished(Tuple2<String, Integer> key) { - lock.writeLock().lock(); - try { - broadcastVariables.get(key).f0 = true; - } finally { - lock.writeLock().unlock(); - } + @VisibleForTesting + public static void markCacheFinished(String key) { Review comment: Thanks @gaoyunhaii ! I have updated the code also put operator's mailBoxExecutor in BroadcastContext. Please refer to `BroadcastContext#line50 & line71`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org