Repository: incubator-reef
Updated Branches:
  refs/heads/master b89ca82f2 -> 9cab34166


[REEF-657] Race condition in StreamingCodecFunctionCache in REEF.Network

This makes `AddCodecs()` threadsafe by using lock.

JIRA:
  [REEF-657](https://issues.apache.org/jira/browse/REEF-657)

Pull Request:
  This closes #429


Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/9cab3416
Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/9cab3416
Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/9cab3416

Branch: refs/heads/master
Commit: 9cab34166e91f57ce71b3170a17003bb93a0779f
Parents: b89ca82
Author: Dhruv <[email protected]>
Authored: Thu Aug 27 13:58:38 2015 -0700
Committer: Markus Weimer <[email protected]>
Committed: Fri Aug 28 10:54:12 2015 -0700

----------------------------------------------------------------------
 .../Codec/StreamingCodecFunctionCache.cs        | 77 +++++++++++---------
 1 file changed, 42 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/9cab3416/lang/cs/Org.Apache.REEF.Network/NetworkService/Codec/StreamingCodecFunctionCache.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Network/NetworkService/Codec/StreamingCodecFunctionCache.cs
 
b/lang/cs/Org.Apache.REEF.Network/NetworkService/Codec/StreamingCodecFunctionCache.cs
index 6d91298..5248407 100644
--- 
a/lang/cs/Org.Apache.REEF.Network/NetworkService/Codec/StreamingCodecFunctionCache.cs
+++ 
b/lang/cs/Org.Apache.REEF.Network/NetworkService/Codec/StreamingCodecFunctionCache.cs
@@ -18,7 +18,7 @@
  */
 
 using System;
-using System.Collections.Generic;
+using System.Collections.Concurrent;
 using System.Reflection;
 using System.Threading;
 using System.Threading.Tasks;
@@ -39,12 +39,13 @@ namespace Org.Apache.REEF.Network.NetworkService.Codec
     internal class StreamingCodecFunctionCache<T>
     {
         private static readonly Logger Logger = Logger.GetLogger(typeof 
(StreamingCodecFunctionCache<T>));
-        private readonly Dictionary<Type, Func<IDataReader, T>> _readFuncCache;
-        private readonly Dictionary<Type, Func<IDataReader, CancellationToken, 
T>> _readAsyncFuncCache;
-        private readonly Dictionary<Type, Action<T, IDataWriter>> 
_writeFuncCache;
-        private readonly Dictionary<Type, Func<T, IDataWriter, 
CancellationToken, Task>> _writeAsyncFuncCache;
+        private readonly ConcurrentDictionary<Type, Func<IDataReader, T>> 
_readFuncCache;
+        private readonly ConcurrentDictionary<Type, Func<IDataReader, 
CancellationToken, T>> _readAsyncFuncCache;
+        private readonly ConcurrentDictionary<Type, Action<T, IDataWriter>> 
_writeFuncCache;
+        private readonly ConcurrentDictionary<Type, Func<T, IDataWriter, 
CancellationToken, Task>> _writeAsyncFuncCache;
         private readonly IInjector _injector;
         private readonly Type _streamingCodecType;
+        private readonly object _lock;
 
         /// <summary>
         /// Create new StreamingCodecFunctionCache.
@@ -53,11 +54,12 @@ namespace Org.Apache.REEF.Network.NetworkService.Codec
         internal StreamingCodecFunctionCache(IInjector injector)
         {
             _injector = injector;
-            _readFuncCache = new Dictionary<Type, Func<IDataReader, T>>();
-            _readAsyncFuncCache = new Dictionary<Type, Func<IDataReader, 
CancellationToken, T>>();
-            _writeFuncCache = new Dictionary<Type, Action<T, IDataWriter>>();
-            _writeAsyncFuncCache = new Dictionary<Type, Func<T, IDataWriter, 
CancellationToken, Task>>();
+            _readFuncCache = new ConcurrentDictionary<Type, Func<IDataReader, 
T>>();
+            _readAsyncFuncCache = new ConcurrentDictionary<Type, 
Func<IDataReader, CancellationToken, T>>();
+            _writeFuncCache = new ConcurrentDictionary<Type, Action<T, 
IDataWriter>>();
+            _writeAsyncFuncCache = new ConcurrentDictionary<Type, Func<T, 
IDataWriter, CancellationToken, Task>>();
             _streamingCodecType = typeof(IStreamingCodec<>);
+            _lock = new object();
         }
 
         /// <summary>
@@ -140,32 +142,37 @@ namespace Org.Apache.REEF.Network.NetworkService.Codec
                     Logger);
             }
 
