Repository: ignite Updated Branches: refs/heads/master f621f7f76 -> 72ac53da2
http://git-wip-us.apache.org/repos/asf/ignite/blob/72ac53da/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedCallbacks.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedCallbacks.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedCallbacks.cs index cc205e8..09933be 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedCallbacks.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedCallbacks.cs @@ -22,7 +22,6 @@ namespace Apache.Ignite.Core.Impl.Unmanaged using System.Diagnostics; using System.Diagnostics.CodeAnalysis; using System.Globalization; - using System.IO; using System.Runtime.InteropServices; using System.Threading; using Apache.Ignite.Core.Cache.Affinity; @@ -77,7 +76,14 @@ namespace Apache.Ignite.Core.Impl.Unmanaged /** Keep references to created delegates. */ // ReSharper disable once CollectionNeverQueried.Local - private readonly List<Delegate> _delegates = new List<Delegate>(50); + private readonly List<Delegate> _delegates = new List<Delegate>(5); + + /** Handlers array. */ + private readonly InLongOutLongHandler[] _inLongOutLongHandlers = new InLongOutLongHandler[62]; + + /** Handlers array. */ + private readonly InLongLongLongObjectOutLongHandler[] _inLongLongLongObjectOutLongHandlers + = new InLongLongLongObjectOutLongHandler[62]; /** Initialized flag. */ private readonly ManualResetEventSlim _initEvent = new ManualResetEventSlim(false); @@ -107,90 +113,19 @@ namespace Apache.Ignite.Core.Impl.Unmanaged /** Operation: prepare .Net. */ private const int OpPrepareDotNet = 1; - private delegate long CacheStoreCreateCallbackDelegate(void* target, long memPtr); - private delegate int CacheStoreInvokeCallbackDelegate(void* target, long objPtr, long memPtr); - private delegate void CacheStoreDestroyCallbackDelegate(void* target, long objPtr); - private delegate long CacheStoreSessionCreateCallbackDelegate(void* target, long storePtr); - - private delegate long CacheEntryFilterCreateCallbackDelegate(void* target, long memPtr); - private delegate int CacheEntryFilterApplyCallbackDelegate(void* target, long objPtr, long memPtr); - private delegate void CacheEntryFilterDestroyCallbackDelegate(void* target, long objPtr); - - private delegate void CacheInvokeCallbackDelegate(void* target, long inMemPtr, long outMemPtr); - - private delegate void ComputeTaskMapCallbackDelegate(void* target, long taskPtr, long inMemPtr, long outMemPtr); - private delegate int ComputeTaskJobResultCallbackDelegate(void* target, long taskPtr, long jobPtr, long memPtr); - private delegate void ComputeTaskReduceCallbackDelegate(void* target, long taskPtr); - private delegate void ComputeTaskCompleteCallbackDelegate(void* target, long taskPtr, long memPtr); - private delegate int ComputeJobSerializeCallbackDelegate(void* target, long jobPtr, long memPtr); - private delegate long ComputeJobCreateCallbackDelegate(void* target, long memPtr); - private delegate void ComputeJobExecuteCallbackDelegate(void* target, long jobPtr, int cancel, long memPtr); - private delegate void ComputeJobCancelCallbackDelegate(void* target, long jobPtr); - private delegate void ComputeJobDestroyCallbackDelegate(void* target, long jobPtr); - - private delegate void ContinuousQueryListenerApplyCallbackDelegate(void* target, long lsnrPtr, long memPtr); - private delegate long ContinuousQueryFilterCreateCallbackDelegate(void* target, long memPtr); - private delegate int ContinuousQueryFilterApplyCallbackDelegate(void* target, long filterPtr, long memPtr); - private delegate void ContinuousQueryFilterReleaseCallbackDelegate(void* target, long filterPtr); - - private delegate void DataStreamerTopologyUpdateCallbackDelegate(void* target, long ldrPtr, long topVer, int topSize); - private delegate void DataStreamerStreamReceiverInvokeCallbackDelegate(void* target, long ptr, void* cache, long memPtr, byte keepPortable); - - private delegate void FutureByteResultCallbackDelegate(void* target, long futPtr, int res); - private delegate void FutureBoolResultCallbackDelegate(void* target, long futPtr, int res); - private delegate void FutureShortResultCallbackDelegate(void* target, long futPtr, int res); - private delegate void FutureCharResultCallbackDelegate(void* target, long futPtr, int res); - private delegate void FutureIntResultCallbackDelegate(void* target, long futPtr, int res); - private delegate void FutureFloatResultCallbackDelegate(void* target, long futPtr, float res); - private delegate void FutureLongResultCallbackDelegate(void* target, long futPtr, long res); - private delegate void FutureDoubleResultCallbackDelegate(void* target, long futPtr, double res); - private delegate void FutureObjectResultCallbackDelegate(void* target, long futPtr, long memPtr); - private delegate void FutureNullResultCallbackDelegate(void* target, long futPtr); - private delegate void FutureErrorCallbackDelegate(void* target, long futPtr, long memPtr); - - private delegate void LifecycleOnEventCallbackDelegate(void* target, long ptr, int evt); - - private delegate void MemoryReallocateCallbackDelegate(void* target, long memPtr, int cap); - - private delegate long MessagingFilterCreateCallbackDelegate(void* target, long memPtr); - private delegate int MessagingFilterApplyCallbackDelegate(void* target, long ptr, long memPtr); - private delegate void MessagingFilterDestroyCallbackDelegate(void* target, long ptr); - - private delegate long EventFilterCreateCallbackDelegate(void* target, long memPtr); - private delegate int EventFilterApplyCallbackDelegate(void* target, long ptr, long memPtr); - private delegate void EventFilterDestroyCallbackDelegate(void* target, long ptr); - - private delegate long ServiceInitCallbackDelegate(void* target, long memPtr); - private delegate void ServiceExecuteCallbackDelegate(void* target, long svcPtr, long memPtr); - private delegate void ServiceCancelCallbackDelegate(void* target, long svcPtr, long memPtr); - private delegate void ServiceInvokeMethodCallbackDelegate(void* target, long svcPtr, long inMemPtr, long outMemPtr); - - private delegate int ClusterNodeFilterApplyCallbackDelegate(void* target, long memPtr); - - private delegate void NodeInfoCallbackDelegate(void* target, long memPtr); - - private delegate void OnStartCallbackDelegate(void* target, void* proc, long memPtr); - private delegate void OnStopCallbackDelegate(void* target); - private delegate void ErrorCallbackDelegate(void* target, int errType, sbyte* errClsChars, int errClsCharsLen, sbyte* errMsgChars, int errMsgCharsLen, sbyte* stackTraceChars, int stackTraceCharsLen, void* errData, int errDataLen); - private delegate long ExtensionCallbackInLongOutLongDelegate(void* target, int typ, long arg1); - private delegate long ExtensionCallbackInLongLongOutLongDelegate(void* target, int typ, long arg1, long arg2); - - private delegate void OnClientDisconnectedDelegate(void* target); - private delegate void OnClientReconnectedDelegate(void* target, bool clusterRestarted); - private delegate void LoggerLogDelegate(void* target, int level, sbyte* messageChars, int messageCharsLen, sbyte* categoryChars, int categoryCharsLen, sbyte* errorInfoChars, int errorInfoCharsLen, long memPtr); private delegate bool LoggerIsLevelEnabledDelegate(void* target, int level); - private delegate long AffinityFunctionInitDelegate(void* target, long memPtr, void* baseFunc); - private delegate int AffinityFunctionPartitionDelegate(void* target, long ptr, long memPtr); - private delegate void AffinityFunctionAssignPartitionsDelegate(void* target, long ptr, long inMemPtr, long outMemPtr); - private delegate void AffinityFunctionRemoveNodeDelegate(void* target, long ptr, long memPtr); - private delegate void AffinityFunctionDestroyDelegate(void* target, long ptr); - private delegate void ConsoleWriteDelegate(sbyte* chars, int charsLen, bool isErr); + private delegate long InLongOutLongDelegate(void* target, int type, long val); + private delegate long InLongLongLongObjectOutLongDelegate(void* target, int type, long val1, long val2, long val3, void* arg); + + private delegate long InLongOutLongFunc(long val); + private delegate long InLongLongLongObjectOutLongFunc(long val1, long val2, long val3, void* arg); + /// <summary> /// Constructor. /// </summary> @@ -204,89 +139,13 @@ namespace Apache.Ignite.Core.Impl.Unmanaged { target = IntPtr.Zero.ToPointer(), // Target is not used in .Net as we rely on dynamic FP creation. - cacheStoreCreate = CreateFunctionPointer((CacheStoreCreateCallbackDelegate) CacheStoreCreate), - cacheStoreInvoke = CreateFunctionPointer((CacheStoreInvokeCallbackDelegate) CacheStoreInvoke), - cacheStoreDestroy = CreateFunctionPointer((CacheStoreDestroyCallbackDelegate) CacheStoreDestroy), - - cacheStoreSessionCreate = CreateFunctionPointer((CacheStoreSessionCreateCallbackDelegate) CacheStoreSessionCreate), - - cacheEntryFilterCreate = CreateFunctionPointer((CacheEntryFilterCreateCallbackDelegate)CacheEntryFilterCreate), - cacheEntryFilterApply = CreateFunctionPointer((CacheEntryFilterApplyCallbackDelegate)CacheEntryFilterApply), - cacheEntryFilterDestroy = CreateFunctionPointer((CacheEntryFilterDestroyCallbackDelegate)CacheEntryFilterDestroy), - - cacheInvoke = CreateFunctionPointer((CacheInvokeCallbackDelegate) CacheInvoke), - - computeTaskMap = CreateFunctionPointer((ComputeTaskMapCallbackDelegate) ComputeTaskMap), - computeTaskJobResult = - CreateFunctionPointer((ComputeTaskJobResultCallbackDelegate) ComputeTaskJobResult), - computeTaskReduce = CreateFunctionPointer((ComputeTaskReduceCallbackDelegate) ComputeTaskReduce), - computeTaskComplete = CreateFunctionPointer((ComputeTaskCompleteCallbackDelegate) ComputeTaskComplete), - computeJobSerialize = CreateFunctionPointer((ComputeJobSerializeCallbackDelegate) ComputeJobSerialize), - computeJobCreate = CreateFunctionPointer((ComputeJobCreateCallbackDelegate) ComputeJobCreate), - computeJobExecute = CreateFunctionPointer((ComputeJobExecuteCallbackDelegate) ComputeJobExecute), - computeJobCancel = CreateFunctionPointer((ComputeJobCancelCallbackDelegate) ComputeJobCancel), - computeJobDestroy = CreateFunctionPointer((ComputeJobDestroyCallbackDelegate) ComputeJobDestroy), - continuousQueryListenerApply = - CreateFunctionPointer((ContinuousQueryListenerApplyCallbackDelegate) ContinuousQueryListenerApply), - continuousQueryFilterCreate = - CreateFunctionPointer((ContinuousQueryFilterCreateCallbackDelegate) ContinuousQueryFilterCreate), - continuousQueryFilterApply = - CreateFunctionPointer((ContinuousQueryFilterApplyCallbackDelegate) ContinuousQueryFilterApply), - continuousQueryFilterRelease = - CreateFunctionPointer((ContinuousQueryFilterReleaseCallbackDelegate) ContinuousQueryFilterRelease), - dataStreamerTopologyUpdate = - CreateFunctionPointer((DataStreamerTopologyUpdateCallbackDelegate) DataStreamerTopologyUpdate), - dataStreamerStreamReceiverInvoke = - CreateFunctionPointer((DataStreamerStreamReceiverInvokeCallbackDelegate) DataStreamerStreamReceiverInvoke), - - futureByteResult = CreateFunctionPointer((FutureByteResultCallbackDelegate) FutureByteResult), - futureBoolResult = CreateFunctionPointer((FutureBoolResultCallbackDelegate) FutureBoolResult), - futureShortResult = CreateFunctionPointer((FutureShortResultCallbackDelegate) FutureShortResult), - futureCharResult = CreateFunctionPointer((FutureCharResultCallbackDelegate) FutureCharResult), - futureIntResult = CreateFunctionPointer((FutureIntResultCallbackDelegate) FutureIntResult), - futureFloatResult = CreateFunctionPointer((FutureFloatResultCallbackDelegate) FutureFloatResult), - futureLongResult = CreateFunctionPointer((FutureLongResultCallbackDelegate) FutureLongResult), - futureDoubleResult = CreateFunctionPointer((FutureDoubleResultCallbackDelegate) FutureDoubleResult), - futureObjectResult = CreateFunctionPointer((FutureObjectResultCallbackDelegate) FutureObjectResult), - futureNullResult = CreateFunctionPointer((FutureNullResultCallbackDelegate) FutureNullResult), - futureError = CreateFunctionPointer((FutureErrorCallbackDelegate) FutureError), - lifecycleOnEvent = CreateFunctionPointer((LifecycleOnEventCallbackDelegate) LifecycleOnEvent), - memoryReallocate = CreateFunctionPointer((MemoryReallocateCallbackDelegate) MemoryReallocate), - nodeInfo = CreateFunctionPointer((NodeInfoCallbackDelegate) NodeInfo), - - messagingFilterCreate = CreateFunctionPointer((MessagingFilterCreateCallbackDelegate)MessagingFilterCreate), - messagingFilterApply = CreateFunctionPointer((MessagingFilterApplyCallbackDelegate)MessagingFilterApply), - messagingFilterDestroy = CreateFunctionPointer((MessagingFilterDestroyCallbackDelegate)MessagingFilterDestroy), - - eventFilterCreate = CreateFunctionPointer((EventFilterCreateCallbackDelegate)EventFilterCreate), - eventFilterApply = CreateFunctionPointer((EventFilterApplyCallbackDelegate)EventFilterApply), - eventFilterDestroy = CreateFunctionPointer((EventFilterDestroyCallbackDelegate)EventFilterDestroy), - - serviceInit = CreateFunctionPointer((ServiceInitCallbackDelegate)ServiceInit), - serviceExecute = CreateFunctionPointer((ServiceExecuteCallbackDelegate)ServiceExecute), - serviceCancel = CreateFunctionPointer((ServiceCancelCallbackDelegate)ServiceCancel), - serviceInvokeMethod = CreateFunctionPointer((ServiceInvokeMethodCallbackDelegate)ServiceInvokeMethod), - - clusterNodeFilterApply = CreateFunctionPointer((ClusterNodeFilterApplyCallbackDelegate)ClusterNodeFilterApply), - - onStart = CreateFunctionPointer((OnStartCallbackDelegate)OnStart), - onStop = CreateFunctionPointer((OnStopCallbackDelegate)OnStop), error = CreateFunctionPointer((ErrorCallbackDelegate)Error), - extensionCbInLongOutLong = CreateFunctionPointer((ExtensionCallbackInLongOutLongDelegate)ExtensionCallbackInLongOutLong), - extensionCbInLongLongOutLong = CreateFunctionPointer((ExtensionCallbackInLongLongOutLongDelegate)ExtensionCallbackInLongLongOutLong), - - onClientDisconnected = CreateFunctionPointer((OnClientDisconnectedDelegate)OnClientDisconnected), - ocClientReconnected = CreateFunctionPointer((OnClientReconnectedDelegate)OnClientReconnected), - - affinityFunctionInit = CreateFunctionPointer((AffinityFunctionInitDelegate)AffinityFunctionInit), - affinityFunctionPartition = CreateFunctionPointer((AffinityFunctionPartitionDelegate)AffinityFunctionPartition), - affinityFunctionAssignPartitions = CreateFunctionPointer((AffinityFunctionAssignPartitionsDelegate)AffinityFunctionAssignPartitions), - affinityFunctionRemoveNode = CreateFunctionPointer((AffinityFunctionRemoveNodeDelegate)AffinityFunctionRemoveNode), - affinityFunctionDestroy = CreateFunctionPointer((AffinityFunctionDestroyDelegate)AffinityFunctionDestroy), - loggerLog = CreateFunctionPointer((LoggerLogDelegate)LoggerLog), - loggerIsLevelEnabled = CreateFunctionPointer((LoggerIsLevelEnabledDelegate)LoggerIsLevelEnabled) + loggerIsLevelEnabled = CreateFunctionPointer((LoggerIsLevelEnabledDelegate)LoggerIsLevelEnabled), + + inLongOutLong = CreateFunctionPointer((InLongOutLongDelegate)InLongOutLong), + inLongLongObjectOutLong = CreateFunctionPointer((InLongLongLongObjectOutLongDelegate)InLongLongLongObjectOutLong) }; _cbsPtr = Marshal.AllocHGlobal(UU.HandlersSize()); @@ -294,6 +153,8 @@ namespace Apache.Ignite.Core.Impl.Unmanaged Marshal.StructureToPtr(cbs, _cbsPtr, false); _thisHnd = GCHandle.Alloc(this); + + InitHandlers(); } /// <summary> @@ -304,106 +165,257 @@ namespace Apache.Ignite.Core.Impl.Unmanaged get { return _handleRegistry; } } - #region IMPLEMENTATION: CACHE + #region HANDLERS + + /// <summary> + /// Initializes the handlers. + /// </summary> + private void InitHandlers() + { + AddHandler(UnmanagedCallbackOp.CacheStoreCreate, CacheStoreCreate, true); + AddHandler(UnmanagedCallbackOp.CacheStoreInvoke, CacheStoreInvoke); + AddHandler(UnmanagedCallbackOp.CacheStoreDestroy, CacheStoreDestroy); + AddHandler(UnmanagedCallbackOp.CacheStoreSessionCreate, CacheStoreSessionCreate); + AddHandler(UnmanagedCallbackOp.CacheEntryFilterCreate, CacheEntryFilterCreate); + AddHandler(UnmanagedCallbackOp.CacheEntryFilterApply, CacheEntryFilterApply); + AddHandler(UnmanagedCallbackOp.CacheEntryFilterDestroy, CacheEntryFilterDestroy); + AddHandler(UnmanagedCallbackOp.CacheInvoke, CacheInvoke); + AddHandler(UnmanagedCallbackOp.ComputeTaskMap, ComputeTaskMap); + AddHandler(UnmanagedCallbackOp.ComputeTaskJobResult, ComputeTaskJobResult); + AddHandler(UnmanagedCallbackOp.ComputeTaskReduce, ComputeTaskReduce); + AddHandler(UnmanagedCallbackOp.ComputeTaskComplete, ComputeTaskComplete); + AddHandler(UnmanagedCallbackOp.ComputeJobSerialize, ComputeJobSerialize); + AddHandler(UnmanagedCallbackOp.ComputeJobCreate, ComputeJobCreate); + AddHandler(UnmanagedCallbackOp.ComputeJobExecute, ComputeJobExecute); + AddHandler(UnmanagedCallbackOp.ComputeJobCancel, ComputeJobCancel); + AddHandler(UnmanagedCallbackOp.ComputeJobDestroy, ComputeJobDestroy); + AddHandler(UnmanagedCallbackOp.ContinuousQueryListenerApply, ContinuousQueryListenerApply); + AddHandler(UnmanagedCallbackOp.ContinuousQueryFilterCreate, ContinuousQueryFilterCreate); + AddHandler(UnmanagedCallbackOp.ContinuousQueryFilterApply, ContinuousQueryFilterApply); + AddHandler(UnmanagedCallbackOp.ContinuousQueryFilterRelease, ContinuousQueryFilterRelease); + AddHandler(UnmanagedCallbackOp.DataStreamerTopologyUpdate, DataStreamerTopologyUpdate); + AddHandler(UnmanagedCallbackOp.DataStreamerStreamReceiverInvoke, DataStreamerStreamReceiverInvoke); + AddHandler(UnmanagedCallbackOp.FutureByteResult, FutureByteResult); + AddHandler(UnmanagedCallbackOp.FutureBoolResult, FutureBoolResult); + AddHandler(UnmanagedCallbackOp.FutureShortResult, FutureShortResult); + AddHandler(UnmanagedCallbackOp.FutureCharResult, FutureCharResult); + AddHandler(UnmanagedCallbackOp.FutureIntResult, FutureIntResult); + AddHandler(UnmanagedCallbackOp.FutureFloatResult, FutureFloatResult); + AddHandler(UnmanagedCallbackOp.FutureLongResult, FutureLongResult); + AddHandler(UnmanagedCallbackOp.FutureDoubleResult, FutureDoubleResult); + AddHandler(UnmanagedCallbackOp.FutureObjectResult, FutureObjectResult); + AddHandler(UnmanagedCallbackOp.FutureNullResult, FutureNullResult); + AddHandler(UnmanagedCallbackOp.FutureError, FutureError); + AddHandler(UnmanagedCallbackOp.LifecycleOnEvent, LifecycleOnEvent, true); + AddHandler(UnmanagedCallbackOp.MemoryReallocate, MemoryReallocate, true); + AddHandler(UnmanagedCallbackOp.MessagingFilterCreate, MessagingFilterCreate); + AddHandler(UnmanagedCallbackOp.MessagingFilterApply, MessagingFilterApply); + AddHandler(UnmanagedCallbackOp.MessagingFilterDestroy, MessagingFilterDestroy); + AddHandler(UnmanagedCallbackOp.EventFilterCreate, EventFilterCreate); + AddHandler(UnmanagedCallbackOp.EventFilterApply, EventFilterApply); + AddHandler(UnmanagedCallbackOp.EventFilterDestroy, EventFilterDestroy); + AddHandler(UnmanagedCallbackOp.ServiceInit, ServiceInit); + AddHandler(UnmanagedCallbackOp.ServiceExecute, ServiceExecute); + AddHandler(UnmanagedCallbackOp.ServiceCancel, ServiceCancel); + AddHandler(UnmanagedCallbackOp.ServiceInvokeMethod, ServiceInvokeMethod); + AddHandler(UnmanagedCallbackOp.ClusterNodeFilterApply, ClusterNodeFilterApply); + AddHandler(UnmanagedCallbackOp.NodeInfo, NodeInfo); + AddHandler(UnmanagedCallbackOp.OnStart, OnStart, true); + AddHandler(UnmanagedCallbackOp.OnStop, OnStop, true); + AddHandler(UnmanagedCallbackOp.ExtensionInLongLongOutLong, ExtensionCallbackInLongLongOutLong, true); + AddHandler(UnmanagedCallbackOp.OnClientDisconnected, OnClientDisconnected); + AddHandler(UnmanagedCallbackOp.OnClientReconnected, OnClientReconnected); + AddHandler(UnmanagedCallbackOp.AffinityFunctionInit, AffinityFunctionInit); + AddHandler(UnmanagedCallbackOp.AffinityFunctionPartition, AffinityFunctionPartition); + AddHandler(UnmanagedCallbackOp.AffinityFunctionAssignPartitions, AffinityFunctionAssignPartitions); + AddHandler(UnmanagedCallbackOp.AffinityFunctionRemoveNode, AffinityFunctionRemoveNode); + AddHandler(UnmanagedCallbackOp.AffinityFunctionDestroy, AffinityFunctionDestroy); + AddHandler(UnmanagedCallbackOp.ComputeTaskLocalJobResult, ComputeTaskLocalJobResult); + AddHandler(UnmanagedCallbackOp.ComputeJobExecuteLocal, ComputeJobExecuteLocal); + } + + /// <summary> + /// Adds the handler. + /// </summary> + private void AddHandler(UnmanagedCallbackOp op, InLongOutLongFunc func, bool allowUninitialized = false) + { + _inLongOutLongHandlers[(int)op] = new InLongOutLongHandler(func, allowUninitialized); + } + + /// <summary> + /// Adds the handler. + /// </summary> + private void AddHandler(UnmanagedCallbackOp op, InLongLongLongObjectOutLongFunc func, + bool allowUninitialized = false) + { + _inLongLongLongObjectOutLongHandlers[(int)op] + = new InLongLongLongObjectOutLongHandler(func, allowUninitialized); + } + + #endregion + + #region IMPLEMENTATION: GENERAL PURPOSE + + [SuppressMessage("Microsoft.Design", "CA1031:DoNotCatchGeneralExceptionTypes")] + private long InLongOutLong(void* target, int type, long val) + { + try + { + if (type < 0 || type > _inLongOutLongHandlers.Length) + throw GetInvalidOpError("InLongOutLong", type); + + var hnd = _inLongOutLongHandlers[type]; + + if (hnd.Handler == null) + throw GetInvalidOpError("InLongOutLong", type); - private long CacheStoreCreate(void* target, long memPtr) + if (!hnd.AllowUninitialized) + _initEvent.Wait(); + + return hnd.Handler(val); + } + catch (Exception e) + { + _log.Error(e, "Failure in Java callback"); + + UU.ThrowToJava(_ctx.NativeContext, e); + + return 0; + } + } + + [SuppressMessage("Microsoft.Design", "CA1031:DoNotCatchGeneralExceptionTypes")] + private long InLongLongLongObjectOutLong(void* target, int type, long val1, long val2, long val3, void* arg) { - return SafeCall(() => + try { - var cacheStore = CacheStore.CreateInstance(memPtr, _handleRegistry); + if (type < 0 || type > _inLongLongLongObjectOutLongHandlers.Length) + throw GetInvalidOpError("InLongLongLongObjectOutLong", type); + + var hnd = _inLongLongLongObjectOutLongHandlers[type]; + + if (hnd.Handler == null) + throw GetInvalidOpError("InLongLongLongObjectOutLong", type); + + if (!hnd.AllowUninitialized) + _initEvent.Wait(); + + return hnd.Handler(val1, val2, val3, arg); + } + catch (Exception e) + { + _log.Error(e, "Failure in Java callback"); + + UU.ThrowToJava(_ctx.NativeContext, e); + + return 0; + } + } - if (_ignite != null) - cacheStore.Init(_ignite); - else + /// <summary> + /// Throws the invalid op error. + /// </summary> + private static Exception GetInvalidOpError(string method, int type) + { + return new InvalidOperationException( + string.Format("Invalid {0} callback code: {1}", method, (UnmanagedCallbackOp) type)); + } + + #endregion + + #region IMPLEMENTATION: CACHE + + private long CacheStoreCreate(long memPtr) + { + var cacheStore = CacheStore.CreateInstance(memPtr, _handleRegistry); + + if (_ignite != null) + cacheStore.Init(_ignite); + else + { + lock (_initActions) { - lock (_initActions) - { - if (_ignite != null) - cacheStore.Init(_ignite); - else - _initActions.Add(cacheStore.Init); - } + if (_ignite != null) + cacheStore.Init(_ignite); + else + _initActions.Add(cacheStore.Init); } + } - return cacheStore.Handle; - }, true); + return cacheStore.Handle; } [SuppressMessage("Microsoft.Design", "CA1031:DoNotCatchGeneralExceptionTypes")] [SuppressMessage("Microsoft.Reliability", "CA2000:Dispose objects before losing scope")] - private int CacheStoreInvoke(void* target, long objPtr, long memPtr) + private long CacheStoreInvoke(long memPtr) { - return SafeCall(() => + using (PlatformMemoryStream stream = IgniteManager.Memory.Get(memPtr).GetStream()) { - var t = _handleRegistry.Get<CacheStore>(objPtr, true); + try + { + var store = _handleRegistry.Get<CacheStore>(stream.ReadLong(), true); - using (PlatformMemoryStream stream = IgniteManager.Memory.Get(memPtr).GetStream()) + return store.Invoke(stream, _ignite); + } + catch (Exception e) { - try - { - return t.Invoke(stream, _ignite); - } - catch (Exception e) - { - stream.Seek(0, SeekOrigin.Begin); + stream.Reset(); - _ignite.Marshaller.StartMarshal(stream).WriteObject(e); + _ignite.Marshaller.StartMarshal(stream).WriteObject(e); - return -1; - } + return -1; } - }); + } } - private void CacheStoreDestroy(void* target, long objPtr) + private long CacheStoreDestroy(long objPtr) { - SafeCall(() => _ignite.HandleRegistry.Release(objPtr)); + _ignite.HandleRegistry.Release(objPtr); + + return 0; } - private long CacheStoreSessionCreate(void* target, long storePtr) + private long CacheStoreSessionCreate(long val) { - return SafeCall(() => _ignite.HandleRegistry.Allocate(new CacheStoreSession())); + return _ignite.HandleRegistry.Allocate(new CacheStoreSession()); } - private long CacheEntryFilterCreate(void* target, long memPtr) + private long CacheEntryFilterCreate(long memPtr) { - return SafeCall(() => _handleRegistry.Allocate(CacheEntryFilterHolder.CreateInstance(memPtr, _ignite))); + return _handleRegistry.Allocate(CacheEntryFilterHolder.CreateInstance(memPtr, _ignite)); } - private int CacheEntryFilterApply(void* target, long objPtr, long memPtr) + private long CacheEntryFilterApply(long memPtr) { - return SafeCall(() => + using (PlatformMemoryStream stream = IgniteManager.Memory.Get(memPtr).GetStream()) { - var t = _ignite.HandleRegistry.Get<CacheEntryFilterHolder>(objPtr); + var t = _ignite.HandleRegistry.Get<CacheEntryFilterHolder>(stream.ReadLong()); - using (PlatformMemoryStream stream = IgniteManager.Memory.Get(memPtr).GetStream()) - { - return t.Invoke(stream); - } - }); + return t.Invoke(stream); + } } - private void CacheEntryFilterDestroy(void* target, long objPtr) + private long CacheEntryFilterDestroy(long objPtr) { - SafeCall(() => _ignite.HandleRegistry.Release(objPtr)); + _ignite.HandleRegistry.Release(objPtr); + + return 0; } - private void CacheInvoke(void* target, long inMemPtr, long outMemPtr) + private long CacheInvoke(long memPtr) { - SafeCall(() => + using (PlatformMemoryStream stream = IgniteManager.Memory.Get(memPtr).GetStream()) { - using (PlatformMemoryStream inStream = IgniteManager.Memory.Get(inMemPtr).GetStream()) - { - var result = ReadAndRunCacheEntryProcessor(inStream, _ignite); + var result = ReadAndRunCacheEntryProcessor(stream, _ignite); - using (PlatformMemoryStream outStream = IgniteManager.Memory.Get(outMemPtr).GetStream()) - { - result.Write(outStream, _ignite.Marshaller); + stream.Reset(); - outStream.SynchronizeOutput(); - } - } - }); + result.Write(stream, _ignite.Marshaller); + + stream.SynchronizeOutput(); + } + + return 0; } /// <summary> @@ -432,122 +444,110 @@ namespace Apache.Ignite.Core.Impl.Unmanaged #region IMPLEMENTATION: COMPUTE - private void ComputeTaskMap(void* target, long taskPtr, long inMemPtr, long outMemPtr) + private long ComputeTaskMap(long memPtr) { - SafeCall(() => + using (PlatformMemoryStream stream = IgniteManager.Memory.Get(memPtr).GetStream()) { - using (PlatformMemoryStream inStream = IgniteManager.Memory.Get(inMemPtr).GetStream()) - { - using (PlatformMemoryStream outStream = IgniteManager.Memory.Get(outMemPtr).GetStream()) - { - Task(taskPtr).Map(inStream, outStream); - } - } - }); + Task(stream.ReadLong()).Map(stream); + + return 0; + } + } + + private long ComputeTaskLocalJobResult(long taskPtr, long jobPtr, long unused, void* arg) + { + return Task(taskPtr).JobResultLocal(Job(jobPtr)); } - private int ComputeTaskJobResult(void* target, long taskPtr, long jobPtr, long memPtr) + private long ComputeTaskJobResult(long memPtr) { - return SafeCall(() => + using (var stream = IgniteManager.Memory.Get(memPtr).GetStream()) { - var task = Task(taskPtr); + var task = Task(stream.ReadLong()); - if (memPtr == 0) - { - return task.JobResultLocal(Job(jobPtr)); - } + var job = Job(stream.ReadLong()); - using (var stream = IgniteManager.Memory.Get(memPtr).GetStream()) - { - return task.JobResultRemote(Job(jobPtr), stream); - } - }); + return task.JobResultRemote(job, stream); + } } - private void ComputeTaskReduce(void* target, long taskPtr) + private long ComputeTaskReduce(long taskPtr) { - SafeCall(() => - { - var task = _handleRegistry.Get<IComputeTaskHolder>(taskPtr, true); + _handleRegistry.Get<IComputeTaskHolder>(taskPtr, true).Reduce(); - task.Reduce(); - }); + return 0; } - private void ComputeTaskComplete(void* target, long taskPtr, long memPtr) + private long ComputeTaskComplete(long taskPtr, long memPtr, long unused, void* arg) { - SafeCall(() => - { - var task = _handleRegistry.Get<IComputeTaskHolder>(taskPtr, true); + var task = _handleRegistry.Get<IComputeTaskHolder>(taskPtr, true); - if (memPtr == 0) - task.Complete(taskPtr); - else + if (memPtr == 0) + task.Complete(taskPtr); + else + { + using (PlatformMemoryStream stream = IgniteManager.Memory.Get(memPtr).GetStream()) { - using (PlatformMemoryStream stream = IgniteManager.Memory.Get(memPtr).GetStream()) - { - task.CompleteWithError(taskPtr, stream); - } + task.CompleteWithError(taskPtr, stream); } - }); + } + + return 0; } - private int ComputeJobSerialize(void* target, long jobPtr, long memPtr) + private long ComputeJobSerialize(long jobPtr, long memPtr, long unused, void* arg) { - return SafeCall(() => + using (PlatformMemoryStream stream = IgniteManager.Memory.Get(memPtr).GetStream()) { - using (PlatformMemoryStream stream = IgniteManager.Memory.Get(memPtr).GetStream()) - { - return Job(jobPtr).Serialize(stream) ? 1 : 0; - } - }); + return Job(jobPtr).Serialize(stream) ? 1 : 0; + } } - private long ComputeJobCreate(void* target, long memPtr) + private long ComputeJobCreate(long memPtr) { - return SafeCall(() => + using (PlatformMemoryStream stream = IgniteManager.Memory.Get(memPtr).GetStream()) { - using (PlatformMemoryStream stream = IgniteManager.Memory.Get(memPtr).GetStream()) - { - ComputeJobHolder job = ComputeJobHolder.CreateJob(_ignite, stream); + ComputeJobHolder job = ComputeJobHolder.CreateJob(_ignite, stream); - return _handleRegistry.Allocate(job); - } - }); + return _handleRegistry.Allocate(job); + } } - private void ComputeJobExecute(void* target, long jobPtr, int cancel, long memPtr) + private long ComputeJobExecuteLocal(long jobPtr, long cancel, long unused, void* arg) { - SafeCall(() => - { - var job = Job(jobPtr); + Job(jobPtr).ExecuteLocal(cancel == 1); - if (memPtr == 0) - job.ExecuteLocal(cancel == 1); - else - { - using (PlatformMemoryStream stream = IgniteManager.Memory.Get(memPtr).GetStream()) - { - job.ExecuteRemote(stream, cancel == 1); - } - } - }); + return 0; } - private void ComputeJobCancel(void* target, long jobPtr) + private long ComputeJobExecute(long memPtr) { - SafeCall(() => + using (PlatformMemoryStream stream = IgniteManager.Memory.Get(memPtr).GetStream()) { - Job(jobPtr).Cancel(); - }); + var job = Job(stream.ReadLong()); + + var cancel = stream.ReadBool(); + + stream.Reset(); + + job.ExecuteRemote(stream, cancel); + } + + return 0; } - private void ComputeJobDestroy(void* target, long jobPtr) + private long ComputeJobCancel(long jobPtr) { - SafeCall(() => - { - _handleRegistry.Release(jobPtr); - }); + Job(jobPtr).Cancel(); + + return 0; + } + + private long ComputeJobDestroy(long jobPtr) + { + _handleRegistry.Release(jobPtr); + + return 0; } /// <summary> @@ -574,219 +574,178 @@ namespace Apache.Ignite.Core.Impl.Unmanaged #region IMPLEMENTATION: CONTINUOUS QUERY - private void ContinuousQueryListenerApply(void* target, long lsnrPtr, long memPtr) + private long ContinuousQueryListenerApply(long memPtr) { - SafeCall(() => + using (var stream = IgniteManager.Memory.Get(memPtr).GetStream()) { - var hnd = _handleRegistry.Get<IContinuousQueryHandleImpl>(lsnrPtr); + var hnd = _handleRegistry.Get<IContinuousQueryHandleImpl>(stream.ReadLong()); - using (var stream = IgniteManager.Memory.Get(memPtr).GetStream()) - { - hnd.Apply(stream); - } - }); + hnd.Apply(stream); + + return 0; + } } [SuppressMessage("ReSharper", "PossibleNullReferenceException")] - private long ContinuousQueryFilterCreate(void* target, long memPtr) + private long ContinuousQueryFilterCreate(long memPtr) { - return SafeCall(() => + // 1. Unmarshal filter holder. + using (var stream = IgniteManager.Memory.Get(memPtr).GetStream()) { - // 1. Unmarshal filter holder. - using (var stream = IgniteManager.Memory.Get(memPtr).GetStream()) - { - var reader = _ignite.Marshaller.StartUnmarshal(stream); + var reader = _ignite.Marshaller.StartUnmarshal(stream); - var filterHolder = reader.ReadObject<ContinuousQueryFilterHolder>(); + var filterHolder = reader.ReadObject<ContinuousQueryFilterHolder>(); - // 2. Create real filter from it's holder. - var filter = (IContinuousQueryFilter)DelegateTypeDescriptor.GetContinuousQueryFilterCtor( - filterHolder.Filter.GetType())(filterHolder.Filter, filterHolder.KeepBinary); + // 2. Create real filter from it's holder. + var filter = (IContinuousQueryFilter) DelegateTypeDescriptor.GetContinuousQueryFilterCtor( + filterHolder.Filter.GetType())(filterHolder.Filter, filterHolder.KeepBinary); - // 3. Inject grid. - filter.Inject(_ignite); + // 3. Inject grid. + filter.Inject(_ignite); - // 4. Allocate GC handle. - return filter.Allocate(); - } - }); + // 4. Allocate GC handle. + return filter.Allocate(); + } } - private int ContinuousQueryFilterApply(void* target, long filterPtr, long memPtr) + private long ContinuousQueryFilterApply(long memPtr) { - return SafeCall(() => + using (var stream = IgniteManager.Memory.Get(memPtr).GetStream()) { - var holder = _handleRegistry.Get<IContinuousQueryFilter>(filterPtr); + var holder = _handleRegistry.Get<IContinuousQueryFilter>(stream.ReadLong()); - using (var stream = IgniteManager.Memory.Get(memPtr).GetStream()) - { - return holder.Evaluate(stream) ? 1 : 0; - } - }); + return holder.Evaluate(stream) ? 1 : 0; + } } - private void ContinuousQueryFilterRelease(void* target, long filterPtr) + private long ContinuousQueryFilterRelease(long filterPtr) { - SafeCall(() => - { - var holder = _handleRegistry.Get<IContinuousQueryFilter>(filterPtr); + var holder = _handleRegistry.Get<IContinuousQueryFilter>(filterPtr); - holder.Release(); - }); + holder.Release(); + + return 0; } #endregion #region IMPLEMENTATION: DATA STREAMER - private void DataStreamerTopologyUpdate(void* target, long ldrPtr, long topVer, int topSize) + private long DataStreamerTopologyUpdate(long ldrPtr, long topVer, long topSize, void* unused) { - SafeCall(() => - { - var ldrRef = _handleRegistry.Get<WeakReference>(ldrPtr); + var ldrRef = _handleRegistry.Get<WeakReference>(ldrPtr); - if (ldrRef == null) - return; + if (ldrRef == null) + return 0; - var ldr = ldrRef.Target as IDataStreamer; + var ldr = ldrRef.Target as IDataStreamer; - if (ldr != null) - ldr.TopologyChange(topVer, topSize); - else - _handleRegistry.Release(ldrPtr, true); - }); + if (ldr != null) + ldr.TopologyChange(topVer, (int) topSize); + else + _handleRegistry.Release(ldrPtr, true); + + return 0; } [SuppressMessage("Microsoft.Reliability", "CA2000:Dispose objects before losing scope")] - private void DataStreamerStreamReceiverInvoke(void* target, long rcvPtr, void* cache, long memPtr, - byte keepBinary) + private long DataStreamerStreamReceiverInvoke(long memPtr, long unused, long unused1, void* cache) { - SafeCall(() => + using (var stream = IgniteManager.Memory.Get(memPtr).GetStream()) { - using (var stream = IgniteManager.Memory.Get(memPtr).GetStream()) - { - var reader = _ignite.Marshaller.StartUnmarshal(stream, BinaryMode.ForceBinary); + var rcvPtr = stream.ReadLong(); - var binaryReceiver = reader.ReadObject<BinaryObject>(); + var keepBinary = stream.ReadBool(); - var receiver = _handleRegistry.Get<StreamReceiverHolder>(rcvPtr) ?? - binaryReceiver.Deserialize<StreamReceiverHolder>(); + var reader = _ignite.Marshaller.StartUnmarshal(stream, BinaryMode.ForceBinary); - if (receiver != null) - receiver.Receive(_ignite, new UnmanagedNonReleaseableTarget(_ctx, cache), stream, - keepBinary != 0); - } - }); + var binaryReceiver = reader.ReadObject<BinaryObject>(); + + var receiver = _handleRegistry.Get<StreamReceiverHolder>(rcvPtr) ?? + binaryReceiver.Deserialize<StreamReceiverHolder>(); + + if (receiver != null) + receiver.Receive(_ignite, new UnmanagedNonReleaseableTarget(_ctx, cache), stream, keepBinary); + + return 0; + } } #endregion #region IMPLEMENTATION: FUTURES - private void FutureByteResult(void* target, long futPtr, int res) + private long FutureByteResult(long futPtr, long res, long unused, void* arg) { - SafeCall(() => - { - ProcessFuture<byte>(futPtr, fut => { fut.OnResult((byte)res); }); - }); + return ProcessFuture<byte>(futPtr, fut => { fut.OnResult((byte) res); }); } - private void FutureBoolResult(void* target, long futPtr, int res) + private long FutureBoolResult(long futPtr, long res, long unused, void* arg) { - SafeCall(() => - { - ProcessFuture<bool>(futPtr, fut => { fut.OnResult(res == 1); }); - }); + return ProcessFuture<bool>(futPtr, fut => { fut.OnResult(res == 1); }); } - private void FutureShortResult(void* target, long futPtr, int res) + private long FutureShortResult(long futPtr, long res, long unused, void* arg) { - SafeCall(() => - { - ProcessFuture<short>(futPtr, fut => { fut.OnResult((short)res); }); - }); + return ProcessFuture<short>(futPtr, fut => { fut.OnResult((short)res); }); } - private void FutureCharResult(void* target, long futPtr, int res) + private long FutureCharResult(long futPtr, long res, long unused, void* arg) { - SafeCall(() => - { - ProcessFuture<char>(futPtr, fut => { fut.OnResult((char)res); }); - }); + return ProcessFuture<char>(futPtr, fut => { fut.OnResult((char)res); }); } - private void FutureIntResult(void* target, long futPtr, int res) + private long FutureIntResult(long futPtr, long res, long unused, void* arg) { - SafeCall(() => - { - ProcessFuture<int>(futPtr, fut => { fut.OnResult(res); }); - }); + return ProcessFuture<int>(futPtr, fut => { fut.OnResult((int) res); }); } - private void FutureFloatResult(void* target, long futPtr, float res) + private long FutureFloatResult(long futPtr, long res, long unused, void* arg) { - SafeCall(() => - { - ProcessFuture<float>(futPtr, fut => { fut.OnResult(res); }); - }); + return ProcessFuture<float>(futPtr, fut => { fut.OnResult(BinaryUtils.IntToFloatBits((int) res)); }); } - private void FutureLongResult(void* target, long futPtr, long res) + private long FutureLongResult(long futPtr, long res, long unused, void* arg) { - SafeCall(() => - { - ProcessFuture<long>(futPtr, fut => { fut.OnResult(res); }); - }); + return ProcessFuture<long>(futPtr, fut => { fut.OnResult(res); }); } - private void FutureDoubleResult(void* target, long futPtr, double res) + private long FutureDoubleResult(long futPtr, long res, long unused, void* arg) { - SafeCall(() => - { - ProcessFuture<double>(futPtr, fut => { fut.OnResult(res); }); - }); + return ProcessFuture<double>(futPtr, fut => { fut.OnResult(BinaryUtils.LongToDoubleBits(res)); }); } - private void FutureObjectResult(void* target, long futPtr, long memPtr) + private long FutureObjectResult(long futPtr, long memPtr, long unused, void* arg) { - SafeCall(() => + return ProcessFuture(futPtr, fut => { - ProcessFuture(futPtr, fut => + using (var stream = IgniteManager.Memory.Get(memPtr).GetStream()) { - using (var stream = IgniteManager.Memory.Get(memPtr).GetStream()) - { - fut.OnResult(stream); - } - }); + fut.OnResult(stream); + } }); } - private void FutureNullResult(void* target, long futPtr) + private long FutureNullResult(long futPtr) { - SafeCall(() => - { - ProcessFuture(futPtr, fut => { fut.OnNullResult(); }); - }); + return ProcessFuture(futPtr, fut => { fut.OnNullResult(); }); } - private void FutureError(void* target, long futPtr, long memPtr) + private long FutureError(long futPtr, long memPtr, long unused, void* arg) { - SafeCall(() => + using (var stream = IgniteManager.Memory.Get(memPtr).GetStream()) { - using (var stream = IgniteManager.Memory.Get(memPtr).GetStream()) - { - var reader = _ignite.Marshaller.StartUnmarshal(stream); + var reader = _ignite.Marshaller.StartUnmarshal(stream); - string errCls = reader.ReadString(); - string errMsg = reader.ReadString(); - string stackTrace = reader.ReadString(); - Exception inner = reader.ReadBoolean() ? reader.ReadObject<Exception>() : null; + string errCls = reader.ReadString(); + string errMsg = reader.ReadString(); + string stackTrace = reader.ReadString(); + Exception inner = reader.ReadBoolean() ? reader.ReadObject<Exception>() : null; - Exception err = ExceptionUtils.GetException(_ignite, errCls, errMsg, stackTrace, reader, inner); + Exception err = ExceptionUtils.GetException(_ignite, errCls, errMsg, stackTrace, reader, inner); - ProcessFuture(futPtr, fut => { fut.OnError(err); }); - } - }); + return ProcessFuture(futPtr, fut => { fut.OnError(err); }); + } } /// <summary> @@ -794,11 +753,13 @@ namespace Apache.Ignite.Core.Impl.Unmanaged /// </summary> /// <param name="futPtr">Future pointer.</param> /// <param name="action">Action.</param> - private void ProcessFuture(long futPtr, Action<IFutureInternal> action) + private long ProcessFuture(long futPtr, Action<IFutureInternal> action) { try { action(_handleRegistry.Get<IFutureInternal>(futPtr, true)); + + return 0; } finally { @@ -811,11 +772,13 @@ namespace Apache.Ignite.Core.Impl.Unmanaged /// </summary> /// <param name="futPtr">Future pointer.</param> /// <param name="action">Action.</param> - private void ProcessFuture<T>(long futPtr, Action<Future<T>> action) + private long ProcessFuture<T>(long futPtr, Action<Future<T>> action) { try { action(_handleRegistry.Get<Future<T>>(futPtr, true)); + + return 0; } finally { @@ -827,257 +790,230 @@ namespace Apache.Ignite.Core.Impl.Unmanaged #region IMPLEMENTATION: LIFECYCLE - private void LifecycleOnEvent(void* target, long ptr, int evt) + private long LifecycleOnEvent(long ptr, long evt, long unused, void* arg) { - SafeCall(() => - { - var bean = _handleRegistry.Get<LifecycleBeanHolder>(ptr); + var bean = _handleRegistry.Get<LifecycleBeanHolder>(ptr); - bean.OnLifecycleEvent((LifecycleEventType)evt); - }, true); + bean.OnLifecycleEvent((LifecycleEventType) evt); + + return 0; } #endregion #region IMPLEMENTATION: MESSAGING - private long MessagingFilterCreate(void* target, long memPtr) + private long MessagingFilterCreate(long memPtr) { - return SafeCall(() => - { - MessageListenerHolder holder = MessageListenerHolder.CreateRemote(_ignite, memPtr); + MessageListenerHolder holder = MessageListenerHolder.CreateRemote(_ignite, memPtr); - return _ignite.HandleRegistry.AllocateSafe(holder); - }); + return _ignite.HandleRegistry.AllocateSafe(holder); } - private int MessagingFilterApply(void* target, long ptr, long memPtr) + private long MessagingFilterApply(long ptr, long memPtr, long unused, void* arg) { - return SafeCall(() => - { - var holder = _ignite.HandleRegistry.Get<MessageListenerHolder>(ptr, false); + var holder = _ignite.HandleRegistry.Get<MessageListenerHolder>(ptr, false); - if (holder == null) - return 0; + if (holder == null) + return 0; - using (var stream = IgniteManager.Memory.Get(memPtr).GetStream()) - { - return holder.Invoke(stream); - } - }); + using (var stream = IgniteManager.Memory.Get(memPtr).GetStream()) + { + return holder.Invoke(stream); + } } - private void MessagingFilterDestroy(void* target, long ptr) + private long MessagingFilterDestroy(long ptr) { - SafeCall(() => - { - _ignite.HandleRegistry.Release(ptr); - }); + _ignite.HandleRegistry.Release(ptr); + + return 0; } #endregion #region IMPLEMENTATION: EXTENSIONS - private long ExtensionCallbackInLongOutLong(void* target, int op, long arg1) + private long ExtensionCallbackInLongLongOutLong(long op, long arg1, long arg2, void* arg) { - throw new InvalidOperationException("Unsupported operation type: " + op); - } - - private long ExtensionCallbackInLongLongOutLong(void* target, int op, long arg1, long arg2) - { - return SafeCall(() => + switch (op) { - switch (op) - { - case OpPrepareDotNet: - using (var inStream = IgniteManager.Memory.Get(arg1).GetStream()) - using (var outStream = IgniteManager.Memory.Get(arg2).GetStream()) - { - Ignition.OnPrepare(inStream, outStream, _handleRegistry, _log); + case OpPrepareDotNet: + using (var inStream = IgniteManager.Memory.Get(arg1).GetStream()) + using (var outStream = IgniteManager.Memory.Get(arg2).GetStream()) + { + Ignition.OnPrepare(inStream, outStream, _handleRegistry, _log); - return 0; - } + return 0; + } - default: - throw new InvalidOperationException("Unsupported operation type: " + op); - } - }, op == OpPrepareDotNet); + default: + throw new InvalidOperationException("Unsupported operation type: " + op); + } } #endregion #region IMPLEMENTATION: EVENTS - private long EventFilterCreate(void* target, long memPtr) + private long EventFilterCreate(long memPtr) { - return SafeCall(() => _handleRegistry.Allocate(RemoteListenEventFilter.CreateInstance(memPtr, _ignite))); + return _handleRegistry.Allocate(RemoteListenEventFilter.CreateInstance(memPtr, _ignite)); } - private int EventFilterApply(void* target, long ptr, long memPtr) + private long EventFilterApply(long ptr, long memPtr, long unused, void* arg) { - return SafeCall(() => - { - var holder = _ignite.HandleRegistry.Get<IInteropCallback>(ptr, false); + var holder = _ignite.HandleRegistry.Get<IInteropCallback>(ptr, false); - if (holder == null) - return 0; + if (holder == null) + return 0; - using (var stream = IgniteManager.Memory.Get(memPtr).GetStream()) - { - return holder.Invoke(stream); - } - }); + using (var stream = IgniteManager.Memory.Get(memPtr).GetStream()) + { + return holder.Invoke(stream); + } } - private void EventFilterDestroy(void* target, long ptr) + private long EventFilterDestroy(long ptr) { - SafeCall(() => - { - _ignite.HandleRegistry.Release(ptr); - }); + _ignite.HandleRegistry.Release(ptr); + + return 0; } #endregion #region IMPLEMENTATION: SERVICES - private long ServiceInit(void* target, long memPtr) + private long ServiceInit(long memPtr) { - return SafeCall(() => + using (var stream = IgniteManager.Memory.Get(memPtr).GetStream()) { - using (var stream = IgniteManager.Memory.Get(memPtr).GetStream()) - { - var reader = _ignite.Marshaller.StartUnmarshal(stream); + var reader = _ignite.Marshaller.StartUnmarshal(stream); - bool srvKeepBinary = reader.ReadBoolean(); - var svc = reader.ReadObject<IService>(); + bool srvKeepBinary = reader.ReadBoolean(); + var svc = reader.ReadObject<IService>(); - ResourceProcessor.Inject(svc, _ignite); + ResourceProcessor.Inject(svc, _ignite); - svc.Init(new ServiceContext(_ignite.Marshaller.StartUnmarshal(stream, srvKeepBinary))); + svc.Init(new ServiceContext(_ignite.Marshaller.StartUnmarshal(stream, srvKeepBinary))); - return _handleRegistry.Allocate(svc); - } - }); + return _handleRegistry.Allocate(svc); + } } - private void ServiceExecute(void* target, long svcPtr, long memPtr) + private long ServiceExecute(long memPtr) { - SafeCall(() => + using (var stream = IgniteManager.Memory.Get(memPtr).GetStream()) { - var svc = _handleRegistry.Get<IService>(svcPtr); + var svc = _handleRegistry.Get<IService>(stream.ReadLong()); // Ignite does not guarantee that Cancel is called after Execute exits // So missing handle is a valid situation if (svc == null) - return; + return 0; - using (var stream = IgniteManager.Memory.Get(memPtr).GetStream()) - { - var reader = _ignite.Marshaller.StartUnmarshal(stream); + var reader = _ignite.Marshaller.StartUnmarshal(stream); - bool srvKeepBinary = reader.ReadBoolean(); + bool srvKeepBinary = reader.ReadBoolean(); - svc.Execute(new ServiceContext( - _ignite.Marshaller.StartUnmarshal(stream, srvKeepBinary))); - } - }); + svc.Execute(new ServiceContext(_ignite.Marshaller.StartUnmarshal(stream, srvKeepBinary))); + + return 0; + } } - private void ServiceCancel(void* target, long svcPtr, long memPtr) + private long ServiceCancel(long memPtr) { - SafeCall(() => + using (var stream = IgniteManager.Memory.Get(memPtr).GetStream()) { - var svc = _handleRegistry.Get<IService>(svcPtr, true); + long svcPtr = stream.ReadLong(); try { - using (var stream = IgniteManager.Memory.Get(memPtr).GetStream()) - { - var reader = _ignite.Marshaller.StartUnmarshal(stream); + var svc = _handleRegistry.Get<IService>(svcPtr, true); - bool srvKeepBinary = reader.ReadBoolean(); + var reader = _ignite.Marshaller.StartUnmarshal(stream); - svc.Cancel(new ServiceContext(_ignite.Marshaller.StartUnmarshal(stream, srvKeepBinary))); - } + bool srvKeepBinary = reader.ReadBoolean(); + + svc.Cancel(new ServiceContext(_ignite.Marshaller.StartUnmarshal(stream, srvKeepBinary))); + + return 0; } finally { _ignite.HandleRegistry.Release(svcPtr); } - }); + } } - private void ServiceInvokeMethod(void* target, long svcPtr, long inMemPtr, long outMemPtr) + private long ServiceInvokeMethod(long memPtr) { - SafeCall(() => + using (var stream = IgniteManager.Memory.Get(memPtr).GetStream()) { - using (var inStream = IgniteManager.Memory.Get(inMemPtr).GetStream()) - using (var outStream = IgniteManager.Memory.Get(outMemPtr).GetStream()) - { - var svc = _handleRegistry.Get<IService>(svcPtr, true); + var svc = _handleRegistry.Get<IService>(stream.ReadLong(), true); - string mthdName; - object[] mthdArgs; + string mthdName; + object[] mthdArgs; - ServiceProxySerializer.ReadProxyMethod(inStream, _ignite.Marshaller, out mthdName, out mthdArgs); + ServiceProxySerializer.ReadProxyMethod(stream, _ignite.Marshaller, out mthdName, out mthdArgs); - var result = ServiceProxyInvoker.InvokeServiceMethod(svc, mthdName, mthdArgs); + var result = ServiceProxyInvoker.InvokeServiceMethod(svc, mthdName, mthdArgs); - ServiceProxySerializer.WriteInvocationResult(outStream, _ignite.Marshaller, result.Key, result.Value); + stream.Reset(); - outStream.SynchronizeOutput(); - } - }); + ServiceProxySerializer.WriteInvocationResult(stream, _ignite.Marshaller, result.Key, result.Value); + + stream.SynchronizeOutput(); + + return 0; + } } - private int ClusterNodeFilterApply(void* target, long memPtr) + private long ClusterNodeFilterApply(long memPtr) { - return SafeCall(() => + using (var stream = IgniteManager.Memory.Get(memPtr).GetStream()) { - using (var stream = IgniteManager.Memory.Get(memPtr).GetStream()) - { - var reader = _ignite.Marshaller.StartUnmarshal(stream); + var reader = _ignite.Marshaller.StartUnmarshal(stream); - var filter = reader.ReadObject<IClusterNodeFilter>(); + var filter = reader.ReadObject<IClusterNodeFilter>(); - return filter.Invoke(_ignite.GetNode(reader.ReadGuid())) ? 1 : 0; - } - }); + return filter.Invoke(_ignite.GetNode(reader.ReadGuid())) ? 1 : 0; + } } #endregion #region IMPLEMENTATION: MISCELLANEOUS - private void NodeInfo(void* target, long memPtr) + private long NodeInfo(long memPtr) { - SafeCall(() => _ignite.UpdateNodeInfo(memPtr)); + _ignite.UpdateNodeInfo(memPtr); + + return 0; } - private void MemoryReallocate(void* target, long memPtr, int cap) + private long MemoryReallocate(long memPtr, long cap, long unused, void* arg) { - SafeCall(() => - { - IgniteManager.Memory.Get(memPtr).Reallocate(cap); - }, true); + IgniteManager.Memory.Get(memPtr).Reallocate((int)cap); + + return 0; } - private void OnStart(void* target, void* proc, long memPtr) + private long OnStart(long memPtr, long unused, long unused1, void* proc) { - SafeCall(() => + var proc0 = UU.Acquire(_ctx, proc); + + using (var stream = IgniteManager.Memory.Get(memPtr).GetStream()) { - var proc0 = UnmanagedUtils.Acquire(_ctx, proc); + Ignition.OnStart(proc0, stream); + } - using (var stream = IgniteManager.Memory.Get(memPtr).GetStream()) - { - Ignition.OnStart(proc0, stream); - } - }, true); + return 0; } - private void OnStop(void* target) + private long OnStop(long unused) { Marshal.FreeHGlobal(_cbsPtr); @@ -1092,6 +1028,8 @@ namespace Apache.Ignite.Core.Impl.Unmanaged if (ignite != null) ignite.AfterNodeStop(); + + return 0; } private void Error(void* target, int errType, sbyte* errClsChars, int errClsCharsLen, sbyte* errMsgChars, @@ -1122,20 +1060,18 @@ namespace Apache.Ignite.Core.Impl.Unmanaged } } - private void OnClientDisconnected(void* target) + private long OnClientDisconnected(long unused) { - SafeCall(() => - { - _ignite.OnClientDisconnected(); - }); + _ignite.OnClientDisconnected(); + + return 0; } - private void OnClientReconnected(void* target, bool clusterRestarted) + private long OnClientReconnected(long clusterRestarted) { - SafeCall(() => - { - _ignite.OnClientReconnected(clusterRestarted); - }); + _ignite.OnClientReconnected(clusterRestarted != 0); + + return 0; } private void LoggerLog(void* target, int level, sbyte* messageChars, int messageCharsLen, sbyte* categoryChars, @@ -1194,82 +1130,79 @@ namespace Apache.Ignite.Core.Impl.Unmanaged #region AffinityFunction - private long AffinityFunctionInit(void* target, long memPtr, void* baseFunc) + private long AffinityFunctionInit(long memPtr, long unused, long unused1, void* baseFunc) { - return SafeCall(() => + using (var stream = IgniteManager.Memory.Get(memPtr).GetStream()) { - using (var stream = IgniteManager.Memory.Get(memPtr).GetStream()) - { - var reader = _ignite.Marshaller.StartUnmarshal(stream); + var reader = _ignite.Marshaller.StartUnmarshal(stream); - var func = reader.ReadObjectEx<IAffinityFunction>(); + var func = reader.ReadObjectEx<IAffinityFunction>(); - ResourceProcessor.Inject(func, _ignite); + ResourceProcessor.Inject(func, _ignite); - var affBase = func as AffinityFunctionBase; + var affBase = func as AffinityFunctionBase; - if (affBase != null) - affBase.SetBaseFunction(new PlatformAffinityFunction( - _ignite.InteropProcessor.ChangeTarget(baseFunc), _ignite.Marshaller)); + if (affBase != null) + { + var baseFunc0 = UU.Acquire(_ctx, baseFunc); - return _handleRegistry.Allocate(func); + affBase.SetBaseFunction(new PlatformAffinityFunction(baseFunc0, _ignite.Marshaller)); } - }); + + return _handleRegistry.Allocate(func); + } } - private int AffinityFunctionPartition(void* target, long ptr, long memPtr) + private long AffinityFunctionPartition(long memPtr) { - return SafeCall(() => + using (var stream = IgniteManager.Memory.Get(memPtr).GetStream()) { - using (var stream = IgniteManager.Memory.Get(memPtr).GetStream()) - { - var key = _ignite.Marshaller.Unmarshal<object>(stream); + var ptr = stream.ReadLong(); - return _handleRegistry.Get<IAffinityFunction>(ptr, true).GetPartition(key); - } - }); + var key = _ignite.Marshaller.Unmarshal<object>(stream); + + return _handleRegistry.Get<IAffinityFunction>(ptr, true).GetPartition(key); + } } - private void AffinityFunctionAssignPartitions(void* target, long ptr, long inMemPtr, long outMemPtr) + private long AffinityFunctionAssignPartitions(long memPtr) { - SafeCall(() => + using (var stream = IgniteManager.Memory.Get(memPtr).GetStream()) { - using (var inStream = IgniteManager.Memory.Get(inMemPtr).GetStream()) - { - var ctx = new AffinityFunctionContext(_ignite.Marshaller.StartUnmarshal(inStream)); - var func = _handleRegistry.Get<IAffinityFunction>(ptr, true); - var parts = func.AssignPartitions(ctx); + var ptr = stream.ReadLong(); + var ctx = new AffinityFunctionContext(_ignite.Marshaller.StartUnmarshal(stream)); + var func = _handleRegistry.Get<IAffinityFunction>(ptr, true); + var parts = func.AssignPartitions(ctx); - if (parts == null) - throw new IgniteException(func.GetType() + ".AssignPartitions() returned invalid result: null"); + if (parts == null) + throw new IgniteException(func.GetType() + ".AssignPartitions() returned invalid result: null"); - using (var outStream = IgniteManager.Memory.Get(outMemPtr).GetStream()) - { - AffinityFunctionSerializer.WritePartitions(parts, outStream, _ignite.Marshaller); - } - } - }); + stream.Reset(); + + AffinityFunctionSerializer.WritePartitions(parts, stream, _ignite.Marshaller); + + return 0; + } } - private void AffinityFunctionRemoveNode(void* target, long ptr, long memPtr) + private long AffinityFunctionRemoveNode(long memPtr) { - SafeCall(() => + using (var stream = IgniteManager.Memory.Get(memPtr).GetStream()) { - using (var stream = IgniteManager.Memory.Get(memPtr).GetStream()) - { - var nodeId = _ignite.Marshaller.Unmarshal<Guid>(stream); + var ptr = stream.ReadLong(); + var nodeId = _ignite.Marshaller.Unmarshal<Guid>(stream); - _handleRegistry.Get<IAffinityFunction>(ptr, true).RemoveNode(nodeId); - } - }); + _handleRegistry.Get<IAffinityFunction>(ptr, true).RemoveNode(nodeId); + + return 0; + } } - private void AffinityFunctionDestroy(void* target, long ptr) + private long AffinityFunctionDestroy(long ptr) { - SafeCall(() => - { - _handleRegistry.Release(ptr); - }); + _handleRegistry.Release(ptr); + + return 0; } #endregion @@ -1315,7 +1248,7 @@ namespace Apache.Ignite.Core.Impl.Unmanaged } #endregion - + /// <summary> /// Callbacks pointer. /// </summary> @@ -1398,5 +1331,47 @@ namespace Apache.Ignite.Core.Impl.Unmanaged { get { return ConsoleWritePtr; } } + + /// <summary> + /// InLongOutLong handler struct. + /// </summary> + private struct InLongOutLongHandler + { + /// <summary> The handler func. </summary> + public readonly InLongOutLongFunc Handler; + + /// <summary> Allow uninitialized flag. </summary> + public readonly bool AllowUninitialized; + + /// <summary> + /// Initializes a new instance of the <see cref="InLongOutLongHandler"/> struct. + /// </summary> + public InLongOutLongHandler(InLongOutLongFunc handler, bool allowUninitialized) + { + Handler = handler; + AllowUninitialized = allowUninitialized; + } + } + + /// <summary> + /// InLongLongLongObjectOutLong handler struct. + /// </summary> + private struct InLongLongLongObjectOutLongHandler + { + /// <summary> The handler func. </summary> + public readonly InLongLongLongObjectOutLongFunc Handler; + + /// <summary> Allow uninitialized flag. </summary> + public readonly bool AllowUninitialized; + + /// <summary> + /// Initializes a new instance of the <see cref="InLongLongLongObjectOutLongHandler"/> struct. + /// </summary> + public InLongLongLongObjectOutLongHandler(InLongLongLongObjectOutLongFunc handler, bool allowUninitialized) + { + Handler = handler; + AllowUninitialized = allowUninitialized; + } + } } } \ No newline at end of file
