Repository: incubator-reef Updated Branches: refs/heads/master b89ca82f2 -> 9cab34166
[REEF-657] Race condition in StreamingCodecFunctionCache in REEF.Network This makes `AddCodecs()` threadsafe by using lock. JIRA: [REEF-657](https://issues.apache.org/jira/browse/REEF-657) Pull Request: This closes #429 Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/9cab3416 Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/9cab3416 Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/9cab3416 Branch: refs/heads/master Commit: 9cab34166e91f57ce71b3170a17003bb93a0779f Parents: b89ca82 Author: Dhruv <[email protected]> Authored: Thu Aug 27 13:58:38 2015 -0700 Committer: Markus Weimer <[email protected]> Committed: Fri Aug 28 10:54:12 2015 -0700 ---------------------------------------------------------------------- .../Codec/StreamingCodecFunctionCache.cs | 77 +++++++++++--------- 1 file changed, 42 insertions(+), 35 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/9cab3416/lang/cs/Org.Apache.REEF.Network/NetworkService/Codec/StreamingCodecFunctionCache.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Network/NetworkService/Codec/StreamingCodecFunctionCache.cs b/lang/cs/Org.Apache.REEF.Network/NetworkService/Codec/StreamingCodecFunctionCache.cs index 6d91298..5248407 100644 --- a/lang/cs/Org.Apache.REEF.Network/NetworkService/Codec/StreamingCodecFunctionCache.cs +++ b/lang/cs/Org.Apache.REEF.Network/NetworkService/Codec/StreamingCodecFunctionCache.cs @@ -18,7 +18,7 @@ */ using System; -using System.Collections.Generic; +using System.Collections.Concurrent; using System.Reflection; using System.Threading; using System.Threading.Tasks; @@ -39,12 +39,13 @@ namespace Org.Apache.REEF.Network.NetworkService.Codec internal class StreamingCodecFunctionCache<T> { private static readonly Logger Logger = Logger.GetLogger(typeof (StreamingCodecFunctionCache<T>)); - private readonly Dictionary<Type, Func<IDataReader, T>> _readFuncCache; - private readonly Dictionary<Type, Func<IDataReader, CancellationToken, T>> _readAsyncFuncCache; - private readonly Dictionary<Type, Action<T, IDataWriter>> _writeFuncCache; - private readonly Dictionary<Type, Func<T, IDataWriter, CancellationToken, Task>> _writeAsyncFuncCache; + private readonly ConcurrentDictionary<Type, Func<IDataReader, T>> _readFuncCache; + private readonly ConcurrentDictionary<Type, Func<IDataReader, CancellationToken, T>> _readAsyncFuncCache; + private readonly ConcurrentDictionary<Type, Action<T, IDataWriter>> _writeFuncCache; + private readonly ConcurrentDictionary<Type, Func<T, IDataWriter, CancellationToken, Task>> _writeAsyncFuncCache; private readonly IInjector _injector; private readonly Type _streamingCodecType; + private readonly object _lock; /// <summary> /// Create new StreamingCodecFunctionCache. @@ -53,11 +54,12 @@ namespace Org.Apache.REEF.Network.NetworkService.Codec internal StreamingCodecFunctionCache(IInjector injector) { _injector = injector; - _readFuncCache = new Dictionary<Type, Func<IDataReader, T>>(); - _readAsyncFuncCache = new Dictionary<Type, Func<IDataReader, CancellationToken, T>>(); - _writeFuncCache = new Dictionary<Type, Action<T, IDataWriter>>(); - _writeAsyncFuncCache = new Dictionary<Type, Func<T, IDataWriter, CancellationToken, Task>>(); + _readFuncCache = new ConcurrentDictionary<Type, Func<IDataReader, T>>(); + _readAsyncFuncCache = new ConcurrentDictionary<Type, Func<IDataReader, CancellationToken, T>>(); + _writeFuncCache = new ConcurrentDictionary<Type, Action<T, IDataWriter>>(); + _writeAsyncFuncCache = new ConcurrentDictionary<Type, Func<T, IDataWriter, CancellationToken, Task>>(); _streamingCodecType = typeof(IStreamingCodec<>); + _lock = new object(); } /// <summary> @@ -140,32 +142,37 @@ namespace Org.Apache.REEF.Network.NetworkService.Codec Logger); } - Type codecType = _streamingCodecType.MakeGenericType(messageType); - var codec = _injector.GetInstance(codecType); - - MethodInfo readMethod = codec.GetType().GetMethod("Read"); - _readFuncCache[messageType] = (Func<IDataReader, T>) Delegate.CreateDelegate - (typeof (Func<IDataReader, T>), codec, readMethod); - - MethodInfo readAsyncMethod = codec.GetType().GetMethod("ReadAsync"); - MethodInfo genericHelper = GetType() - .GetMethod("ReadAsyncHelperFunc", BindingFlags.NonPublic | BindingFlags.Instance); - MethodInfo constructedHelper = genericHelper.MakeGenericMethod(messageType); - _readAsyncFuncCache[messageType] = - (Func<IDataReader, CancellationToken, T>)constructedHelper.Invoke(this, new[] { readAsyncMethod, codec }); - - MethodInfo writeMethod = codec.GetType().GetMethod("Write"); - genericHelper = GetType().GetMethod("WriteHelperFunc", BindingFlags.NonPublic | BindingFlags.Instance); - constructedHelper = genericHelper.MakeGenericMethod(messageType); - _writeFuncCache[messageType] = - (Action<T, IDataWriter>) constructedHelper.Invoke(this, new[] {writeMethod, codec}); - - MethodInfo writeAsyncMethod = codec.GetType().GetMethod("WriteAsync"); - genericHelper = GetType().GetMethod("WriteAsyncHelperFunc", BindingFlags.NonPublic | BindingFlags.Instance); - constructedHelper = genericHelper.MakeGenericMethod(messageType); - _writeAsyncFuncCache[messageType] = - (Func<T, IDataWriter, CancellationToken, Task>) - constructedHelper.Invoke(this, new[] {writeAsyncMethod, codec}); + lock (_lock) + { + Type codecType = _streamingCodecType.MakeGenericType(messageType); + var codec = _injector.GetInstance(codecType); + + MethodInfo readMethod = codec.GetType().GetMethod("Read"); + _readFuncCache[messageType] = (Func<IDataReader, T>) Delegate.CreateDelegate + (typeof (Func<IDataReader, T>), codec, readMethod); + + MethodInfo readAsyncMethod = codec.GetType().GetMethod("ReadAsync"); + MethodInfo genericHelper = GetType() + .GetMethod("ReadAsyncHelperFunc", BindingFlags.NonPublic | BindingFlags.Instance); + MethodInfo constructedHelper = genericHelper.MakeGenericMethod(messageType); + _readAsyncFuncCache[messageType] = + (Func<IDataReader, CancellationToken, T>) + constructedHelper.Invoke(this, new[] {readAsyncMethod, codec}); + + MethodInfo writeMethod = codec.GetType().GetMethod("Write"); + genericHelper = GetType().GetMethod("WriteHelperFunc", BindingFlags.NonPublic | BindingFlags.Instance); + constructedHelper = genericHelper.MakeGenericMethod(messageType); + _writeFuncCache[messageType] = + (Action<T, IDataWriter>) constructedHelper.Invoke(this, new[] {writeMethod, codec}); + + MethodInfo writeAsyncMethod = codec.GetType().GetMethod("WriteAsync"); + genericHelper = GetType() + .GetMethod("WriteAsyncHelperFunc", BindingFlags.NonPublic | BindingFlags.Instance); + constructedHelper = genericHelper.MakeGenericMethod(messageType); + _writeAsyncFuncCache[messageType] = + (Func<T, IDataWriter, CancellationToken, Task>) + constructedHelper.Invoke(this, new[] {writeAsyncMethod, codec}); + } } private Action<T, IDataWriter> WriteHelperFunc<T1>(MethodInfo method, object codec) where T1 : class
