http://git-wip-us.apache.org/repos/asf/ignite/blob/72ac53da/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedCallbacks.cs
----------------------------------------------------------------------
diff --git
a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedCallbacks.cs
b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedCallbacks.cs
index cc205e8..09933be 100644
---
a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedCallbacks.cs
+++
b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedCallbacks.cs
@@ -22,7 +22,6 @@ namespace Apache.Ignite.Core.Impl.Unmanaged
using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
using System.Globalization;
- using System.IO;
using System.Runtime.InteropServices;
using System.Threading;
using Apache.Ignite.Core.Cache.Affinity;
@@ -77,7 +76,14 @@ namespace Apache.Ignite.Core.Impl.Unmanaged
/** Keep references to created delegates. */
// ReSharper disable once CollectionNeverQueried.Local
- private readonly List<Delegate> _delegates = new List<Delegate>(50);
+ private readonly List<Delegate> _delegates = new List<Delegate>(5);
+
+ /** Handlers array. */
+ private readonly InLongOutLongHandler[] _inLongOutLongHandlers = new
InLongOutLongHandler[62];
+
+ /** Handlers array. */
+ private readonly InLongLongLongObjectOutLongHandler[]
_inLongLongLongObjectOutLongHandlers
+ = new InLongLongLongObjectOutLongHandler[62];
/** Initialized flag. */
private readonly ManualResetEventSlim _initEvent = new
ManualResetEventSlim(false);
@@ -107,90 +113,19 @@ namespace Apache.Ignite.Core.Impl.Unmanaged
/** Operation: prepare .Net. */
private const int OpPrepareDotNet = 1;
- private delegate long CacheStoreCreateCallbackDelegate(void* target,
long memPtr);
- private delegate int CacheStoreInvokeCallbackDelegate(void* target,
long objPtr, long memPtr);
- private delegate void CacheStoreDestroyCallbackDelegate(void* target,
long objPtr);
- private delegate long CacheStoreSessionCreateCallbackDelegate(void*
target, long storePtr);
-
- private delegate long CacheEntryFilterCreateCallbackDelegate(void*
target, long memPtr);
- private delegate int CacheEntryFilterApplyCallbackDelegate(void*
target, long objPtr, long memPtr);
- private delegate void CacheEntryFilterDestroyCallbackDelegate(void*
target, long objPtr);
-
- private delegate void CacheInvokeCallbackDelegate(void* target, long
inMemPtr, long outMemPtr);
-
- private delegate void ComputeTaskMapCallbackDelegate(void* target,
long taskPtr, long inMemPtr, long outMemPtr);
- private delegate int ComputeTaskJobResultCallbackDelegate(void*
target, long taskPtr, long jobPtr, long memPtr);
- private delegate void ComputeTaskReduceCallbackDelegate(void* target,
long taskPtr);
- private delegate void ComputeTaskCompleteCallbackDelegate(void*
target, long taskPtr, long memPtr);
- private delegate int ComputeJobSerializeCallbackDelegate(void* target,
long jobPtr, long memPtr);
- private delegate long ComputeJobCreateCallbackDelegate(void* target,
long memPtr);
- private delegate void ComputeJobExecuteCallbackDelegate(void* target,
long jobPtr, int cancel, long memPtr);
- private delegate void ComputeJobCancelCallbackDelegate(void* target,
long jobPtr);
- private delegate void ComputeJobDestroyCallbackDelegate(void* target,
long jobPtr);
-
- private delegate void
ContinuousQueryListenerApplyCallbackDelegate(void* target, long lsnrPtr, long
memPtr);
- private delegate long
ContinuousQueryFilterCreateCallbackDelegate(void* target, long memPtr);
- private delegate int ContinuousQueryFilterApplyCallbackDelegate(void*
target, long filterPtr, long memPtr);
- private delegate void
ContinuousQueryFilterReleaseCallbackDelegate(void* target, long filterPtr);
-
- private delegate void DataStreamerTopologyUpdateCallbackDelegate(void*
target, long ldrPtr, long topVer, int topSize);
- private delegate void
DataStreamerStreamReceiverInvokeCallbackDelegate(void* target, long ptr, void*
cache, long memPtr, byte keepPortable);
-
- private delegate void FutureByteResultCallbackDelegate(void* target,
long futPtr, int res);
- private delegate void FutureBoolResultCallbackDelegate(void* target,
long futPtr, int res);
- private delegate void FutureShortResultCallbackDelegate(void* target,
long futPtr, int res);
- private delegate void FutureCharResultCallbackDelegate(void* target,
long futPtr, int res);
- private delegate void FutureIntResultCallbackDelegate(void* target,
long futPtr, int res);
- private delegate void FutureFloatResultCallbackDelegate(void* target,
long futPtr, float res);
- private delegate void FutureLongResultCallbackDelegate(void* target,
long futPtr, long res);
- private delegate void FutureDoubleResultCallbackDelegate(void* target,
long futPtr, double res);
- private delegate void FutureObjectResultCallbackDelegate(void* target,
long futPtr, long memPtr);
- private delegate void FutureNullResultCallbackDelegate(void* target,
long futPtr);
- private delegate void FutureErrorCallbackDelegate(void* target, long
futPtr, long memPtr);
-
- private delegate void LifecycleOnEventCallbackDelegate(void* target,
long ptr, int evt);
-
- private delegate void MemoryReallocateCallbackDelegate(void* target,
long memPtr, int cap);
-
- private delegate long MessagingFilterCreateCallbackDelegate(void*
target, long memPtr);
- private delegate int MessagingFilterApplyCallbackDelegate(void*
target, long ptr, long memPtr);
- private delegate void MessagingFilterDestroyCallbackDelegate(void*
target, long ptr);
-
- private delegate long EventFilterCreateCallbackDelegate(void* target,
long memPtr);
- private delegate int EventFilterApplyCallbackDelegate(void* target,
long ptr, long memPtr);
- private delegate void EventFilterDestroyCallbackDelegate(void* target,
long ptr);
-
- private delegate long ServiceInitCallbackDelegate(void* target, long
memPtr);
- private delegate void ServiceExecuteCallbackDelegate(void* target,
long svcPtr, long memPtr);
- private delegate void ServiceCancelCallbackDelegate(void* target, long
svcPtr, long memPtr);
- private delegate void ServiceInvokeMethodCallbackDelegate(void*
target, long svcPtr, long inMemPtr, long outMemPtr);
-
- private delegate int ClusterNodeFilterApplyCallbackDelegate(void*
target, long memPtr);
-
- private delegate void NodeInfoCallbackDelegate(void* target, long
memPtr);
-
- private delegate void OnStartCallbackDelegate(void* target, void*
proc, long memPtr);
- private delegate void OnStopCallbackDelegate(void* target);
-
private delegate void ErrorCallbackDelegate(void* target, int errType,
sbyte* errClsChars, int errClsCharsLen, sbyte* errMsgChars, int errMsgCharsLen,
sbyte* stackTraceChars, int stackTraceCharsLen, void* errData, int errDataLen);
- private delegate long ExtensionCallbackInLongOutLongDelegate(void*
target, int typ, long arg1);
- private delegate long ExtensionCallbackInLongLongOutLongDelegate(void*
target, int typ, long arg1, long arg2);
-
- private delegate void OnClientDisconnectedDelegate(void* target);
- private delegate void OnClientReconnectedDelegate(void* target, bool
clusterRestarted);
-
private delegate void LoggerLogDelegate(void* target, int level,
sbyte* messageChars, int messageCharsLen, sbyte* categoryChars, int
categoryCharsLen, sbyte* errorInfoChars, int errorInfoCharsLen, long memPtr);
private delegate bool LoggerIsLevelEnabledDelegate(void* target, int
level);
- private delegate long AffinityFunctionInitDelegate(void* target, long
memPtr, void* baseFunc);
- private delegate int AffinityFunctionPartitionDelegate(void* target,
long ptr, long memPtr);
- private delegate void AffinityFunctionAssignPartitionsDelegate(void*
target, long ptr, long inMemPtr, long outMemPtr);
- private delegate void AffinityFunctionRemoveNodeDelegate(void* target,
long ptr, long memPtr);
- private delegate void AffinityFunctionDestroyDelegate(void* target,
long ptr);
-
private delegate void ConsoleWriteDelegate(sbyte* chars, int charsLen,
bool isErr);
+ private delegate long InLongOutLongDelegate(void* target, int type,
long val);
+ private delegate long InLongLongLongObjectOutLongDelegate(void*
target, int type, long val1, long val2, long val3, void* arg);
+
+ private delegate long InLongOutLongFunc(long val);
+ private delegate long InLongLongLongObjectOutLongFunc(long val1, long
val2, long val3, void* arg);
+
/// <summary>
/// Constructor.
/// </summary>
@@ -204,89 +139,13 @@ namespace Apache.Ignite.Core.Impl.Unmanaged
{
target = IntPtr.Zero.ToPointer(), // Target is not used in
.Net as we rely on dynamic FP creation.
- cacheStoreCreate =
CreateFunctionPointer((CacheStoreCreateCallbackDelegate) CacheStoreCreate),
- cacheStoreInvoke =
CreateFunctionPointer((CacheStoreInvokeCallbackDelegate) CacheStoreInvoke),
- cacheStoreDestroy =
CreateFunctionPointer((CacheStoreDestroyCallbackDelegate) CacheStoreDestroy),
-
- cacheStoreSessionCreate =
CreateFunctionPointer((CacheStoreSessionCreateCallbackDelegate)
CacheStoreSessionCreate),
-
- cacheEntryFilterCreate =
CreateFunctionPointer((CacheEntryFilterCreateCallbackDelegate)CacheEntryFilterCreate),
- cacheEntryFilterApply =
CreateFunctionPointer((CacheEntryFilterApplyCallbackDelegate)CacheEntryFilterApply),
- cacheEntryFilterDestroy =
CreateFunctionPointer((CacheEntryFilterDestroyCallbackDelegate)CacheEntryFilterDestroy),
-
- cacheInvoke =
CreateFunctionPointer((CacheInvokeCallbackDelegate) CacheInvoke),
-
- computeTaskMap =
CreateFunctionPointer((ComputeTaskMapCallbackDelegate) ComputeTaskMap),
- computeTaskJobResult =
-
CreateFunctionPointer((ComputeTaskJobResultCallbackDelegate)
ComputeTaskJobResult),
- computeTaskReduce =
CreateFunctionPointer((ComputeTaskReduceCallbackDelegate) ComputeTaskReduce),
- computeTaskComplete =
CreateFunctionPointer((ComputeTaskCompleteCallbackDelegate)
ComputeTaskComplete),
- computeJobSerialize =
CreateFunctionPointer((ComputeJobSerializeCallbackDelegate)
ComputeJobSerialize),
- computeJobCreate =
CreateFunctionPointer((ComputeJobCreateCallbackDelegate) ComputeJobCreate),
- computeJobExecute =
CreateFunctionPointer((ComputeJobExecuteCallbackDelegate) ComputeJobExecute),
- computeJobCancel =
CreateFunctionPointer((ComputeJobCancelCallbackDelegate) ComputeJobCancel),
- computeJobDestroy =
CreateFunctionPointer((ComputeJobDestroyCallbackDelegate) ComputeJobDestroy),
- continuousQueryListenerApply =
-
CreateFunctionPointer((ContinuousQueryListenerApplyCallbackDelegate)
ContinuousQueryListenerApply),
- continuousQueryFilterCreate =
-
CreateFunctionPointer((ContinuousQueryFilterCreateCallbackDelegate)
ContinuousQueryFilterCreate),
- continuousQueryFilterApply =
-
CreateFunctionPointer((ContinuousQueryFilterApplyCallbackDelegate)
ContinuousQueryFilterApply),
- continuousQueryFilterRelease =
-
CreateFunctionPointer((ContinuousQueryFilterReleaseCallbackDelegate)
ContinuousQueryFilterRelease),
- dataStreamerTopologyUpdate =
-
CreateFunctionPointer((DataStreamerTopologyUpdateCallbackDelegate)
DataStreamerTopologyUpdate),
- dataStreamerStreamReceiverInvoke =
-
CreateFunctionPointer((DataStreamerStreamReceiverInvokeCallbackDelegate)
DataStreamerStreamReceiverInvoke),
-
- futureByteResult =
CreateFunctionPointer((FutureByteResultCallbackDelegate) FutureByteResult),
- futureBoolResult =
CreateFunctionPointer((FutureBoolResultCallbackDelegate) FutureBoolResult),
- futureShortResult =
CreateFunctionPointer((FutureShortResultCallbackDelegate) FutureShortResult),
- futureCharResult =
CreateFunctionPointer((FutureCharResultCallbackDelegate) FutureCharResult),
- futureIntResult =
CreateFunctionPointer((FutureIntResultCallbackDelegate) FutureIntResult),
- futureFloatResult =
CreateFunctionPointer((FutureFloatResultCallbackDelegate) FutureFloatResult),
- futureLongResult =
CreateFunctionPointer((FutureLongResultCallbackDelegate) FutureLongResult),
- futureDoubleResult =
CreateFunctionPointer((FutureDoubleResultCallbackDelegate) FutureDoubleResult),
- futureObjectResult =
CreateFunctionPointer((FutureObjectResultCallbackDelegate) FutureObjectResult),
- futureNullResult =
CreateFunctionPointer((FutureNullResultCallbackDelegate) FutureNullResult),
- futureError =
CreateFunctionPointer((FutureErrorCallbackDelegate) FutureError),
- lifecycleOnEvent =
CreateFunctionPointer((LifecycleOnEventCallbackDelegate) LifecycleOnEvent),
- memoryReallocate =
CreateFunctionPointer((MemoryReallocateCallbackDelegate) MemoryReallocate),
- nodeInfo = CreateFunctionPointer((NodeInfoCallbackDelegate)
NodeInfo),
-
- messagingFilterCreate =
CreateFunctionPointer((MessagingFilterCreateCallbackDelegate)MessagingFilterCreate),
- messagingFilterApply =
CreateFunctionPointer((MessagingFilterApplyCallbackDelegate)MessagingFilterApply),
- messagingFilterDestroy =
CreateFunctionPointer((MessagingFilterDestroyCallbackDelegate)MessagingFilterDestroy),
-
- eventFilterCreate =
CreateFunctionPointer((EventFilterCreateCallbackDelegate)EventFilterCreate),
- eventFilterApply =
CreateFunctionPointer((EventFilterApplyCallbackDelegate)EventFilterApply),
- eventFilterDestroy =
CreateFunctionPointer((EventFilterDestroyCallbackDelegate)EventFilterDestroy),
-
- serviceInit =
CreateFunctionPointer((ServiceInitCallbackDelegate)ServiceInit),
- serviceExecute =
CreateFunctionPointer((ServiceExecuteCallbackDelegate)ServiceExecute),
- serviceCancel =
CreateFunctionPointer((ServiceCancelCallbackDelegate)ServiceCancel),
- serviceInvokeMethod =
CreateFunctionPointer((ServiceInvokeMethodCallbackDelegate)ServiceInvokeMethod),
-
- clusterNodeFilterApply =
CreateFunctionPointer((ClusterNodeFilterApplyCallbackDelegate)ClusterNodeFilterApply),
-
- onStart =
CreateFunctionPointer((OnStartCallbackDelegate)OnStart),
- onStop = CreateFunctionPointer((OnStopCallbackDelegate)OnStop),
error = CreateFunctionPointer((ErrorCallbackDelegate)Error),
- extensionCbInLongOutLong =
CreateFunctionPointer((ExtensionCallbackInLongOutLongDelegate)ExtensionCallbackInLongOutLong),
- extensionCbInLongLongOutLong =
CreateFunctionPointer((ExtensionCallbackInLongLongOutLongDelegate)ExtensionCallbackInLongLongOutLong),
-
- onClientDisconnected =
CreateFunctionPointer((OnClientDisconnectedDelegate)OnClientDisconnected),
- ocClientReconnected =
CreateFunctionPointer((OnClientReconnectedDelegate)OnClientReconnected),
-
- affinityFunctionInit =
CreateFunctionPointer((AffinityFunctionInitDelegate)AffinityFunctionInit),
- affinityFunctionPartition =
CreateFunctionPointer((AffinityFunctionPartitionDelegate)AffinityFunctionPartition),
- affinityFunctionAssignPartitions =
CreateFunctionPointer((AffinityFunctionAssignPartitionsDelegate)AffinityFunctionAssignPartitions),
- affinityFunctionRemoveNode =
CreateFunctionPointer((AffinityFunctionRemoveNodeDelegate)AffinityFunctionRemoveNode),
- affinityFunctionDestroy =
CreateFunctionPointer((AffinityFunctionDestroyDelegate)AffinityFunctionDestroy),
-
loggerLog =
CreateFunctionPointer((LoggerLogDelegate)LoggerLog),
- loggerIsLevelEnabled =
CreateFunctionPointer((LoggerIsLevelEnabledDelegate)LoggerIsLevelEnabled)
+ loggerIsLevelEnabled =
CreateFunctionPointer((LoggerIsLevelEnabledDelegate)LoggerIsLevelEnabled),
+
+ inLongOutLong =
CreateFunctionPointer((InLongOutLongDelegate)InLongOutLong),
+ inLongLongObjectOutLong =
CreateFunctionPointer((InLongLongLongObjectOutLongDelegate)InLongLongLongObjectOutLong)
};
_cbsPtr = Marshal.AllocHGlobal(UU.HandlersSize());
@@ -294,6 +153,8 @@ namespace Apache.Ignite.Core.Impl.Unmanaged
Marshal.StructureToPtr(cbs, _cbsPtr, false);
_thisHnd = GCHandle.Alloc(this);
+
+ InitHandlers();
}
/// <summary>
@@ -304,106 +165,257 @@ namespace Apache.Ignite.Core.Impl.Unmanaged
get { return _handleRegistry; }
}
- #region IMPLEMENTATION: CACHE
+ #region HANDLERS
+
+ /// <summary>
+ /// Initializes the handlers.
+ /// </summary>
+ private void InitHandlers()
+ {
+ AddHandler(UnmanagedCallbackOp.CacheStoreCreate, CacheStoreCreate,
true);
+ AddHandler(UnmanagedCallbackOp.CacheStoreInvoke, CacheStoreInvoke);
+ AddHandler(UnmanagedCallbackOp.CacheStoreDestroy,
CacheStoreDestroy);
+ AddHandler(UnmanagedCallbackOp.CacheStoreSessionCreate,
CacheStoreSessionCreate);
+ AddHandler(UnmanagedCallbackOp.CacheEntryFilterCreate,
CacheEntryFilterCreate);
+ AddHandler(UnmanagedCallbackOp.CacheEntryFilterApply,
CacheEntryFilterApply);
+ AddHandler(UnmanagedCallbackOp.CacheEntryFilterDestroy,
CacheEntryFilterDestroy);
+ AddHandler(UnmanagedCallbackOp.CacheInvoke, CacheInvoke);
+ AddHandler(UnmanagedCallbackOp.ComputeTaskMap, ComputeTaskMap);
+ AddHandler(UnmanagedCallbackOp.ComputeTaskJobResult,
ComputeTaskJobResult);
+ AddHandler(UnmanagedCallbackOp.ComputeTaskReduce,
ComputeTaskReduce);
+ AddHandler(UnmanagedCallbackOp.ComputeTaskComplete,
ComputeTaskComplete);
+ AddHandler(UnmanagedCallbackOp.ComputeJobSerialize,
ComputeJobSerialize);
+ AddHandler(UnmanagedCallbackOp.ComputeJobCreate, ComputeJobCreate);
+ AddHandler(UnmanagedCallbackOp.ComputeJobExecute,
ComputeJobExecute);
+ AddHandler(UnmanagedCallbackOp.ComputeJobCancel, ComputeJobCancel);
+ AddHandler(UnmanagedCallbackOp.ComputeJobDestroy,
ComputeJobDestroy);
+ AddHandler(UnmanagedCallbackOp.ContinuousQueryListenerApply,
ContinuousQueryListenerApply);
+ AddHandler(UnmanagedCallbackOp.ContinuousQueryFilterCreate,
ContinuousQueryFilterCreate);
+ AddHandler(UnmanagedCallbackOp.ContinuousQueryFilterApply,
ContinuousQueryFilterApply);
+ AddHandler(UnmanagedCallbackOp.ContinuousQueryFilterRelease,
ContinuousQueryFilterRelease);
+ AddHandler(UnmanagedCallbackOp.DataStreamerTopologyUpdate,
DataStreamerTopologyUpdate);
+ AddHandler(UnmanagedCallbackOp.DataStreamerStreamReceiverInvoke,
DataStreamerStreamReceiverInvoke);
+ AddHandler(UnmanagedCallbackOp.FutureByteResult, FutureByteResult);
+ AddHandler(UnmanagedCallbackOp.FutureBoolResult, FutureBoolResult);
+ AddHandler(UnmanagedCallbackOp.FutureShortResult,
FutureShortResult);
+ AddHandler(UnmanagedCallbackOp.FutureCharResult, FutureCharResult);
+ AddHandler(UnmanagedCallbackOp.FutureIntResult, FutureIntResult);
+ AddHandler(UnmanagedCallbackOp.FutureFloatResult,
FutureFloatResult);
+ AddHandler(UnmanagedCallbackOp.FutureLongResult, FutureLongResult);
+ AddHandler(UnmanagedCallbackOp.FutureDoubleResult,
FutureDoubleResult);
+ AddHandler(UnmanagedCallbackOp.FutureObjectResult,
FutureObjectResult);
+ AddHandler(UnmanagedCallbackOp.FutureNullResult, FutureNullResult);
+ AddHandler(UnmanagedCallbackOp.FutureError, FutureError);
+ AddHandler(UnmanagedCallbackOp.LifecycleOnEvent, LifecycleOnEvent,
true);
+ AddHandler(UnmanagedCallbackOp.MemoryReallocate, MemoryReallocate,
true);
+ AddHandler(UnmanagedCallbackOp.MessagingFilterCreate,
MessagingFilterCreate);
+ AddHandler(UnmanagedCallbackOp.MessagingFilterApply,
MessagingFilterApply);
+ AddHandler(UnmanagedCallbackOp.MessagingFilterDestroy,
MessagingFilterDestroy);
+ AddHandler(UnmanagedCallbackOp.EventFilterCreate,
EventFilterCreate);
+ AddHandler(UnmanagedCallbackOp.EventFilterApply, EventFilterApply);
+ AddHandler(UnmanagedCallbackOp.EventFilterDestroy,
EventFilterDestroy);
+ AddHandler(UnmanagedCallbackOp.ServiceInit, ServiceInit);
+ AddHandler(UnmanagedCallbackOp.ServiceExecute, ServiceExecute);
+ AddHandler(UnmanagedCallbackOp.ServiceCancel, ServiceCancel);
+ AddHandler(UnmanagedCallbackOp.ServiceInvokeMethod,
ServiceInvokeMethod);
+ AddHandler(UnmanagedCallbackOp.ClusterNodeFilterApply,
ClusterNodeFilterApply);
+ AddHandler(UnmanagedCallbackOp.NodeInfo, NodeInfo);
+ AddHandler(UnmanagedCallbackOp.OnStart, OnStart, true);
+ AddHandler(UnmanagedCallbackOp.OnStop, OnStop, true);
+ AddHandler(UnmanagedCallbackOp.ExtensionInLongLongOutLong,
ExtensionCallbackInLongLongOutLong, true);
+ AddHandler(UnmanagedCallbackOp.OnClientDisconnected,
OnClientDisconnected);
+ AddHandler(UnmanagedCallbackOp.OnClientReconnected,
OnClientReconnected);
+ AddHandler(UnmanagedCallbackOp.AffinityFunctionInit,
AffinityFunctionInit);
+ AddHandler(UnmanagedCallbackOp.AffinityFunctionPartition,
AffinityFunctionPartition);
+ AddHandler(UnmanagedCallbackOp.AffinityFunctionAssignPartitions,
AffinityFunctionAssignPartitions);
+ AddHandler(UnmanagedCallbackOp.AffinityFunctionRemoveNode,
AffinityFunctionRemoveNode);
+ AddHandler(UnmanagedCallbackOp.AffinityFunctionDestroy,
AffinityFunctionDestroy);
+ AddHandler(UnmanagedCallbackOp.ComputeTaskLocalJobResult,
ComputeTaskLocalJobResult);
+ AddHandler(UnmanagedCallbackOp.ComputeJobExecuteLocal,
ComputeJobExecuteLocal);
+ }
+
+ /// <summary>
+ /// Adds the handler.
+ /// </summary>
+ private void AddHandler(UnmanagedCallbackOp op, InLongOutLongFunc
func, bool allowUninitialized = false)
+ {
+ _inLongOutLongHandlers[(int)op] = new InLongOutLongHandler(func,
allowUninitialized);
+ }
+
+ /// <summary>
+ /// Adds the handler.
+ /// </summary>
+ private void AddHandler(UnmanagedCallbackOp op,
InLongLongLongObjectOutLongFunc func,
+ bool allowUninitialized = false)
+ {
+ _inLongLongLongObjectOutLongHandlers[(int)op]
+ = new InLongLongLongObjectOutLongHandler(func,
allowUninitialized);
+ }
+
+ #endregion
+
+ #region IMPLEMENTATION: GENERAL PURPOSE
+
+ [SuppressMessage("Microsoft.Design",
"CA1031:DoNotCatchGeneralExceptionTypes")]
+ private long InLongOutLong(void* target, int type, long val)
+ {
+ try
+ {
+ if (type < 0 || type > _inLongOutLongHandlers.Length)
+ throw GetInvalidOpError("InLongOutLong", type);
+
+ var hnd = _inLongOutLongHandlers[type];
+
+ if (hnd.Handler == null)
+ throw GetInvalidOpError("InLongOutLong", type);
- private long CacheStoreCreate(void* target, long memPtr)
+ if (!hnd.AllowUninitialized)
+ _initEvent.Wait();
+
+ return hnd.Handler(val);
+ }
+ catch (Exception e)
+ {
+ _log.Error(e, "Failure in Java callback");
+
+ UU.ThrowToJava(_ctx.NativeContext, e);
+
+ return 0;
+ }
+ }
+
+ [SuppressMessage("Microsoft.Design",
"CA1031:DoNotCatchGeneralExceptionTypes")]
+ private long InLongLongLongObjectOutLong(void* target, int type, long
val1, long val2, long val3, void* arg)
{
- return SafeCall(() =>
+ try
{
- var cacheStore = CacheStore.CreateInstance(memPtr,
_handleRegistry);
+ if (type < 0 || type >
_inLongLongLongObjectOutLongHandlers.Length)
+ throw GetInvalidOpError("InLongLongLongObjectOutLong",
type);
+
+ var hnd = _inLongLongLongObjectOutLongHandlers[type];
+
+ if (hnd.Handler == null)
+ throw GetInvalidOpError("InLongLongLongObjectOutLong",
type);
+
+ if (!hnd.AllowUninitialized)
+ _initEvent.Wait();
+
+ return hnd.Handler(val1, val2, val3, arg);
+ }
+ catch (Exception e)
+ {
+ _log.Error(e, "Failure in Java callback");
+
+ UU.ThrowToJava(_ctx.NativeContext, e);
+
+ return 0;
+ }
+ }
- if (_ignite != null)
- cacheStore.Init(_ignite);
- else
+ /// <summary>
+ /// Throws the invalid op error.
+ /// </summary>
+ private static Exception GetInvalidOpError(string method, int type)
+ {
+ return new InvalidOperationException(
+ string.Format("Invalid {0} callback code: {1}", method,
(UnmanagedCallbackOp) type));
+ }
+
+ #endregion
+
+ #region IMPLEMENTATION: CACHE
+
+ private long CacheStoreCreate(long memPtr)
+ {
+ var cacheStore = CacheStore.CreateInstance(memPtr,
_handleRegistry);
+
+ if (_ignite != null)
+ cacheStore.Init(_ignite);
+ else
+ {
+ lock (_initActions)
{
- lock (_initActions)
- {
- if (_ignite != null)
- cacheStore.Init(_ignite);
- else
- _initActions.Add(cacheStore.Init);
- }
+ if (_ignite != null)
+ cacheStore.Init(_ignite);
+ else
+ _initActions.Add(cacheStore.Init);
}
+ }
- return cacheStore.Handle;
- }, true);
+ return cacheStore.Handle;
}
[SuppressMessage("Microsoft.Design",
"CA1031:DoNotCatchGeneralExceptionTypes")]
[SuppressMessage("Microsoft.Reliability", "CA2000:Dispose objects
before losing scope")]
- private int CacheStoreInvoke(void* target, long objPtr, long memPtr)
+ private long CacheStoreInvoke(long memPtr)
{
- return SafeCall(() =>
+ using (PlatformMemoryStream stream =
IgniteManager.Memory.Get(memPtr).GetStream())
{
- var t = _handleRegistry.Get<CacheStore>(objPtr, true);
+ try
+ {
+ var store =
_handleRegistry.Get<CacheStore>(stream.ReadLong(), true);
- using (PlatformMemoryStream stream =
IgniteManager.Memory.Get(memPtr).GetStream())
+ return store.Invoke(stream, _ignite);
+ }
+ catch (Exception e)
{
- try
- {
- return t.Invoke(stream, _ignite);
- }
- catch (Exception e)
- {
- stream.Seek(0, SeekOrigin.Begin);
+ stream.Reset();
- _ignite.Marshaller.StartMarshal(stream).WriteObject(e);
+ _ignite.Marshaller.StartMarshal(stream).WriteObject(e);
- return -1;
- }
+ return -1;
}
- });
+ }
}
- private void CacheStoreDestroy(void* target, long objPtr)
+ private long CacheStoreDestroy(long objPtr)
{
- SafeCall(() => _ignite.HandleRegistry.Release(objPtr));
+ _ignite.HandleRegistry.Release(objPtr);
+
+ return 0;
}
- private long CacheStoreSessionCreate(void* target, long storePtr)
+ private long CacheStoreSessionCreate(long val)
{
- return SafeCall(() => _ignite.HandleRegistry.Allocate(new
CacheStoreSession()));
+ return _ignite.HandleRegistry.Allocate(new CacheStoreSession());
}
- private long CacheEntryFilterCreate(void* target, long memPtr)
+ private long CacheEntryFilterCreate(long memPtr)
{
- return SafeCall(() =>
_handleRegistry.Allocate(CacheEntryFilterHolder.CreateInstance(memPtr,
_ignite)));
+ return
_handleRegistry.Allocate(CacheEntryFilterHolder.CreateInstance(memPtr,
_ignite));
}
- private int CacheEntryFilterApply(void* target, long objPtr, long
memPtr)
+ private long CacheEntryFilterApply(long memPtr)
{
- return SafeCall(() =>
+ using (PlatformMemoryStream stream =
IgniteManager.Memory.Get(memPtr).GetStream())
{
- var t =
_ignite.HandleRegistry.Get<CacheEntryFilterHolder>(objPtr);
+ var t =
_ignite.HandleRegistry.Get<CacheEntryFilterHolder>(stream.ReadLong());
- using (PlatformMemoryStream stream =
IgniteManager.Memory.Get(memPtr).GetStream())
- {
- return t.Invoke(stream);
- }
- });
+ return t.Invoke(stream);
+ }
}
- private void CacheEntryFilterDestroy(void* target, long objPtr)
+ private long CacheEntryFilterDestroy(long objPtr)
{
- SafeCall(() => _ignite.HandleRegistry.Release(objPtr));
+ _ignite.HandleRegistry.Release(objPtr);
+
+ return 0;
}
- private void CacheInvoke(void* target, long inMemPtr, long outMemPtr)
+ private long CacheInvoke(long memPtr)
{
- SafeCall(() =>
+ using (PlatformMemoryStream stream =
IgniteManager.Memory.Get(memPtr).GetStream())
{
- using (PlatformMemoryStream inStream =
IgniteManager.Memory.Get(inMemPtr).GetStream())
- {
- var result = ReadAndRunCacheEntryProcessor(inStream,
_ignite);
+ var result = ReadAndRunCacheEntryProcessor(stream, _ignite);
- using (PlatformMemoryStream outStream =
IgniteManager.Memory.Get(outMemPtr).GetStream())
- {
- result.Write(outStream, _ignite.Marshaller);
+ stream.Reset();
- outStream.SynchronizeOutput();
- }
- }
- });
+ result.Write(stream, _ignite.Marshaller);
+
+ stream.SynchronizeOutput();
+ }
+
+ return 0;
}
/// <summary>
@@ -432,122 +444,110 @@ namespace Apache.Ignite.Core.Impl.Unmanaged
#region IMPLEMENTATION: COMPUTE
- private void ComputeTaskMap(void* target, long taskPtr, long inMemPtr,
long outMemPtr)
+ private long ComputeTaskMap(long memPtr)
{
- SafeCall(() =>
+ using (PlatformMemoryStream stream =
IgniteManager.Memory.Get(memPtr).GetStream())
{
- using (PlatformMemoryStream inStream =
IgniteManager.Memory.Get(inMemPtr).GetStream())
- {
- using (PlatformMemoryStream outStream =
IgniteManager.Memory.Get(outMemPtr).GetStream())
- {
- Task(taskPtr).Map(inStream, outStream);
- }
- }
- });
+ Task(stream.ReadLong()).Map(stream);
+
+ return 0;
+ }
+ }
+
+ private long ComputeTaskLocalJobResult(long taskPtr, long jobPtr, long
unused, void* arg)
+ {
+ return Task(taskPtr).JobResultLocal(Job(jobPtr));
}
- private int ComputeTaskJobResult(void* target, long taskPtr, long
jobPtr, long memPtr)
+ private long ComputeTaskJobResult(long memPtr)
{
- return SafeCall(() =>
+ using (var stream = IgniteManager.Memory.Get(memPtr).GetStream())
{
- var task = Task(taskPtr);
+ var task = Task(stream.ReadLong());
- if (memPtr == 0)
- {
- return task.JobResultLocal(Job(jobPtr));
- }
+ var job = Job(stream.ReadLong());
- using (var stream =
IgniteManager.Memory.Get(memPtr).GetStream())
- {
- return task.JobResultRemote(Job(jobPtr), stream);
- }
- });
+ return task.JobResultRemote(job, stream);
+ }
}
- private void ComputeTaskReduce(void* target, long taskPtr)
+ private long ComputeTaskReduce(long taskPtr)
{
- SafeCall(() =>
- {
- var task = _handleRegistry.Get<IComputeTaskHolder>(taskPtr,
true);
+ _handleRegistry.Get<IComputeTaskHolder>(taskPtr, true).Reduce();
- task.Reduce();
- });
+ return 0;
}
- private void ComputeTaskComplete(void* target, long taskPtr, long
memPtr)
+ private long ComputeTaskComplete(long taskPtr, long memPtr, long
unused, void* arg)
{
- SafeCall(() =>
- {
- var task = _handleRegistry.Get<IComputeTaskHolder>(taskPtr,
true);
+ var task = _handleRegistry.Get<IComputeTaskHolder>(taskPtr, true);
- if (memPtr == 0)
- task.Complete(taskPtr);
- else
+ if (memPtr == 0)
+ task.Complete(taskPtr);
+ else
+ {
+ using (PlatformMemoryStream stream =
IgniteManager.Memory.Get(memPtr).GetStream())
{
- using (PlatformMemoryStream stream =
IgniteManager.Memory.Get(memPtr).GetStream())
- {
- task.CompleteWithError(taskPtr, stream);
- }
+ task.CompleteWithError(taskPtr, stream);
}
- });
+ }
+
+ return 0;
}
- private int ComputeJobSerialize(void* target, long jobPtr, long memPtr)
+ private long ComputeJobSerialize(long jobPtr, long memPtr, long
unused, void* arg)
{
- return SafeCall(() =>
+ using (PlatformMemoryStream stream =
IgniteManager.Memory.Get(memPtr).GetStream())
{
- using (PlatformMemoryStream stream =
IgniteManager.Memory.Get(memPtr).GetStream())
- {
- return Job(jobPtr).Serialize(stream) ? 1 : 0;
- }
- });
+ return Job(jobPtr).Serialize(stream) ? 1 : 0;
+ }
}
- private long ComputeJobCreate(void* target, long memPtr)
+ private long ComputeJobCreate(long memPtr)
{
- return SafeCall(() =>
+ using (PlatformMemoryStream stream =
IgniteManager.Memory.Get(memPtr).GetStream())
{
- using (PlatformMemoryStream stream =
IgniteManager.Memory.Get(memPtr).GetStream())
- {
- ComputeJobHolder job = ComputeJobHolder.CreateJob(_ignite,
stream);
+ ComputeJobHolder job = ComputeJobHolder.CreateJob(_ignite,
stream);
- return _handleRegistry.Allocate(job);
- }
- });
+ return _handleRegistry.Allocate(job);
+ }
}
- private void ComputeJobExecute(void* target, long jobPtr, int cancel,
long memPtr)
+ private long ComputeJobExecuteLocal(long jobPtr, long cancel, long
unused, void* arg)
{
- SafeCall(() =>
- {
- var job = Job(jobPtr);
+ Job(jobPtr).ExecuteLocal(cancel == 1);
- if (memPtr == 0)
- job.ExecuteLocal(cancel == 1);
- else
- {
- using (PlatformMemoryStream stream =
IgniteManager.Memory.Get(memPtr).GetStream())
- {
- job.ExecuteRemote(stream, cancel == 1);
- }
- }
- });
+ return 0;
}
- private void ComputeJobCancel(void* target, long jobPtr)
+ private long ComputeJobExecute(long memPtr)
{
- SafeCall(() =>
+ using (PlatformMemoryStream stream =
IgniteManager.Memory.Get(memPtr).GetStream())
{
- Job(jobPtr).Cancel();
- });
+ var job = Job(stream.ReadLong());
+
+ var cancel = stream.ReadBool();
+
+ stream.Reset();
+
+ job.ExecuteRemote(stream, cancel);
+ }
+
+ return 0;
}
- private void ComputeJobDestroy(void* target, long jobPtr)
+ private long ComputeJobCancel(long jobPtr)
{
- SafeCall(() =>
- {
- _handleRegistry.Release(jobPtr);
- });
+ Job(jobPtr).Cancel();
+
+ return 0;
+ }
+
+ private long ComputeJobDestroy(long jobPtr)
+ {
+ _handleRegistry.Release(jobPtr);
+
+ return 0;
}
/// <summary>
@@ -574,219 +574,178 @@ namespace Apache.Ignite.Core.Impl.Unmanaged
#region IMPLEMENTATION: CONTINUOUS QUERY
- private void ContinuousQueryListenerApply(void* target, long lsnrPtr,
long memPtr)
+ private long ContinuousQueryListenerApply(long memPtr)
{
- SafeCall(() =>
+ using (var stream = IgniteManager.Memory.Get(memPtr).GetStream())
{
- var hnd =
_handleRegistry.Get<IContinuousQueryHandleImpl>(lsnrPtr);
+ var hnd =
_handleRegistry.Get<IContinuousQueryHandleImpl>(stream.ReadLong());
- using (var stream =
IgniteManager.Memory.Get(memPtr).GetStream())
- {
- hnd.Apply(stream);
- }
- });
+ hnd.Apply(stream);
+
+ return 0;
+ }
}
[SuppressMessage("ReSharper", "PossibleNullReferenceException")]
- private long ContinuousQueryFilterCreate(void* target, long memPtr)
+ private long ContinuousQueryFilterCreate(long memPtr)
{
- return SafeCall(() =>
+ // 1. Unmarshal filter holder.
+ using (var stream = IgniteManager.Memory.Get(memPtr).GetStream())
{
- // 1. Unmarshal filter holder.
- using (var stream =
IgniteManager.Memory.Get(memPtr).GetStream())
- {
- var reader = _ignite.Marshaller.StartUnmarshal(stream);
+ var reader = _ignite.Marshaller.StartUnmarshal(stream);
- var filterHolder =
reader.ReadObject<ContinuousQueryFilterHolder>();
+ var filterHolder =
reader.ReadObject<ContinuousQueryFilterHolder>();
- // 2. Create real filter from it's holder.
- var filter =
(IContinuousQueryFilter)DelegateTypeDescriptor.GetContinuousQueryFilterCtor(
- filterHolder.Filter.GetType())(filterHolder.Filter,
filterHolder.KeepBinary);
+ // 2. Create real filter from it's holder.
+ var filter = (IContinuousQueryFilter)
DelegateTypeDescriptor.GetContinuousQueryFilterCtor(
+ filterHolder.Filter.GetType())(filterHolder.Filter,
filterHolder.KeepBinary);
- // 3. Inject grid.
- filter.Inject(_ignite);
+ // 3. Inject grid.
+ filter.Inject(_ignite);
- // 4. Allocate GC handle.
- return filter.Allocate();
- }
- });
+ // 4. Allocate GC handle.
+ return filter.Allocate();
+ }
}
- private int ContinuousQueryFilterApply(void* target, long filterPtr,
long memPtr)
+ private long ContinuousQueryFilterApply(long memPtr)
{
- return SafeCall(() =>
+ using (var stream = IgniteManager.Memory.Get(memPtr).GetStream())
{
- var holder =
_handleRegistry.Get<IContinuousQueryFilter>(filterPtr);
+ var holder =
_handleRegistry.Get<IContinuousQueryFilter>(stream.ReadLong());
- using (var stream =
IgniteManager.Memory.Get(memPtr).GetStream())
- {
- return holder.Evaluate(stream) ? 1 : 0;
- }
- });
+ return holder.Evaluate(stream) ? 1 : 0;
+ }
}
- private void ContinuousQueryFilterRelease(void* target, long filterPtr)
+ private long ContinuousQueryFilterRelease(long filterPtr)
{
- SafeCall(() =>
- {
- var holder =
_handleRegistry.Get<IContinuousQueryFilter>(filterPtr);
+ var holder =
_handleRegistry.Get<IContinuousQueryFilter>(filterPtr);
- holder.Release();
- });
+ holder.Release();
+
+ return 0;
}
#endregion
#region IMPLEMENTATION: DATA STREAMER
- private void DataStreamerTopologyUpdate(void* target, long ldrPtr,
long topVer, int topSize)
+ private long DataStreamerTopologyUpdate(long ldrPtr, long topVer, long
topSize, void* unused)
{
- SafeCall(() =>
- {
- var ldrRef = _handleRegistry.Get<WeakReference>(ldrPtr);
+ var ldrRef = _handleRegistry.Get<WeakReference>(ldrPtr);
- if (ldrRef == null)
- return;
+ if (ldrRef == null)
+ return 0;
- var ldr = ldrRef.Target as IDataStreamer;
+ var ldr = ldrRef.Target as IDataStreamer;
- if (ldr != null)
- ldr.TopologyChange(topVer, topSize);
- else
- _handleRegistry.Release(ldrPtr, true);
- });
+ if (ldr != null)
+ ldr.TopologyChange(topVer, (int) topSize);
+ else
+ _handleRegistry.Release(ldrPtr, true);
+
+ return 0;
}
[SuppressMessage("Microsoft.Reliability", "CA2000:Dispose objects
before losing scope")]
- private void DataStreamerStreamReceiverInvoke(void* target, long
rcvPtr, void* cache, long memPtr,
- byte keepBinary)
+ private long DataStreamerStreamReceiverInvoke(long memPtr, long
unused, long unused1, void* cache)
{
- SafeCall(() =>
+ using (var stream = IgniteManager.Memory.Get(memPtr).GetStream())
{
- using (var stream =
IgniteManager.Memory.Get(memPtr).GetStream())
- {
- var reader = _ignite.Marshaller.StartUnmarshal(stream,
BinaryMode.ForceBinary);
+ var rcvPtr = stream.ReadLong();
- var binaryReceiver = reader.ReadObject<BinaryObject>();
+ var keepBinary = stream.ReadBool();
- var receiver =
_handleRegistry.Get<StreamReceiverHolder>(rcvPtr) ??
-
binaryReceiver.Deserialize<StreamReceiverHolder>();
+ var reader = _ignite.Marshaller.StartUnmarshal(stream,
BinaryMode.ForceBinary);
- if (receiver != null)
- receiver.Receive(_ignite, new
UnmanagedNonReleaseableTarget(_ctx, cache), stream,
- keepBinary != 0);
- }
- });
+ var binaryReceiver = reader.ReadObject<BinaryObject>();
+
+ var receiver =
_handleRegistry.Get<StreamReceiverHolder>(rcvPtr) ??
+
binaryReceiver.Deserialize<StreamReceiverHolder>();
+
+ if (receiver != null)
+ receiver.Receive(_ignite, new
UnmanagedNonReleaseableTarget(_ctx, cache), stream, keepBinary);
+
+ return 0;
+ }
}
#endregion
#region IMPLEMENTATION: FUTURES
- private void FutureByteResult(void* target, long futPtr, int res)
+ private long FutureByteResult(long futPtr, long res, long unused,
void* arg)
{
- SafeCall(() =>
- {
- ProcessFuture<byte>(futPtr, fut => { fut.OnResult((byte)res);
});
- });
+ return ProcessFuture<byte>(futPtr, fut => { fut.OnResult((byte)
res); });
}
- private void FutureBoolResult(void* target, long futPtr, int res)
+ private long FutureBoolResult(long futPtr, long res, long unused,
void* arg)
{
- SafeCall(() =>
- {
- ProcessFuture<bool>(futPtr, fut => { fut.OnResult(res == 1);
});
- });
+ return ProcessFuture<bool>(futPtr, fut => { fut.OnResult(res ==
1); });
}
- private void FutureShortResult(void* target, long futPtr, int res)
+ private long FutureShortResult(long futPtr, long res, long unused,
void* arg)
{
- SafeCall(() =>
- {
- ProcessFuture<short>(futPtr, fut => {
fut.OnResult((short)res); });
- });
+ return ProcessFuture<short>(futPtr, fut => {
fut.OnResult((short)res); });
}
- private void FutureCharResult(void* target, long futPtr, int res)
+ private long FutureCharResult(long futPtr, long res, long unused,
void* arg)
{
- SafeCall(() =>
- {
- ProcessFuture<char>(futPtr, fut => { fut.OnResult((char)res);
});
- });
+ return ProcessFuture<char>(futPtr, fut => {
fut.OnResult((char)res); });
}
- private void FutureIntResult(void* target, long futPtr, int res)
+ private long FutureIntResult(long futPtr, long res, long unused, void*
arg)
{
- SafeCall(() =>
- {
- ProcessFuture<int>(futPtr, fut => { fut.OnResult(res); });
- });
+ return ProcessFuture<int>(futPtr, fut => { fut.OnResult((int)
res); });
}
- private void FutureFloatResult(void* target, long futPtr, float res)
+ private long FutureFloatResult(long futPtr, long res, long unused,
void* arg)
{
- SafeCall(() =>
- {
- ProcessFuture<float>(futPtr, fut => { fut.OnResult(res); });
- });
+ return ProcessFuture<float>(futPtr, fut => {
fut.OnResult(BinaryUtils.IntToFloatBits((int) res)); });
}
- private void FutureLongResult(void* target, long futPtr, long res)
+ private long FutureLongResult(long futPtr, long res, long unused,
void* arg)
{
- SafeCall(() =>
- {
- ProcessFuture<long>(futPtr, fut => { fut.OnResult(res); });
- });
+ return ProcessFuture<long>(futPtr, fut => { fut.OnResult(res); });
}
- private void FutureDoubleResult(void* target, long futPtr, double res)
+ private long FutureDoubleResult(long futPtr, long res, long unused,
void* arg)
{
- SafeCall(() =>
- {
- ProcessFuture<double>(futPtr, fut => { fut.OnResult(res); });
- });
+ return ProcessFuture<double>(futPtr, fut => {
fut.OnResult(BinaryUtils.LongToDoubleBits(res)); });
}
- private void FutureObjectResult(void* target, long futPtr, long memPtr)
+ private long FutureObjectResult(long futPtr, long memPtr, long unused,
void* arg)
{
- SafeCall(() =>
+ return ProcessFuture(futPtr, fut =>
{
- ProcessFuture(futPtr, fut =>
+ using (var stream =
IgniteManager.Memory.Get(memPtr).GetStream())
{
- using (var stream =
IgniteManager.Memory.Get(memPtr).GetStream())
- {
- fut.OnResult(stream);
- }
- });
+ fut.OnResult(stream);
+ }
});
}
- private void FutureNullResult(void* target, long futPtr)
+ private long FutureNullResult(long futPtr)
{
- SafeCall(() =>
- {
- ProcessFuture(futPtr, fut => { fut.OnNullResult(); });
- });
+ return ProcessFuture(futPtr, fut => { fut.OnNullResult(); });
}
- private void FutureError(void* target, long futPtr, long memPtr)
+ private long FutureError(long futPtr, long memPtr, long unused, void*
arg)
{
- SafeCall(() =>
+ using (var stream = IgniteManager.Memory.Get(memPtr).GetStream())
{
- using (var stream =
IgniteManager.Memory.Get(memPtr).GetStream())
- {
- var reader = _ignite.Marshaller.StartUnmarshal(stream);
+ var reader = _ignite.Marshaller.StartUnmarshal(stream);
- string errCls = reader.ReadString();
- string errMsg = reader.ReadString();
- string stackTrace = reader.ReadString();
- Exception inner = reader.ReadBoolean() ?
reader.ReadObject<Exception>() : null;
+ string errCls = reader.ReadString();
+ string errMsg = reader.ReadString();
+ string stackTrace = reader.ReadString();
+ Exception inner = reader.ReadBoolean() ?
reader.ReadObject<Exception>() : null;
- Exception err = ExceptionUtils.GetException(_ignite,
errCls, errMsg, stackTrace, reader, inner);
+ Exception err = ExceptionUtils.GetException(_ignite, errCls,
errMsg, stackTrace, reader, inner);
- ProcessFuture(futPtr, fut => { fut.OnError(err); });
- }
- });
+ return ProcessFuture(futPtr, fut => { fut.OnError(err); });
+ }
}
/// <summary>
@@ -794,11 +753,13 @@ namespace Apache.Ignite.Core.Impl.Unmanaged
/// </summary>
/// <param name="futPtr">Future pointer.</param>
/// <param name="action">Action.</param>
- private void ProcessFuture(long futPtr, Action<IFutureInternal> action)
+ private long ProcessFuture(long futPtr, Action<IFutureInternal> action)
{
try
{
action(_handleRegistry.Get<IFutureInternal>(futPtr, true));
+
+ return 0;
}
finally
{
@@ -811,11 +772,13 @@ namespace Apache.Ignite.Core.Impl.Unmanaged
/// </summary>
/// <param name="futPtr">Future pointer.</param>
/// <param name="action">Action.</param>
- private void ProcessFuture<T>(long futPtr, Action<Future<T>> action)
+ private long ProcessFuture<T>(long futPtr, Action<Future<T>> action)
{
try
{
action(_handleRegistry.Get<Future<T>>(futPtr, true));
+
+ return 0;
}
finally
{
@@ -827,257 +790,230 @@ namespace Apache.Ignite.Core.Impl.Unmanaged
#region IMPLEMENTATION: LIFECYCLE
- private void LifecycleOnEvent(void* target, long ptr, int evt)
+ private long LifecycleOnEvent(long ptr, long evt, long unused, void*
arg)
{
- SafeCall(() =>
- {
- var bean = _handleRegistry.Get<LifecycleBeanHolder>(ptr);
+ var bean = _handleRegistry.Get<LifecycleBeanHolder>(ptr);
- bean.OnLifecycleEvent((LifecycleEventType)evt);
- }, true);
+ bean.OnLifecycleEvent((LifecycleEventType) evt);
+
+ return 0;
}
#endregion
#region IMPLEMENTATION: MESSAGING
- private long MessagingFilterCreate(void* target, long memPtr)
+ private long MessagingFilterCreate(long memPtr)
{
- return SafeCall(() =>
- {
- MessageListenerHolder holder =
MessageListenerHolder.CreateRemote(_ignite, memPtr);
+ MessageListenerHolder holder =
MessageListenerHolder.CreateRemote(_ignite, memPtr);
- return _ignite.HandleRegistry.AllocateSafe(holder);
- });
+ return _ignite.HandleRegistry.AllocateSafe(holder);
}
- private int MessagingFilterApply(void* target, long ptr, long memPtr)
+ private long MessagingFilterApply(long ptr, long memPtr, long unused,
void* arg)
{
- return SafeCall(() =>
- {
- var holder =
_ignite.HandleRegistry.Get<MessageListenerHolder>(ptr, false);
+ var holder =
_ignite.HandleRegistry.Get<MessageListenerHolder>(ptr, false);
- if (holder == null)
- return 0;
+ if (holder == null)
+ return 0;
- using (var stream =
IgniteManager.Memory.Get(memPtr).GetStream())
- {
- return holder.Invoke(stream);
- }
- });
+ using (var stream = IgniteManager.Memory.Get(memPtr).GetStream())
+ {
+ return holder.Invoke(stream);
+ }
}
- private void MessagingFilterDestroy(void* target, long ptr)
+ private long MessagingFilterDestroy(long ptr)
{
- SafeCall(() =>
- {
- _ignite.HandleRegistry.Release(ptr);
- });
+ _ignite.HandleRegistry.Release(ptr);
+
+ return 0;
}
#endregion
#region IMPLEMENTATION: EXTENSIONS
- private long ExtensionCallbackInLongOutLong(void* target, int op, long
arg1)
+ private long ExtensionCallbackInLongLongOutLong(long op, long arg1,
long arg2, void* arg)
{
- throw new InvalidOperationException("Unsupported operation type: "
+ op);
- }
-
- private long ExtensionCallbackInLongLongOutLong(void* target, int op,
long arg1, long arg2)
- {
- return SafeCall(() =>
+ switch (op)
{
- switch (op)
- {
- case OpPrepareDotNet:
- using (var inStream =
IgniteManager.Memory.Get(arg1).GetStream())
- using (var outStream =
IgniteManager.Memory.Get(arg2).GetStream())
- {
- Ignition.OnPrepare(inStream, outStream,
_handleRegistry, _log);
+ case OpPrepareDotNet:
+ using (var inStream =
IgniteManager.Memory.Get(arg1).GetStream())
+ using (var outStream =
IgniteManager.Memory.Get(arg2).GetStream())
+ {
+ Ignition.OnPrepare(inStream, outStream,
_handleRegistry, _log);
- return 0;
- }
+ return 0;
+ }
- default:
- throw new InvalidOperationException("Unsupported
operation type: " + op);
- }
- }, op == OpPrepareDotNet);
+ default:
+ throw new InvalidOperationException("Unsupported operation
type: " + op);
+ }
}
#endregion
#region IMPLEMENTATION: EVENTS
- private long EventFilterCreate(void* target, long memPtr)
+ private long EventFilterCreate(long memPtr)
{
- return SafeCall(() =>
_handleRegistry.Allocate(RemoteListenEventFilter.CreateInstance(memPtr,
_ignite)));
+ return
_handleRegistry.Allocate(RemoteListenEventFilter.CreateInstance(memPtr,
_ignite));
}
- private int EventFilterApply(void* target, long ptr, long memPtr)
+ private long EventFilterApply(long ptr, long memPtr, long unused,
void* arg)
{
- return SafeCall(() =>
- {
- var holder = _ignite.HandleRegistry.Get<IInteropCallback>(ptr,
false);
+ var holder = _ignite.HandleRegistry.Get<IInteropCallback>(ptr,
false);
- if (holder == null)
- return 0;
+ if (holder == null)
+ return 0;
- using (var stream =
IgniteManager.Memory.Get(memPtr).GetStream())
- {
- return holder.Invoke(stream);
- }
- });
+ using (var stream = IgniteManager.Memory.Get(memPtr).GetStream())
+ {
+ return holder.Invoke(stream);
+ }
}
- private void EventFilterDestroy(void* target, long ptr)
+ private long EventFilterDestroy(long ptr)
{
- SafeCall(() =>
- {
- _ignite.HandleRegistry.Release(ptr);
- });
+ _ignite.HandleRegistry.Release(ptr);
+
+ return 0;
}
#endregion
#region IMPLEMENTATION: SERVICES
- private long ServiceInit(void* target, long memPtr)
+ private long ServiceInit(long memPtr)
{
- return SafeCall(() =>
+ using (var stream = IgniteManager.Memory.Get(memPtr).GetStream())
{
- using (var stream =
IgniteManager.Memory.Get(memPtr).GetStream())
- {
- var reader = _ignite.Marshaller.StartUnmarshal(stream);
+ var reader = _ignite.Marshaller.StartUnmarshal(stream);
- bool srvKeepBinary = reader.ReadBoolean();
- var svc = reader.ReadObject<IService>();
+ bool srvKeepBinary = reader.ReadBoolean();
+ var svc = reader.ReadObject<IService>();
- ResourceProcessor.Inject(svc, _ignite);
+ ResourceProcessor.Inject(svc, _ignite);
- svc.Init(new
ServiceContext(_ignite.Marshaller.StartUnmarshal(stream, srvKeepBinary)));
+ svc.Init(new
ServiceContext(_ignite.Marshaller.StartUnmarshal(stream, srvKeepBinary)));
- return _handleRegistry.Allocate(svc);
- }
- });
+ return _handleRegistry.Allocate(svc);
+ }
}
- private void ServiceExecute(void* target, long svcPtr, long memPtr)
+ private long ServiceExecute(long memPtr)
{
- SafeCall(() =>
+ using (var stream = IgniteManager.Memory.Get(memPtr).GetStream())
{
- var svc = _handleRegistry.Get<IService>(svcPtr);
+ var svc = _handleRegistry.Get<IService>(stream.ReadLong());
// Ignite does not guarantee that Cancel is called after
Execute exits
// So missing handle is a valid situation
if (svc == null)
- return;
+ return 0;
- using (var stream =
IgniteManager.Memory.Get(memPtr).GetStream())
- {
- var reader = _ignite.Marshaller.StartUnmarshal(stream);
+ var reader = _ignite.Marshaller.StartUnmarshal(stream);
- bool srvKeepBinary = reader.ReadBoolean();
+ bool srvKeepBinary = reader.ReadBoolean();
- svc.Execute(new ServiceContext(
- _ignite.Marshaller.StartUnmarshal(stream,
srvKeepBinary)));
- }
- });
+ svc.Execute(new
ServiceContext(_ignite.Marshaller.StartUnmarshal(stream, srvKeepBinary)));
+
+ return 0;
+ }
}
- private void ServiceCancel(void* target, long svcPtr, long memPtr)
+ private long ServiceCancel(long memPtr)
{
- SafeCall(() =>
+ using (var stream = IgniteManager.Memory.Get(memPtr).GetStream())
{
- var svc = _handleRegistry.Get<IService>(svcPtr, true);
+ long svcPtr = stream.ReadLong();
try
{
- using (var stream =
IgniteManager.Memory.Get(memPtr).GetStream())
- {
- var reader = _ignite.Marshaller.StartUnmarshal(stream);
+ var svc = _handleRegistry.Get<IService>(svcPtr, true);
- bool srvKeepBinary = reader.ReadBoolean();
+ var reader = _ignite.Marshaller.StartUnmarshal(stream);
- svc.Cancel(new
ServiceContext(_ignite.Marshaller.StartUnmarshal(stream, srvKeepBinary)));
- }
+ bool srvKeepBinary = reader.ReadBoolean();
+
+ svc.Cancel(new
ServiceContext(_ignite.Marshaller.StartUnmarshal(stream, srvKeepBinary)));
+
+ return 0;
}
finally
{
_ignite.HandleRegistry.Release(svcPtr);
}
- });
+ }
}
- private void ServiceInvokeMethod(void* target, long svcPtr, long
inMemPtr, long outMemPtr)
+ private long ServiceInvokeMethod(long memPtr)
{
- SafeCall(() =>
+ using (var stream = IgniteManager.Memory.Get(memPtr).GetStream())
{
- using (var inStream =
IgniteManager.Memory.Get(inMemPtr).GetStream())
- using (var outStream =
IgniteManager.Memory.Get(outMemPtr).GetStream())
- {
- var svc = _handleRegistry.Get<IService>(svcPtr, true);
+ var svc = _handleRegistry.Get<IService>(stream.ReadLong(),
true);
- string mthdName;
- object[] mthdArgs;
+ string mthdName;
+ object[] mthdArgs;
- ServiceProxySerializer.ReadProxyMethod(inStream,
_ignite.Marshaller, out mthdName, out mthdArgs);
+ ServiceProxySerializer.ReadProxyMethod(stream,
_ignite.Marshaller, out mthdName, out mthdArgs);
- var result = ServiceProxyInvoker.InvokeServiceMethod(svc,
mthdName, mthdArgs);
+ var result = ServiceProxyInvoker.InvokeServiceMethod(svc,
mthdName, mthdArgs);
- ServiceProxySerializer.WriteInvocationResult(outStream,
_ignite.Marshaller, result.Key, result.Value);
+ stream.Reset();
- outStream.SynchronizeOutput();
- }
- });
+ ServiceProxySerializer.WriteInvocationResult(stream,
_ignite.Marshaller, result.Key, result.Value);
+
+ stream.SynchronizeOutput();
+
+ return 0;
+ }
}
- private int ClusterNodeFilterApply(void* target, long memPtr)
+ private long ClusterNodeFilterApply(long memPtr)
{
- return SafeCall(() =>
+ using (var stream = IgniteManager.Memory.Get(memPtr).GetStream())
{
- using (var stream =
IgniteManager.Memory.Get(memPtr).GetStream())
- {
- var reader = _ignite.Marshaller.StartUnmarshal(stream);
+ var reader = _ignite.Marshaller.StartUnmarshal(stream);
- var filter = reader.ReadObject<IClusterNodeFilter>();
+ var filter = reader.ReadObject<IClusterNodeFilter>();
- return filter.Invoke(_ignite.GetNode(reader.ReadGuid())) ?
1 : 0;
- }
- });
+ return filter.Invoke(_ignite.GetNode(reader.ReadGuid())) ? 1 :
0;
+ }
}
#endregion
#region IMPLEMENTATION: MISCELLANEOUS
- private void NodeInfo(void* target, long memPtr)
+ private long NodeInfo(long memPtr)
{
- SafeCall(() => _ignite.UpdateNodeInfo(memPtr));
+ _ignite.UpdateNodeInfo(memPtr);
+
+ return 0;
}
- private void MemoryReallocate(void* target, long memPtr, int cap)
+ private long MemoryReallocate(long memPtr, long cap, long unused,
void* arg)
{
- SafeCall(() =>
- {
- IgniteManager.Memory.Get(memPtr).Reallocate(cap);
- }, true);
+ IgniteManager.Memory.Get(memPtr).Reallocate((int)cap);
+
+ return 0;
}
- private void OnStart(void* target, void* proc, long memPtr)
+ private long OnStart(long memPtr, long unused, long unused1, void*
proc)
{
- SafeCall(() =>
+ var proc0 = UU.Acquire(_ctx, proc);
+
+ using (var stream = IgniteManager.Memory.Get(memPtr).GetStream())
{
- var proc0 = UnmanagedUtils.Acquire(_ctx, proc);
+ Ignition.OnStart(proc0, stream);
+ }
- using (var stream =
IgniteManager.Memory.Get(memPtr).GetStream())
- {
- Ignition.OnStart(proc0, stream);
- }
- }, true);
+ return 0;
}
- private void OnStop(void* target)
+ private long OnStop(long unused)
{
Marshal.FreeHGlobal(_cbsPtr);
@@ -1092,6 +1028,8 @@ namespace Apache.Ignite.Core.Impl.Unmanaged
if (ignite != null)
ignite.AfterNodeStop();
+
+ return 0;
}
private void Error(void* target, int errType, sbyte* errClsChars, int
errClsCharsLen, sbyte* errMsgChars,
@@ -1122,20 +1060,18 @@ namespace Apache.Ignite.Core.Impl.Unmanaged
}
}
- private void OnClientDisconnected(void* target)
+ private long OnClientDisconnected(long unused)
{
- SafeCall(() =>
- {
- _ignite.OnClientDisconnected();
- });
+ _ignite.OnClientDisconnected();
+
+ return 0;
}
- private void OnClientReconnected(void* target, bool clusterRestarted)
+ private long OnClientReconnected(long clusterRestarted)
{
- SafeCall(() =>
- {
- _ignite.OnClientReconnected(clusterRestarted);
- });
+ _ignite.OnClientReconnected(clusterRestarted != 0);
+
+ return 0;
}
private void LoggerLog(void* target, int level, sbyte* messageChars,
int messageCharsLen, sbyte* categoryChars,
@@ -1194,82 +1130,79 @@ namespace Apache.Ignite.Core.Impl.Unmanaged
#region AffinityFunction
- private long AffinityFunctionInit(void* target, long memPtr, void*
baseFunc)
+ private long AffinityFunctionInit(long memPtr, long unused, long
unused1, void* baseFunc)
{
- return SafeCall(() =>
+ using (var stream = IgniteManager.Memory.Get(memPtr).GetStream())
{
- using (var stream =
IgniteManager.Memory.Get(memPtr).GetStream())
- {
- var reader = _ignite.Marshaller.StartUnmarshal(stream);
+ var reader = _ignite.Marshaller.StartUnmarshal(stream);
- var func = reader.ReadObjectEx<IAffinityFunction>();
+ var func = reader.ReadObjectEx<IAffinityFunction>();
- ResourceProcessor.Inject(func, _ignite);
+ ResourceProcessor.Inject(func, _ignite);
- var affBase = func as AffinityFunctionBase;
+ var affBase = func as AffinityFunctionBase;
- if (affBase != null)
- affBase.SetBaseFunction(new PlatformAffinityFunction(
- _ignite.InteropProcessor.ChangeTarget(baseFunc),
_ignite.Marshaller));
+ if (affBase != null)
+ {
+ var baseFunc0 = UU.Acquire(_ctx, baseFunc);
- return _handleRegistry.Allocate(func);
+ affBase.SetBaseFunction(new
PlatformAffinityFunction(baseFunc0, _ignite.Marshaller));
}
- });
+
+ return _handleRegistry.Allocate(func);
+ }
}
- private int AffinityFunctionPartition(void* target, long ptr, long
memPtr)
+ private long AffinityFunctionPartition(long memPtr)
{
- return SafeCall(() =>
+ using (var stream = IgniteManager.Memory.Get(memPtr).GetStream())
{
- using (var stream =
IgniteManager.Memory.Get(memPtr).GetStream())
- {
- var key = _ignite.Marshaller.Unmarshal<object>(stream);
+ var ptr = stream.ReadLong();
- return _handleRegistry.Get<IAffinityFunction>(ptr,
true).GetPartition(key);
- }
- });
+ var key = _ignite.Marshaller.Unmarshal<object>(stream);
+
+ return _handleRegistry.Get<IAffinityFunction>(ptr,
true).GetPartition(key);
+ }
}
- private void AffinityFunctionAssignPartitions(void* target, long ptr,
long inMemPtr, long outMemPtr)
+ private long AffinityFunctionAssignPartitions(long memPtr)
{
- SafeCall(() =>
+ using (var stream = IgniteManager.Memory.Get(memPtr).GetStream())
{
- using (var inStream =
IgniteManager.Memory.Get(inMemPtr).GetStream())
- {
- var ctx = new
AffinityFunctionContext(_ignite.Marshaller.StartUnmarshal(inStream));
- var func = _handleRegistry.Get<IAffinityFunction>(ptr,
true);
- var parts = func.AssignPartitions(ctx);
+ var ptr = stream.ReadLong();
+ var ctx = new
AffinityFunctionContext(_ignite.Marshaller.StartUnmarshal(stream));
+ var func = _handleRegistry.Get<IAffinityFunction>(ptr, true);
+ var parts = func.AssignPartitions(ctx);
- if (parts == null)
- throw new IgniteException(func.GetType() +
".AssignPartitions() returned invalid result: null");
+ if (parts == null)
+ throw new IgniteException(func.GetType() +
".AssignPartitions() returned invalid result: null");
- using (var outStream =
IgniteManager.Memory.Get(outMemPtr).GetStream())
- {
- AffinityFunctionSerializer.WritePartitions(parts,
outStream, _ignite.Marshaller);
- }
- }
- });
+ stream.Reset();
+
+ AffinityFunctionSerializer.WritePartitions(parts, stream,
_ignite.Marshaller);
+
+ return 0;
+ }
}
- private void AffinityFunctionRemoveNode(void* target, long ptr, long
memPtr)
+ private long AffinityFunctionRemoveNode(long memPtr)
{
- SafeCall(() =>
+ using (var stream = IgniteManager.Memory.Get(memPtr).GetStream())
{
- using (var stream =
IgniteManager.Memory.Get(memPtr).GetStream())
- {
- var nodeId = _ignite.Marshaller.Unmarshal<Guid>(stream);
+ var ptr = stream.ReadLong();
+ var nodeId = _ignite.Marshaller.Unmarshal<Guid>(stream);
- _handleRegistry.Get<IAffinityFunction>(ptr,
true).RemoveNode(nodeId);
- }
- });
+ _handleRegistry.Get<IAffinityFunction>(ptr,
true).RemoveNode(nodeId);
+
+ return 0;
+ }
}
- private void AffinityFunctionDestroy(void* target, long ptr)
+ private long AffinityFunctionDestroy(long ptr)
{
- SafeCall(() =>
- {
- _handleRegistry.Release(ptr);
- });
+ _handleRegistry.Release(ptr);
+
+ return 0;
}
#endregion
@@ -1315,7 +1248,7 @@ namespace Apache.Ignite.Core.Impl.Unmanaged
}
#endregion
-
+
/// <summary>
/// Callbacks pointer.
/// </summary>
@@ -1398,5 +1331,47 @@ namespace Apache.Ignite.Core.Impl.Unmanaged
{
get { return ConsoleWritePtr; }
}
+
+ /// <summary>
+ /// InLongOutLong handler struct.
+ /// </summary>
+ private struct InLongOutLongHandler
+ {
+ /// <summary> The handler func. </summary>
+ public readonly InLongOutLongFunc Handler;
+
+ /// <summary> Allow uninitialized flag. </summary>
+ public readonly bool AllowUninitialized;
+
+ /// <summary>
+ /// Initializes a new instance of the <see
cref="InLongOutLongHandler"/> struct.
+ /// </summary>
+ public InLongOutLongHandler(InLongOutLongFunc handler, bool
allowUninitialized)
+ {
+ Handler = handler;
+ AllowUninitialized = allowUninitialized;
+ }
+ }
+
+ /// <summary>
+ /// InLongLongLongObjectOutLong handler struct.
+ /// </summary>
+ private struct InLongLongLongObjectOutLongHandler
+ {
+ /// <summary> The handler func. </summary>
+ public readonly InLongLongLongObjectOutLongFunc Handler;
+
+ /// <summary> Allow uninitialized flag. </summary>
+ public readonly bool AllowUninitialized;
+
+ /// <summary>
+ /// Initializes a new instance of the <see
cref="InLongLongLongObjectOutLongHandler"/> struct.
+ /// </summary>
+ public
InLongLongLongObjectOutLongHandler(InLongLongLongObjectOutLongFunc handler,
bool allowUninitialized)
+ {
+ Handler = handler;
+ AllowUninitialized = allowUninitialized;
+ }
+ }
}
}
\ No newline at end of file