-            Type codecType = _streamingCodecType.MakeGenericType(messageType);
-            var codec = _injector.GetInstance(codecType);
-
-            MethodInfo readMethod = codec.GetType().GetMethod("Read");
-            _readFuncCache[messageType] = (Func<IDataReader, T>) 
Delegate.CreateDelegate
-                (typeof (Func<IDataReader, T>), codec, readMethod);
-
-            MethodInfo readAsyncMethod = 
codec.GetType().GetMethod("ReadAsync");
-            MethodInfo genericHelper = GetType()
-                .GetMethod("ReadAsyncHelperFunc", BindingFlags.NonPublic | 
BindingFlags.Instance);
-            MethodInfo constructedHelper = 
genericHelper.MakeGenericMethod(messageType);
-            _readAsyncFuncCache[messageType] =
-                (Func<IDataReader, CancellationToken, 
T>)constructedHelper.Invoke(this, new[] { readAsyncMethod, codec });
-
-            MethodInfo writeMethod = codec.GetType().GetMethod("Write");
-            genericHelper = GetType().GetMethod("WriteHelperFunc", 
BindingFlags.NonPublic | BindingFlags.Instance);
-            constructedHelper = genericHelper.MakeGenericMethod(messageType);
-            _writeFuncCache[messageType] =
-                (Action<T, IDataWriter>) constructedHelper.Invoke(this, new[] 
{writeMethod, codec});
-            
-            MethodInfo writeAsyncMethod = 
codec.GetType().GetMethod("WriteAsync");
-            genericHelper = GetType().GetMethod("WriteAsyncHelperFunc", 
BindingFlags.NonPublic | BindingFlags.Instance);
-            constructedHelper = genericHelper.MakeGenericMethod(messageType);
-            _writeAsyncFuncCache[messageType] =
-                (Func<T, IDataWriter, CancellationToken, Task>)
-                    constructedHelper.Invoke(this, new[] {writeAsyncMethod, 
codec});
+            lock (_lock)
+            {
+                Type codecType = 
_streamingCodecType.MakeGenericType(messageType);
+                var codec = _injector.GetInstance(codecType);
+
+                MethodInfo readMethod = codec.GetType().GetMethod("Read");
+                _readFuncCache[messageType] = (Func<IDataReader, T>) 
Delegate.CreateDelegate
+                    (typeof (Func<IDataReader, T>), codec, readMethod);
+
+                MethodInfo readAsyncMethod = 
codec.GetType().GetMethod("ReadAsync");
+                MethodInfo genericHelper = GetType()
+                    .GetMethod("ReadAsyncHelperFunc", BindingFlags.NonPublic | 
BindingFlags.Instance);
+                MethodInfo constructedHelper = 
genericHelper.MakeGenericMethod(messageType);
+                _readAsyncFuncCache[messageType] =
+                    (Func<IDataReader, CancellationToken, T>)
+                        constructedHelper.Invoke(this, new[] {readAsyncMethod, 
codec});
+
+                MethodInfo writeMethod = codec.GetType().GetMethod("Write");
+                genericHelper = GetType().GetMethod("WriteHelperFunc", 
BindingFlags.NonPublic | BindingFlags.Instance);
+                constructedHelper = 
genericHelper.MakeGenericMethod(messageType);
+                _writeFuncCache[messageType] =
+                    (Action<T, IDataWriter>) constructedHelper.Invoke(this, 
new[] {writeMethod, codec});
+
+                MethodInfo writeAsyncMethod = 
codec.GetType().GetMethod("WriteAsync");
+                genericHelper = GetType()
+                    .GetMethod("WriteAsyncHelperFunc", BindingFlags.NonPublic 
| BindingFlags.Instance);
+                constructedHelper = 
genericHelper.MakeGenericMethod(messageType);
+                _writeAsyncFuncCache[messageType] =
+                    (Func<T, IDataWriter, CancellationToken, Task>)
+                        constructedHelper.Invoke(this, new[] 
{writeAsyncMethod, codec});
+            }
         }
 
         private Action<T, IDataWriter> WriteHelperFunc<T1>(MethodInfo method, 
object codec) where T1 : class

Reply via email to