zhipeng93 commented on a change in pull request #18:
URL: https://github.com/apache/flink-ml/pull/18#discussion_r732757633



##########
File path: 
flink-ml-lib/src/main/java/org/apache/flink/ml/common/broadcast/BroadcastContext.java
##########
@@ -18,106 +18,54 @@
 
 package org.apache.flink.ml.common.broadcast;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.java.tuple.Tuple2;
 
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.concurrent.ConcurrentHashMap;
 
 public class BroadcastContext {
     /**
-     * Store broadcast DataStreams in a Map. The key is (broadcastName, 
partitionId) and the value
-     * is (isBroaddcastVariableReady, cacheList).
+     * stores broadcast data streams in a map. The key is 
broadcastName-partitionId and the value is
+     * (isBroadcastVariableReady, cacheList).
      */
-    private static Map<Tuple2<String, Integer>, Tuple2<Boolean, List<?>>> 
broadcastVariables =
-            new HashMap<>();
-    /**
-     * We use lock because we want to enable `getBroadcastVariable(String)` in 
a TM with multiple
-     * slots here. Note that using ConcurrentHashMap is not enough since we 
need "contains and get
-     * in an atomic operation".
-     */
-    private static ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+    private static final Map<String, Tuple2<Boolean, List<?>>> 
BROADCAST_VARIABLES =
+            new ConcurrentHashMap<>();
 
-    public static void putBroadcastVariable(
-            Tuple2<String, Integer> key, Tuple2<Boolean, List<?>> variable) {
-        lock.writeLock().lock();
-        try {
-            broadcastVariables.put(key, variable);
-        } finally {
-            lock.writeLock().unlock();
-        }
+    @VisibleForTesting
+    public static void putBroadcastVariable(String key, Tuple2<Boolean, 
List<?>> variable) {
+        BROADCAST_VARIABLES.put(key, variable);
     }
 
     /**
-     * get the cached list with the given key.
-     *
-     * @param key
-     * @param <T>
-     * @return the cache list.
-     */
-    public static <T> List<T> getBroadcastVariable(Tuple2<String, Integer> 
key) {
-        lock.readLock().lock();
-        List<?> result = null;
-        try {
-            result = broadcastVariables.get(key).f1;
-        } finally {
-            lock.readLock().unlock();
-        }
-        return (List<T>) result;
-    }
-
-    /**
-     * get broadcast variables by name
+     * gets broadcast variables by name if this broadcast variable is fully 
cached.
      *
      * @param name
      * @param <T>
-     * @return
+     * @return the cache broadcast variable. Return null if it is not fully 
cached.
      */
+    @VisibleForTesting
     public static <T> List<T> getBroadcastVariable(String name) {
-        lock.readLock().lock();
-        List<?> result = null;
-        try {
-            for (Tuple2<String, Integer> nameAndPartitionId : 
broadcastVariables.keySet()) {
-                if (name.equals(nameAndPartitionId.f0) && 
isCacheFinished(nameAndPartitionId)) {
-                    result = broadcastVariables.get(nameAndPartitionId).f1;
-                    break;
-                }
-            }
-        } finally {
-            lock.readLock().unlock();
+        Tuple2<Boolean, List<?>> cacheReadyAndList = 
BROADCAST_VARIABLES.get(name);
+        if (cacheReadyAndList.f0) {
+            return (List<T>) cacheReadyAndList.f1;
         }
-        return (List<T>) result;
+        return null;
     }
 
-    public static void remove(Tuple2<String, Integer> key) {
-        lock.writeLock().lock();
-        try {
-            broadcastVariables.remove(key);
-        } finally {
-            lock.writeLock().unlock();
-        }
+    @VisibleForTesting
+    public static void remove(String key) {
+        BROADCAST_VARIABLES.remove(key);
     }
 
-    public static void markCacheFinished(Tuple2<String, Integer> key) {
-        lock.writeLock().lock();
-        try {
-            broadcastVariables.get(key).f0 = true;
-        } finally {
-            lock.writeLock().unlock();
-        }
+    @VisibleForTesting
+    public static void markCacheFinished(String key) {

Review comment:
       Thanks @gaoyunhaii ! I have updated the code also put operator's 
mailBoxExecutor in BroadcastContext. Please refer to `BroadcastContext#line50 & 
line71`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to