IGNITE-4033 Streamline platform callback interface This closes #1261
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/72ac53da Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/72ac53da Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/72ac53da Branch: refs/heads/ignite-4371 Commit: 72ac53da2e6f8311c2d816763a6765724b79491a Parents: f621f7f Author: Pavel Tupitsyn <[email protected]> Authored: Mon Dec 12 17:44:48 2016 +0300 Committer: Pavel Tupitsyn <[email protected]> Committed: Mon Dec 12 17:44:48 2016 +0300 ---------------------------------------------------------------------- .../cache/PlatformCacheEntryFilterImpl.java | 4 +- .../cache/PlatformCacheEntryProcessorImpl.java | 43 +- .../affinity/PlatformAffinityFunction.java | 52 +- .../callback/PlatformCallbackGateway.java | 266 ++-- .../platform/callback/PlatformCallbackOp.java | 206 +++ .../callback/PlatformCallbackUtils.java | 544 +------- .../platform/compute/PlatformAbstractJob.java | 2 +- .../platform/compute/PlatformAbstractTask.java | 15 +- .../platform/compute/PlatformClosureJob.java | 12 +- .../platform/compute/PlatformFullJob.java | 15 +- .../platform/compute/PlatformFullTask.java | 18 +- .../PlatformStreamReceiverImpl.java | 3 + .../dotnet/PlatformDotNetCacheStore.java | 6 +- .../services/PlatformAbstractService.java | 25 +- .../platform/utils/PlatformFutureUtils.java | 4 +- .../platform/utils/PlatformUtils.java | 8 +- .../cpp/core/src/impl/ignite_environment.cpp | 74 +- .../platforms/cpp/jni/include/ignite/jni/java.h | 85 +- modules/platforms/cpp/jni/project/vs/module.def | 4 +- modules/platforms/cpp/jni/src/java.cpp | 339 +---- .../Services/ServicesTest.cs | 3 +- .../Apache.Ignite.Core.csproj | 1 + .../Impl/Binary/BinaryUtils.cs | 16 + .../Impl/Binary/Io/BinaryStreamBase.cs | 4 +- .../Impl/Compute/ComputeTaskHolder.cs | 14 +- .../Impl/Unmanaged/UnmanagedCallbackHandlers.cs | 79 +- .../Impl/Unmanaged/UnmanagedCallbackOp.cs | 86 ++ .../Impl/Unmanaged/UnmanagedCallbacks.cs | 1229 +++++++++--------- 28 files changed, 1293 insertions(+), 1864 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/72ac53da/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCacheEntryFilterImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCacheEntryFilterImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCacheEntryFilterImpl.java index 4c86d6d..3c55b76 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCacheEntryFilterImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCacheEntryFilterImpl.java @@ -60,12 +60,14 @@ public class PlatformCacheEntryFilterImpl extends PlatformAbstractPredicate impl BinaryRawWriterEx writer = ctx.writer(out); + writer.writeLong(ptr); + writer.writeObject(k); writer.writeObject(v); out.synchronize(); - return ctx.gateway().cacheEntryFilterApply(ptr, mem.pointer()) != 0; + return ctx.gateway().cacheEntryFilterApply(mem.pointer()) != 0; } } http://git-wip-us.apache.org/repos/asf/ignite/blob/72ac53da/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCacheEntryProcessorImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCacheEntryProcessorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCacheEntryProcessorImpl.java index 3e8ad61..31dd267 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCacheEntryProcessorImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCacheEntryProcessorImpl.java @@ -17,15 +17,9 @@ package org.apache.ignite.internal.processors.platform.cache; -import java.io.Externalizable; -import java.io.IOException; -import java.io.ObjectInput; -import java.io.ObjectOutput; -import javax.cache.processor.EntryProcessorException; -import javax.cache.processor.MutableEntry; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.internal.IgniteKernal; +import org.apache.ignite.binary.BinaryRawWriter; import org.apache.ignite.internal.binary.BinaryRawReaderEx; import org.apache.ignite.internal.binary.BinaryRawWriterEx; import org.apache.ignite.internal.processors.platform.PlatformContext; @@ -36,6 +30,13 @@ import org.apache.ignite.internal.processors.platform.memory.PlatformOutputStrea import org.apache.ignite.internal.processors.platform.utils.PlatformUtils; import org.apache.ignite.internal.util.typedef.internal.U; +import javax.cache.processor.EntryProcessorException; +import javax.cache.processor.MutableEntry; +import java.io.Externalizable; +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; + /** * Platform cache entry processor. Delegates processing to native platform. */ @@ -65,7 +66,7 @@ public class PlatformCacheEntryProcessorImpl implements PlatformCacheEntryProces private transient long ptr; /** - * {@link java.io.Externalizable} support. + * {@link Externalizable} support. */ public PlatformCacheEntryProcessorImpl() { // No-op. @@ -86,7 +87,7 @@ public class PlatformCacheEntryProcessorImpl implements PlatformCacheEntryProces @Override public Object process(MutableEntry entry, Object... args) throws EntryProcessorException { try { - IgniteKernal ignite = (IgniteKernal)entry.unwrap(Ignite.class); + Ignite ignite = (Ignite)entry.unwrap(Ignite.class); PlatformProcessor interopProc; @@ -112,12 +113,10 @@ public class PlatformCacheEntryProcessorImpl implements PlatformCacheEntryProces * @param ctx Context. * @param entry Entry. * @return Processing result. - * @throws org.apache.ignite.IgniteCheckedException */ - private Object execute0(PlatformContext ctx, MutableEntry entry) - throws IgniteCheckedException { - try (PlatformMemory outMem = ctx.memory().allocate()) { - PlatformOutputStream out = outMem.output(); + private Object execute0(PlatformContext ctx, MutableEntry entry) { + try (PlatformMemory mem = ctx.memory().allocate()) { + PlatformOutputStream out = mem.output(); BinaryRawWriterEx writer = ctx.writer(out); @@ -125,17 +124,15 @@ public class PlatformCacheEntryProcessorImpl implements PlatformCacheEntryProces out.synchronize(); - try (PlatformMemory inMem = ctx.memory().allocate()) { - PlatformInputStream in = inMem.input(); + ctx.gateway().cacheInvoke(mem.pointer()); - ctx.gateway().cacheInvoke(outMem.pointer(), inMem.pointer()); + PlatformInputStream in = mem.input(); - in.synchronize(); + in.synchronize(); - BinaryRawReaderEx reader = ctx.reader(in); + BinaryRawReaderEx reader = ctx.reader(in); - return readResultAndUpdateEntry(ctx, entry, reader); - } + return readResultAndUpdateEntry(ctx, entry, reader); } } @@ -145,7 +142,7 @@ public class PlatformCacheEntryProcessorImpl implements PlatformCacheEntryProces * @param entry Entry to process. * @param writer Writer. */ - private void writeEntryAndProcessor(MutableEntry entry, BinaryRawWriterEx writer) { + private void writeEntryAndProcessor(MutableEntry entry, BinaryRawWriter writer) { writer.writeObject(entry.getKey()); writer.writeObject(entry.getValue()); @@ -167,7 +164,7 @@ public class PlatformCacheEntryProcessorImpl implements PlatformCacheEntryProces * @param entry Mutable entry to update. * @param reader Reader. * @return Entry processing result - * @throws javax.cache.processor.EntryProcessorException If processing has failed in user code. + * @throws EntryProcessorException If processing has failed in user code. */ @SuppressWarnings("unchecked") private Object readResultAndUpdateEntry(PlatformContext ctx, MutableEntry entry, BinaryRawReaderEx reader) { http://git-wip-us.apache.org/repos/asf/ignite/blob/72ac53da/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/affinity/PlatformAffinityFunction.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/affinity/PlatformAffinityFunction.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/affinity/PlatformAffinityFunction.java index 2d3cada..4206d40 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/affinity/PlatformAffinityFunction.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/affinity/PlatformAffinityFunction.java @@ -166,11 +166,12 @@ public class PlatformAffinityFunction implements AffinityFunction, Externalizabl PlatformOutputStream out = mem.output(); BinaryRawWriterEx writer = ctx.writer(out); + writer.writeLong(ptr); writer.writeObject(key); out.synchronize(); - return ctx.gateway().affinityFunctionPartition(ptr, mem.pointer()); + return ctx.gateway().affinityFunctionPartition(mem.pointer()); } } @@ -186,34 +187,34 @@ public class PlatformAffinityFunction implements AffinityFunction, Externalizabl assert ptr != 0; assert affCtx != null; - try (PlatformMemory outMem = ctx.memory().allocate()) { - try (PlatformMemory inMem = ctx.memory().allocate()) { - PlatformOutputStream out = outMem.output(); - BinaryRawWriterEx writer = ctx.writer(out); + try (PlatformMemory mem = ctx.memory().allocate()) { + PlatformOutputStream out = mem.output(); + BinaryRawWriterEx writer = ctx.writer(out); + + writer.writeLong(ptr); - // Write previous assignment - PlatformAffinityUtils.writeAffinityFunctionContext(affCtx, writer, ctx); + // Write previous assignment + PlatformAffinityUtils.writeAffinityFunctionContext(affCtx, writer, ctx); - out.synchronize(); + out.synchronize(); + + // Call platform + // We can not restore original AffinityFunctionContext after the call to platform, + // due to DiscoveryEvent (when node leaves, we can't get it by id anymore). + // Secondly, AffinityFunctionContext can't be changed by the user. + if (baseTarget != null) + baseTarget.setCurrentAffinityFunctionContext(affCtx); - // Call platform - // We can not restore original AffinityFunctionContext after the call to platform, - // due to DiscoveryEvent (when node leaves, we can't get it by id anymore). - // Secondly, AffinityFunctionContext can't be changed by the user. + try { + ctx.gateway().affinityFunctionAssignPartitions(mem.pointer()); + } + finally { if (baseTarget != null) - baseTarget.setCurrentAffinityFunctionContext(affCtx); - - try { - ctx.gateway().affinityFunctionAssignPartitions(ptr, outMem.pointer(), inMem.pointer()); - } - finally { - if (baseTarget != null) - baseTarget.setCurrentAffinityFunctionContext(null); - } - - // Read result - return PlatformAffinityUtils.readPartitionAssignment(ctx.reader(inMem), ctx); + baseTarget.setCurrentAffinityFunctionContext(null); } + + // Read result + return PlatformAffinityUtils.readPartitionAssignment(ctx.reader(mem), ctx); } } @@ -234,11 +235,12 @@ public class PlatformAffinityFunction implements AffinityFunction, Externalizabl PlatformOutputStream out = mem.output(); BinaryRawWriterEx writer = ctx.writer(out); + writer.writeLong(ptr); writer.writeUuid(nodeId); out.synchronize(); - ctx.gateway().affinityFunctionRemoveNode(ptr, mem.pointer()); + ctx.gateway().affinityFunctionRemoveNode(mem.pointer()); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/72ac53da/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackGateway.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackGateway.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackGateway.java index c77f501..a9268fe 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackGateway.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackGateway.java @@ -61,7 +61,7 @@ public class PlatformCallbackGateway { enter(); try { - return PlatformCallbackUtils.cacheStoreCreate(envPtr, memPtr); + return PlatformCallbackUtils.inLongOutLong(envPtr, PlatformCallbackOp.CacheStoreCreate, memPtr); } finally { leave(); @@ -69,15 +69,14 @@ public class PlatformCallbackGateway { } /** - * @param objPtr Object pointer. * @param memPtr Memory pointer. * @return Result. */ - public int cacheStoreInvoke(long objPtr, long memPtr) { + public int cacheStoreInvoke(long memPtr) { enter(); try { - return PlatformCallbackUtils.cacheStoreInvoke(envPtr, objPtr, memPtr); + return (int)PlatformCallbackUtils.inLongOutLong(envPtr, PlatformCallbackOp.CacheStoreInvoke, memPtr); } finally { leave(); @@ -92,7 +91,7 @@ public class PlatformCallbackGateway { return; // no need to destroy stores on grid stop try { - PlatformCallbackUtils.cacheStoreDestroy(envPtr, objPtr); + PlatformCallbackUtils.inLongOutLong(envPtr, PlatformCallbackOp.CacheStoreDestroy, objPtr); } finally { leave(); @@ -102,14 +101,13 @@ public class PlatformCallbackGateway { /** * Creates cache store session. * - * @param storePtr Store instance pointer. * @return Session instance pointer. */ - public long cacheStoreSessionCreate(long storePtr) { + public long cacheStoreSessionCreate() { enter(); try { - return PlatformCallbackUtils.cacheStoreSessionCreate(envPtr, storePtr); + return PlatformCallbackUtils.inLongOutLong(envPtr, PlatformCallbackOp.CacheStoreSessionCreate, 0); } finally { leave(); @@ -126,7 +124,7 @@ public class PlatformCallbackGateway { enter(); try { - return PlatformCallbackUtils.cacheEntryFilterCreate(envPtr, memPtr); + return PlatformCallbackUtils.inLongOutLong(envPtr, PlatformCallbackOp.CacheEntryFilterCreate, memPtr); } finally { leave(); @@ -134,15 +132,14 @@ public class PlatformCallbackGateway { } /** - * @param ptr Pointer. * @param memPtr Memory pointer. * @return Result. */ - public int cacheEntryFilterApply(long ptr, long memPtr) { + public int cacheEntryFilterApply(long memPtr) { enter(); try { - return PlatformCallbackUtils.cacheEntryFilterApply(envPtr, ptr, memPtr); + return (int)PlatformCallbackUtils.inLongOutLong(envPtr, PlatformCallbackOp.CacheEntryFilterApply, memPtr); } finally { leave(); @@ -156,7 +153,7 @@ public class PlatformCallbackGateway { enter(); try { - PlatformCallbackUtils.cacheEntryFilterDestroy(envPtr, ptr); + PlatformCallbackUtils.inLongOutLong(envPtr, PlatformCallbackOp.CacheEntryFilterDestroy, ptr); } finally { leave(); @@ -166,14 +163,13 @@ public class PlatformCallbackGateway { /** * Invoke cache entry processor. * - * @param outMemPtr Output memory pointer. - * @param inMemPtr Input memory pointer. + * @param memPtr Memory pointer. */ - public void cacheInvoke(long outMemPtr, long inMemPtr) { + public void cacheInvoke(long memPtr) { enter(); try { - PlatformCallbackUtils.cacheInvoke(envPtr, outMemPtr, inMemPtr); + PlatformCallbackUtils.inLongOutLong(envPtr, PlatformCallbackOp.CacheInvoke, memPtr); } finally { leave(); @@ -183,15 +179,13 @@ public class PlatformCallbackGateway { /** * Perform native task map. Do not throw exceptions, serializing them to the output stream instead. * - * @param taskPtr Task pointer. - * @param outMemPtr Output memory pointer (exists if topology changed, otherwise {@code 0}). - * @param inMemPtr Input memory pointer. + * @param memPtr Memory pointer. */ - public void computeTaskMap(long taskPtr, long outMemPtr, long inMemPtr) { + public void computeTaskMap(long memPtr) { enter(); try { - PlatformCallbackUtils.computeTaskMap(envPtr, taskPtr, outMemPtr, inMemPtr); + PlatformCallbackUtils.inLongOutLong(envPtr, PlatformCallbackOp.ComputeTaskMap, memPtr); } finally { leave(); @@ -203,14 +197,31 @@ public class PlatformCallbackGateway { * * @param taskPtr Task pointer. * @param jobPtr Job pointer. - * @param memPtr Memory pointer (always zero for local job execution). * @return Job result enum ordinal. */ - public int computeTaskJobResult(long taskPtr, long jobPtr, long memPtr) { + public int computeTaskLocalJobResult(long taskPtr, long jobPtr) { + enter(); + + try { + return (int)PlatformCallbackUtils.inLongLongLongObjectOutLong(envPtr, + PlatformCallbackOp.ComputeTaskLocalJobResult, taskPtr, jobPtr, 0, null); + } + finally { + leave(); + } + } + + /** + * Perform native task job result notification. + * + * @param memPtr Memory pointer. + * @return Job result enum ordinal. + */ + public int computeTaskJobResult(long memPtr) { enter(); try { - return PlatformCallbackUtils.computeTaskJobResult(envPtr, taskPtr, jobPtr, memPtr); + return (int)PlatformCallbackUtils.inLongOutLong(envPtr, PlatformCallbackOp.ComputeTaskJobResult, memPtr); } finally { leave(); @@ -226,7 +237,7 @@ public class PlatformCallbackGateway { enter(); try { - PlatformCallbackUtils.computeTaskReduce(envPtr, taskPtr); + PlatformCallbackUtils.inLongOutLong(envPtr, PlatformCallbackOp.ComputeTaskReduce, taskPtr); } finally { leave(); @@ -243,7 +254,8 @@ public class PlatformCallbackGateway { enter(); try { - PlatformCallbackUtils.computeTaskComplete(envPtr, taskPtr, memPtr); + PlatformCallbackUtils.inLongLongLongObjectOutLong(envPtr, + PlatformCallbackOp.ComputeTaskComplete, taskPtr, memPtr, 0, null); } finally { leave(); @@ -261,7 +273,8 @@ public class PlatformCallbackGateway { enter(); try { - return PlatformCallbackUtils.computeJobSerialize(envPtr, jobPtr, memPtr); + return (int)PlatformCallbackUtils.inLongLongLongObjectOutLong(envPtr, + PlatformCallbackOp.ComputeJobSerialize, jobPtr, memPtr, 0, null); } finally { leave(); @@ -278,7 +291,7 @@ public class PlatformCallbackGateway { enter(); try { - return PlatformCallbackUtils.computeJobCreate(envPtr, memPtr); + return PlatformCallbackUtils.inLongOutLong(envPtr, PlatformCallbackOp.ComputeJobCreate, memPtr); } finally { leave(); @@ -290,13 +303,29 @@ public class PlatformCallbackGateway { * * @param jobPtr Job pointer. * @param cancel Cancel flag. - * @param memPtr Memory pointer to write result to for remote job execution or {@code 0} for local job execution. */ - public void computeJobExecute(long jobPtr, int cancel, long memPtr) { + public void computeJobExecuteLocal(long jobPtr, long cancel) { enter(); try { - PlatformCallbackUtils.computeJobExecute(envPtr, jobPtr, cancel, memPtr); + PlatformCallbackUtils.inLongLongLongObjectOutLong(envPtr, + PlatformCallbackOp.ComputeJobExecuteLocal, jobPtr, cancel, 0, null); + } + finally { + leave(); + } + } + + /** + * Execute native job on a node other than where it was created. + * + * @param memPtr Memory pointer. + */ + public void computeJobExecute(long memPtr) { + enter(); + + try { + PlatformCallbackUtils.inLongOutLong(envPtr, PlatformCallbackOp.ComputeJobExecute, memPtr); } finally { leave(); @@ -312,7 +341,7 @@ public class PlatformCallbackGateway { enter(); try { - PlatformCallbackUtils.computeJobCancel(envPtr, jobPtr); + PlatformCallbackUtils.inLongOutLong(envPtr, PlatformCallbackOp.ComputeJobCancel, jobPtr); } finally { leave(); @@ -328,7 +357,7 @@ public class PlatformCallbackGateway { enter(); try { - PlatformCallbackUtils.computeJobDestroy(envPtr, ptr); + PlatformCallbackUtils.inLongOutLong(envPtr, PlatformCallbackOp.ComputeJobDestroy, ptr); } finally { leave(); @@ -338,14 +367,13 @@ public class PlatformCallbackGateway { /** * Invoke local callback. * - * @param cbPtr Callback pointer. * @param memPtr Memory pointer. */ - public void continuousQueryListenerApply(long cbPtr, long memPtr) { + public void continuousQueryListenerApply(long memPtr) { enter(); try { - PlatformCallbackUtils.continuousQueryListenerApply(envPtr, cbPtr, memPtr); + PlatformCallbackUtils.inLongOutLong(envPtr, PlatformCallbackOp.ContinuousQueryListenerApply, memPtr); } finally { leave(); @@ -362,7 +390,7 @@ public class PlatformCallbackGateway { enter(); try { - return PlatformCallbackUtils.continuousQueryFilterCreate(envPtr, memPtr); + return PlatformCallbackUtils.inLongOutLong(envPtr, PlatformCallbackOp.ContinuousQueryFilterCreate, memPtr); } finally { leave(); @@ -372,15 +400,14 @@ public class PlatformCallbackGateway { /** * Invoke remote filter. * - * @param filterPtr Filter pointer. * @param memPtr Memory pointer. * @return Result. */ - public int continuousQueryFilterApply(long filterPtr, long memPtr) { + public long continuousQueryFilterApply(long memPtr) { enter(); try { - return PlatformCallbackUtils.continuousQueryFilterApply(envPtr, filterPtr, memPtr); + return PlatformCallbackUtils.inLongOutLong(envPtr, PlatformCallbackOp.ContinuousQueryFilterApply, memPtr); } finally { leave(); @@ -396,7 +423,8 @@ public class PlatformCallbackGateway { enter(); try { - PlatformCallbackUtils.continuousQueryFilterRelease(envPtr, filterPtr); + PlatformCallbackUtils.inLongOutLong(envPtr, + PlatformCallbackOp.ContinuousQueryFilterRelease, filterPtr); } finally { leave(); @@ -414,7 +442,8 @@ public class PlatformCallbackGateway { enter(); try { - PlatformCallbackUtils.dataStreamerTopologyUpdate(envPtr, ptr, topVer, topSize); + PlatformCallbackUtils.inLongLongLongObjectOutLong(envPtr, + PlatformCallbackOp.DataStreamerTopologyUpdate, ptr, topVer, topSize, null); } finally { leave(); @@ -424,16 +453,15 @@ public class PlatformCallbackGateway { /** * Invoke stream receiver. * - * @param ptr Receiver native pointer. * @param cache Cache object. * @param memPtr Stream pointer. - * @param keepBinary Binary flag. */ public void dataStreamerStreamReceiverInvoke(long ptr, PlatformTargetProxy cache, long memPtr, boolean keepBinary) { enter(); try { - PlatformCallbackUtils.dataStreamerStreamReceiverInvoke(envPtr, ptr, cache, memPtr, keepBinary); + PlatformCallbackUtils.inLongLongLongObjectOutLong(envPtr, + PlatformCallbackOp.DataStreamerStreamReceiverInvoke, memPtr, 0, 0, cache); } finally { leave(); @@ -446,11 +474,12 @@ public class PlatformCallbackGateway { * @param futPtr Future pointer. * @param res Result. */ - public void futureByteResult(long futPtr, int res) { + public void futureByteResult(long futPtr, long res) { enter(); try { - PlatformCallbackUtils.futureByteResult(envPtr, futPtr, res); + PlatformCallbackUtils.inLongLongLongObjectOutLong(envPtr, + PlatformCallbackOp.FutureByteResult, futPtr, res, 0, null); } finally { leave(); @@ -463,11 +492,12 @@ public class PlatformCallbackGateway { * @param futPtr Future pointer. * @param res Result. */ - public void futureBoolResult(long futPtr, int res) { + public void futureBoolResult(long futPtr, long res) { enter(); try { - PlatformCallbackUtils.futureBoolResult(envPtr, futPtr, res); + PlatformCallbackUtils.inLongLongLongObjectOutLong(envPtr, + PlatformCallbackOp.FutureBoolResult, futPtr, res, 0, null); } finally { leave(); @@ -480,11 +510,12 @@ public class PlatformCallbackGateway { * @param futPtr Future pointer. * @param res Result. */ - public void futureShortResult(long futPtr, int res) { + public void futureShortResult(long futPtr, long res) { enter(); try { - PlatformCallbackUtils.futureShortResult(envPtr, futPtr, res); + PlatformCallbackUtils.inLongLongLongObjectOutLong(envPtr, + PlatformCallbackOp.FutureShortResult, futPtr, res, 0, null); } finally { leave(); @@ -497,11 +528,12 @@ public class PlatformCallbackGateway { * @param futPtr Future pointer. * @param res Result. */ - public void futureCharResult(long futPtr, int res) { + public void futureCharResult(long futPtr, long res) { enter(); try { - PlatformCallbackUtils.futureCharResult(envPtr, futPtr, res); + PlatformCallbackUtils.inLongLongLongObjectOutLong(envPtr, + PlatformCallbackOp.FutureCharResult, futPtr, res, 0, null); } finally { leave(); @@ -514,11 +546,12 @@ public class PlatformCallbackGateway { * @param futPtr Future pointer. * @param res Result. */ - public void futureIntResult(long futPtr, int res) { + public void futureIntResult(long futPtr, long res) { enter(); try { - PlatformCallbackUtils.futureIntResult(envPtr, futPtr, res); + PlatformCallbackUtils.inLongLongLongObjectOutLong(envPtr, + PlatformCallbackOp.FutureIntResult, futPtr, res, 0, null); } finally { leave(); @@ -531,11 +564,12 @@ public class PlatformCallbackGateway { * @param futPtr Future pointer. * @param res Result. */ - public void futureFloatResult(long futPtr, float res) { + public void futureFloatResult(long futPtr, long res) { enter(); try { - PlatformCallbackUtils.futureFloatResult(envPtr, futPtr, res); + PlatformCallbackUtils.inLongLongLongObjectOutLong(envPtr, + PlatformCallbackOp.FutureFloatResult, futPtr, res, 0, null); } finally { leave(); @@ -552,7 +586,8 @@ public class PlatformCallbackGateway { enter(); try { - PlatformCallbackUtils.futureLongResult(envPtr, futPtr, res); + PlatformCallbackUtils.inLongLongLongObjectOutLong(envPtr, + PlatformCallbackOp.FutureLongResult, futPtr, res, 0, null); } finally { leave(); @@ -565,11 +600,12 @@ public class PlatformCallbackGateway { * @param futPtr Future pointer. * @param res Result. */ - public void futureDoubleResult(long futPtr, double res) { + public void futureDoubleResult(long futPtr, long res) { enter(); try { - PlatformCallbackUtils.futureDoubleResult(envPtr, futPtr, res); + PlatformCallbackUtils.inLongLongLongObjectOutLong(envPtr, + PlatformCallbackOp.FutureDoubleResult, futPtr, res, 0, null); } finally { leave(); @@ -586,7 +622,8 @@ public class PlatformCallbackGateway { enter(); try { - PlatformCallbackUtils.futureObjectResult(envPtr, futPtr, memPtr); + PlatformCallbackUtils.inLongLongLongObjectOutLong(envPtr, + PlatformCallbackOp.FutureObjectResult, futPtr, memPtr, 0, null); } finally { leave(); @@ -602,7 +639,7 @@ public class PlatformCallbackGateway { enter(); try { - PlatformCallbackUtils.futureNullResult(envPtr, futPtr); + PlatformCallbackUtils.inLongOutLong(envPtr, PlatformCallbackOp.FutureNullResult, futPtr); } finally { leave(); @@ -619,7 +656,8 @@ public class PlatformCallbackGateway { enter(); try { - PlatformCallbackUtils.futureError(envPtr, futPtr, memPtr); + PlatformCallbackUtils.inLongLongLongObjectOutLong(envPtr, + PlatformCallbackOp.FutureError, futPtr, memPtr, 0, null); } finally { leave(); @@ -636,8 +674,9 @@ public class PlatformCallbackGateway { enter(); try { - return PlatformCallbackUtils.messagingFilterCreate(envPtr, memPtr); - } + return PlatformCallbackUtils.inLongOutLong(envPtr, + PlatformCallbackOp.MessagingFilterCreate, memPtr); + } finally { leave(); } @@ -652,7 +691,8 @@ public class PlatformCallbackGateway { enter(); try { - return PlatformCallbackUtils.messagingFilterApply(envPtr, ptr, memPtr); + return (int)PlatformCallbackUtils.inLongLongLongObjectOutLong(envPtr, + PlatformCallbackOp.MessagingFilterApply, ptr, memPtr, 0, null); } finally { leave(); @@ -665,7 +705,7 @@ public class PlatformCallbackGateway { enter(); try { - PlatformCallbackUtils.messagingFilterDestroy(envPtr, ptr); + PlatformCallbackUtils.inLongOutLong(envPtr, PlatformCallbackOp.MessagingFilterDestroy, ptr); } finally { leave(); @@ -682,7 +722,7 @@ public class PlatformCallbackGateway { enter(); try { - return PlatformCallbackUtils.eventFilterCreate(envPtr, memPtr); + return PlatformCallbackUtils.inLongOutLong(envPtr, PlatformCallbackOp.EventFilterCreate, memPtr); } finally { leave(); @@ -698,7 +738,8 @@ public class PlatformCallbackGateway { enter(); try { - return PlatformCallbackUtils.eventFilterApply(envPtr, ptr, memPtr); + return (int)PlatformCallbackUtils.inLongLongLongObjectOutLong(envPtr, + PlatformCallbackOp.EventFilterApply, ptr, memPtr, 0, null); } finally { leave(); @@ -712,7 +753,7 @@ public class PlatformCallbackGateway { enter(); try { - PlatformCallbackUtils.eventFilterDestroy(envPtr, ptr); + PlatformCallbackUtils.inLongOutLong(envPtr, PlatformCallbackOp.EventFilterDestroy, ptr); } finally { leave(); @@ -728,7 +769,7 @@ public class PlatformCallbackGateway { enter(); try { - PlatformCallbackUtils.nodeInfo(envPtr, memPtr); + PlatformCallbackUtils.inLongOutLong(envPtr, PlatformCallbackOp.NodeInfo, memPtr); } finally { leave(); @@ -745,7 +786,7 @@ public class PlatformCallbackGateway { enter(); try { - PlatformCallbackUtils.onStart(envPtr, proc, memPtr); + PlatformCallbackUtils.inLongLongLongObjectOutLong(envPtr, PlatformCallbackOp.OnStart, memPtr, 0, 0, proc); } finally { leave(); @@ -762,7 +803,8 @@ public class PlatformCallbackGateway { enter(); try { - PlatformCallbackUtils.lifecycleEvent(envPtr, ptr, evt); + PlatformCallbackUtils.inLongLongLongObjectOutLong(envPtr, + PlatformCallbackOp.LifecycleOnEvent, ptr, evt, 0, null); } finally { leave(); @@ -779,7 +821,8 @@ public class PlatformCallbackGateway { enter(); try { - PlatformCallbackUtils.memoryReallocate(envPtr, memPtr, cap); + PlatformCallbackUtils.inLongLongLongObjectOutLong(envPtr, + PlatformCallbackOp.MemoryReallocate, memPtr, cap, 0, null); } finally { leave(); @@ -790,13 +833,13 @@ public class PlatformCallbackGateway { * Initializes native service. * * @param memPtr Pointer. - * @throws org.apache.ignite.IgniteCheckedException In case of error. + * @throws IgniteCheckedException In case of error. */ public long serviceInit(long memPtr) throws IgniteCheckedException { enter(); try { - return PlatformCallbackUtils.serviceInit(envPtr, memPtr); + return PlatformCallbackUtils.inLongOutLong(envPtr, PlatformCallbackOp.ServiceInit, memPtr); } finally { leave(); @@ -806,15 +849,14 @@ public class PlatformCallbackGateway { /** * Executes native service. * - * @param svcPtr Pointer to the service in the native platform. * @param memPtr Stream pointer. - * @throws org.apache.ignite.IgniteCheckedException In case of error. + * @throws IgniteCheckedException In case of error. */ - public void serviceExecute(long svcPtr, long memPtr) throws IgniteCheckedException { + public void serviceExecute(long memPtr) throws IgniteCheckedException { enter(); try { - PlatformCallbackUtils.serviceExecute(envPtr, svcPtr, memPtr); + PlatformCallbackUtils.inLongOutLong(envPtr, PlatformCallbackOp.ServiceExecute, memPtr); } finally { leave(); @@ -824,15 +866,14 @@ public class PlatformCallbackGateway { /** * Cancels native service. * - * @param svcPtr Pointer to the service in the native platform. * @param memPtr Stream pointer. - * @throws org.apache.ignite.IgniteCheckedException In case of error. + * @throws IgniteCheckedException In case of error. */ - public void serviceCancel(long svcPtr, long memPtr) throws IgniteCheckedException { + public void serviceCancel(long memPtr) throws IgniteCheckedException { enter(); try { - PlatformCallbackUtils.serviceCancel(envPtr, svcPtr, memPtr); + PlatformCallbackUtils.inLongOutLong(envPtr, PlatformCallbackOp.ServiceCancel, memPtr); } finally { leave(); @@ -842,16 +883,14 @@ public class PlatformCallbackGateway { /** * Invokes service method. * - * @param svcPtr Pointer to the service in the native platform. - * @param outMemPtr Output memory pointer. - * @param inMemPtr Input memory pointer. - * @throws org.apache.ignite.IgniteCheckedException In case of error. + * @param memPtr Memory pointer. + * @throws IgniteCheckedException In case of error. */ - public void serviceInvokeMethod(long svcPtr, long outMemPtr, long inMemPtr) throws IgniteCheckedException { + public void serviceInvokeMethod(long memPtr) throws IgniteCheckedException { enter(); try { - PlatformCallbackUtils.serviceInvokeMethod(envPtr, svcPtr, outMemPtr, inMemPtr); + PlatformCallbackUtils.inLongOutLong(envPtr, PlatformCallbackOp.ServiceInvokeMethod, memPtr); } finally { leave(); @@ -867,7 +906,7 @@ public class PlatformCallbackGateway { enter(); try { - return PlatformCallbackUtils.clusterNodeFilterApply(envPtr, memPtr); + return (int)PlatformCallbackUtils.inLongOutLong(envPtr, PlatformCallbackOp.ClusterNodeFilterApply, memPtr); } finally { leave(); @@ -885,7 +924,8 @@ public class PlatformCallbackGateway { enter(); try { - return PlatformCallbackUtils.extensionCallbackInLongOutLong(envPtr, typ, arg1); + return PlatformCallbackUtils.inLongLongLongObjectOutLong(envPtr, + PlatformCallbackOp.ExtensionInLongOutLong, typ, arg1, 0, null); } finally { leave(); @@ -904,7 +944,8 @@ public class PlatformCallbackGateway { enter(); try { - return PlatformCallbackUtils.extensionCallbackInLongLongOutLong(envPtr, typ, arg1, arg2); + return PlatformCallbackUtils.inLongLongLongObjectOutLong(envPtr, + PlatformCallbackOp.ExtensionInLongLongOutLong, typ, arg1, arg2, null); } finally { leave(); @@ -918,7 +959,7 @@ public class PlatformCallbackGateway { enter(); try { - PlatformCallbackUtils.onClientDisconnected(envPtr); + PlatformCallbackUtils.inLongOutLong(envPtr, PlatformCallbackOp.OnClientDisconnected, 0); } finally { leave(); @@ -934,7 +975,8 @@ public class PlatformCallbackGateway { enter(); try { - PlatformCallbackUtils.onClientReconnected(envPtr, clusterRestarted); + PlatformCallbackUtils.inLongOutLong(envPtr, + PlatformCallbackOp.OnClientReconnected, clusterRestarted ? 1 : 0); } finally { leave(); @@ -985,7 +1027,7 @@ public class PlatformCallbackGateway { public void onStop() { block(); - PlatformCallbackUtils.onStop(envPtr); + PlatformCallbackUtils.inLongOutLong(envPtr, PlatformCallbackOp.OnStop, 0); } /** @@ -999,7 +1041,8 @@ public class PlatformCallbackGateway { enter(); try { - return PlatformCallbackUtils.affinityFunctionInit(envPtr, memPtr, baseFunc); + return PlatformCallbackUtils.inLongLongLongObjectOutLong(envPtr, PlatformCallbackOp.AffinityFunctionInit, + memPtr, 0, 0, baseFunc); } finally { leave(); @@ -1009,15 +1052,15 @@ public class PlatformCallbackGateway { /** * Gets the partition from affinity function. * - * @param ptr Affinity function pointer. - * @param memPtr Pointer to a stream with key object. + * @param memPtr Pointer to a stream with data. * @return Partition number for a given key. */ - public int affinityFunctionPartition(long ptr, long memPtr) { + public int affinityFunctionPartition(long memPtr) { enter(); try { - return PlatformCallbackUtils.affinityFunctionPartition(envPtr, ptr, memPtr); + return (int)PlatformCallbackUtils.inLongOutLong(envPtr, + PlatformCallbackOp.AffinityFunctionPartition, memPtr); } finally { leave(); @@ -1027,15 +1070,13 @@ public class PlatformCallbackGateway { /** * Assigns the affinity partitions. * - * @param ptr Affinity function pointer. - * @param outMemPtr Pointer to a stream with affinity context. - * @param inMemPtr Pointer to a stream with result. + * @param memPtr Pointer to a stream. */ - public void affinityFunctionAssignPartitions(long ptr, long outMemPtr, long inMemPtr){ + public void affinityFunctionAssignPartitions(long memPtr){ enter(); try { - PlatformCallbackUtils.affinityFunctionAssignPartitions(envPtr, ptr, outMemPtr, inMemPtr); + PlatformCallbackUtils.inLongOutLong(envPtr, PlatformCallbackOp.AffinityFunctionAssignPartitions, memPtr); } finally { leave(); @@ -1045,14 +1086,13 @@ public class PlatformCallbackGateway { /** * Removes the node from affinity function. * - * @param ptr Affinity function pointer. - * @param memPtr Pointer to a stream with node id. + * @param memPtr Pointer to a stream. */ - public void affinityFunctionRemoveNode(long ptr, long memPtr) { + public void affinityFunctionRemoveNode(long memPtr) { enter(); try { - PlatformCallbackUtils.affinityFunctionRemoveNode(envPtr, ptr, memPtr); + PlatformCallbackUtils.inLongOutLong(envPtr, PlatformCallbackOp.AffinityFunctionRemoveNode, memPtr); } finally { leave(); @@ -1069,7 +1109,7 @@ public class PlatformCallbackGateway { return; // skip: destroy is not necessary during shutdown. try { - PlatformCallbackUtils.affinityFunctionDestroy(envPtr, ptr); + PlatformCallbackUtils.inLongOutLong(envPtr, PlatformCallbackOp.AffinityFunctionDestroy, ptr); } finally { leave(); @@ -1097,7 +1137,7 @@ public class PlatformCallbackGateway { /** * Enter gateway. */ - protected boolean tryEnter() { + private boolean tryEnter() { return lock.enterBusy(); } http://git-wip-us.apache.org/repos/asf/ignite/blob/72ac53da/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackOp.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackOp.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackOp.java new file mode 100644 index 0000000..973ba51 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackOp.java @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License; Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing; software + * distributed under the License is distributed on an "AS IS" BASIS; + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND; either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.platform.callback; + +/** + * Platform callback operation codes. + */ +class PlatformCallbackOp { + /** */ + public static final int CacheStoreCreate = 1; + + /** */ + public static final int CacheStoreInvoke = 2; + + /** */ + public static final int CacheStoreDestroy = 3; + + /** */ + public static final int CacheStoreSessionCreate = 4; + + /** */ + public static final int CacheEntryFilterCreate = 5; + + /** */ + public static final int CacheEntryFilterApply = 6; + + /** */ + public static final int CacheEntryFilterDestroy = 7; + + /** */ + public static final int CacheInvoke = 8; + + /** */ + public static final int ComputeTaskMap = 9; + + /** */ + public static final int ComputeTaskJobResult = 10; + + /** */ + public static final int ComputeTaskReduce = 11; + + /** */ + public static final int ComputeTaskComplete = 12; + + /** */ + public static final int ComputeJobSerialize = 13; + + /** */ + public static final int ComputeJobCreate = 14; + + /** */ + public static final int ComputeJobExecute = 15; + + /** */ + public static final int ComputeJobCancel = 16; + + /** */ + public static final int ComputeJobDestroy = 17; + + /** */ + public static final int ContinuousQueryListenerApply = 18; + + /** */ + public static final int ContinuousQueryFilterCreate = 19; + + /** */ + public static final int ContinuousQueryFilterApply = 20; + + /** */ + public static final int ContinuousQueryFilterRelease = 21; + + /** */ + public static final int DataStreamerTopologyUpdate = 22; + + /** */ + public static final int DataStreamerStreamReceiverInvoke = 23; + + /** */ + public static final int FutureByteResult = 24; + + /** */ + public static final int FutureBoolResult = 25; + + /** */ + public static final int FutureShortResult = 26; + + /** */ + public static final int FutureCharResult = 27; + + /** */ + public static final int FutureIntResult = 28; + + /** */ + public static final int FutureFloatResult = 29; + + /** */ + public static final int FutureLongResult = 30; + + /** */ + public static final int FutureDoubleResult = 31; + + /** */ + public static final int FutureObjectResult = 32; + + /** */ + public static final int FutureNullResult = 33; + + /** */ + public static final int FutureError = 34; + + /** */ + public static final int LifecycleOnEvent = 35; + + /** */ + public static final int MemoryReallocate = 36; + + /** */ + public static final int MessagingFilterCreate = 37; + + /** */ + public static final int MessagingFilterApply = 38; + + /** */ + public static final int MessagingFilterDestroy = 39; + + /** */ + public static final int EventFilterCreate = 40; + + /** */ + public static final int EventFilterApply = 41; + + /** */ + public static final int EventFilterDestroy = 42; + + /** */ + public static final int ServiceInit = 43; + + /** */ + public static final int ServiceExecute = 44; + + /** */ + public static final int ServiceCancel = 45; + + /** */ + public static final int ServiceInvokeMethod = 46; + + /** */ + public static final int ClusterNodeFilterApply = 47; + + /** */ + public static final int NodeInfo = 48; + + /** */ + public static final int OnStart = 49; + + /** */ + public static final int OnStop = 50; + + /** */ + public static final int ExtensionInLongOutLong = 51; + + /** */ + public static final int ExtensionInLongLongOutLong = 52; + + /** */ + public static final int OnClientDisconnected = 53; + + /** */ + public static final int OnClientReconnected = 54; + + /** */ + public static final int AffinityFunctionInit = 55; + + /** */ + public static final int AffinityFunctionPartition = 56; + + /** */ + public static final int AffinityFunctionAssignPartitions = 57; + + /** */ + public static final int AffinityFunctionRemoveNode = 58; + + /** */ + public static final int AffinityFunctionDestroy = 59; + + /** */ + public static final int ComputeTaskLocalJobResult = 60; + + /** */ + public static final int ComputeJobExecuteLocal = 61; +} http://git-wip-us.apache.org/repos/asf/ignite/blob/72ac53da/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackUtils.java index 9d60ec0..f823cb3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackUtils.java @@ -17,533 +17,12 @@ package org.apache.ignite.internal.processors.platform.callback; -import org.apache.ignite.internal.processors.platform.PlatformTargetProxy; - /** * Platform callback utility methods. Implemented in target platform. All methods in this class must be * package-visible and invoked only through {@link PlatformCallbackGateway}. */ public class PlatformCallbackUtils { /** - * Create cache store. - * - * @param envPtr Environment pointer. - * @param memPtr Memory pointer. - * @return Pointer. - */ - static native long cacheStoreCreate(long envPtr, long memPtr); - - /** - * @param envPtr Environment pointer. - * @param objPtr Object pointer. - * @param memPtr Memory pointer. - * @return Result. - */ - static native int cacheStoreInvoke(long envPtr, long objPtr, long memPtr); - - /** - * @param envPtr Environment pointer. - * @param objPtr Object pointer. - */ - static native void cacheStoreDestroy(long envPtr, long objPtr); - - /** - * Creates cache store session. - * - * @param envPtr Environment pointer. - * @param storePtr Store instance pointer. - * @return Session instance pointer. - */ - static native long cacheStoreSessionCreate(long envPtr, long storePtr); - - /** - * Creates cache entry filter and returns a pointer. - * - * @param envPtr Environment pointer. - * @param memPtr Memory pointer. - * @return Pointer. - */ - static native long cacheEntryFilterCreate(long envPtr, long memPtr); - - /** - * @param envPtr Environment pointer. - * @param objPtr Pointer. - * @param memPtr Memory pointer. - * @return Result. - */ - static native int cacheEntryFilterApply(long envPtr, long objPtr, long memPtr); - - /** - * @param envPtr Environment pointer. - * @param objPtr Pointer. - */ - static native void cacheEntryFilterDestroy(long envPtr, long objPtr); - - /** - * Invoke cache entry processor. - * - * @param envPtr Environment pointer. - * @param outMemPtr Output memory pointer. - * @param inMemPtr Input memory pointer. - */ - static native void cacheInvoke(long envPtr, long outMemPtr, long inMemPtr); - - /** - * Perform native task map. Do not throw exceptions, serializing them to the output stream instead. - * - * @param envPtr Environment pointer. - * @param taskPtr Task pointer. - * @param outMemPtr Output memory pointer (exists if topology changed, otherwise {@code 0}). - * @param inMemPtr Input memory pointer. - */ - static native void computeTaskMap(long envPtr, long taskPtr, long outMemPtr, long inMemPtr); - - /** - * Perform native task job result notification. - * - * @param envPtr Environment pointer. - * @param taskPtr Task pointer. - * @param jobPtr Job pointer. - * @param memPtr Memory pointer (always zero for local job execution). - * @return Job result enum ordinal. - */ - static native int computeTaskJobResult(long envPtr, long taskPtr, long jobPtr, long memPtr); - - /** - * Perform native task reduce. - * - * @param envPtr Environment pointer. - * @param taskPtr Task pointer. - */ - static native void computeTaskReduce(long envPtr, long taskPtr); - - /** - * Complete task with native error. - * - * @param envPtr Environment pointer. - * @param taskPtr Task pointer. - * @param memPtr Memory pointer with exception data or {@code 0} in case of success. - */ - static native void computeTaskComplete(long envPtr, long taskPtr, long memPtr); - - /** - * Serialize native job. - * - * @param envPtr Environment pointer. - * @param jobPtr Job pointer. - * @param memPtr Memory pointer. - * @return {@code True} if serialization succeeded. - */ - static native int computeJobSerialize(long envPtr, long jobPtr, long memPtr); - - /** - * Create job in native platform. - * - * @param envPtr Environment pointer. - * @param memPtr Memory pointer. - * @return Pointer to job. - */ - static native long computeJobCreate(long envPtr, long memPtr); - - /** - * Execute native job on a node other than where it was created. - * - * @param envPtr Environment pointer. - * @param jobPtr Job pointer. - * @param cancel Cancel flag. - * @param memPtr Memory pointer to write result to for remote job execution or {@code 0} for local job execution. - */ - static native void computeJobExecute(long envPtr, long jobPtr, int cancel, long memPtr); - - /** - * Cancel the job. - * - * @param envPtr Environment pointer. - * @param jobPtr Job pointer. - */ - static native void computeJobCancel(long envPtr, long jobPtr); - - /** - * Destroy the job. - * - * @param envPtr Environment pointer. - * @param ptr Pointer. - */ - static native void computeJobDestroy(long envPtr, long ptr); - - /** - * Invoke local callback. - * - * @param envPtr Environment pointer. - * @param cbPtr Callback pointer. - * @param memPtr Memory pointer. - */ - static native void continuousQueryListenerApply(long envPtr, long cbPtr, long memPtr); - - /** - * Create filter in native platform. - * - * @param envPtr Environment pointer. - * @param memPtr Memory pointer. - * @return Pointer to created filter. - */ - static native long continuousQueryFilterCreate(long envPtr, long memPtr); - - /** - * Invoke remote filter. - * - * @param envPtr Environment pointer. - * @param filterPtr Filter pointer. - * @param memPtr Memory pointer. - * @return Result. - */ - static native int continuousQueryFilterApply(long envPtr, long filterPtr, long memPtr); - - /** - * Release remote filter. - * - * @param envPtr Environment pointer. - * @param filterPtr Filter pointer. - */ - static native void continuousQueryFilterRelease(long envPtr, long filterPtr); - - /** - * Notify native data streamer about topology update. - * - * @param envPtr Environment pointer. - * @param ptr Data streamer native pointer. - * @param topVer Topology version. - * @param topSize Topology size. - */ - static native void dataStreamerTopologyUpdate(long envPtr, long ptr, long topVer, int topSize); - - /** - * Invoke stream receiver. - * - * @param envPtr Environment pointer. - * @param ptr Receiver native pointer. - * @param cache Cache object. - * @param memPtr Stream pointer. - * @param keepBinary Binary flag. - */ - static native void dataStreamerStreamReceiverInvoke(long envPtr, long ptr, PlatformTargetProxy cache, long memPtr, - boolean keepBinary); - - /** - * Notify future with byte result. - * - * @param envPtr Environment pointer. - * @param futPtr Future pointer. - * @param res Result. - */ - static native void futureByteResult(long envPtr, long futPtr, int res); - - /** - * Notify future with boolean result. - * - * @param envPtr Environment pointer. - * @param futPtr Future pointer. - * @param res Result. - */ - static native void futureBoolResult(long envPtr, long futPtr, int res); - - /** - * Notify future with short result. - * - * @param envPtr Environment pointer. - * @param futPtr Future pointer. - * @param res Result. - */ - static native void futureShortResult(long envPtr, long futPtr, int res); - - /** - * Notify future with byte result. - * - * @param envPtr Environment pointer. - * @param futPtr Future pointer. - * @param res Result. - */ - static native void futureCharResult(long envPtr, long futPtr, int res); - - /** - * Notify future with int result. - * - * @param envPtr Environment pointer. - * @param futPtr Future pointer. - * @param res Result. - */ - static native void futureIntResult(long envPtr, long futPtr, int res); - - /** - * Notify future with float result. - * - * @param envPtr Environment pointer. - * @param futPtr Future pointer. - * @param res Result. - */ - static native void futureFloatResult(long envPtr, long futPtr, float res); - - /** - * Notify future with long result. - * - * @param envPtr Environment pointer. - * @param futPtr Future pointer. - * @param res Result. - */ - static native void futureLongResult(long envPtr, long futPtr, long res); - - /** - * Notify future with double result. - * - * @param envPtr Environment pointer. - * @param futPtr Future pointer. - * @param res Result. - */ - static native void futureDoubleResult(long envPtr, long futPtr, double res); - - /** - * Notify future with object result. - * - * @param envPtr Environment pointer. - * @param futPtr Future pointer. - * @param memPtr Memory pointer. - */ - static native void futureObjectResult(long envPtr, long futPtr, long memPtr); - - /** - * Notify future with null result. - * - * @param envPtr Environment pointer. - * @param futPtr Future pointer. - */ - static native void futureNullResult(long envPtr, long futPtr); - - /** - * Notify future with error. - * - * @param envPtr Environment pointer. - * @param futPtr Future pointer. - * @param memPtr Pointer to memory with error information. - */ - static native void futureError(long envPtr, long futPtr, long memPtr); - - /** - * Creates message filter and returns a pointer. - * - * @param envPtr Environment pointer. - * @param memPtr Memory pointer. - * @return Pointer. - */ - static native long messagingFilterCreate(long envPtr, long memPtr); - - /** - * @param envPtr Environment pointer. - * @param objPtr Pointer. - * @param memPtr Memory pointer. - * @return Result. - */ - static native int messagingFilterApply(long envPtr, long objPtr, long memPtr); - - /** - * @param envPtr Environment pointer. - * @param objPtr Pointer. - */ - static native void messagingFilterDestroy(long envPtr, long objPtr); - - /** - * Creates event filter and returns a pointer. - * - * @param envPtr Environment pointer. - * @param memPtr Memory pointer. - * @return Pointer. - */ - static native long eventFilterCreate(long envPtr, long memPtr); - - /** - * @param envPtr Environment pointer. - * @param objPtr Pointer. - * @param memPtr Memory pointer. - * @return Result. - */ - static native int eventFilterApply(long envPtr, long objPtr, long memPtr); - - /** - * @param envPtr Environment pointer. - * @param objPtr Pointer. - */ - static native void eventFilterDestroy(long envPtr, long objPtr); - - /** - * Sends node info to native target. - * - * @param envPtr Environment pointer. - * @param memPtr Ptr to a stream with serialized node. - */ - static native void nodeInfo(long envPtr, long memPtr); - - /** - * Kernal start callback. - * - * @param envPtr Environment pointer. - * @param proc Platform processor. - * @param memPtr Memory pointer. - */ - static native void onStart(long envPtr, Object proc, long memPtr); - - /* - * Kernal stop callback. - * - * @param envPtr Environment pointer. - */ - static native void onStop(long envPtr); - - /** - * Lifecycle event callback. - * - * @param envPtr Environment pointer. - * @param ptr Holder pointer. - * @param evt Event. - */ - static native void lifecycleEvent(long envPtr, long ptr, int evt); - - /** - * Re-allocate external memory chunk. - * - * @param envPtr Environment pointer. - * @param memPtr Cross-platform pointer. - * @param cap Capacity. - */ - static native void memoryReallocate(long envPtr, long memPtr, int cap); - - /** - * Initializes native service. - * - * @param envPtr Environment pointer. - * @param memPtr Stream pointer. - * @return Pointer to the native platform service. - */ - static native long serviceInit(long envPtr, long memPtr); - - /** - * Executes native service. - * - * @param envPtr Environment pointer. - * @param svcPtr Pointer to the service in the native platform. - * @param memPtr Stream pointer. - */ - static native void serviceExecute(long envPtr, long svcPtr, long memPtr); - - /** - * Cancels native service. - * - * @param envPtr Environment pointer. - * @param svcPtr Pointer to the service in the native platform. - * @param memPtr Stream pointer. - */ - static native void serviceCancel(long envPtr, long svcPtr, long memPtr); - - /** - * Invokes service method. - * - * @param envPtr Environment pointer. - * @param svcPtr Pointer to the service in the native platform. - * @param outMemPtr Output memory pointer. - * @param inMemPtr Input memory pointer. - */ - static native void serviceInvokeMethod(long envPtr, long svcPtr, long outMemPtr, long inMemPtr); - - /** - * Invokes cluster node filter. - * - * @param envPtr Environment pointer. - * @param memPtr Stream pointer. - */ - static native int clusterNodeFilterApply(long envPtr, long memPtr); - - /** - * Extension callback accepting single long argument and returning long result. - * - * @param envPtr Environment pointer. - * @param typ Operation type. - * @param arg1 Argument 1. - * @return Long result. - */ - static native long extensionCallbackInLongOutLong(long envPtr, int typ, long arg1); - - /** - * Extension callback accepting two long arguments and returning long result. - * - * @param envPtr Environment pointer. - * @param typ Operation type. - * @param arg1 Argument 1. - * @param arg2 Argument 2. - * @return Long result. - */ - static native long extensionCallbackInLongLongOutLong(long envPtr, int typ, long arg1, long arg2); - - /** - * Notifies platform about client disconnect. - * - * @param envPtr Environment pointer. - */ - static native void onClientDisconnected(long envPtr); - - /** - * Notifies platform about client reconnect. - * - * @param envPtr Environment pointer. - * @param clusterRestarted Cluster restarted flag. - */ - static native void onClientReconnected(long envPtr, boolean clusterRestarted); - - /** - * Initializes affinity function. - * - * @param envPtr Environment pointer. - * @param memPtr Pointer to a stream with serialized affinity function. - * @param baseFunc Optional func for base calls. - * @return Affinity function pointer. - */ - static native long affinityFunctionInit(long envPtr, long memPtr, PlatformTargetProxy baseFunc); - - /** - * Gets the partition from affinity function. - * - * @param envPtr Environment pointer. - * @param ptr Affinity function pointer. - * @param memPtr Pointer to a stream with key object. - * @return Partition number for a given key. - */ - static native int affinityFunctionPartition(long envPtr, long ptr, long memPtr); - - /** - * Assigns the affinity partitions. - * - * @param envPtr Environment pointer. - * @param ptr Affinity function pointer. - * @param outMemPtr Pointer to a stream with affinity context. - * @param inMemPtr Pointer to a stream with result. - */ - static native void affinityFunctionAssignPartitions(long envPtr, long ptr, long outMemPtr, long inMemPtr); - - /** - * Removes the node from affinity function. - * - * @param envPtr Environment pointer. - * @param ptr Affinity function pointer. - * @param memPtr Pointer to a stream with node id. - */ - static native void affinityFunctionRemoveNode(long envPtr, long ptr, long memPtr); - - /** - * Destroys the affinity function. - * - * @param envPtr Environment pointer. - * @param ptr Affinity function pointer. - */ - static native void affinityFunctionDestroy(long envPtr, long ptr); - - /** * Redirects the console output. * * @param str String to write. @@ -572,6 +51,29 @@ public class PlatformCallbackUtils { static native boolean loggerIsLevelEnabled(long envPtr, int level); /** + * Performs a generic long-long operation. + * + * @param envPtr Environment pointer. + * @param type Operation code. + * @param val Value. + * @return Value. + */ + static native long inLongOutLong(long envPtr, int type, long val); + + /** + * Performs a generic out-in operation. + * + * @param envPtr Environment pointer. + * @param type Operation code. + * @param val1 First value. + * @param val2 Second value. + * @param val3 Third value. + * @param arg Object argument. + * @return Value. + */ + static native long inLongLongLongObjectOutLong(long envPtr, int type, long val1, long val2, long val3, Object arg); + + /** * Private constructor. */ private PlatformCallbackUtils() { http://git-wip-us.apache.org/repos/asf/ignite/blob/72ac53da/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformAbstractJob.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformAbstractJob.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformAbstractJob.java index 32aed39..56875c4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformAbstractJob.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformAbstractJob.java @@ -131,7 +131,7 @@ public abstract class PlatformAbstractJob implements PlatformJob, Externalizable // Local job, must execute it with respect to possible concurrent task completion. if (task.onJobLock()) { try { - ctx.gateway().computeJobExecute(ptr, cancel ? 1 : 0, 0); + ctx.gateway().computeJobExecuteLocal(ptr, cancel ? 1 : 0); return LOC_JOB_RES; } http://git-wip-us.apache.org/repos/asf/ignite/blob/72ac53da/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformAbstractTask.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformAbstractTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformAbstractTask.java index fe1e316..6a9fed5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformAbstractTask.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformAbstractTask.java @@ -65,8 +65,6 @@ public abstract class PlatformAbstractTask implements ComputeTask<Object, Void> @Override public ComputeJobResultPolicy result(ComputeJobResult res, List<ComputeJobResult> rcvd) { assert rcvd.isEmpty() : "Should not cache result in Java for interop task"; - int plc; - lock.readLock().lock(); try { @@ -78,9 +76,11 @@ public abstract class PlatformAbstractTask implements ComputeTask<Object, Void> Object res0bj = res.getData(); + int plc; + if (res0bj == PlatformAbstractJob.LOC_JOB_RES) // Processing local job execution result. - plc = ctx.gateway().computeTaskJobResult(taskPtr, job.pointer(), 0); + plc = ctx.gateway().computeTaskLocalJobResult(taskPtr, job.pointer()); else { // Processing remote job execution result or exception. try (PlatformMemory mem = ctx.memory().allocate()) { @@ -88,6 +88,9 @@ public abstract class PlatformAbstractTask implements ComputeTask<Object, Void> BinaryRawWriterEx writer = ctx.writer(out); + writer.writeLong(taskPtr); + writer.writeLong(job.pointer()); + writer.writeUuid(res.getNode().id()); writer.writeBoolean(res.isCancelled()); @@ -97,7 +100,7 @@ public abstract class PlatformAbstractTask implements ComputeTask<Object, Void> out.synchronize(); - plc = ctx.gateway().computeTaskJobResult(taskPtr, job.pointer(), mem.pointer()); + plc = ctx.gateway().computeTaskJobResult(mem.pointer()); } } @@ -184,7 +187,7 @@ public abstract class PlatformAbstractTask implements ComputeTask<Object, Void> * @return {@code} True if task is not completed yet, {@code false} otherwise. */ @SuppressWarnings("LockAcquiredButNotSafelyReleased") - public boolean onJobLock() { + boolean onJobLock() { lock.readLock().lock(); if (done) { @@ -199,7 +202,7 @@ public abstract class PlatformAbstractTask implements ComputeTask<Object, Void> /** * Callback invoked by job when task can be unlocked. */ - public void onJobUnlock() { + void onJobUnlock() { assert !done; lock.readLock().unlock(); http://git-wip-us.apache.org/repos/asf/ignite/blob/72ac53da/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformClosureJob.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformClosureJob.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformClosureJob.java index f8567ce..25926eb 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformClosureJob.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformClosureJob.java @@ -25,6 +25,7 @@ import org.apache.ignite.internal.binary.BinaryRawReaderEx; import org.apache.ignite.internal.processors.platform.PlatformContext; import org.apache.ignite.internal.processors.platform.memory.PlatformInputStream; import org.apache.ignite.internal.processors.platform.memory.PlatformMemory; +import org.apache.ignite.internal.processors.platform.memory.PlatformOutputStream; import org.apache.ignite.internal.processors.platform.utils.PlatformUtils; import org.jetbrains.annotations.Nullable; @@ -63,9 +64,16 @@ public class PlatformClosureJob extends PlatformAbstractJob { createJob(ctx); try (PlatformMemory mem = ctx.memory().allocate()) { - PlatformInputStream in = mem.input(); + PlatformOutputStream out = mem.output(); + + out.writeLong(ptr); + out.writeBoolean(false); // cancel + + out.synchronize(); - ctx.gateway().computeJobExecute(ptr, 0, mem.pointer()); + ctx.gateway().computeJobExecute(mem.pointer()); + + PlatformInputStream in = mem.input(); in.synchronize(); http://git-wip-us.apache.org/repos/asf/ignite/blob/72ac53da/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformFullJob.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformFullJob.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformFullJob.java index 51c9cdb..9ff9609 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformFullJob.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformFullJob.java @@ -17,6 +17,7 @@ package org.apache.ignite.internal.processors.platform.compute; +import java.io.Externalizable; import java.io.IOException; import java.io.ObjectInput; import java.io.ObjectOutput; @@ -26,6 +27,7 @@ import org.apache.ignite.internal.processors.platform.PlatformContext; import org.apache.ignite.internal.processors.platform.PlatformProcessor; import org.apache.ignite.internal.processors.platform.memory.PlatformInputStream; import org.apache.ignite.internal.processors.platform.memory.PlatformMemory; +import org.apache.ignite.internal.processors.platform.memory.PlatformOutputStream; import org.apache.ignite.internal.processors.platform.utils.PlatformUtils; import org.jetbrains.annotations.Nullable; @@ -64,7 +66,7 @@ public class PlatformFullJob extends PlatformAbstractJob { private transient byte state; /** - * {@link java.io.Externalizable} support. + * {@link Externalizable} support. */ @SuppressWarnings("UnusedDeclaration") public PlatformFullJob() { @@ -114,9 +116,16 @@ public class PlatformFullJob extends PlatformAbstractJob { return runLocal(ctx, cancel); else { try (PlatformMemory mem = ctx.memory().allocate()) { - PlatformInputStream in = mem.input(); + PlatformOutputStream out = mem.output(); + + out.writeLong(ptr); + out.writeBoolean(cancel); // cancel + + out.synchronize(); - ctx.gateway().computeJobExecute(ptr, cancel ? 1 : 0, mem.pointer()); + ctx.gateway().computeJobExecute(mem.pointer()); + + PlatformInputStream in = mem.input(); in.synchronize(); http://git-wip-us.apache.org/repos/asf/ignite/blob/72ac53da/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformFullTask.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformFullTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformFullTask.java index e2f6720..3134066 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformFullTask.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformFullTask.java @@ -80,26 +80,26 @@ public final class PlatformFullTask extends PlatformAbstractTask { PlatformMemoryManager memMgr = ctx.memory(); - try (PlatformMemory outMem = memMgr.allocate()) { - PlatformOutputStream out = outMem.output(); + try (PlatformMemory mem = memMgr.allocate()) { + PlatformOutputStream out = mem.output(); BinaryRawWriterEx writer = ctx.writer(out); + writer.writeLong(taskPtr); + write(writer, nodes, subgrid); out.synchronize(); - try (PlatformMemory inMem = memMgr.allocate()) { - PlatformInputStream in = inMem.input(); + ctx.gateway().computeTaskMap(mem.pointer()); - ctx.gateway().computeTaskMap(taskPtr, outMem.pointer(), inMem.pointer()); + PlatformInputStream in = mem.input(); - in.synchronize(); + in.synchronize(); - BinaryRawReaderEx reader = ctx.reader(in); + BinaryRawReaderEx reader = ctx.reader(in); - return read(reader, nodes); - } + return read(reader, nodes); } } finally { http://git-wip-us.apache.org/repos/asf/ignite/blob/72ac53da/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastreamer/PlatformStreamReceiverImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastreamer/PlatformStreamReceiverImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastreamer/PlatformStreamReceiverImpl.java index d0992fc..c3dde26 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastreamer/PlatformStreamReceiverImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastreamer/PlatformStreamReceiverImpl.java @@ -78,6 +78,9 @@ public class PlatformStreamReceiverImpl extends PlatformAbstractPredicate implem try (PlatformMemory mem = ctx.memory().allocate()) { PlatformOutputStream out = mem.output(); + out.writeLong(ptr); + out.writeBoolean(keepBinary); + BinaryRawWriterEx writer = ctx.writer(out); writer.writeObject(pred); http://git-wip-us.apache.org/repos/asf/ignite/blob/72ac53da/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/dotnet/PlatformDotNetCacheStore.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/dotnet/PlatformDotNetCacheStore.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/dotnet/PlatformDotNetCacheStore.java index 3563dd6..5257b26 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/dotnet/PlatformDotNetCacheStore.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/dotnet/PlatformDotNetCacheStore.java @@ -396,7 +396,7 @@ public class PlatformDotNetCacheStore<K, V> implements CacheStore<K, V>, Platfor if (sesPtr == null) { // Session is not deployed yet, do that. - sesPtr = platformCtx.gateway().cacheStoreSessionCreate(ptr); + sesPtr = platformCtx.gateway().cacheStoreSessionCreate(); ses.properties().put(KEY_SES, sesPtr); } @@ -419,11 +419,13 @@ public class PlatformDotNetCacheStore<K, V> implements CacheStore<K, V>, Platfor BinaryRawWriterEx writer = platformCtx.writer(out); + writer.writeLong(ptr); + task.apply(writer); out.synchronize(); - int res = platformCtx.gateway().cacheStoreInvoke(ptr, mem.pointer()); + int res = platformCtx.gateway().cacheStoreInvoke(mem.pointer()); if (res != 0) { // Read error http://git-wip-us.apache.org/repos/asf/ignite/blob/72ac53da/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/services/PlatformAbstractService.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/services/PlatformAbstractService.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/services/PlatformAbstractService.java index d6a6e16..4db01cf 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/services/PlatformAbstractService.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/services/PlatformAbstractService.java @@ -110,13 +110,15 @@ public abstract class PlatformAbstractService implements PlatformService, Extern BinaryRawWriterEx writer = platformCtx.writer(out); + writer.writeLong(ptr); + writer.writeBoolean(srvKeepBinary); writeServiceContext(ctx, writer); out.synchronize(); - platformCtx.gateway().serviceExecute(ptr, mem.pointer()); + platformCtx.gateway().serviceExecute(mem.pointer()); } catch (IgniteCheckedException e) { throw U.convertException(e); @@ -133,13 +135,15 @@ public abstract class PlatformAbstractService implements PlatformService, Extern BinaryRawWriterEx writer = platformCtx.writer(out); + writer.writeLong(ptr); + writer.writeBoolean(srvKeepBinary); writeServiceContext(ctx, writer); out.synchronize(); - platformCtx.gateway().serviceCancel(ptr, mem.pointer()); + platformCtx.gateway().serviceCancel(mem.pointer()); } catch (IgniteCheckedException e) { throw U.convertException(e); @@ -173,10 +177,11 @@ public abstract class PlatformAbstractService implements PlatformService, Extern assert ptr != 0; assert platformCtx != null; - try (PlatformMemory outMem = platformCtx.memory().allocate()) { - PlatformOutputStream out = outMem.output(); + try (PlatformMemory mem = platformCtx.memory().allocate()) { + PlatformOutputStream out = mem.output(); BinaryRawWriterEx writer = platformCtx.writer(out); + writer.writeLong(ptr); writer.writeBoolean(srvKeepBinary); writer.writeString(mthdName); @@ -192,17 +197,15 @@ public abstract class PlatformAbstractService implements PlatformService, Extern out.synchronize(); - try (PlatformMemory inMem = platformCtx.memory().allocate()) { - PlatformInputStream in = inMem.input(); + platformCtx.gateway().serviceInvokeMethod(mem.pointer()); - platformCtx.gateway().serviceInvokeMethod(ptr, outMem.pointer(), inMem.pointer()); + PlatformInputStream in = mem.input(); - in.synchronize(); + in.synchronize(); - BinaryRawReaderEx reader = platformCtx.reader(in); + BinaryRawReaderEx reader = platformCtx.reader(in); - return PlatformUtils.readInvocationResult(platformCtx, reader); - } + return PlatformUtils.readInvocationResult(platformCtx, reader); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/72ac53da/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformFutureUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformFutureUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformFutureUtils.java index e81f4c6..b84744c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformFutureUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformFutureUtils.java @@ -250,7 +250,7 @@ public class PlatformFutureUtils { break; case TYP_FLOAT: - gate.futureFloatResult(futPtr, (float) res); + gate.futureFloatResult(futPtr, Float.floatToIntBits((float) res)); break; @@ -260,7 +260,7 @@ public class PlatformFutureUtils { break; case TYP_DOUBLE: - gate.futureDoubleResult(futPtr, (double) res); + gate.futureDoubleResult(futPtr, Double.doubleToLongBits((double)res)); break; http://git-wip-us.apache.org/repos/asf/ignite/blob/72ac53da/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformUtils.java index 0d30ad9..4c0eab4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformUtils.java @@ -501,6 +501,8 @@ public class PlatformUtils { BinaryRawWriterEx writer = ctx.writer(out); + writer.writeLong(lsnrPtr); + int cntPos = writer.reserveInt(); int cnt = 0; @@ -515,7 +517,7 @@ public class PlatformUtils { out.synchronize(); - ctx.gateway().continuousQueryListenerApply(lsnrPtr, mem.pointer()); + ctx.gateway().continuousQueryListenerApply(mem.pointer()); } catch (Exception e) { throw toCacheEntryListenerException(e); @@ -538,11 +540,13 @@ public class PlatformUtils { try (PlatformMemory mem = ctx.memory().allocate()) { PlatformOutputStream out = mem.output(); + out.writeLong(filterPtr); + writeCacheEntryEvent(ctx.writer(out), evt); out.synchronize(); - return ctx.gateway().continuousQueryFilterApply(filterPtr, mem.pointer()) == 1; + return ctx.gateway().continuousQueryFilterApply(mem.pointer()) == 1; } catch (Exception e) { throw toCacheEntryListenerException(e);
