http://git-wip-us.apache.org/repos/asf/ignite/blob/24a78f5d/modules/platforms/dotnet/Apache.Ignite.Core/Impl/PlatformTarget.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/PlatformTarget.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/PlatformTarget.cs index 4a4f93b..0472ce4 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/PlatformTarget.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/PlatformTarget.cs @@ -22,6 +22,7 @@ namespace Apache.Ignite.Core.Impl using System.Diagnostics; using System.Diagnostics.CodeAnalysis; using System.IO; + using System.Threading; using System.Threading.Tasks; using Apache.Ignite.Core.Impl.Binary; using Apache.Ignite.Core.Impl.Binary.IO; @@ -289,6 +290,26 @@ namespace Apache.Ignite.Core.Impl } /// <summary> + /// Perform out operation. + /// </summary> + /// <param name="type">Operation type.</param> + /// <param name="action">Action to be performed on the stream.</param> + /// <returns></returns> + protected IUnmanagedTarget DoOutOpObject(int type, Action<BinaryWriter> action) + { + using (var stream = IgniteManager.Memory.Allocate().GetStream()) + { + var writer = _marsh.StartMarshal(stream); + + action(writer); + + FinishMarshal(writer); + + return UU.TargetInStreamOutObject(_target, type, stream.SynchronizeOutput()); + } + } + + /// <summary> /// Perform simple output operation accepting single argument. /// </summary> /// <param name="type">Operation type.</param> @@ -633,6 +654,37 @@ namespace Apache.Ignite.Core.Impl /// <param name="keepBinary">Keep binary flag, only applicable to object futures. False by default.</param> /// <param name="convertFunc">The function to read future result from stream.</param> /// <returns>Created future.</returns> + protected Future<T> GetFuture<T>(Func<long, int, IUnmanagedTarget> listenAction, bool keepBinary = false, + Func<BinaryReader, T> convertFunc = null) + { + var futType = FutureType.Object; + + var type = typeof(T); + + if (type.IsPrimitive) + IgniteFutureTypeMap.TryGetValue(type, out futType); + + var fut = convertFunc == null && futType != FutureType.Object + ? new Future<T>() + : new Future<T>(new FutureConverter<T>(_marsh, keepBinary, convertFunc)); + + var futHnd = _marsh.Ignite.HandleRegistry.Allocate(fut); + + var futTarget = listenAction(futHnd, (int) futType); + + fut.SetTarget(futTarget); + + return fut; + } + + /// <summary> + /// Creates a future and starts listening. + /// </summary> + /// <typeparam name="T">Future result type</typeparam> + /// <param name="listenAction">The listen action.</param> + /// <param name="keepBinary">Keep binary flag, only applicable to object futures. False by default.</param> + /// <param name="convertFunc">The function to read future result from stream.</param> + /// <returns>Created future.</returns> protected Future<T> GetFuture<T>(Action<long, int> listenAction, bool keepBinary = false, Func<BinaryReader, T> convertFunc = null) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/24a78f5d/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/IgniteJniNativeMethods.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/IgniteJniNativeMethods.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/IgniteJniNativeMethods.cs index 860e703..5e54a4c 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/IgniteJniNativeMethods.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/IgniteJniNativeMethods.cs @@ -113,6 +113,12 @@ namespace Apache.Ignite.Core.Impl.Unmanaged [DllImport(IgniteUtils.FileIgniteJniDll, EntryPoint = "IgniteTargetListenFutureForOperation")] public static extern void TargetListenFutForOp(void* ctx, void* target, long futId, int typ, int opId); + [DllImport(IgniteUtils.FileIgniteJniDll, EntryPoint = "IgniteTargetListenFutureAndGet")] + public static extern void* TargetListenFutAndGet(void* ctx, void* target, long futId, int typ); + + [DllImport(IgniteUtils.FileIgniteJniDll, EntryPoint = "IgniteTargetListenFutureForOperationAndGet")] + public static extern void* TargetListenFutForOpAndGet(void* ctx, void* target, long futId, int typ, int opId); + [DllImport(IgniteUtils.FileIgniteJniDll, EntryPoint = "IgniteAffinityPartitions")] public static extern int AffinityParts(void* ctx, void* target); @@ -178,7 +184,7 @@ namespace Apache.Ignite.Core.Impl.Unmanaged public static extern void ComputeWithTimeout(void* ctx, void* target, long timeout); [DllImport(IgniteUtils.FileIgniteJniDll, EntryPoint = "IgniteComputeExecuteNative")] - public static extern void ComputeExecuteNative(void* ctx, void* target, long taskPtr, long topVer); + public static extern void* ComputeExecuteNative(void* ctx, void* target, long taskPtr, long topVer); [DllImport(IgniteUtils.FileIgniteJniDll, EntryPoint = "IgniteContinuousQueryClose")] public static extern void ContinuousQryClose(void* ctx, void* target); @@ -354,5 +360,13 @@ namespace Apache.Ignite.Core.Impl.Unmanaged [DllImport(IgniteUtils.FileIgniteJniDll, EntryPoint = "IgniteAtomicLongClose")] public static extern void AtomicLongClose(void* ctx, void* target); + + [DllImport(IgniteUtils.FileIgniteJniDll, EntryPoint = "IgniteListenableCancel")] + [return: MarshalAs(UnmanagedType.U1)] + public static extern bool ListenableCancel(void* ctx, void* target); + + [DllImport(IgniteUtils.FileIgniteJniDll, EntryPoint = "IgniteListenableIsCancelled")] + [return: MarshalAs(UnmanagedType.U1)] + public static extern bool ListenableIsCancelled(void* ctx, void* target); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/24a78f5d/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedUtils.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedUtils.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedUtils.cs index 56a184d..4c8f1dc 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedUtils.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedUtils.cs @@ -305,6 +305,21 @@ namespace Apache.Ignite.Core.Impl.Unmanaged JNI.TargetListenFutForOp(target.Context, target.Target, futId, typ, opId); } + internal static IUnmanagedTarget TargetListenFutureAndGet(IUnmanagedTarget target, long futId, int typ) + { + var res = JNI.TargetListenFutAndGet(target.Context, target.Target, futId, typ); + + return target.ChangeTarget(res); + } + + internal static IUnmanagedTarget TargetListenFutureForOperationAndGet(IUnmanagedTarget target, long futId, + int typ, int opId) + { + var res = JNI.TargetListenFutForOpAndGet(target.Context, target.Target, futId, typ, opId); + + return target.ChangeTarget(res); + } + #endregion #region NATIVE METHODS: AFFINITY @@ -440,9 +455,11 @@ namespace Apache.Ignite.Core.Impl.Unmanaged JNI.ComputeWithTimeout(target.Context, target.Target, timeout); } - internal static void ComputeExecuteNative(IUnmanagedTarget target, long taskPtr, long topVer) + internal static IUnmanagedTarget ComputeExecuteNative(IUnmanagedTarget target, long taskPtr, long topVer) { - JNI.ComputeExecuteNative(target.Context, target.Target, taskPtr, topVer); + void* res = JNI.ComputeExecuteNative(target.Context, target.Target, taskPtr, topVer); + + return target.ChangeTarget(res); } #endregion @@ -816,6 +833,16 @@ namespace Apache.Ignite.Core.Impl.Unmanaged JNI.AtomicLongClose(target.Context, target.Target); } + internal static bool ListenableCancel(IUnmanagedTarget target) + { + return JNI.ListenableCancel(target.Context, target.Target); + } + + internal static bool ListenableIsCancelled(IUnmanagedTarget target) + { + return JNI.ListenableIsCancelled(target.Context, target.Target); + } + #endregion } }
