IGNITE-5769 Abstract away .NET->Java calls This closes #2352
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/89bba2fa Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/89bba2fa Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/89bba2fa Branch: refs/heads/ignite-5757 Commit: 89bba2fa2c423d5713c8412ba0069b869005694c Parents: 47fea40 Author: Pavel Tupitsyn <[email protected]> Authored: Fri Jul 28 10:06:16 2017 +0300 Committer: Pavel Tupitsyn <[email protected]> Committed: Fri Jul 28 10:06:16 2017 +0300 ---------------------------------------------------------------------- .../platform/PlatformTargetProxy.java | 11 + .../platform/PlatformTargetProxyImpl.java | 79 +- .../plugin/PlatformTestPluginTarget.java | 7 +- .../cpp/jni/include/ignite/jni/exports.h | 1 + .../platforms/cpp/jni/include/ignite/jni/java.h | 5 +- modules/platforms/cpp/jni/project/vs/module.def | 1 + modules/platforms/cpp/jni/src/exports.cpp | 4 + modules/platforms/cpp/jni/src/java.cpp | 19 +- .../Plugin/PluginTest.cs | 13 +- .../Apache.Ignite.Core.Tests/TestUtils.cs | 7 +- .../Apache.Ignite.Core.csproj | 5 +- .../dotnet/Apache.Ignite.Core/Ignition.cs | 9 +- .../Impl/Binary/BinaryProcessor.cs | 6 +- .../Impl/Binary/BinaryWriterExtensions.cs | 107 ++ .../Cache/Affinity/PlatformAffinityFunction.cs | 7 +- .../Impl/Cache/CacheAffinityImpl.cs | 18 +- .../Impl/Cache/CacheEnumerator.cs | 8 +- .../Apache.Ignite.Core/Impl/Cache/CacheImpl.cs | 113 +- .../Impl/Cache/Query/AbstractQueryCursor.cs | 12 +- .../Continuous/ContinuousQueryHandleImpl.cs | 12 +- .../Impl/Cache/Query/FieldsQueryCursor.cs | 6 +- .../Impl/Cache/Query/QueryCursor.cs | 5 +- .../Impl/Cluster/ClusterGroupImpl.cs | 76 +- .../Impl/Common/DelegateTypeDescriptor.cs | 9 +- .../Impl/Common/Listenable.cs | 8 +- .../Impl/Compute/ComputeImpl.cs | 12 +- .../Impl/DataStructures/AtomicLong.cs | 9 +- .../Impl/DataStructures/AtomicReference.cs | 8 +- .../Impl/DataStructures/AtomicSequence.cs | 9 +- .../Impl/Datastream/DataStreamerImpl.cs | 8 +- .../Impl/Datastream/StreamReceiverHolder.cs | 13 +- .../Apache.Ignite.Core/Impl/Events/Events.cs | 11 +- .../Impl/IPlatformTargetInternal.cs | 102 ++ .../dotnet/Apache.Ignite.Core/Impl/Ignite.cs | 48 +- .../Impl/Messaging/Messaging.cs | 10 +- .../Impl/PlatformDisposableTargetAdapter.cs | 75 ++ .../Impl/PlatformJniTarget.cs | 536 +++++++++ .../Apache.Ignite.Core/Impl/PlatformTarget.cs | 1086 ------------------ .../Impl/PlatformTargetAdapter.cs | 534 +++++++++ .../Impl/Services/Services.cs | 19 +- .../Impl/Transactions/TransactionsImpl.cs | 29 +- .../Impl/Unmanaged/IgniteJniNativeMethods.cs | 3 + .../Impl/Unmanaged/UnmanagedCallbacks.cs | 10 +- .../Impl/Unmanaged/UnmanagedUtils.cs | 7 + .../Interop/IPlatformTarget.cs | 15 + 45 files changed, 1690 insertions(+), 1402 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/89bba2fa/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformTargetProxy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformTargetProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformTargetProxy.java index 1ee57cb..29de311 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformTargetProxy.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformTargetProxy.java @@ -105,6 +105,17 @@ public interface PlatformTargetProxy { void inStreamAsync(int type, long memPtr) throws Exception; /** + * Asynchronous operation accepting memory stream and returning PlatformListenableTarget. + * Supports cancellable async operations. + * + * @param type Operation type. + * @param memPtr Memory pointer. + * @return Result. + * @throws Exception If case of failure. + */ + Object inStreamOutObjectAsync(int type, long memPtr) throws Exception; + + /** * Returns the underlying target. * * @return Underlying target. http://git-wip-us.apache.org/repos/asf/ignite/blob/89bba2fa/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformTargetProxyImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformTargetProxyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformTargetProxyImpl.java index 44044b1..b472275 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformTargetProxyImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformTargetProxyImpl.java @@ -23,6 +23,8 @@ import org.apache.ignite.internal.binary.BinaryRawWriterEx; import org.apache.ignite.internal.processors.platform.memory.PlatformMemory; import org.apache.ignite.internal.processors.platform.memory.PlatformOutputStream; import org.apache.ignite.internal.processors.platform.utils.PlatformFutureUtils; +import org.apache.ignite.internal.processors.platform.utils.PlatformListenable; +import org.apache.ignite.internal.processors.platform.utils.PlatformListenableTarget; import org.apache.ignite.lang.IgniteFuture; /** @@ -109,37 +111,16 @@ public class PlatformTargetProxyImpl implements PlatformTargetProxy { /** {@inheritDoc} */ @Override public void inStreamAsync(int type, long memPtr) throws Exception { - try (PlatformMemory mem = platformCtx.memory().get(memPtr)) { - BinaryRawReaderEx reader = platformCtx.reader(mem); - - long futId = reader.readLong(); - int futTyp = reader.readInt(); - - final PlatformAsyncResult res = target.processInStreamAsync(type, reader); - - if (res == null) - throw new IgniteException("PlatformTarget.processInStreamAsync should not return null."); - - IgniteFuture fut = res.future(); + inStreamOutListenableAsync(type, memPtr); + } - if (fut == null) - throw new IgniteException("PlatformAsyncResult.future() should not return null."); + /** {@inheritDoc} */ + @Override public Object inStreamOutObjectAsync(int type, long memPtr) throws Exception { + PlatformListenable listenable = inStreamOutListenableAsync(type, memPtr); - PlatformFutureUtils.listen(platformCtx, fut, futId, futTyp, new PlatformFutureUtils.Writer() { - /** {@inheritDoc} */ - @Override public void write(BinaryRawWriterEx writer, Object obj, Throwable err) { - res.write(writer, obj); - } + PlatformListenableTarget target = new PlatformListenableTarget(listenable, platformCtx); - /** {@inheritDoc} */ - @Override public boolean canWrite(Object obj, Throwable err) { - return err == null; - } - }, target); - } - catch (Exception e) { - throw target.convertException(e); - } + return wrapProxy(target); } /** {@inheritDoc} */ @@ -234,4 +215,46 @@ public class PlatformTargetProxyImpl implements PlatformTargetProxy { private PlatformTarget unwrapProxy(Object obj) { return obj == null ? null : ((PlatformTargetProxyImpl)obj).target; } + + /** + * Performs asyncronous operation. + * + * @param type Type. + * @param memPtr Stream pointer. + * @return Listenable. + * @throws Exception On error. + */ + private PlatformListenable inStreamOutListenableAsync(int type, long memPtr) throws Exception { + try (PlatformMemory mem = platformCtx.memory().get(memPtr)) { + BinaryRawReaderEx reader = platformCtx.reader(mem); + + long futId = reader.readLong(); + int futTyp = reader.readInt(); + + final PlatformAsyncResult res = target.processInStreamAsync(type, reader); + + if (res == null) + throw new IgniteException("PlatformTarget.processInStreamAsync should not return null."); + + IgniteFuture fut = res.future(); + + if (fut == null) + throw new IgniteException("PlatformAsyncResult.future() should not return null."); + + return PlatformFutureUtils.listen(platformCtx, fut, futId, futTyp, new PlatformFutureUtils.Writer() { + /** {@inheritDoc} */ + @Override public void write(BinaryRawWriterEx writer, Object obj, Throwable err) { + res.write(writer, obj); + } + + /** {@inheritDoc} */ + @Override public boolean canWrite(Object obj, Throwable err) { + return err == null; + } + }, target); + } + catch (Exception e) { + throw target.convertException(e); + } + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/89bba2fa/modules/core/src/test/java/org/apache/ignite/platform/plugin/PlatformTestPluginTarget.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/platform/plugin/PlatformTestPluginTarget.java b/modules/core/src/test/java/org/apache/ignite/platform/plugin/PlatformTestPluginTarget.java index 7e69425..8c1cbe9 100644 --- a/modules/core/src/test/java/org/apache/ignite/platform/plugin/PlatformTestPluginTarget.java +++ b/modules/core/src/test/java/org/apache/ignite/platform/plugin/PlatformTestPluginTarget.java @@ -146,7 +146,12 @@ class PlatformTestPluginTarget implements PlatformTarget { case 1: { // Async upper case. final String val = reader.readString(); - final GridFutureAdapter<String> fa = new GridFutureAdapter<>(); + + final GridFutureAdapter<String> fa = new GridFutureAdapter<String>() { + @Override public boolean cancel() throws IgniteCheckedException { + return onCancelled(); + } + }; new Thread(new Runnable() { @Override public void run() { http://git-wip-us.apache.org/repos/asf/ignite/blob/89bba2fa/modules/platforms/cpp/jni/include/ignite/jni/exports.h ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/jni/include/ignite/jni/exports.h b/modules/platforms/cpp/jni/include/ignite/jni/exports.h index ea0c32a..0580d19 100644 --- a/modules/platforms/cpp/jni/include/ignite/jni/exports.h +++ b/modules/platforms/cpp/jni/include/ignite/jni/exports.h @@ -38,6 +38,7 @@ extern "C" { void IGNITE_CALL IgniteTargetOutStream(gcj::JniContext* ctx, void* obj, int opType, long long memPtr); void* IGNITE_CALL IgniteTargetOutObject(gcj::JniContext* ctx, void* obj, int opType); void IGNITE_CALL IgniteTargetInStreamAsync(gcj::JniContext* ctx, void* obj, int opType, long long memPtr); + void* IGNITE_CALL IgniteTargetInStreamOutObjectAsync(gcj::JniContext* ctx, void* obj, int opType, long long memPtr); void* IGNITE_CALL IgniteAcquire(gcj::JniContext* ctx, void* obj); void IGNITE_CALL IgniteRelease(void* obj); http://git-wip-us.apache.org/repos/asf/ignite/blob/89bba2fa/modules/platforms/cpp/jni/include/ignite/jni/java.h ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/jni/include/ignite/jni/java.h b/modules/platforms/cpp/jni/include/ignite/jni/java.h index c170a5b..c713e81 100644 --- a/modules/platforms/cpp/jni/include/ignite/jni/java.h +++ b/modules/platforms/cpp/jni/include/ignite/jni/java.h @@ -173,9 +173,6 @@ namespace ignite jmethodID m_PlatformIgnition_stop; jmethodID m_PlatformIgnition_stopAll; - jclass c_PlatformProcessor; - jmethodID m_PlatformProcessor_releaseStart; - jclass c_PlatformTarget; jmethodID m_PlatformTarget_inLongOutLong; jmethodID m_PlatformTarget_inStreamOutLong; @@ -183,6 +180,7 @@ namespace ignite jmethodID m_PlatformTarget_outStream; jmethodID m_PlatformTarget_outObject; jmethodID m_PlatformTarget_inStreamAsync; + jmethodID m_PlatformTarget_inStreamOutObjectAsync; jmethodID m_PlatformTarget_inStreamOutStream; jmethodID m_PlatformTarget_inObjectStreamOutObjectStream; @@ -325,6 +323,7 @@ namespace ignite void TargetOutStream(jobject obj, int opType, long long memPtr, JniErrorInfo* errInfo = NULL); jobject TargetOutObject(jobject obj, int opType, JniErrorInfo* errInfo = NULL); void TargetInStreamAsync(jobject obj, int type, long long memPtr, JniErrorInfo* errInfo = NULL); + jobject TargetInStreamOutObjectAsync(jobject obj, int type, long long memPtr, JniErrorInfo* errInfo = NULL); jobject CacheOutOpQueryCursor(jobject obj, int type, long long memPtr, JniErrorInfo* errInfo = NULL); jobject CacheOutOpContinuousQuery(jobject obj, int type, long long memPtr, JniErrorInfo* errInfo = NULL); http://git-wip-us.apache.org/repos/asf/ignite/blob/89bba2fa/modules/platforms/cpp/jni/project/vs/module.def ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/jni/project/vs/module.def b/modules/platforms/cpp/jni/project/vs/module.def index 53e7e42..1407f82 100644 --- a/modules/platforms/cpp/jni/project/vs/module.def +++ b/modules/platforms/cpp/jni/project/vs/module.def @@ -12,6 +12,7 @@ IgniteTargetInStreamOutStream @20 IgniteTargetInObjectStreamOutObjectStream @21 IgniteTargetInLongOutLong @24 IgniteTargetInStreamAsync @25 +IgniteTargetInStreamOutObjectAsync @26 IgniteAcquire @80 IgniteRelease @81 IgniteThrowToJava @82 http://git-wip-us.apache.org/repos/asf/ignite/blob/89bba2fa/modules/platforms/cpp/jni/src/exports.cpp ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/jni/src/exports.cpp b/modules/platforms/cpp/jni/src/exports.cpp index 9b7defd..aeb68ab 100644 --- a/modules/platforms/cpp/jni/src/exports.cpp +++ b/modules/platforms/cpp/jni/src/exports.cpp @@ -74,6 +74,10 @@ extern "C" { ctx->TargetInStreamAsync(static_cast<jobject>(obj), opType, memPtr); } + void* IGNITE_CALL IgniteTargetInStreamOutObjectAsync(gcj::JniContext* ctx, void* obj, int opType, long long memPtr) { + return ctx->TargetInStreamOutObjectAsync(static_cast<jobject>(obj), opType, memPtr); + } + void* IGNITE_CALL IgniteAcquire(gcj::JniContext* ctx, void* obj) { return ctx->Acquire(static_cast<jobject>(obj)); } http://git-wip-us.apache.org/repos/asf/ignite/blob/89bba2fa/modules/platforms/cpp/jni/src/java.cpp ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/jni/src/java.cpp b/modules/platforms/cpp/jni/src/java.cpp index 7eadec0..ac4ba63 100644 --- a/modules/platforms/cpp/jni/src/java.cpp +++ b/modules/platforms/cpp/jni/src/java.cpp @@ -221,9 +221,6 @@ namespace ignite const char* C_PLATFORM_NO_CALLBACK_EXCEPTION = "org/apache/ignite/internal/processors/platform/PlatformNoCallbackException"; - const char* C_PLATFORM_PROCESSOR = "org/apache/ignite/internal/processors/platform/PlatformProcessor"; - JniMethod M_PLATFORM_PROCESSOR_RELEASE_START = JniMethod("releaseStart", "()V", false); - const char* C_PLATFORM_TARGET = "org/apache/ignite/internal/processors/platform/PlatformTargetProxy"; JniMethod M_PLATFORM_TARGET_IN_LONG_OUT_LONG = JniMethod("inLongOutLong", "(IJ)J", false); JniMethod M_PLATFORM_TARGET_IN_STREAM_OUT_LONG = JniMethod("inStreamOutLong", "(IJ)J", false); @@ -233,6 +230,7 @@ namespace ignite JniMethod M_PLATFORM_TARGET_OUT_STREAM = JniMethod("outStream", "(IJ)V", false); JniMethod M_PLATFORM_TARGET_OUT_OBJECT = JniMethod("outObject", "(I)Ljava/lang/Object;", false); JniMethod M_PLATFORM_TARGET_IN_STREAM_ASYNC = JniMethod("inStreamAsync", "(IJ)V", false); + JniMethod M_PLATFORM_TARGET_IN_STREAM_OUT_OBJECT_ASYNC = JniMethod("inStreamOutObjectAsync", "(IJ)Ljava/lang/Object;", false); const char* C_PLATFORM_CALLBACK_UTILS = "org/apache/ignite/internal/processors/platform/callback/PlatformCallbackUtils"; @@ -449,9 +447,6 @@ namespace ignite m_PlatformIgnition_stop = FindMethod(env, c_PlatformIgnition, M_PLATFORM_IGNITION_STOP); m_PlatformIgnition_stopAll = FindMethod(env, c_PlatformIgnition, M_PLATFORM_IGNITION_STOP_ALL); - c_PlatformProcessor = FindClass(env, C_PLATFORM_PROCESSOR); - m_PlatformProcessor_releaseStart = FindMethod(env, c_PlatformProcessor, M_PLATFORM_PROCESSOR_RELEASE_START); - c_PlatformTarget = FindClass(env, C_PLATFORM_TARGET); m_PlatformTarget_inLongOutLong = FindMethod(env, c_PlatformTarget, M_PLATFORM_TARGET_IN_LONG_OUT_LONG); m_PlatformTarget_inStreamOutLong = FindMethod(env, c_PlatformTarget, M_PLATFORM_TARGET_IN_STREAM_OUT_LONG); @@ -461,6 +456,7 @@ namespace ignite m_PlatformTarget_inStreamOutStream = FindMethod(env, c_PlatformTarget, M_PLATFORM_TARGET_IN_STREAM_OUT_STREAM); m_PlatformTarget_inObjectStreamOutObjectStream = FindMethod(env, c_PlatformTarget, M_PLATFORM_TARGET_IN_OBJECT_STREAM_OUT_OBJECT_STREAM); m_PlatformTarget_inStreamAsync = FindMethod(env, c_PlatformTarget, M_PLATFORM_TARGET_IN_STREAM_ASYNC); + m_PlatformTarget_inStreamOutObjectAsync = FindMethod(env, c_PlatformTarget, M_PLATFORM_TARGET_IN_STREAM_OUT_OBJECT_ASYNC); c_PlatformUtils = FindClass(env, C_PLATFORM_UTILS); m_PlatformUtils_reallocate = FindMethod(env, c_PlatformUtils, M_PLATFORM_UTILS_REALLOC); @@ -473,7 +469,6 @@ namespace ignite void JniMembers::Destroy(JNIEnv* env) { DeleteClass(env, c_IgniteException); DeleteClass(env, c_PlatformIgnition); - DeleteClass(env, c_PlatformProcessor); DeleteClass(env, c_PlatformTarget); DeleteClass(env, c_PlatformUtils); } @@ -894,6 +889,16 @@ namespace ignite ExceptionCheck(env, err); } + jobject JniContext::TargetInStreamOutObjectAsync(jobject obj, int opType, long long memPtr, JniErrorInfo* err) { + JNIEnv* env = Attach(); + + jobject res = env->CallObjectMethod(obj, jvm->GetMembers().m_PlatformTarget_inStreamOutObjectAsync, opType, memPtr); + + ExceptionCheck(env, err); + + return LocalToGlobal(env, res); + } + jobject JniContext::CacheOutOpQueryCursor(jobject obj, int type, long long memPtr, JniErrorInfo* err) { JNIEnv* env = Attach(); http://git-wip-us.apache.org/repos/asf/ignite/blob/89bba2fa/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Plugin/PluginTest.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Plugin/PluginTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Plugin/PluginTest.cs index 00b1cca..1cb2fae 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Plugin/PluginTest.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Plugin/PluginTest.cs @@ -21,6 +21,8 @@ namespace Apache.Ignite.Core.Tests.Plugin using System.Collections.Generic; using System.IO; using System.Linq; + using System.Threading; + using System.Threading.Tasks; using Apache.Ignite.Core.Binary; using Apache.Ignite.Core.Common; using Apache.Ignite.Core.Interop; @@ -142,13 +144,22 @@ namespace Apache.Ignite.Core.Tests.Plugin Assert.IsTrue(task.IsCompleted); Assert.AreEqual("FOO", asyncRes); + // Async operation with cancellation. + var cts = new CancellationTokenSource(); + task = target.DoOutOpAsync(1, w => w.WriteString("foo"), r => r.ReadString(), cts.Token); + Assert.IsFalse(task.IsCompleted); + cts.Cancel(); + Assert.IsTrue(task.IsCanceled); + var aex = Assert.Throws<AggregateException>(() => { asyncRes = task.Result; }); + Assert.IsInstanceOf<TaskCanceledException>(aex.GetBaseException()); + // Async operation with exception in entry point. Assert.Throws<TestIgnitePluginException>(() => target.DoOutOpAsync<object>(2, null, null)); // Async operation with exception in future. var errTask = target.DoOutOpAsync<object>(3, null, null); Assert.IsFalse(errTask.IsCompleted); - var aex = Assert.Throws<AggregateException>(() => errTask.Wait()); + aex = Assert.Throws<AggregateException>(() => errTask.Wait()); Assert.IsInstanceOf<IgniteException>(aex.InnerExceptions.Single()); // Throws custom mapped exception. http://git-wip-us.apache.org/repos/asf/ignite/blob/89bba2fa/modules/platforms/dotnet/Apache.Ignite.Core.Tests/TestUtils.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/TestUtils.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/TestUtils.cs index 6e0a497..4b171b0 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/TestUtils.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/TestUtils.cs @@ -365,12 +365,11 @@ namespace Apache.Ignite.Core.Tests }; var proc = System.Diagnostics.Process.Start(procStart); - Assert.IsNotNull(proc); - Console.WriteLine(proc.StandardOutput.ReadToEnd()); - Console.WriteLine(proc.StandardError.ReadToEnd()); - Assert.IsTrue(proc.WaitForExit(15000)); + IgniteProcess.AttachProcessConsoleReader(proc); + + Assert.IsTrue(proc.WaitForExit(19000)); Assert.AreEqual(0, proc.ExitCode); } http://git-wip-us.apache.org/repos/asf/ignite/blob/89bba2fa/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj b/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj index 76132c3..c444ed0 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj @@ -99,7 +99,10 @@ <Compile Include="Cache\IMemoryMetrics.cs" /> <Compile Include="Common\ExceptionFactory.cs" /> <Compile Include="Configuration\Package-Info.cs" /> + <Compile Include="Impl\IPlatformTargetInternal.cs" /> <Compile Include="Impl\PersistentStore\PersistentStoreMetrics.cs" /> + <Compile Include="Impl\PlatformDisposableTargetAdapter.cs" /> + <Compile Include="Impl\PlatformJniTarget.cs" /> <Compile Include="PersistentStore\IPersistentStoreMetrics.cs" /> <Compile Include="PersistentStore\Package-Info.cs" /> <Compile Include="PersistentStore\PersistentStoreConfiguration.cs" /> @@ -385,7 +388,7 @@ <Compile Include="Impl\Ignite.cs" /> <Compile Include="Impl\IgniteManager.cs" /> <Compile Include="Impl\Log\JavaLogger.cs" /> - <Compile Include="Impl\PlatformTarget.cs" /> + <Compile Include="Impl\PlatformTargetAdapter.cs" /> <Compile Include="Impl\IgniteUtils.cs" /> <Compile Include="Impl\Handle\Handle.cs" /> <Compile Include="Impl\Handle\HandleRegistry.cs" /> http://git-wip-us.apache.org/repos/asf/ignite/blob/89bba2fa/modules/platforms/dotnet/Apache.Ignite.Core/Ignition.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Ignition.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Ignition.cs index 44ebef3..568eea7 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Ignition.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Ignition.cs @@ -239,7 +239,7 @@ namespace Apache.Ignite.Core // 3. Create startup object which will guide us through the rest of the process. _startup = new Startup(cfg, cbs); - IUnmanagedTarget interopProc = null; + PlatformJniTarget interopProc = null; try { @@ -249,7 +249,7 @@ namespace Apache.Ignite.Core // 5. At this point start routine is finished. We expect STARTUP object to have all necessary data. var node = _startup.Ignite; - interopProc = node.InteropProcessor; + interopProc = (PlatformJniTarget)node.InteropProcessor; var javaLogger = log as JavaLogger; if (javaLogger != null) @@ -279,7 +279,7 @@ namespace Apache.Ignite.Core // 2. Stop Ignite node if it was started. if (interopProc != null) - UU.IgnitionStop(interopProc.Context, gridName, true); + UU.IgnitionStop(interopProc.Target.Context, gridName, true); // 3. Throw error further (use startup error if exists because it is more precise). if (_startup.Error != null) @@ -466,7 +466,8 @@ namespace Apache.Ignite.Core if (Nodes.ContainsKey(new NodeKey(name))) throw new IgniteException("Ignite with the same name already started: " + name); - _startup.Ignite = new Ignite(_startup.Configuration, _startup.Name, interopProc, _startup.Marshaller, + _startup.Ignite = new Ignite(_startup.Configuration, _startup.Name, + new PlatformJniTarget(interopProc, _startup.Marshaller), _startup.Marshaller, _startup.LifecycleHandlers, _startup.Callbacks); } catch (Exception e) http://git-wip-us.apache.org/repos/asf/ignite/blob/89bba2fa/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryProcessor.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryProcessor.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryProcessor.cs index b8937c9..69056b3 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryProcessor.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryProcessor.cs @@ -21,12 +21,11 @@ namespace Apache.Ignite.Core.Impl.Binary using System.Diagnostics; using Apache.Ignite.Core.Binary; using Apache.Ignite.Core.Impl.Binary.Metadata; - using Apache.Ignite.Core.Impl.Unmanaged; /// <summary> /// Binary metadata processor, delegates to PlatformBinaryProcessor in Java. /// </summary> - internal class BinaryProcessor : PlatformTarget + internal class BinaryProcessor : PlatformTargetAdapter { /// <summary> /// Op codes. @@ -46,8 +45,7 @@ namespace Apache.Ignite.Core.Impl.Binary /// Initializes a new instance of the <see cref="BinaryProcessor"/> class. /// </summary> /// <param name="target">Target.</param> - /// <param name="marsh">Marshaller.</param> - public BinaryProcessor(IUnmanagedTarget target, Marshaller marsh) : base(target, marsh) + public BinaryProcessor(IPlatformTargetInternal target) : base(target) { // No-op. } http://git-wip-us.apache.org/repos/asf/ignite/blob/89bba2fa/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryWriterExtensions.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryWriterExtensions.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryWriterExtensions.cs index 64bfa35..3dc8a96 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryWriterExtensions.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Binary/BinaryWriterExtensions.cs @@ -18,6 +18,8 @@ namespace Apache.Ignite.Core.Impl.Binary { using System; + using System.Collections.Generic; + using System.IO; using Apache.Ignite.Core.Binary; /// <summary> @@ -75,5 +77,110 @@ namespace Apache.Ignite.Core.Impl.Binary writer.WriteBoolean(false); } + /// <summary> + /// Write collection. + /// </summary> + /// <param name="writer">Writer.</param> + /// <param name="vals">Values.</param> + /// <param name="selector">A transform function to apply to each element.</param> + /// <returns>The same writer for chaining.</returns> + private static void WriteCollection<T1, T2>(this BinaryWriter writer, ICollection<T1> vals, + Func<T1, T2> selector) + { + writer.WriteInt(vals.Count); + + if (selector == null) + { + foreach (var val in vals) + writer.Write(val); + } + else + { + foreach (var val in vals) + writer.Write(selector(val)); + } + } + + /// <summary> + /// Write enumerable. + /// </summary> + /// <param name="writer">Writer.</param> + /// <param name="vals">Values.</param> + /// <returns>The same writer for chaining.</returns> + public static void WriteEnumerable<T>(this BinaryWriter writer, IEnumerable<T> vals) + { + WriteEnumerable<T, T>(writer, vals, null); + } + + /// <summary> + /// Write enumerable. + /// </summary> + /// <param name="writer">Writer.</param> + /// <param name="vals">Values.</param> + /// <param name="selector">A transform function to apply to each element.</param> + /// <returns>The same writer for chaining.</returns> + public static void WriteEnumerable<T1, T2>(this BinaryWriter writer, IEnumerable<T1> vals, + Func<T1, T2> selector) + { + var col = vals as ICollection<T1>; + + if (col != null) + { + WriteCollection(writer, col, selector); + return; + } + + var stream = writer.Stream; + + var pos = stream.Position; + + stream.Seek(4, SeekOrigin.Current); + + var size = 0; + + if (selector == null) + { + foreach (var val in vals) + { + writer.Write(val); + + size++; + } + } + else + { + foreach (var val in vals) + { + writer.Write(selector(val)); + + size++; + } + } + + stream.WriteInt(pos, size); + } + + /// <summary> + /// Write dictionary. + /// </summary> + /// <param name="writer">Writer.</param> + /// <param name="vals">Values.</param> + public static void WriteDictionary<T1, T2>(this BinaryWriter writer, IEnumerable<KeyValuePair<T1, T2>> vals) + { + var pos = writer.Stream.Position; + writer.WriteInt(0); // Reserve count. + + int cnt = 0; + + foreach (var pair in vals) + { + writer.Write(pair.Key); + writer.Write(pair.Value); + + cnt++; + } + + writer.Stream.WriteInt(pos, cnt); + } } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/89bba2fa/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Affinity/PlatformAffinityFunction.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Affinity/PlatformAffinityFunction.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Affinity/PlatformAffinityFunction.cs index d335804..08c31a6 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Affinity/PlatformAffinityFunction.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Affinity/PlatformAffinityFunction.cs @@ -21,13 +21,11 @@ namespace Apache.Ignite.Core.Impl.Cache.Affinity using System.Collections.Generic; using Apache.Ignite.Core.Cache.Affinity; using Apache.Ignite.Core.Cluster; - using Apache.Ignite.Core.Impl.Binary; - using Apache.Ignite.Core.Impl.Unmanaged; /// <summary> /// Affinity function that delegates to Java. /// </summary> - internal class PlatformAffinityFunction : PlatformTarget, IAffinityFunction + internal class PlatformAffinityFunction : PlatformTargetAdapter, IAffinityFunction { /** Opcodes. */ private enum Op @@ -41,8 +39,7 @@ namespace Apache.Ignite.Core.Impl.Cache.Affinity /// Initializes a new instance of the <see cref="PlatformAffinityFunction"/> class. /// </summary> /// <param name="target">Target.</param> - /// <param name="marsh">Marshaller.</param> - public PlatformAffinityFunction(IUnmanagedTarget target, Marshaller marsh) : base(target, marsh) + public PlatformAffinityFunction(IPlatformTargetInternal target) : base(target) { // No-op. } http://git-wip-us.apache.org/repos/asf/ignite/blob/89bba2fa/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheAffinityImpl.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheAffinityImpl.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheAffinityImpl.cs index f09a119..a2bba29 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheAffinityImpl.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheAffinityImpl.cs @@ -19,19 +19,16 @@ namespace Apache.Ignite.Core.Impl.Cache { using System; using System.Collections.Generic; - using System.Diagnostics; using Apache.Ignite.Core.Cache; using Apache.Ignite.Core.Cluster; using Apache.Ignite.Core.Impl.Binary; using Apache.Ignite.Core.Impl.Binary.IO; using Apache.Ignite.Core.Impl.Common; - using Apache.Ignite.Core.Impl.Unmanaged; - using UU = Apache.Ignite.Core.Impl.Unmanaged.UnmanagedUtils; /// <summary> /// Cache affinity implementation. /// </summary> - internal class CacheAffinityImpl : PlatformTarget, ICacheAffinity + internal class CacheAffinityImpl : PlatformTargetAdapter, ICacheAffinity { /** */ private const int OpAffinityKey = 1; @@ -88,17 +85,12 @@ namespace Apache.Ignite.Core.Impl.Cache /// Initializes a new instance of the <see cref="CacheAffinityImpl" /> class. /// </summary> /// <param name="target">Target.</param> - /// <param name="marsh">Marshaller.</param> /// <param name="keepBinary">Keep binary flag.</param> - /// <param name="ignite">Grid.</param> - public CacheAffinityImpl(IUnmanagedTarget target, Marshaller marsh, bool keepBinary, - Ignite ignite) : base(target, marsh) + public CacheAffinityImpl(IPlatformTargetInternal target, bool keepBinary) : base(target) { _keepBinary = keepBinary; - Debug.Assert(ignite != null); - - _ignite = ignite; + _ignite = target.Marshaller.Ignite; } /** <inheritDoc /> */ @@ -182,7 +174,7 @@ namespace Apache.Ignite.Core.Impl.Cache { IgniteArgumentCheck.NotNull(keys, "keys"); - return DoOutInOp(OpMapKeysToNodes, w => WriteEnumerable(w, keys), + return DoOutInOp(OpMapKeysToNodes, w => w.WriteEnumerable(keys), reader => ReadDictionary(reader, ReadNode, r => (IList<TK>) r.ReadCollectionAsList<TK>())); } @@ -214,7 +206,7 @@ namespace Apache.Ignite.Core.Impl.Cache IgniteArgumentCheck.NotNull(parts, "parts"); return DoOutInOp(OpMapPartitionsToNodes, - w => WriteEnumerable(w, parts), + w => w.WriteEnumerable(parts), reader => ReadDictionary(reader, r => r.ReadInt(), ReadNode)); } http://git-wip-us.apache.org/repos/asf/ignite/blob/89bba2fa/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheEnumerator.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheEnumerator.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheEnumerator.cs index e2b8350..2860bb6 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheEnumerator.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheEnumerator.cs @@ -21,14 +21,12 @@ namespace Apache.Ignite.Core.Impl.Cache using System.Collections; using System.Collections.Generic; using Apache.Ignite.Core.Cache; - using Apache.Ignite.Core.Impl.Binary; using Apache.Ignite.Core.Impl.Binary.IO; - using Apache.Ignite.Core.Impl.Unmanaged; /// <summary> /// Real cache enumerator communicating with Java. /// </summary> - internal class CacheEnumerator<TK, TV> : PlatformDisposableTarget, IEnumerator<ICacheEntry<TK, TV>> + internal class CacheEnumerator<TK, TV> : PlatformDisposableTargetAdapter, IEnumerator<ICacheEntry<TK, TV>> { /** Operation: next value. */ private const int OpNext = 1; @@ -43,10 +41,8 @@ namespace Apache.Ignite.Core.Impl.Cache /// Constructor. /// </summary> /// <param name="target">Target.</param> - /// <param name="marsh">Marshaller.</param> /// <param name="keepBinary">Keep binary flag.</param> - public CacheEnumerator(IUnmanagedTarget target, Marshaller marsh, bool keepBinary) : - base(target, marsh) + public CacheEnumerator(IPlatformTargetInternal target, bool keepBinary) : base(target) { _keepBinary = keepBinary; } http://git-wip-us.apache.org/repos/asf/ignite/blob/89bba2fa/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheImpl.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheImpl.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheImpl.cs index e6b2408..5789c8f 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheImpl.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/CacheImpl.cs @@ -20,7 +20,6 @@ namespace Apache.Ignite.Core.Impl.Cache using System; using System.Collections; using System.Collections.Generic; - using System.Diagnostics; using System.Diagnostics.CodeAnalysis; using System.Threading.Tasks; using Apache.Ignite.Core.Binary; @@ -38,13 +37,12 @@ namespace Apache.Ignite.Core.Impl.Cache using Apache.Ignite.Core.Impl.Cluster; using Apache.Ignite.Core.Impl.Common; using Apache.Ignite.Core.Impl.Transactions; - using Apache.Ignite.Core.Impl.Unmanaged; /// <summary> /// Native cache wrapper. /// </summary> [SuppressMessage("Microsoft.Design", "CA1001:TypesThatOwnDisposableFieldsShouldBeDisposable")] - internal class CacheImpl<TK, TV> : PlatformTarget, ICache<TK, TV>, ICacheInternal, ICacheLockInternal + internal class CacheImpl<TK, TV> : PlatformTargetAdapter, ICache<TK, TV>, ICacheInternal, ICacheLockInternal { /** Ignite instance. */ private readonly Ignite _ignite; @@ -64,31 +62,32 @@ namespace Apache.Ignite.Core.Impl.Cache /** Transaction manager. */ private readonly CacheTransactionManager _txManager; + /** Pre-allocated delegate. */ + private readonly Func<IBinaryStream, Exception> _readException; + /// <summary> /// Constructor. /// </summary> - /// <param name="grid">Grid.</param> /// <param name="target">Target.</param> - /// <param name="marsh">Marshaller.</param> /// <param name="flagSkipStore">Skip store flag.</param> /// <param name="flagKeepBinary">Keep binary flag.</param> /// <param name="flagNoRetries">No-retries mode flag.</param> /// <param name="flagPartitionRecover">Partition recover mode flag.</param> - public CacheImpl(Ignite grid, IUnmanagedTarget target, Marshaller marsh, + public CacheImpl(IPlatformTargetInternal target, bool flagSkipStore, bool flagKeepBinary, bool flagNoRetries, bool flagPartitionRecover) - : base(target, marsh) + : base(target) { - Debug.Assert(grid != null); - - _ignite = grid; + _ignite = target.Marshaller.Ignite; _flagSkipStore = flagSkipStore; _flagKeepBinary = flagKeepBinary; _flagNoRetries = flagNoRetries; _flagPartitionRecover = flagPartitionRecover; _txManager = GetConfiguration().AtomicityMode == CacheAtomicityMode.Transactional - ? new CacheTransactionManager(grid.GetTransactions()) + ? new CacheTransactionManager(_ignite.GetTransactions()) : null; + + _readException = stream => ReadException(Marshaller.StartUnmarshal(stream)); } /** <inheritDoc /> */ @@ -172,7 +171,7 @@ namespace Apache.Ignite.Core.Impl.Cache if (_flagSkipStore) return this; - return new CacheImpl<TK, TV>(_ignite, DoOutOpObject((int) CacheOp.WithSkipStore), Marshaller, + return new CacheImpl<TK, TV>(DoOutOpObject((int) CacheOp.WithSkipStore), true, _flagKeepBinary, true, _flagPartitionRecover); } @@ -196,7 +195,7 @@ namespace Apache.Ignite.Core.Impl.Cache return result; } - return new CacheImpl<TK1, TV1>(_ignite, DoOutOpObject((int) CacheOp.WithKeepBinary), Marshaller, + return new CacheImpl<TK1, TV1>(DoOutOpObject((int) CacheOp.WithKeepBinary), _flagSkipStore, true, _flagNoRetries, _flagPartitionRecover); } @@ -207,7 +206,7 @@ namespace Apache.Ignite.Core.Impl.Cache var cache0 = DoOutOpObject((int)CacheOp.WithExpiryPolicy, w => ExpiryPolicySerializer.WritePolicy(w, plc)); - return new CacheImpl<TK, TV>(_ignite, cache0, Marshaller, _flagSkipStore, _flagKeepBinary, + return new CacheImpl<TK, TV>(cache0, _flagSkipStore, _flagKeepBinary, _flagNoRetries, _flagPartitionRecover); } @@ -220,7 +219,7 @@ namespace Apache.Ignite.Core.Impl.Cache /** <inheritDoc /> */ public void LoadCache(ICacheEntryFilter<TK, TV> p, params object[] args) { - DoOutInOpX((int) CacheOp.LoadCache, writer => WriteLoadCacheData(writer, p, args), ReadException); + DoOutInOpX((int) CacheOp.LoadCache, writer => WriteLoadCacheData(writer, p, args), _readException); } /** <inheritDoc /> */ @@ -232,7 +231,7 @@ namespace Apache.Ignite.Core.Impl.Cache /** <inheritDoc /> */ public void LocalLoadCache(ICacheEntryFilter<TK, TV> p, params object[] args) { - DoOutInOpX((int) CacheOp.LocLoadCache, writer => WriteLoadCacheData(writer, p, args), ReadException); + DoOutInOpX((int) CacheOp.LocLoadCache, writer => WriteLoadCacheData(writer, p, args), _readException); } /** <inheritDoc /> */ @@ -281,7 +280,7 @@ namespace Apache.Ignite.Core.Impl.Cache return DoOutOpAsync(CacheOp.LoadAll, writer => { writer.WriteBoolean(replaceExistingValues); - WriteEnumerable(writer, keys); + writer.WriteEnumerable(keys); }); } @@ -306,7 +305,7 @@ namespace Apache.Ignite.Core.Impl.Cache { IgniteArgumentCheck.NotNull(keys, "keys"); - return DoOutOp(CacheOp.ContainsKeys, writer => WriteEnumerable(writer, keys)); + return DoOutOp(CacheOp.ContainsKeys, writer => writer.WriteEnumerable(keys)); } /** <inheritDoc /> */ @@ -314,7 +313,7 @@ namespace Apache.Ignite.Core.Impl.Cache { IgniteArgumentCheck.NotNull(keys, "keys"); - return DoOutOpAsync<bool>(CacheOp.ContainsKeysAsync, writer => WriteEnumerable(writer, keys)); + return DoOutOpAsync<bool>(CacheOp.ContainsKeysAsync, writer => writer.WriteEnumerable(keys)); } /** <inheritDoc /> */ @@ -342,7 +341,7 @@ namespace Apache.Ignite.Core.Impl.Cache w.WriteInt(EncodePeekModes(modes)); }, (s, r) => r == True ? new CacheResult<TV>(Unmarshal<TV>(s)) : new CacheResult<TV>(), - ReadException); + _readException); value = res.Success ? res.Value : default(TV); @@ -375,7 +374,7 @@ namespace Apache.Ignite.Core.Impl.Cache throw GetKeyNotFoundException(); return Unmarshal<TV>(stream); - }, ReadException); + }, _readException); } /** <inheritDoc /> */ @@ -418,9 +417,9 @@ namespace Apache.Ignite.Core.Impl.Cache IgniteArgumentCheck.NotNull(keys, "keys"); return DoOutInOpX((int) CacheOp.GetAll, - writer => WriteEnumerable(writer, keys), + writer => writer.WriteEnumerable(keys), (s, r) => r == True ? ReadGetAllDictionary(Marshaller.StartUnmarshal(s, _flagKeepBinary)) : null, - ReadException); + _readException); } /** <inheritDoc /> */ @@ -428,7 +427,7 @@ namespace Apache.Ignite.Core.Impl.Cache { IgniteArgumentCheck.NotNull(keys, "keys"); - return DoOutOpAsync(CacheOp.GetAllAsync, w => WriteEnumerable(w, keys), r => ReadGetAllDictionary(r)); + return DoOutOpAsync(CacheOp.GetAllAsync, w => w.WriteEnumerable(keys), r => ReadGetAllDictionary(r)); } /** <inheritdoc /> */ @@ -631,7 +630,7 @@ namespace Apache.Ignite.Core.Impl.Cache StartTx(); - DoOutOp(CacheOp.PutAll, writer => WriteDictionary(writer, vals)); + DoOutOp(CacheOp.PutAll, writer => writer.WriteDictionary(vals)); } /** <inheritDoc /> */ @@ -641,7 +640,7 @@ namespace Apache.Ignite.Core.Impl.Cache StartTx(); - return DoOutOpAsync(CacheOp.PutAllAsync, writer => WriteDictionary(writer, vals)); + return DoOutOpAsync(CacheOp.PutAllAsync, writer => writer.WriteDictionary(vals)); } /** <inheritdoc /> */ @@ -649,7 +648,7 @@ namespace Apache.Ignite.Core.Impl.Cache { IgniteArgumentCheck.NotNull(keys, "keys"); - DoOutOp(CacheOp.LocEvict, writer => WriteEnumerable(writer, keys)); + DoOutOp(CacheOp.LocEvict, writer => writer.WriteEnumerable(keys)); } /** <inheritdoc /> */ @@ -685,7 +684,7 @@ namespace Apache.Ignite.Core.Impl.Cache { IgniteArgumentCheck.NotNull(keys, "keys"); - DoOutOp(CacheOp.ClearAll, writer => WriteEnumerable(writer, keys)); + DoOutOp(CacheOp.ClearAll, writer => writer.WriteEnumerable(keys)); } /** <inheritDoc /> */ @@ -693,7 +692,7 @@ namespace Apache.Ignite.Core.Impl.Cache { IgniteArgumentCheck.NotNull(keys, "keys"); - return DoOutOpAsync(CacheOp.ClearAllAsync, writer => WriteEnumerable(writer, keys)); + return DoOutOpAsync(CacheOp.ClearAllAsync, writer => writer.WriteEnumerable(keys)); } /** <inheritdoc /> */ @@ -709,7 +708,7 @@ namespace Apache.Ignite.Core.Impl.Cache { IgniteArgumentCheck.NotNull(keys, "keys"); - DoOutOp(CacheOp.LocalClearAll, writer => WriteEnumerable(writer, keys)); + DoOutOp(CacheOp.LocalClearAll, writer => writer.WriteEnumerable(keys)); } /** <inheritdoc /> */ @@ -761,7 +760,7 @@ namespace Apache.Ignite.Core.Impl.Cache StartTx(); - DoOutOp(CacheOp.RemoveAll, writer => WriteEnumerable(writer, keys)); + DoOutOp(CacheOp.RemoveAll, writer => writer.WriteEnumerable(keys)); } /** <inheritDoc /> */ @@ -771,7 +770,7 @@ namespace Apache.Ignite.Core.Impl.Cache StartTx(); - return DoOutOpAsync(CacheOp.RemoveAllAsync, writer => WriteEnumerable(writer, keys)); + return DoOutOpAsync(CacheOp.RemoveAllAsync, writer => writer.WriteEnumerable(keys)); } /** <inheritDoc /> */ @@ -843,7 +842,7 @@ namespace Apache.Ignite.Core.Impl.Cache writer.Write(holder); }, (input, res) => res == True ? Unmarshal<TRes>(input) : default(TRes), - ReadException); + _readException); } /** <inheritDoc /> */ @@ -891,10 +890,12 @@ namespace Apache.Ignite.Core.Impl.Cache return DoOutInOpX((int) CacheOp.InvokeAll, writer => { - WriteEnumerable(writer, keys); + writer.WriteEnumerable(keys); writer.Write(holder); }, - (input, res) => res == True ? ReadInvokeAllResults<TRes>(Marshaller.StartUnmarshal(input, IsKeepBinary)): null, ReadException); + (input, res) => res == True + ? ReadInvokeAllResults<TRes>(Marshaller.StartUnmarshal(input, IsKeepBinary)) + : null, _readException); } /** <inheritDoc /> */ @@ -912,7 +913,7 @@ namespace Apache.Ignite.Core.Impl.Cache return DoOutOpAsync(CacheOp.InvokeAllAsync, writer => { - WriteEnumerable(writer, keys); + writer.WriteEnumerable(keys); writer.Write(holder); }, input => ReadInvokeAllResults<TRes>(input)); @@ -931,7 +932,7 @@ namespace Apache.Ignite.Core.Impl.Cache }, (input, res) => res == True ? readFunc(Marshaller.StartUnmarshal(input)) - : default(T), ReadException); + : default(T), _readException); } /** <inheritdoc /> */ @@ -940,7 +941,7 @@ namespace Apache.Ignite.Core.Impl.Cache IgniteArgumentCheck.NotNull(key, "key"); return DoOutInOpX((int) CacheOp.Lock, w => w.Write(key), - (stream, res) => new CacheLock(stream.ReadInt(), this), ReadException); + (stream, res) => new CacheLock(stream.ReadInt(), this), _readException); } /** <inheritdoc /> */ @@ -948,8 +949,8 @@ namespace Apache.Ignite.Core.Impl.Cache { IgniteArgumentCheck.NotNull(keys, "keys"); - return DoOutInOpX((int) CacheOp.LockAll, w => WriteEnumerable(w, keys), - (stream, res) => new CacheLock(stream.ReadInt(), this), ReadException); + return DoOutInOpX((int) CacheOp.LockAll, w => w.WriteEnumerable(keys), + (stream, res) => new CacheLock(stream.ReadInt(), this), _readException); } /** <inheritdoc /> */ @@ -1011,7 +1012,7 @@ namespace Apache.Ignite.Core.Impl.Cache if (_flagNoRetries) return this; - return new CacheImpl<TK, TV>(_ignite, DoOutOpObject((int) CacheOp.WithNoRetries), Marshaller, + return new CacheImpl<TK, TV>(DoOutOpObject((int) CacheOp.WithNoRetries), _flagSkipStore, _flagKeepBinary, true, _flagPartitionRecover); } @@ -1021,7 +1022,7 @@ namespace Apache.Ignite.Core.Impl.Cache if (_flagPartitionRecover) return this; - return new CacheImpl<TK, TV>(_ignite, DoOutOpObject((int) CacheOp.WithPartitionRecover), Marshaller, + return new CacheImpl<TK, TV>(DoOutOpObject((int) CacheOp.WithPartitionRecover), _flagSkipStore, _flagKeepBinary, _flagNoRetries, true); } @@ -1092,7 +1093,7 @@ namespace Apache.Ignite.Core.Impl.Cache writer.WriteString(qry.Schema); // Schema }); - return new FieldsQueryCursor<T>(cursor, Marshaller, _flagKeepBinary, readerFunc); + return new FieldsQueryCursor<T>(cursor, _flagKeepBinary, readerFunc); } /** <inheritDoc /> */ @@ -1102,7 +1103,7 @@ namespace Apache.Ignite.Core.Impl.Cache var cursor = DoOutOpObject((int) qry.OpId, writer => qry.Write(writer, IsKeepBinary)); - return new QueryCursor<TK, TV>(cursor, Marshaller, _flagKeepBinary); + return new QueryCursor<TK, TV>(cursor, _flagKeepBinary); } /** <inheritdoc /> */ @@ -1168,10 +1169,10 @@ namespace Apache.Ignite.Core.Impl.Cache { var target = DoOutOpObject((int) CacheOp.LocIterator, (IBinaryStream s) => s.WriteInt(peekModes)); - return new CacheEnumerator<TK, TV>(target, Marshaller, _flagKeepBinary); + return new CacheEnumerator<TK, TV>(target, _flagKeepBinary); } - return new CacheEnumerator<TK, TV>(DoOutOpObject((int) CacheOp.Iterator), Marshaller, _flagKeepBinary); + return new CacheEnumerator<TK, TV>(DoOutOpObject((int) CacheOp.Iterator), _flagKeepBinary); } #endregion @@ -1228,14 +1229,6 @@ namespace Apache.Ignite.Core.Impl.Cache } /// <summary> - /// Reads the exception. - /// </summary> - private Exception ReadException(IBinaryStream stream) - { - return ReadException(Marshaller.StartUnmarshal(stream)); - } - - /// <summary> /// Reads the exception, either in binary wrapper form, or as a pair of strings. /// </summary> /// <param name="reader">The stream.</param> @@ -1315,7 +1308,7 @@ namespace Apache.Ignite.Core.Impl.Cache return DoOutInOpX((int) op, w => { w.Write(x); - }, ReadException); + }, _readException); } /// <summary> @@ -1327,7 +1320,7 @@ namespace Apache.Ignite.Core.Impl.Cache { w.Write(x); w.Write(y); - }, ReadException); + }, _readException); } /// <summary> @@ -1340,7 +1333,7 @@ namespace Apache.Ignite.Core.Impl.Cache w.Write(x); w.Write(y); w.Write(z); - }, ReadException); + }, _readException); } /// <summary> @@ -1348,7 +1341,7 @@ namespace Apache.Ignite.Core.Impl.Cache /// </summary> private bool DoOutOp(CacheOp op, Action<BinaryWriter> write) { - return DoOutInOpX((int) op, write, ReadException); + return DoOutInOpX((int) op, write, _readException); } /// <summary> @@ -1359,7 +1352,7 @@ namespace Apache.Ignite.Core.Impl.Cache return DoOutInOpX((int)cacheOp, w => w.Write(x), (stream, res) => res == True ? new CacheResult<TV>(Unmarshal<TV>(stream)) : new CacheResult<TV>(), - ReadException); + _readException); } /// <summary> @@ -1374,7 +1367,7 @@ namespace Apache.Ignite.Core.Impl.Cache w.Write(y); }, (stream, res) => res == True ? new CacheResult<TV>(Unmarshal<TV>(stream)) : new CacheResult<TV>(), - ReadException); + _readException); } /** <inheritdoc /> */ http://git-wip-us.apache.org/repos/asf/ignite/blob/89bba2fa/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Query/AbstractQueryCursor.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Query/AbstractQueryCursor.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Query/AbstractQueryCursor.cs index 95c6a36..8e4985e 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Query/AbstractQueryCursor.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Query/AbstractQueryCursor.cs @@ -23,13 +23,11 @@ namespace Apache.Ignite.Core.Impl.Cache.Query using Apache.Ignite.Core.Cache.Query; using Apache.Ignite.Core.Impl.Binary; using Apache.Ignite.Core.Impl.Binary.IO; - using Apache.Ignite.Core.Impl.Unmanaged; - using UU = Apache.Ignite.Core.Impl.Unmanaged.UnmanagedUtils; /// <summary> /// Abstract query cursor implementation. /// </summary> - internal abstract class AbstractQueryCursor<T> : PlatformDisposableTarget, IQueryCursor<T>, IEnumerator<T> + internal abstract class AbstractQueryCursor<T> : PlatformDisposableTargetAdapter, IQueryCursor<T>, IEnumerator<T> { /** */ private const int OpGetAll = 1; @@ -65,10 +63,8 @@ namespace Apache.Ignite.Core.Impl.Cache.Query /// Constructor. /// </summary> /// <param name="target">Target.</param> - /// <param name="marsh">Marshaller.</param> /// <param name="keepBinary">Keep binary flag.</param> - protected AbstractQueryCursor(IUnmanagedTarget target, Marshaller marsh, bool keepBinary) : - base(target, marsh) + protected AbstractQueryCursor(IPlatformTargetInternal target, bool keepBinary) : base(target) { _keepBinary = keepBinary; } @@ -88,7 +84,7 @@ namespace Apache.Ignite.Core.Impl.Cache.Query throw new InvalidOperationException("Failed to get all entries because GetAll() " + "method has already been called."); - var res = DoInOp<IList<T>>(OpGetAll, ConvertGetAll); + var res = DoInOp(OpGetAll, ConvertGetAll); _getAllCalled = true; @@ -216,7 +212,7 @@ namespace Apache.Ignite.Core.Impl.Cache.Query /// </summary> private void RequestBatch() { - _batch = DoInOp<T[]>(OpGetBatch, ConvertGetBatch); + _batch = DoInOp(OpGetBatch, ConvertGetBatch); _batchPos = 0; } http://git-wip-us.apache.org/repos/asf/ignite/blob/89bba2fa/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Query/Continuous/ContinuousQueryHandleImpl.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Query/Continuous/ContinuousQueryHandleImpl.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Query/Continuous/ContinuousQueryHandleImpl.cs index 6139d8b..ff5c434 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Query/Continuous/ContinuousQueryHandleImpl.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Query/Continuous/ContinuousQueryHandleImpl.cs @@ -28,8 +28,6 @@ namespace Apache.Ignite.Core.Impl.Cache.Query.Continuous using Apache.Ignite.Core.Impl.Binary.IO; using Apache.Ignite.Core.Impl.Common; using Apache.Ignite.Core.Impl.Resource; - using Apache.Ignite.Core.Impl.Unmanaged; - using UU = Apache.Ignite.Core.Impl.Unmanaged.UnmanagedUtils; using CQU = ContinuousQueryUtils; /// <summary> @@ -67,7 +65,7 @@ namespace Apache.Ignite.Core.Impl.Cache.Query.Continuous private readonly long _hnd; /** Native query. */ - private readonly IUnmanagedTarget _nativeQry; + private readonly IPlatformTargetInternal _nativeQry; /** Initial query cursor. */ private volatile IQueryCursor<ICacheEntry<TK, TV>> _initialQueryCursor; @@ -84,7 +82,7 @@ namespace Apache.Ignite.Core.Impl.Cache.Query.Continuous /// <param name="createTargetCb">The initialization callback.</param> /// <param name="initialQry">The initial query.</param> public ContinuousQueryHandleImpl(ContinuousQuery<TK, TV> qry, Marshaller marsh, bool keepBinary, - Func<Action<BinaryWriter>, IUnmanagedTarget> createTargetCb, QueryBase initialQry) + Func<Action<BinaryWriter>, IPlatformTargetInternal> createTargetCb, QueryBase initialQry) { _marsh = marsh; _keepBinary = keepBinary; @@ -138,10 +136,10 @@ namespace Apache.Ignite.Core.Impl.Cache.Query.Continuous }); // 4. Initial query. - var nativeInitialQryCur = UU.TargetOutObject(_nativeQry, 0); + var nativeInitialQryCur = _nativeQry.OutObjectInternal(0); _initialQueryCursor = nativeInitialQryCur == null ? null - : new QueryCursor<TK, TV>(nativeInitialQryCur, _marsh, _keepBinary); + : new QueryCursor<TK, TV>(nativeInitialQryCur, _keepBinary); } catch (Exception) { @@ -225,7 +223,7 @@ namespace Apache.Ignite.Core.Impl.Cache.Query.Continuous try { - UU.TargetInLongOutLong(_nativeQry, 0, 0); + _nativeQry.InLongOutLong(0, 0); } finally { http://git-wip-us.apache.org/repos/asf/ignite/blob/89bba2fa/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Query/FieldsQueryCursor.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Query/FieldsQueryCursor.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Query/FieldsQueryCursor.cs index d928418..9d021dc 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Query/FieldsQueryCursor.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Query/FieldsQueryCursor.cs @@ -22,7 +22,6 @@ namespace Apache.Ignite.Core.Impl.Cache.Query using System.Diagnostics.CodeAnalysis; using Apache.Ignite.Core.Binary; using Apache.Ignite.Core.Impl.Binary; - using Apache.Ignite.Core.Impl.Unmanaged; /// <summary> /// Cursor for entry-based queries. @@ -36,12 +35,11 @@ namespace Apache.Ignite.Core.Impl.Cache.Query /// Constructor. /// </summary> /// <param name="target">Target.</param> - /// <param name="marsh">Marshaler.</param> /// <param name="keepBinary">Keep poratble flag.</param> /// <param name="readerFunc">The reader function.</param> - public FieldsQueryCursor(IUnmanagedTarget target, Marshaller marsh, bool keepBinary, + public FieldsQueryCursor(IPlatformTargetInternal target, bool keepBinary, Func<IBinaryRawReader, int, T> readerFunc) - : base(target, marsh, keepBinary) + : base(target, keepBinary) { Debug.Assert(readerFunc != null); http://git-wip-us.apache.org/repos/asf/ignite/blob/89bba2fa/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Query/QueryCursor.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Query/QueryCursor.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Query/QueryCursor.cs index 5a46915..bc3cdb6 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Query/QueryCursor.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cache/Query/QueryCursor.cs @@ -20,7 +20,6 @@ namespace Apache.Ignite.Core.Impl.Cache.Query using System.Diagnostics.CodeAnalysis; using Apache.Ignite.Core.Cache; using Apache.Ignite.Core.Impl.Binary; - using Apache.Ignite.Core.Impl.Unmanaged; /// <summary> /// Cursor for entry-based queries. @@ -31,10 +30,8 @@ namespace Apache.Ignite.Core.Impl.Cache.Query /// Constructor. /// </summary> /// <param name="target">Target.</param> - /// <param name="marsh">Marshaler.</param> /// <param name="keepBinary">Keep poratble flag.</param> - public QueryCursor(IUnmanagedTarget target, Marshaller marsh, - bool keepBinary) : base(target, marsh, keepBinary) + public QueryCursor(IPlatformTargetInternal target, bool keepBinary) : base(target, keepBinary) { // No-op. } http://git-wip-us.apache.org/repos/asf/ignite/blob/89bba2fa/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cluster/ClusterGroupImpl.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cluster/ClusterGroupImpl.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cluster/ClusterGroupImpl.cs index 30afe57..678fb03 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cluster/ClusterGroupImpl.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Cluster/ClusterGroupImpl.cs @@ -37,16 +37,14 @@ namespace Apache.Ignite.Core.Impl.Cluster using Apache.Ignite.Core.Impl.Messaging; using Apache.Ignite.Core.Impl.PersistentStore; using Apache.Ignite.Core.Impl.Services; - using Apache.Ignite.Core.Impl.Unmanaged; using Apache.Ignite.Core.Messaging; using Apache.Ignite.Core.PersistentStore; using Apache.Ignite.Core.Services; - using UU = Apache.Ignite.Core.Impl.Unmanaged.UnmanagedUtils; /// <summary> /// Ignite projection implementation. /// </summary> - internal class ClusterGroupImpl : PlatformTarget, IClusterGroup + internal class ClusterGroupImpl : PlatformTargetAdapter, IClusterGroup { /** Attribute: platform. */ private const string AttrPlatform = "org.apache.ignite.platform"; @@ -175,13 +173,12 @@ namespace Apache.Ignite.Core.Impl.Cluster /// Constructor. /// </summary> /// <param name="target">Target.</param> - /// <param name="ignite">Grid.</param> /// <param name="pred">Predicate.</param> [SuppressMessage("Microsoft.Performance", "CA1805:DoNotInitializeUnnecessarily")] - public ClusterGroupImpl(IUnmanagedTarget target, Ignite ignite, Func<IClusterNode, bool> pred) - : base(target, ignite.Marshaller) + public ClusterGroupImpl(IPlatformTargetInternal target, Func<IClusterNode, bool> pred) + : base(target) { - _ignite = ignite; + _ignite = target.Marshaller.Ignite; _pred = pred; _comp = new Lazy<ICompute>(() => CreateCompute()); @@ -207,7 +204,7 @@ namespace Apache.Ignite.Core.Impl.Cluster /// </summary> private ICompute CreateCompute() { - return new Compute(new ComputeImpl(DoOutOpObject(OpGetCompute), Marshaller, this, false)); + return new Compute(new ComputeImpl(DoOutOpObject(OpGetCompute), this, false)); } /** <inheritDoc /> */ @@ -252,10 +249,7 @@ namespace Apache.Ignite.Core.Impl.Cluster { Debug.Assert(items != null); - IUnmanagedTarget prj = DoOutOpObject(OpForNodeIds, writer => - { - WriteEnumerable(writer, items, func); - }); + var prj = DoOutOpObject(OpForNodeIds, writer => writer.WriteEnumerable(items, func)); return GetClusterGroup(prj); } @@ -265,7 +259,7 @@ namespace Apache.Ignite.Core.Impl.Cluster { var newPred = _pred == null ? p : node => _pred(node) && p(node); - return new ClusterGroupImpl(Target, _ignite, newPred); + return new ClusterGroupImpl(Target, newPred); } /** <inheritDoc /> */ @@ -278,7 +272,7 @@ namespace Apache.Ignite.Core.Impl.Cluster writer.WriteString(name); writer.WriteString(val); }; - IUnmanagedTarget prj = DoOutOpObject(OpForAttribute, action); + var prj = DoOutOpObject(OpForAttribute, action); return GetClusterGroup(prj); } @@ -293,7 +287,7 @@ namespace Apache.Ignite.Core.Impl.Cluster /// </returns> private IClusterGroup ForCacheNodes(string name, int op) { - IUnmanagedTarget prj = DoOutOpObject(op, writer => + var prj = DoOutOpObject(op, writer => { writer.WriteString(name); }); @@ -336,7 +330,7 @@ namespace Apache.Ignite.Core.Impl.Cluster { IgniteArgumentCheck.NotNull(node, "node"); - IUnmanagedTarget prj = DoOutOpObject(OpForHost, writer => + var prj = DoOutOpObject(OpForHost, writer => { writer.WriteGuid(node.Id); }); @@ -404,15 +398,14 @@ namespace Apache.Ignite.Core.Impl.Cluster return reader.ReadBoolean() ? new ClusterMetricsImpl(reader) : null; }); } - return DoOutInOp(OpMetricsFiltered, writer => - { - WriteEnumerable(writer, GetNodes().Select(node => node.Id)); - }, stream => - { - IBinaryRawReader reader = Marshaller.StartUnmarshal(stream, false); + return DoOutInOp(OpMetricsFiltered, + writer => writer.WriteEnumerable(GetNodes().Select(node => node.Id)), + stream => + { + IBinaryRawReader reader = Marshaller.StartUnmarshal(stream, false); - return reader.ReadBoolean() ? new ClusterMetricsImpl(reader) : null; - }); + return reader.ReadBoolean() ? new ClusterMetricsImpl(reader) : null; + }); } /** <inheritDoc /> */ @@ -426,7 +419,7 @@ namespace Apache.Ignite.Core.Impl.Cluster /// </summary> private IMessaging CreateMessaging() { - return new Messaging(DoOutOpObject(OpGetMessaging), Marshaller, this); + return new Messaging(DoOutOpObject(OpGetMessaging), this); } /** <inheritDoc /> */ @@ -440,7 +433,7 @@ namespace Apache.Ignite.Core.Impl.Cluster /// </summary> private IEvents CreateEvents() { - return new Events(DoOutOpObject(OpGetEvents), Marshaller, this); + return new Events(DoOutOpObject(OpGetEvents), this); } /** <inheritDoc /> */ @@ -454,7 +447,7 @@ namespace Apache.Ignite.Core.Impl.Cluster /// </summary> private IServices CreateServices() { - return new Services(DoOutOpObject(OpGetServices), Marshaller, this, false, false); + return new Services(DoOutOpObject(OpGetServices), this, false, false); } /// <summary> @@ -665,9 +658,9 @@ namespace Apache.Ignite.Core.Impl.Cluster /// </summary> /// <param name="prj">Native projection.</param> /// <returns>New cluster group.</returns> - private IClusterGroup GetClusterGroup(IUnmanagedTarget prj) + private IClusterGroup GetClusterGroup(IPlatformTargetInternal prj) { - return new ClusterGroupImpl(prj, _ignite, _pred); + return new ClusterGroupImpl(prj, _pred); } /// <summary> @@ -678,29 +671,30 @@ namespace Apache.Ignite.Core.Impl.Cluster { long oldTopVer = Interlocked.Read(ref _topVer); - List<IClusterNode> newNodes = null; - - DoOutInOp(OpNodes, writer => + var res = Target.InStreamOutStream(OpNodes, writer => { writer.WriteLong(oldTopVer); - }, input => + }, reader => { - BinaryReader reader = Marshaller.StartUnmarshal(input); - if (reader.ReadBoolean()) { // Topology has been updated. long newTopVer = reader.ReadLong(); + var newNodes = IgniteUtils.ReadNodes((BinaryReader) reader, _pred); - newNodes = IgniteUtils.ReadNodes(reader, _pred); - - UpdateTopology(newTopVer, newNodes); + return Tuple.Create(newTopVer, newNodes); } + + return null; }); - if (newNodes != null) - return newNodes; - + if (res != null) + { + UpdateTopology(res.Item1, res.Item2); + + return res.Item2; + } + // No topology changes. Debug.Assert(_nodes != null, "At least one topology update should have occurred."); http://git-wip-us.apache.org/repos/asf/ignite/blob/89bba2fa/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Common/DelegateTypeDescriptor.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Common/DelegateTypeDescriptor.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Common/DelegateTypeDescriptor.cs index 4cd0678..cc12caa 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Common/DelegateTypeDescriptor.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Common/DelegateTypeDescriptor.cs @@ -28,7 +28,6 @@ namespace Apache.Ignite.Core.Impl.Common using Apache.Ignite.Core.Impl.Cache; using Apache.Ignite.Core.Impl.Cache.Query.Continuous; using Apache.Ignite.Core.Impl.Datastream; - using Apache.Ignite.Core.Impl.Unmanaged; using Apache.Ignite.Core.Messaging; /// <summary> @@ -66,7 +65,7 @@ namespace Apache.Ignite.Core.Impl.Common private readonly Action<object> _computeJobCancel; /** */ - private readonly Action<object, Ignite, IUnmanagedTarget, IBinaryStream, bool> _streamReceiver; + private readonly Action<object, Ignite, IPlatformTargetInternal, IBinaryStream, bool> _streamReceiver; /** */ private readonly Func<object, object> _streamTransformerCtor; @@ -163,7 +162,7 @@ namespace Apache.Ignite.Core.Impl.Common /// </summary> /// <param name="type">Type.</param> /// <returns>Precompiled invocator delegate.</returns> - public static Action<object, Ignite, IUnmanagedTarget, IBinaryStream, bool> GetStreamReceiver(Type type) + public static Action<object, Ignite, IPlatformTargetInternal, IBinaryStream, bool> GetStreamReceiver(Type type) { return Get(type)._streamReceiver; } @@ -313,12 +312,12 @@ namespace Apache.Ignite.Core.Impl.Common .MakeGenericMethod(iface.GetGenericArguments()); _streamReceiver = DelegateConverter - .CompileFunc<Action<object, Ignite, IUnmanagedTarget, IBinaryStream, bool>>( + .CompileFunc<Action<object, Ignite, IPlatformTargetInternal, IBinaryStream, bool>>( typeof (StreamReceiverHolder), method, new[] { - iface, typeof (Ignite), typeof (IUnmanagedTarget), typeof (IBinaryStream), + iface, typeof (Ignite), typeof (IPlatformTargetInternal), typeof (IBinaryStream), typeof (bool) }, new[] {true, false, false, false, false, false}); http://git-wip-us.apache.org/repos/asf/ignite/blob/89bba2fa/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Common/Listenable.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Common/Listenable.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Common/Listenable.cs index 6da98ab..8566d0b 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Common/Listenable.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Common/Listenable.cs @@ -17,13 +17,10 @@ namespace Apache.Ignite.Core.Impl.Common { - using Apache.Ignite.Core.Impl.Binary; - using Apache.Ignite.Core.Impl.Unmanaged; - /// <summary> /// Platform listenable. /// </summary> - internal class Listenable : PlatformTarget + internal class Listenable : PlatformTargetAdapter { /** */ private const int OpCancel = 1; @@ -32,8 +29,7 @@ namespace Apache.Ignite.Core.Impl.Common /// Initializes a new instance of the <see cref="Listenable"/> class. /// </summary> /// <param name="target">Target.</param> - /// <param name="marsh">Marshaller.</param> - public Listenable(IUnmanagedTarget target, Marshaller marsh) : base(target, marsh) + public Listenable(IPlatformTargetInternal target) : base(target) { // No-op. } http://git-wip-us.apache.org/repos/asf/ignite/blob/89bba2fa/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeImpl.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeImpl.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeImpl.cs index cace7b2..06f9ad4 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeImpl.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeImpl.cs @@ -34,13 +34,12 @@ namespace Apache.Ignite.Core.Impl.Compute using Apache.Ignite.Core.Impl.Cluster; using Apache.Ignite.Core.Impl.Common; using Apache.Ignite.Core.Impl.Compute.Closure; - using Apache.Ignite.Core.Impl.Unmanaged; /// <summary> /// Compute implementation. /// </summary> [SuppressMessage("Microsoft.Design", "CA1001:TypesThatOwnDisposableFieldsShouldBeDisposable")] - internal class ComputeImpl : PlatformTarget + internal class ComputeImpl : PlatformTargetAdapter { /** */ private const int OpAffinity = 1; @@ -76,11 +75,10 @@ namespace Apache.Ignite.Core.Impl.Compute /// Constructor. /// </summary> /// <param name="target">Target.</param> - /// <param name="marsh">Marshaller.</param> /// <param name="prj">Projection.</param> /// <param name="keepBinary">Binary flag.</param> - public ComputeImpl(IUnmanagedTarget target, Marshaller marsh, ClusterGroupImpl prj, bool keepBinary) - : base(target, marsh) + public ComputeImpl(IPlatformTargetInternal target, ClusterGroupImpl prj, bool keepBinary) + : base(target) { _prj = prj; @@ -194,7 +192,7 @@ namespace Apache.Ignite.Core.Impl.Compute var future = holder.Future; - future.SetTarget(new Listenable(futTarget, Marshaller)); + future.SetTarget(new Listenable(futTarget)); return future; } @@ -551,7 +549,7 @@ namespace Apache.Ignite.Core.Impl.Compute writeAction(writer); }); - holder.Future.SetTarget(new Listenable(futTarget, Marshaller)); + holder.Future.SetTarget(new Listenable(futTarget)); } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/ignite/blob/89bba2fa/modules/platforms/dotnet/Apache.Ignite.Core/Impl/DataStructures/AtomicLong.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/DataStructures/AtomicLong.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/DataStructures/AtomicLong.cs index 0c4bf84..f797408 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/DataStructures/AtomicLong.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/DataStructures/AtomicLong.cs @@ -19,16 +19,12 @@ namespace Apache.Ignite.Core.Impl.DataStructures { using System.Diagnostics; using Apache.Ignite.Core.DataStructures; - using Apache.Ignite.Core.Impl.Binary; using Apache.Ignite.Core.Impl.Binary.IO; - using Apache.Ignite.Core.Impl.Unmanaged; - - using UU = Apache.Ignite.Core.Impl.Unmanaged.UnmanagedUtils; /// <summary> /// Atomic long wrapper. /// </summary> - internal sealed class AtomicLong : PlatformTarget, IAtomicLong + internal sealed class AtomicLong : PlatformTargetAdapter, IAtomicLong { /** */ private readonly string _name; @@ -50,9 +46,8 @@ namespace Apache.Ignite.Core.Impl.DataStructures /// Initializes a new instance of the <see cref="AtomicLong"/> class. /// </summary> /// <param name="target">The target.</param> - /// <param name="marsh">The marshaller.</param> /// <param name="name">The name.</param> - public AtomicLong(IUnmanagedTarget target, Marshaller marsh, string name) : base(target, marsh) + public AtomicLong(IPlatformTargetInternal target, string name) : base(target) { Debug.Assert(!string.IsNullOrEmpty(name)); http://git-wip-us.apache.org/repos/asf/ignite/blob/89bba2fa/modules/platforms/dotnet/Apache.Ignite.Core/Impl/DataStructures/AtomicReference.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/DataStructures/AtomicReference.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/DataStructures/AtomicReference.cs index 4ca4b24..76515a2 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/DataStructures/AtomicReference.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/DataStructures/AtomicReference.cs @@ -19,13 +19,11 @@ namespace Apache.Ignite.Core.Impl.DataStructures { using System.Diagnostics; using Apache.Ignite.Core.DataStructures; - using Apache.Ignite.Core.Impl.Binary; - using Apache.Ignite.Core.Impl.Unmanaged; /// <summary> /// Atomic reference. /// </summary> - internal class AtomicReference<T> : PlatformTarget, IAtomicReference<T> + internal class AtomicReference<T> : PlatformTargetAdapter, IAtomicReference<T> { /** Opcodes. */ private enum Op @@ -41,8 +39,8 @@ namespace Apache.Ignite.Core.Impl.DataStructures private readonly string _name; /** <inheritDoc /> */ - public AtomicReference(IUnmanagedTarget target, Marshaller marsh, string name) - : base(target, marsh) + public AtomicReference(IPlatformTargetInternal target, string name) + : base(target) { Debug.Assert(!string.IsNullOrEmpty(name)); http://git-wip-us.apache.org/repos/asf/ignite/blob/89bba2fa/modules/platforms/dotnet/Apache.Ignite.Core/Impl/DataStructures/AtomicSequence.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/DataStructures/AtomicSequence.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/DataStructures/AtomicSequence.cs index f7fc6b7..dd079ef 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/DataStructures/AtomicSequence.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/DataStructures/AtomicSequence.cs @@ -19,13 +19,11 @@ namespace Apache.Ignite.Core.Impl.DataStructures { using System.Diagnostics; using Apache.Ignite.Core.DataStructures; - using Apache.Ignite.Core.Impl.Binary; - using Apache.Ignite.Core.Impl.Unmanaged; /// <summary> /// Atomic long wrapper. /// </summary> - internal sealed class AtomicSequence: PlatformTarget, IAtomicSequence + internal sealed class AtomicSequence: PlatformTargetAdapter, IAtomicSequence { /** */ private readonly string _name; @@ -46,10 +44,9 @@ namespace Apache.Ignite.Core.Impl.DataStructures /// Initializes a new instance of the <see cref="Apache.Ignite.Core.Impl.DataStructures.AtomicLong"/> class. /// </summary> /// <param name="target">The target.</param> - /// <param name="marsh">The marshaller.</param> /// <param name="name">The name.</param> - public AtomicSequence(IUnmanagedTarget target, Marshaller marsh, string name) - : base(target, marsh) + public AtomicSequence(IPlatformTargetInternal target, string name) + : base(target) { Debug.Assert(!string.IsNullOrEmpty(name)); http://git-wip-us.apache.org/repos/asf/ignite/blob/89bba2fa/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Datastream/DataStreamerImpl.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Datastream/DataStreamerImpl.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Datastream/DataStreamerImpl.cs index 96e58d4..fb2df01 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Datastream/DataStreamerImpl.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Datastream/DataStreamerImpl.cs @@ -26,8 +26,6 @@ namespace Apache.Ignite.Core.Impl.Datastream using Apache.Ignite.Core.Datastream; using Apache.Ignite.Core.Impl.Binary; using Apache.Ignite.Core.Impl.Common; - using Apache.Ignite.Core.Impl.Unmanaged; - using UU = Apache.Ignite.Core.Impl.Unmanaged.UnmanagedUtils; /// <summary> /// Data streamer internal interface to get rid of generics. @@ -45,7 +43,7 @@ namespace Apache.Ignite.Core.Impl.Datastream /// <summary> /// Data streamer implementation. /// </summary> - internal class DataStreamerImpl<TK, TV> : PlatformDisposableTarget, IDataStreamer, IDataStreamer<TK, TV> + internal class DataStreamerImpl<TK, TV> : PlatformDisposableTargetAdapter, IDataStreamer, IDataStreamer<TK, TV> { #pragma warning disable 0420 @@ -141,8 +139,8 @@ namespace Apache.Ignite.Core.Impl.Datastream /// <param name="marsh">Marshaller.</param> /// <param name="cacheName">Cache name.</param> /// <param name="keepBinary">Binary flag.</param> - public DataStreamerImpl(IUnmanagedTarget target, Marshaller marsh, string cacheName, bool keepBinary) - : base(target, marsh) + public DataStreamerImpl(IPlatformTargetInternal target, Marshaller marsh, string cacheName, bool keepBinary) + : base(target) { _cacheName = cacheName; _keepBinary = keepBinary;
