http://git-wip-us.apache.org/repos/asf/ignite/blob/59e6fec0/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformCompute.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformCompute.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformCompute.java index 8ff15d5..fd1c2d4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformCompute.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformCompute.java @@ -29,6 +29,7 @@ import org.apache.ignite.internal.binary.BinaryRawReaderEx; import org.apache.ignite.internal.binary.BinaryRawWriterEx; import org.apache.ignite.internal.processors.platform.PlatformAbstractTarget; import org.apache.ignite.internal.processors.platform.PlatformContext; +import org.apache.ignite.internal.processors.platform.PlatformTarget; import org.apache.ignite.internal.processors.platform.utils.PlatformFutureUtils; import org.apache.ignite.internal.processors.platform.utils.PlatformListenable; import org.apache.ignite.internal.util.future.IgniteFutureImpl; @@ -98,7 +99,7 @@ public class PlatformCompute extends PlatformAbstractTarget { } /** {@inheritDoc} */ - @Override protected Object processInStreamOutObject(int type, BinaryRawReaderEx reader) + @Override public PlatformTarget processInStreamOutObject(int type, BinaryRawReaderEx reader) throws IgniteCheckedException { switch (type) { case OP_UNICAST: @@ -120,7 +121,7 @@ public class PlatformCompute extends PlatformAbstractTarget { } case OP_EXEC_ASYNC: - return executeJavaTask(reader, true); + return wrapListenable((PlatformListenable) executeJavaTask(reader, true)); default: return super.processInStreamOutObject(type, reader); @@ -128,7 +129,7 @@ public class PlatformCompute extends PlatformAbstractTarget { } /** {@inheritDoc} */ - @Override protected long processInLongOutLong(int type, long val) throws IgniteCheckedException { + @Override public long processInLongOutLong(int type, long val) throws IgniteCheckedException { switch (type) { case OP_WITH_TIMEOUT: { compute.withTimeout(val); @@ -154,7 +155,7 @@ public class PlatformCompute extends PlatformAbstractTarget { * @param reader Reader. * @param broadcast broadcast flag. */ - private PlatformListenable processClosures(long taskPtr, BinaryRawReaderEx reader, boolean broadcast, + private PlatformTarget processClosures(long taskPtr, BinaryRawReaderEx reader, boolean broadcast, boolean affinity) { PlatformAbstractTask task; @@ -221,7 +222,7 @@ public class PlatformCompute extends PlatformAbstractTarget { } /** {@inheritDoc} */ - @Override protected void processInStreamOutStream(int type, BinaryRawReaderEx reader, BinaryRawWriterEx writer) + @Override public void processInStreamOutStream(int type, BinaryRawReaderEx reader, BinaryRawWriterEx writer) throws IgniteCheckedException { switch (type) { case OP_EXEC: @@ -239,7 +240,7 @@ public class PlatformCompute extends PlatformAbstractTarget { * * @param task Task. */ - private PlatformListenable executeNative0(final PlatformAbstractTask task) { + private PlatformTarget executeNative0(final PlatformAbstractTask task) { IgniteInternalFuture fut = computeForPlatform.executeAsync(task, null); fut.listen(new IgniteInClosure<IgniteInternalFuture>() { @@ -257,7 +258,7 @@ public class PlatformCompute extends PlatformAbstractTarget { } }); - return PlatformFutureUtils.getListenable(fut); + return wrapListenable(PlatformFutureUtils.getListenable(fut)); } /**
http://git-wip-us.apache.org/repos/asf/ignite/blob/59e6fec0/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastreamer/PlatformDataStreamer.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastreamer/PlatformDataStreamer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastreamer/PlatformDataStreamer.java index cd5fba0..7d71a9e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastreamer/PlatformDataStreamer.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastreamer/PlatformDataStreamer.java @@ -114,7 +114,7 @@ public class PlatformDataStreamer extends PlatformAbstractTarget { } /** {@inheritDoc} */ - @Override protected long processInStreamOutLong(int type, BinaryRawReaderEx reader) throws IgniteCheckedException { + @Override public long processInStreamOutLong(int type, BinaryRawReaderEx reader) throws IgniteCheckedException { switch (type) { case OP_UPDATE: int plc = reader.readInt(); @@ -169,7 +169,7 @@ public class PlatformDataStreamer extends PlatformAbstractTarget { } /** {@inheritDoc} */ - @Override protected long processInLongOutLong(int type, final long val) throws IgniteCheckedException { + @Override public long processInLongOutLong(int type, final long val) throws IgniteCheckedException { switch (type) { case OP_SET_ALLOW_OVERWRITE: ldr.allowOverwrite(val == TRUE); http://git-wip-us.apache.org/repos/asf/ignite/blob/59e6fec0/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastreamer/PlatformStreamReceiverImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastreamer/PlatformStreamReceiverImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastreamer/PlatformStreamReceiverImpl.java index add11ed..d0992fc 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastreamer/PlatformStreamReceiverImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastreamer/PlatformStreamReceiverImpl.java @@ -23,6 +23,8 @@ import org.apache.ignite.IgniteException; import org.apache.ignite.internal.binary.BinaryRawWriterEx; import org.apache.ignite.internal.processors.platform.PlatformAbstractPredicate; import org.apache.ignite.internal.processors.platform.PlatformContext; +import org.apache.ignite.internal.processors.platform.PlatformTargetProxy; +import org.apache.ignite.internal.processors.platform.PlatformTargetProxyImpl; import org.apache.ignite.internal.processors.platform.cache.PlatformCache; import org.apache.ignite.internal.processors.platform.memory.PlatformMemory; import org.apache.ignite.internal.processors.platform.memory.PlatformOutputStream; @@ -89,8 +91,10 @@ public class PlatformStreamReceiverImpl extends PlatformAbstractPredicate implem out.synchronize(); - ctx.gateway().dataStreamerStreamReceiverInvoke(ptr, new PlatformCache(ctx, cache, keepBinary), - mem.pointer(), keepBinary); + PlatformCache cache0 = new PlatformCache(ctx, cache, keepBinary); + PlatformTargetProxy cacheProxy = new PlatformTargetProxyImpl(cache0, ctx); + + ctx.gateway().dataStreamerStreamReceiverInvoke(ptr, cacheProxy, mem.pointer(), keepBinary); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/59e6fec0/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastructures/PlatformAtomicLong.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastructures/PlatformAtomicLong.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastructures/PlatformAtomicLong.java index 811e38b..b57b140 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastructures/PlatformAtomicLong.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastructures/PlatformAtomicLong.java @@ -80,7 +80,7 @@ public class PlatformAtomicLong extends PlatformAbstractTarget { } /** {@inheritDoc} */ - @Override protected long processInStreamOutLong(int type, BinaryRawReaderEx reader) throws IgniteCheckedException { + @Override public long processInStreamOutLong(int type, BinaryRawReaderEx reader) throws IgniteCheckedException { switch (type) { case OP_COMPARE_AND_SET: long cmp = reader.readLong(); @@ -99,7 +99,7 @@ public class PlatformAtomicLong extends PlatformAbstractTarget { } /** {@inheritDoc} */ - @Override protected long processInLongOutLong(int type, long val) throws IgniteCheckedException { + @Override public long processInLongOutLong(int type, long val) throws IgniteCheckedException { switch (type) { case OP_ADD_AND_GET: return atomicLong.addAndGet(val); http://git-wip-us.apache.org/repos/asf/ignite/blob/59e6fec0/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastructures/PlatformAtomicReference.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastructures/PlatformAtomicReference.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastructures/PlatformAtomicReference.java index 63b5b86..a644259 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastructures/PlatformAtomicReference.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastructures/PlatformAtomicReference.java @@ -94,7 +94,7 @@ public class PlatformAtomicReference extends PlatformAbstractTarget { } /** {@inheritDoc} */ - @Override protected void processOutStream(int type, BinaryRawWriterEx writer) throws IgniteCheckedException { + @Override public void processOutStream(int type, BinaryRawWriterEx writer) throws IgniteCheckedException { if (type == OP_GET) writer.writeObject(atomicRef.get()); else @@ -102,7 +102,7 @@ public class PlatformAtomicReference extends PlatformAbstractTarget { } /** {@inheritDoc} */ - @Override protected long processInStreamOutLong(int type, BinaryRawReaderEx reader) + @Override public long processInStreamOutLong(int type, BinaryRawReaderEx reader) throws IgniteCheckedException { if (type == OP_SET) { atomicRef.set(reader.readObjectDetached()); @@ -114,7 +114,7 @@ public class PlatformAtomicReference extends PlatformAbstractTarget { } /** {@inheritDoc} */ - @Override protected void processInStreamOutStream(int type, BinaryRawReaderEx reader, + @Override public void processInStreamOutStream(int type, BinaryRawReaderEx reader, BinaryRawWriterEx writer) throws IgniteCheckedException { if (type == OP_COMPARE_AND_SET_AND_GET) { Object val = reader.readObjectDetached(); @@ -134,7 +134,7 @@ public class PlatformAtomicReference extends PlatformAbstractTarget { } /** {@inheritDoc} */ - @Override protected long processInLongOutLong(int type, long val) throws IgniteCheckedException { + @Override public long processInLongOutLong(int type, long val) throws IgniteCheckedException { switch (type) { case OP_CLOSE: atomicRef.close(); http://git-wip-us.apache.org/repos/asf/ignite/blob/59e6fec0/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastructures/PlatformAtomicSequence.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastructures/PlatformAtomicSequence.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastructures/PlatformAtomicSequence.java index c352731..6d17a72 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastructures/PlatformAtomicSequence.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastructures/PlatformAtomicSequence.java @@ -71,7 +71,7 @@ public class PlatformAtomicSequence extends PlatformAbstractTarget { /** {@inheritDoc} */ - @Override protected long processInLongOutLong(int type, long val) throws IgniteCheckedException { + @Override public long processInLongOutLong(int type, long val) throws IgniteCheckedException { switch (type) { case OP_ADD_AND_GET: return atomicSeq.addAndGet(val); http://git-wip-us.apache.org/repos/asf/ignite/blob/59e6fec0/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/events/PlatformEvents.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/events/PlatformEvents.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/events/PlatformEvents.java index 383e7ab..9ddcc37 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/events/PlatformEvents.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/events/PlatformEvents.java @@ -27,6 +27,7 @@ import org.apache.ignite.internal.binary.BinaryRawWriterEx; import org.apache.ignite.internal.processors.platform.PlatformAbstractTarget; import org.apache.ignite.internal.processors.platform.PlatformContext; import org.apache.ignite.internal.processors.platform.PlatformEventFilterListener; +import org.apache.ignite.internal.processors.platform.PlatformTarget; import org.apache.ignite.internal.processors.platform.utils.PlatformFutureUtils; import org.apache.ignite.internal.util.future.IgniteFutureImpl; import org.apache.ignite.internal.util.typedef.F; @@ -117,7 +118,7 @@ public class PlatformEvents extends PlatformAbstractTarget { } /** {@inheritDoc} */ - @Override protected long processInStreamOutLong(int type, BinaryRawReaderEx reader) + @Override public long processInStreamOutLong(int type, BinaryRawReaderEx reader) throws IgniteCheckedException { switch (type) { case OP_RECORD_LOCAL: @@ -168,7 +169,7 @@ public class PlatformEvents extends PlatformAbstractTarget { /** {@inheritDoc} */ @SuppressWarnings({"IfMayBeConditional", "ConstantConditions", "unchecked"}) - @Override protected void processInStreamOutStream(int type, BinaryRawReaderEx reader, BinaryRawWriterEx writer) + @Override public void processInStreamOutStream(int type, BinaryRawReaderEx reader, BinaryRawWriterEx writer) throws IgniteCheckedException { switch (type) { case OP_LOCAL_QUERY: { @@ -271,7 +272,7 @@ public class PlatformEvents extends PlatformAbstractTarget { } /** {@inheritDoc} */ - @Override protected void processOutStream(int type, BinaryRawWriterEx writer) throws IgniteCheckedException { + @Override public void processOutStream(int type, BinaryRawWriterEx writer) throws IgniteCheckedException { switch (type) { case OP_GET_ENABLED_EVENTS: writeEventTypes(events.enabledEvents(), writer); @@ -284,7 +285,7 @@ public class PlatformEvents extends PlatformAbstractTarget { } /** {@inheritDoc} */ - @Override protected Object processOutObject(int type) throws IgniteCheckedException { + @Override public PlatformTarget processOutObject(int type) throws IgniteCheckedException { switch (type) { case OP_WITH_ASYNC: if (events.isAsync()) @@ -297,7 +298,7 @@ public class PlatformEvents extends PlatformAbstractTarget { } /** {@inheritDoc} */ - @Override protected long processInLongOutLong(int type, long val) throws IgniteCheckedException { + @Override public long processInLongOutLong(int type, long val) throws IgniteCheckedException { switch (type) { case OP_IS_ENABLED: return events.isEnabled((int)val) ? TRUE : FALSE; @@ -310,12 +311,12 @@ public class PlatformEvents extends PlatformAbstractTarget { } /** {@inheritDoc} */ - @Override protected IgniteInternalFuture currentFuture() throws IgniteCheckedException { + @Override public IgniteInternalFuture currentFuture() throws IgniteCheckedException { return ((IgniteFutureImpl)eventsAsync.future()).internalFuture(); } /** {@inheritDoc} */ - @Nullable @Override protected PlatformFutureUtils.Writer futureWriter(int opId) { + @Nullable @Override public PlatformFutureUtils.Writer futureWriter(int opId) { switch (opId) { case OP_WAIT_FOR_LOCAL: return eventResWriter; http://git-wip-us.apache.org/repos/asf/ignite/blob/59e6fec0/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/messaging/PlatformMessaging.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/messaging/PlatformMessaging.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/messaging/PlatformMessaging.java index 216427a..6fe109e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/messaging/PlatformMessaging.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/messaging/PlatformMessaging.java @@ -24,6 +24,7 @@ import org.apache.ignite.internal.binary.BinaryRawReaderEx; import org.apache.ignite.internal.binary.BinaryRawWriterEx; import org.apache.ignite.internal.processors.platform.PlatformAbstractTarget; import org.apache.ignite.internal.processors.platform.PlatformContext; +import org.apache.ignite.internal.processors.platform.PlatformTarget; import org.apache.ignite.internal.processors.platform.message.PlatformMessageFilter; import org.apache.ignite.internal.processors.platform.utils.PlatformUtils; import org.apache.ignite.internal.util.future.IgniteFutureImpl; @@ -86,7 +87,7 @@ public class PlatformMessaging extends PlatformAbstractTarget { } /** {@inheritDoc} */ - @Override protected long processInStreamOutLong(int type, BinaryRawReaderEx reader) + @Override public long processInStreamOutLong(int type, BinaryRawReaderEx reader) throws IgniteCheckedException { switch (type) { case OP_SEND: @@ -149,7 +150,7 @@ public class PlatformMessaging extends PlatformAbstractTarget { /** {@inheritDoc} */ @SuppressWarnings({"IfMayBeConditional", "ConstantConditions", "unchecked"}) - @Override protected void processInStreamOutStream(int type, BinaryRawReaderEx reader, BinaryRawWriterEx writer) + @Override public void processInStreamOutStream(int type, BinaryRawReaderEx reader, BinaryRawWriterEx writer) throws IgniteCheckedException { switch (type) { case OP_REMOTE_LISTEN:{ @@ -181,12 +182,12 @@ public class PlatformMessaging extends PlatformAbstractTarget { } /** {@inheritDoc} */ - @Override protected IgniteInternalFuture currentFuture() throws IgniteCheckedException { + @Override public IgniteInternalFuture currentFuture() throws IgniteCheckedException { return ((IgniteFutureImpl)messagingAsync.future()).internalFuture(); } /** {@inheritDoc} */ - @Override protected Object processOutObject(int type) throws IgniteCheckedException { + @Override public PlatformTarget processOutObject(int type) throws IgniteCheckedException { switch (type) { case OP_WITH_ASYNC: if (messaging.isAsync()) http://git-wip-us.apache.org/repos/asf/ignite/blob/59e6fec0/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/services/PlatformServices.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/services/PlatformServices.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/services/PlatformServices.java index 962a4c0..22a7fa2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/services/PlatformServices.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/services/PlatformServices.java @@ -25,6 +25,7 @@ import org.apache.ignite.internal.binary.BinaryRawReaderEx; import org.apache.ignite.internal.binary.BinaryRawWriterEx; import org.apache.ignite.internal.processors.platform.PlatformAbstractTarget; import org.apache.ignite.internal.processors.platform.PlatformContext; +import org.apache.ignite.internal.processors.platform.PlatformTarget; import org.apache.ignite.internal.processors.platform.dotnet.PlatformDotNetService; import org.apache.ignite.internal.processors.platform.dotnet.PlatformDotNetServiceImpl; import org.apache.ignite.internal.processors.platform.utils.PlatformUtils; @@ -144,7 +145,7 @@ public class PlatformServices extends PlatformAbstractTarget { } /** {@inheritDoc} */ - @Override protected long processInStreamOutLong(int type, BinaryRawReaderEx reader) + @Override public long processInStreamOutLong(int type, BinaryRawReaderEx reader) throws IgniteCheckedException { switch (type) { case OP_DOTNET_DEPLOY: { @@ -195,7 +196,7 @@ public class PlatformServices extends PlatformAbstractTarget { } /** {@inheritDoc} */ - @Override protected void processInStreamOutStream(int type, BinaryRawReaderEx reader, BinaryRawWriterEx writer) + @Override public void processInStreamOutStream(int type, BinaryRawReaderEx reader, BinaryRawWriterEx writer) throws IgniteCheckedException { switch (type) { case OP_DOTNET_SERVICES: { @@ -223,8 +224,8 @@ public class PlatformServices extends PlatformAbstractTarget { } /** {@inheritDoc} */ - @Override protected Object processInObjectStreamOutObjectStream(int type, Object arg, BinaryRawReaderEx reader, - BinaryRawWriterEx writer) throws IgniteCheckedException { + @Override public PlatformTarget processInObjectStreamOutObjectStream(int type, PlatformTarget arg, + BinaryRawReaderEx reader, BinaryRawWriterEx writer) throws IgniteCheckedException { switch (type) { case OP_INVOKE: { assert arg != null; @@ -260,7 +261,7 @@ public class PlatformServices extends PlatformAbstractTarget { } /** {@inheritDoc} */ - @Override protected void processOutStream(int type, BinaryRawWriterEx writer) throws IgniteCheckedException { + @Override public void processOutStream(int type, BinaryRawWriterEx writer) throws IgniteCheckedException { switch (type) { case OP_DESCRIPTORS: { Collection<ServiceDescriptor> descs = services.serviceDescriptors(); @@ -299,7 +300,7 @@ public class PlatformServices extends PlatformAbstractTarget { } /** {@inheritDoc} */ - @Override protected Object processOutObject(int type) throws IgniteCheckedException { + @Override public PlatformTarget processOutObject(int type) throws IgniteCheckedException { switch (type) { case OP_WITH_ASYNC: if (services.isAsync()) @@ -315,7 +316,7 @@ public class PlatformServices extends PlatformAbstractTarget { } /** {@inheritDoc} */ - @Override protected long processInLongOutLong(int type, long val) throws IgniteCheckedException { + @Override public long processInLongOutLong(int type, long val) throws IgniteCheckedException { switch (type) { case OP_CANCEL_ALL: services.cancelAll(); @@ -327,7 +328,7 @@ public class PlatformServices extends PlatformAbstractTarget { } /** {@inheritDoc} */ - @Override protected Object processInStreamOutObject(int type, BinaryRawReaderEx reader) throws IgniteCheckedException { + @Override public PlatformTarget processInStreamOutObject(int type, BinaryRawReaderEx reader) throws IgniteCheckedException { switch (type) { case OP_SERVICE_PROXY: { String name = reader.readString(); @@ -343,14 +344,14 @@ public class PlatformServices extends PlatformAbstractTarget { : new GridServiceProxy<>(services.clusterGroup(), name, Service.class, sticky, platformCtx.kernalContext()); - return new ServiceProxyHolder(proxy, d.serviceClass()); + return new ServiceProxyHolder(proxy, d.serviceClass(), platformContext()); } } return super.processInStreamOutObject(type, reader); } /** {@inheritDoc} */ - @Override protected IgniteInternalFuture currentFuture() throws IgniteCheckedException { + @Override public IgniteInternalFuture currentFuture() throws IgniteCheckedException { return ((IgniteFutureImpl)servicesAsync.future()).internalFuture(); } @@ -392,7 +393,7 @@ public class PlatformServices extends PlatformAbstractTarget { * Proxy holder. */ @SuppressWarnings("unchecked") - private static class ServiceProxyHolder { + private static class ServiceProxyHolder extends PlatformAbstractTarget { /** */ private final Object proxy; @@ -422,7 +423,9 @@ public class PlatformServices extends PlatformAbstractTarget { * @param proxy Proxy object. * @param clazz Proxy class. */ - private ServiceProxyHolder(Object proxy, Class clazz) { + private ServiceProxyHolder(Object proxy, Class clazz, PlatformContext ctx) { + super(ctx); + assert proxy != null; assert clazz != null; http://git-wip-us.apache.org/repos/asf/ignite/blob/59e6fec0/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/transactions/PlatformTransactions.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/transactions/PlatformTransactions.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/transactions/PlatformTransactions.java index 1b41712..3cee2b1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/transactions/PlatformTransactions.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/transactions/PlatformTransactions.java @@ -153,7 +153,7 @@ public class PlatformTransactions extends PlatformAbstractTarget { } /** {@inheritDoc} */ - @Override protected long processInLongOutLong(int type, long val) throws IgniteCheckedException { + @Override public long processInLongOutLong(int type, long val) throws IgniteCheckedException { switch (type) { case OP_COMMIT: tx(val).commit(); @@ -184,7 +184,7 @@ public class PlatformTransactions extends PlatformAbstractTarget { } /** {@inheritDoc} */ - @Override protected long processInStreamOutLong(int type, BinaryRawReaderEx reader) throws IgniteCheckedException { + @Override public long processInStreamOutLong(int type, BinaryRawReaderEx reader) throws IgniteCheckedException { long txId = reader.readLong(); final Transaction asyncTx = (Transaction)tx(txId).withAsync(); @@ -220,7 +220,7 @@ public class PlatformTransactions extends PlatformAbstractTarget { } /** {@inheritDoc} */ - @Override protected void processInStreamOutStream(int type, BinaryRawReaderEx reader, BinaryRawWriterEx writer) throws IgniteCheckedException { + @Override public void processInStreamOutStream(int type, BinaryRawReaderEx reader, BinaryRawWriterEx writer) throws IgniteCheckedException { switch (type) { case OP_START: { TransactionConcurrency txConcurrency = TransactionConcurrency.fromOrdinal(reader.readInt()); @@ -245,7 +245,7 @@ public class PlatformTransactions extends PlatformAbstractTarget { } /** {@inheritDoc} */ - @Override protected void processOutStream(int type, BinaryRawWriterEx writer) throws IgniteCheckedException { + @Override public void processOutStream(int type, BinaryRawWriterEx writer) throws IgniteCheckedException { switch (type) { case OP_CACHE_CONFIG_PARAMETERS: TransactionConfiguration txCfg = platformCtx.kernalContext().config().getTransactionConfiguration(); http://git-wip-us.apache.org/repos/asf/ignite/blob/59e6fec0/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformFutureUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformFutureUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformFutureUtils.java index 5985d22..e81f4c6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformFutureUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformFutureUtils.java @@ -20,8 +20,8 @@ package org.apache.ignite.internal.processors.platform.utils; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.binary.BinaryRawWriterEx; -import org.apache.ignite.internal.processors.platform.PlatformAbstractTarget; import org.apache.ignite.internal.processors.platform.PlatformContext; +import org.apache.ignite.internal.processors.platform.PlatformTarget; import org.apache.ignite.internal.processors.platform.callback.PlatformCallbackGateway; import org.apache.ignite.internal.processors.platform.memory.PlatformMemory; import org.apache.ignite.internal.processors.platform.memory.PlatformOutputStream; @@ -71,7 +71,7 @@ public class PlatformFutureUtils { * @return Resulting listenable. */ public static PlatformListenable listen(final PlatformContext ctx, IgniteInternalFuture fut, final long futPtr, - final int typ, PlatformAbstractTarget target) { + final int typ, PlatformTarget target) { PlatformListenable listenable = getListenable(fut); listen(ctx, listenable, futPtr, typ, null, target); @@ -88,7 +88,7 @@ public class PlatformFutureUtils { * @return Resulting listenable. */ public static PlatformListenable listen(final PlatformContext ctx, IgniteFuture fut, final long futPtr, - final int typ, PlatformAbstractTarget target) { + final int typ, PlatformTarget target) { PlatformListenable listenable = getListenable(fut); listen(ctx, listenable, futPtr, typ, null, target); @@ -107,7 +107,7 @@ public class PlatformFutureUtils { * @return Resulting listenable. */ public static PlatformListenable listen(final PlatformContext ctx, IgniteInternalFuture fut, final long futPtr, - final int typ, Writer writer, PlatformAbstractTarget target) { + final int typ, Writer writer, PlatformTarget target) { PlatformListenable listenable = getListenable(fut); listen(ctx, listenable, futPtr, typ, writer, target); @@ -126,7 +126,7 @@ public class PlatformFutureUtils { * @return Resulting listenable. */ public static PlatformListenable listen(final PlatformContext ctx, IgniteFuture fut, final long futPtr, - final int typ, Writer writer, PlatformAbstractTarget target) { + final int typ, Writer writer, PlatformTarget target) { PlatformListenable listenable = getListenable(fut); listen(ctx, listenable, futPtr, typ, writer, target); @@ -144,7 +144,7 @@ public class PlatformFutureUtils { * @return Resulting listenable. */ public static PlatformListenable listen(final PlatformContext ctx, IgniteInternalFuture fut, final long futPtr, - Writer writer, PlatformAbstractTarget target) { + Writer writer, PlatformTarget target) { PlatformListenable listenable = getListenable(fut); listen(ctx, listenable, futPtr, TYP_OBJ, writer, target); @@ -183,7 +183,7 @@ public class PlatformFutureUtils { */ @SuppressWarnings("unchecked") public static void listen(final PlatformContext ctx, PlatformListenable listenable, final long futPtr, final - int typ, @Nullable final Writer writer, final PlatformAbstractTarget target) { + int typ, @Nullable final Writer writer, final PlatformTarget target) { final PlatformCallbackGateway gate = ctx.gateway(); listenable.listen(new IgniteBiInClosure<Object, Throwable>() { http://git-wip-us.apache.org/repos/asf/ignite/blob/59e6fec0/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformListenableTarget.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformListenableTarget.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformListenableTarget.java new file mode 100644 index 0000000..7d65913 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformListenableTarget.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.platform.utils; + +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.processors.platform.PlatformAbstractTarget; +import org.apache.ignite.internal.processors.platform.PlatformContext; + +/** + * Wraps listenable in a platform target. + */ +public class PlatformListenableTarget extends PlatformAbstractTarget { + /** */ + private static final int OP_CANCEL = 1; + + /** */ + private static final int OP_IS_CANCELLED = 2; + + /** Wrapped listenable */ + private final PlatformListenable listenable; + + /** + * Constructor. + * + * @param platformCtx Context. + */ + public PlatformListenableTarget(PlatformListenable listenable, PlatformContext platformCtx) { + super(platformCtx); + + assert listenable != null; + + this.listenable = listenable; + } + + /** {@inheritDoc} */ + @Override public long processInLongOutLong(int type, long val) throws IgniteCheckedException { + switch (type) { + case OP_CANCEL: + return listenable.cancel() ? TRUE : FALSE; + + case OP_IS_CANCELLED: + return listenable.isCancelled() ? TRUE : FALSE; + } + + return super.processInLongOutLong(type, val); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/59e6fec0/modules/platforms/cpp/jni/include/ignite/jni/exports.h ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/jni/include/ignite/jni/exports.h b/modules/platforms/cpp/jni/include/ignite/jni/exports.h index 3052435..a2e5cbb 100644 --- a/modules/platforms/cpp/jni/include/ignite/jni/exports.h +++ b/modules/platforms/cpp/jni/include/ignite/jni/exports.h @@ -80,9 +80,6 @@ extern "C" { void IGNITE_CALL IgniteDestroyJvm(gcj::JniContext* ctx); - bool IGNITE_CALL IgniteListenableCancel(gcj::JniContext* ctx, void* obj); - bool IGNITE_CALL IgniteListenableIsCancelled(gcj::JniContext* ctx, void* obj); - void IGNITE_CALL IgniteSetConsoleHandler(gcj::ConsoleWriteHandler consoleHandler); void IGNITE_CALL IgniteRemoveConsoleHandler(gcj::ConsoleWriteHandler consoleHandler); } http://git-wip-us.apache.org/repos/asf/ignite/blob/59e6fec0/modules/platforms/cpp/jni/include/ignite/jni/java.h ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/jni/include/ignite/jni/java.h b/modules/platforms/cpp/jni/include/ignite/jni/java.h index 07df001..97e4412 100644 --- a/modules/platforms/cpp/jni/include/ignite/jni/java.h +++ b/modules/platforms/cpp/jni/include/ignite/jni/java.h @@ -286,10 +286,6 @@ namespace ignite jmethodID m_PlatformUtils_reallocate; jmethodID m_PlatformUtils_errData; - jclass c_PlatformListenable; - jmethodID m_PlatformListenable_cancel; - jmethodID m_PlatformListenable_isCancelled; - /** * Constructor. */ @@ -465,9 +461,6 @@ namespace ignite jobject CacheOutOpQueryCursor(jobject obj, int type, long long memPtr, JniErrorInfo* errInfo = NULL); jobject CacheOutOpContinuousQuery(jobject obj, int type, long long memPtr); - bool ListenableCancel(jobject obj); - bool ListenableIsCancelled(jobject obj); - jobject Acquire(jobject obj); void DestroyJvm(); http://git-wip-us.apache.org/repos/asf/ignite/blob/59e6fec0/modules/platforms/cpp/jni/project/vs/module.def ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/jni/project/vs/module.def b/modules/platforms/cpp/jni/project/vs/module.def index e58ac3b..fb56dca 100644 --- a/modules/platforms/cpp/jni/project/vs/module.def +++ b/modules/platforms/cpp/jni/project/vs/module.def @@ -36,8 +36,6 @@ IgniteDestroyJvm @86 IgniteTargetOutObject @91 IgniteProcessorExtensions @97 IgniteProcessorAtomicLong @98 -IgniteListenableCancel @110 -IgniteListenableIsCancelled @111 IgniteProcessorCreateCacheFromConfig @114 IgniteProcessorGetOrCreateCacheFromConfig @115 IgniteProcessorGetIgniteConfiguration @116 http://git-wip-us.apache.org/repos/asf/ignite/blob/59e6fec0/modules/platforms/cpp/jni/src/exports.cpp ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/jni/src/exports.cpp b/modules/platforms/cpp/jni/src/exports.cpp index dde98fb..b842c03 100644 --- a/modules/platforms/cpp/jni/src/exports.cpp +++ b/modules/platforms/cpp/jni/src/exports.cpp @@ -214,14 +214,6 @@ extern "C" { ctx->DestroyJvm(); } - bool IGNITE_CALL IgniteListenableCancel(gcj::JniContext* ctx, void* obj) { - return ctx->ListenableCancel(static_cast<jobject>(obj)); - } - - bool IGNITE_CALL IgniteListenableIsCancelled(gcj::JniContext* ctx, void* obj) { - return ctx->ListenableIsCancelled(static_cast<jobject>(obj)); - } - void IGNITE_CALL IgniteSetConsoleHandler(gcj::ConsoleWriteHandler consoleHandler) { gcj::JniContext::SetConsoleHandler(consoleHandler); } http://git-wip-us.apache.org/repos/asf/ignite/blob/59e6fec0/modules/platforms/cpp/jni/src/java.cpp ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/jni/src/java.cpp b/modules/platforms/cpp/jni/src/java.cpp index 2d3cf72..9626fbb 100644 --- a/modules/platforms/cpp/jni/src/java.cpp +++ b/modules/platforms/cpp/jni/src/java.cpp @@ -194,33 +194,33 @@ namespace ignite const char* C_PLATFORM_PROCESSOR = "org/apache/ignite/internal/processors/platform/PlatformProcessor"; JniMethod M_PLATFORM_PROCESSOR_RELEASE_START = JniMethod("releaseStart", "()V", false); - JniMethod M_PLATFORM_PROCESSOR_PROJECTION = JniMethod("projection", "()Lorg/apache/ignite/internal/processors/platform/PlatformTarget;", false); - JniMethod M_PLATFORM_PROCESSOR_CACHE = JniMethod("cache", "(Ljava/lang/String;)Lorg/apache/ignite/internal/processors/platform/PlatformTarget;", false); - JniMethod M_PLATFORM_PROCESSOR_CREATE_CACHE = JniMethod("createCache", "(Ljava/lang/String;)Lorg/apache/ignite/internal/processors/platform/PlatformTarget;", false); - JniMethod M_PLATFORM_PROCESSOR_GET_OR_CREATE_CACHE = JniMethod("getOrCreateCache", "(Ljava/lang/String;)Lorg/apache/ignite/internal/processors/platform/PlatformTarget;", false); - JniMethod M_PLATFORM_PROCESSOR_CREATE_CACHE_FROM_CONFIG = JniMethod("createCacheFromConfig", "(J)Lorg/apache/ignite/internal/processors/platform/PlatformTarget;", false); - JniMethod M_PLATFORM_PROCESSOR_GET_OR_CREATE_CACHE_FROM_CONFIG = JniMethod("getOrCreateCacheFromConfig", "(J)Lorg/apache/ignite/internal/processors/platform/PlatformTarget;", false); - JniMethod M_PLATFORM_PROCESSOR_CREATE_NEAR_CACHE = JniMethod("createNearCache", "(Ljava/lang/String;J)Lorg/apache/ignite/internal/processors/platform/PlatformTarget;", false); - JniMethod M_PLATFORM_PROCESSOR_GET_OR_CREATE_NEAR_CACHE = JniMethod("getOrCreateNearCache", "(Ljava/lang/String;J)Lorg/apache/ignite/internal/processors/platform/PlatformTarget;", false); + JniMethod M_PLATFORM_PROCESSOR_PROJECTION = JniMethod("projection", "()Lorg/apache/ignite/internal/processors/platform/PlatformTargetProxy;", false); + JniMethod M_PLATFORM_PROCESSOR_CACHE = JniMethod("cache", "(Ljava/lang/String;)Lorg/apache/ignite/internal/processors/platform/PlatformTargetProxy;", false); + JniMethod M_PLATFORM_PROCESSOR_CREATE_CACHE = JniMethod("createCache", "(Ljava/lang/String;)Lorg/apache/ignite/internal/processors/platform/PlatformTargetProxy;", false); + JniMethod M_PLATFORM_PROCESSOR_GET_OR_CREATE_CACHE = JniMethod("getOrCreateCache", "(Ljava/lang/String;)Lorg/apache/ignite/internal/processors/platform/PlatformTargetProxy;", false); + JniMethod M_PLATFORM_PROCESSOR_CREATE_CACHE_FROM_CONFIG = JniMethod("createCacheFromConfig", "(J)Lorg/apache/ignite/internal/processors/platform/PlatformTargetProxy;", false); + JniMethod M_PLATFORM_PROCESSOR_GET_OR_CREATE_CACHE_FROM_CONFIG = JniMethod("getOrCreateCacheFromConfig", "(J)Lorg/apache/ignite/internal/processors/platform/PlatformTargetProxy;", false); + JniMethod M_PLATFORM_PROCESSOR_CREATE_NEAR_CACHE = JniMethod("createNearCache", "(Ljava/lang/String;J)Lorg/apache/ignite/internal/processors/platform/PlatformTargetProxy;", false); + JniMethod M_PLATFORM_PROCESSOR_GET_OR_CREATE_NEAR_CACHE = JniMethod("getOrCreateNearCache", "(Ljava/lang/String;J)Lorg/apache/ignite/internal/processors/platform/PlatformTargetProxy;", false); JniMethod M_PLATFORM_PROCESSOR_DESTROY_CACHE = JniMethod("destroyCache", "(Ljava/lang/String;)V", false); - JniMethod M_PLATFORM_PROCESSOR_AFFINITY = JniMethod("affinity", "(Ljava/lang/String;)Lorg/apache/ignite/internal/processors/platform/PlatformTarget;", false); - JniMethod M_PLATFORM_PROCESSOR_DATA_STREAMER = JniMethod("dataStreamer", "(Ljava/lang/String;Z)Lorg/apache/ignite/internal/processors/platform/PlatformTarget;", false); - JniMethod M_PLATFORM_PROCESSOR_TRANSACTIONS = JniMethod("transactions", "()Lorg/apache/ignite/internal/processors/platform/PlatformTarget;", false); - JniMethod M_PLATFORM_PROCESSOR_COMPUTE = JniMethod("compute", "(Lorg/apache/ignite/internal/processors/platform/PlatformTarget;)Lorg/apache/ignite/internal/processors/platform/PlatformTarget;", false); - JniMethod M_PLATFORM_PROCESSOR_MESSAGE = JniMethod("message", "(Lorg/apache/ignite/internal/processors/platform/PlatformTarget;)Lorg/apache/ignite/internal/processors/platform/PlatformTarget;", false); - JniMethod M_PLATFORM_PROCESSOR_EVENTS = JniMethod("events", "(Lorg/apache/ignite/internal/processors/platform/PlatformTarget;)Lorg/apache/ignite/internal/processors/platform/PlatformTarget;", false); - JniMethod M_PLATFORM_PROCESSOR_SERVICES = JniMethod("services", "(Lorg/apache/ignite/internal/processors/platform/PlatformTarget;)Lorg/apache/ignite/internal/processors/platform/PlatformTarget;", false); - JniMethod M_PLATFORM_PROCESSOR_EXTENSIONS = JniMethod("extensions", "()Lorg/apache/ignite/internal/processors/platform/PlatformTarget;", false); - JniMethod M_PLATFORM_PROCESSOR_ATOMIC_LONG = JniMethod("atomicLong", "(Ljava/lang/String;JZ)Lorg/apache/ignite/internal/processors/platform/PlatformTarget;", false); - JniMethod M_PLATFORM_PROCESSOR_ATOMIC_SEQUENCE = JniMethod("atomicSequence", "(Ljava/lang/String;JZ)Lorg/apache/ignite/internal/processors/platform/PlatformTarget;", false); - JniMethod M_PLATFORM_PROCESSOR_ATOMIC_REFERENCE = JniMethod("atomicReference", "(Ljava/lang/String;JZ)Lorg/apache/ignite/internal/processors/platform/PlatformTarget;", false); + JniMethod M_PLATFORM_PROCESSOR_AFFINITY = JniMethod("affinity", "(Ljava/lang/String;)Lorg/apache/ignite/internal/processors/platform/PlatformTargetProxy;", false); + JniMethod M_PLATFORM_PROCESSOR_DATA_STREAMER = JniMethod("dataStreamer", "(Ljava/lang/String;Z)Lorg/apache/ignite/internal/processors/platform/PlatformTargetProxy;", false); + JniMethod M_PLATFORM_PROCESSOR_TRANSACTIONS = JniMethod("transactions", "()Lorg/apache/ignite/internal/processors/platform/PlatformTargetProxy;", false); + JniMethod M_PLATFORM_PROCESSOR_COMPUTE = JniMethod("compute", "(Lorg/apache/ignite/internal/processors/platform/PlatformTargetProxy;)Lorg/apache/ignite/internal/processors/platform/PlatformTargetProxy;", false); + JniMethod M_PLATFORM_PROCESSOR_MESSAGE = JniMethod("message", "(Lorg/apache/ignite/internal/processors/platform/PlatformTargetProxy;)Lorg/apache/ignite/internal/processors/platform/PlatformTargetProxy;", false); + JniMethod M_PLATFORM_PROCESSOR_EVENTS = JniMethod("events", "(Lorg/apache/ignite/internal/processors/platform/PlatformTargetProxy;)Lorg/apache/ignite/internal/processors/platform/PlatformTargetProxy;", false); + JniMethod M_PLATFORM_PROCESSOR_SERVICES = JniMethod("services", "(Lorg/apache/ignite/internal/processors/platform/PlatformTargetProxy;)Lorg/apache/ignite/internal/processors/platform/PlatformTargetProxy;", false); + JniMethod M_PLATFORM_PROCESSOR_EXTENSIONS = JniMethod("extensions", "()Lorg/apache/ignite/internal/processors/platform/PlatformTargetProxy;", false); + JniMethod M_PLATFORM_PROCESSOR_ATOMIC_LONG = JniMethod("atomicLong", "(Ljava/lang/String;JZ)Lorg/apache/ignite/internal/processors/platform/PlatformTargetProxy;", false); + JniMethod M_PLATFORM_PROCESSOR_ATOMIC_SEQUENCE = JniMethod("atomicSequence", "(Ljava/lang/String;JZ)Lorg/apache/ignite/internal/processors/platform/PlatformTargetProxy;", false); + JniMethod M_PLATFORM_PROCESSOR_ATOMIC_REFERENCE = JniMethod("atomicReference", "(Ljava/lang/String;JZ)Lorg/apache/ignite/internal/processors/platform/PlatformTargetProxy;", false); JniMethod M_PLATFORM_PROCESSOR_GET_IGNITE_CONFIGURATION = JniMethod("getIgniteConfiguration", "(J)V", false); JniMethod M_PLATFORM_PROCESSOR_GET_CACHE_NAMES = JniMethod("getCacheNames", "(J)V", false); JniMethod M_PLATFORM_PROCESSOR_LOGGER_IS_LEVEL_ENABLED = JniMethod("loggerIsLevelEnabled", "(I)Z", false); JniMethod M_PLATFORM_PROCESSOR_LOGGER_LOG = JniMethod("loggerLog", "(ILjava/lang/String;Ljava/lang/String;Ljava/lang/String;)V", false); - JniMethod M_PLATFORM_PROCESSOR_BINARY_PROCESSOR = JniMethod("binaryProcessor", "()Lorg/apache/ignite/internal/processors/platform/PlatformTarget;", false); + JniMethod M_PLATFORM_PROCESSOR_BINARY_PROCESSOR = JniMethod("binaryProcessor", "()Lorg/apache/ignite/internal/processors/platform/PlatformTargetProxy;", false); - const char* C_PLATFORM_TARGET = "org/apache/ignite/internal/processors/platform/PlatformTarget"; + const char* C_PLATFORM_TARGET = "org/apache/ignite/internal/processors/platform/PlatformTargetProxy"; JniMethod M_PLATFORM_TARGET_IN_LONG_OUT_LONG = JniMethod("inLongOutLong", "(IJ)J", false); JniMethod M_PLATFORM_TARGET_IN_STREAM_OUT_LONG = JniMethod("inStreamOutLong", "(IJ)J", false); JniMethod M_PLATFORM_TARGET_IN_STREAM_OUT_OBJECT = JniMethod("inStreamOutObject", "(IJ)Ljava/lang/Object;", false); @@ -260,7 +260,7 @@ namespace ignite JniMethod M_PLATFORM_CALLBACK_UTILS_CONTINUOUS_QUERY_FILTER_RELEASE = JniMethod("continuousQueryFilterRelease", "(JJ)V", true); JniMethod M_PLATFORM_CALLBACK_UTILS_DATA_STREAMER_TOPOLOGY_UPDATE = JniMethod("dataStreamerTopologyUpdate", "(JJJI)V", true); - JniMethod M_PLATFORM_CALLBACK_UTILS_DATA_STREAMER_STREAM_RECEIVER_INVOKE = JniMethod("dataStreamerStreamReceiverInvoke", "(JJLjava/lang/Object;JZ)V", true); + JniMethod M_PLATFORM_CALLBACK_UTILS_DATA_STREAMER_STREAM_RECEIVER_INVOKE = JniMethod("dataStreamerStreamReceiverInvoke", "(JJLorg/apache/ignite/internal/processors/platform/PlatformTargetProxy;JZ)V", true); JniMethod M_PLATFORM_CALLBACK_UTILS_FUTURE_BYTE_RES = JniMethod("futureByteResult", "(JJI)V", true); JniMethod M_PLATFORM_CALLBACK_UTILS_FUTURE_BOOL_RES = JniMethod("futureBoolResult", "(JJI)V", true); @@ -307,7 +307,7 @@ namespace ignite JniMethod M_PLATFORM_CALLBACK_UTILS_LOGGER_LOG = JniMethod("loggerLog", "(JILjava/lang/String;Ljava/lang/String;Ljava/lang/String;J)V", true); JniMethod M_PLATFORM_CALLBACK_UTILS_LOGGER_IS_LEVEL_ENABLED = JniMethod("loggerIsLevelEnabled", "(JI)Z", true); - JniMethod M_PLATFORM_CALLBACK_UTILS_AFFINITY_FUNCTION_INIT = JniMethod("affinityFunctionInit", "(JJLorg/apache/ignite/internal/processors/platform/cache/affinity/PlatformAffinityFunctionTarget;)J", true); + JniMethod M_PLATFORM_CALLBACK_UTILS_AFFINITY_FUNCTION_INIT = JniMethod("affinityFunctionInit", "(JJLorg/apache/ignite/internal/processors/platform/PlatformTargetProxy;)J", true); JniMethod M_PLATFORM_CALLBACK_UTILS_AFFINITY_FUNCTION_PARTITION = JniMethod("affinityFunctionPartition", "(JJJ)I", true); JniMethod M_PLATFORM_CALLBACK_UTILS_AFFINITY_FUNCTION_ASSIGN_PARTITIONS = JniMethod("affinityFunctionAssignPartitions", "(JJJJ)V", true); JniMethod M_PLATFORM_CALLBACK_UTILS_AFFINITY_FUNCTION_REMOVE_NODE = JniMethod("affinityFunctionRemoveNode", "(JJJ)V", true); @@ -327,10 +327,6 @@ namespace ignite JniMethod M_PLATFORM_IGNITION_STOP = JniMethod("stop", "(Ljava/lang/String;Z)Z", true); JniMethod M_PLATFORM_IGNITION_STOP_ALL = JniMethod("stopAll", "(Z)V", true); - const char* C_PLATFORM_LISTENABLE = "org/apache/ignite/internal/processors/platform/utils/PlatformListenable"; - JniMethod M_PLATFORM_LISTENABLE_CANCEL = JniMethod("cancel", "()Z", false); - JniMethod M_PLATFORM_LISTENABLE_IS_CANCELED = JniMethod("isCancelled", "()Z", false); - /* STATIC STATE. */ gcc::CriticalSection JVM_LOCK; gcc::CriticalSection CONSOLE_LOCK; @@ -552,10 +548,6 @@ namespace ignite m_PlatformUtils_reallocate = FindMethod(env, c_PlatformUtils, M_PLATFORM_UTILS_REALLOC); m_PlatformUtils_errData = FindMethod(env, c_PlatformUtils, M_PLATFORM_UTILS_ERR_DATA); - c_PlatformListenable = FindClass(env, C_PLATFORM_LISTENABLE); - m_PlatformListenable_cancel = FindMethod(env, c_PlatformListenable, M_PLATFORM_LISTENABLE_CANCEL); - m_PlatformListenable_isCancelled = FindMethod(env, c_PlatformListenable, M_PLATFORM_LISTENABLE_IS_CANCELED); - // Find utility classes which are not used from context, but are still required in other places. CheckClass(env, C_PLATFORM_NO_CALLBACK_EXCEPTION); } @@ -1447,28 +1439,6 @@ namespace ignite return LocalToGlobal(env, res); } - bool JniContext::ListenableCancel(jobject obj) - { - JNIEnv* env = Attach(); - - jboolean res = env->CallBooleanMethod(obj, jvm->GetMembers().m_PlatformListenable_cancel); - - ExceptionCheck(env); - - return res != 0;; - } - - bool JniContext::ListenableIsCancelled(jobject obj) - { - JNIEnv* env = Attach(); - - jboolean res = env->CallBooleanMethod(obj, jvm->GetMembers().m_PlatformListenable_isCancelled); - - ExceptionCheck(env); - - return res != 0;; - } - jobject JniContext::Acquire(jobject obj) { if (obj) { http://git-wip-us.apache.org/repos/asf/ignite/blob/59e6fec0/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj b/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj index f945efe..6421b8c 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj @@ -181,6 +181,7 @@ <Compile Include="Impl\Cache\Expiry\ExpiryPolicyFactory.cs" /> <Compile Include="Impl\Cache\Expiry\ExpiryPolicySerializer.cs" /> <Compile Include="Impl\Cache\ICacheLockInternal.cs" /> + <Compile Include="Impl\Common\Listenable.cs" /> <Compile Include="Impl\Common\Platform.cs" /> <Compile Include="Impl\Binary\UserSerializerProxy.cs" /> <Compile Include="Impl\Cache\Affinity\AffinityFunctionSerializer.cs" /> http://git-wip-us.apache.org/repos/asf/ignite/blob/59e6fec0/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Common/Future.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Common/Future.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Common/Future.cs index 68bd9d4..50102a7 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Common/Future.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Common/Future.cs @@ -24,7 +24,6 @@ namespace Apache.Ignite.Core.Impl.Common using System.Threading.Tasks; using Apache.Ignite.Core.Common; using Apache.Ignite.Core.Impl.Binary.IO; - using Apache.Ignite.Core.Impl.Unmanaged; /// <summary> /// Grid future implementation. @@ -40,7 +39,7 @@ namespace Apache.Ignite.Core.Impl.Common private readonly TaskCompletionSource<T> _taskCompletionSource = new TaskCompletionSource<T>(); /** */ - private volatile IUnmanagedTarget _unmanagedTarget; + private volatile Listenable _listenable; /// <summary> /// Constructor. @@ -84,7 +83,7 @@ namespace Apache.Ignite.Core.Impl.Common /// <param name="cancellationToken">The cancellation token.</param> public Task<T> GetTask(CancellationToken cancellationToken) { - Debug.Assert(_unmanagedTarget != null); + Debug.Assert(_listenable != null); // OnTokenCancel will fire even if cancellationToken is already cancelled. cancellationToken.Register(OnTokenCancel); @@ -169,11 +168,11 @@ namespace Apache.Ignite.Core.Impl.Common /// <summary> /// Sets unmanaged future target for cancellation. /// </summary> - internal void SetTarget(IUnmanagedTarget target) + internal void SetTarget(Listenable target) { Debug.Assert(target != null); - _unmanagedTarget = target; + _listenable = target; } /// <summary> @@ -181,8 +180,8 @@ namespace Apache.Ignite.Core.Impl.Common /// </summary> private void OnTokenCancel() { - if (_unmanagedTarget != null) - UnmanagedUtils.ListenableCancel(_unmanagedTarget); + if (_listenable != null) + _listenable.Cancel(); } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/59e6fec0/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Common/Listenable.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Common/Listenable.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Common/Listenable.cs new file mode 100644 index 0000000..6da98ab --- /dev/null +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Common/Listenable.cs @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace Apache.Ignite.Core.Impl.Common +{ + using Apache.Ignite.Core.Impl.Binary; + using Apache.Ignite.Core.Impl.Unmanaged; + + /// <summary> + /// Platform listenable. + /// </summary> + internal class Listenable : PlatformTarget + { + /** */ + private const int OpCancel = 1; + + /// <summary> + /// Initializes a new instance of the <see cref="Listenable"/> class. + /// </summary> + /// <param name="target">Target.</param> + /// <param name="marsh">Marshaller.</param> + public Listenable(IUnmanagedTarget target, Marshaller marsh) : base(target, marsh) + { + // No-op. + } + + /// <summary> + /// Cancels the listenable. + /// </summary> + public void Cancel() + { + DoOutInOp(OpCancel); + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/59e6fec0/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeImpl.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeImpl.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeImpl.cs index bc7c7d9..d36caf3 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeImpl.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeImpl.cs @@ -195,7 +195,7 @@ namespace Apache.Ignite.Core.Impl.Compute var future = holder.Future; - future.SetTarget(futTarget); + future.SetTarget(new Listenable(futTarget, Marshaller)); return future; } @@ -550,7 +550,7 @@ namespace Apache.Ignite.Core.Impl.Compute writeAction(writer); }); - holder.Future.SetTarget(futTarget); + holder.Future.SetTarget(new Listenable(futTarget, Marshaller)); } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/ignite/blob/59e6fec0/modules/platforms/dotnet/Apache.Ignite.Core/Impl/PlatformTarget.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/PlatformTarget.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/PlatformTarget.cs index f4a07f6..9cf2a6c 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/PlatformTarget.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/PlatformTarget.cs @@ -869,7 +869,7 @@ namespace Apache.Ignite.Core.Impl throw; } - fut.SetTarget(futTarget); + fut.SetTarget(new Listenable(futTarget, _marsh)); return fut; } http://git-wip-us.apache.org/repos/asf/ignite/blob/59e6fec0/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/IgniteJniNativeMethods.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/IgniteJniNativeMethods.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/IgniteJniNativeMethods.cs index c746866..c4f3e19 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/IgniteJniNativeMethods.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/IgniteJniNativeMethods.cs @@ -167,14 +167,6 @@ namespace Apache.Ignite.Core.Impl.Unmanaged [DllImport(IgniteUtils.FileIgniteJniDll, EntryPoint = "IgniteDestroyJvm")] public static extern void DestroyJvm(void* ctx); - [DllImport(IgniteUtils.FileIgniteJniDll, EntryPoint = "IgniteListenableCancel")] - [return: MarshalAs(UnmanagedType.U1)] - public static extern bool ListenableCancel(void* ctx, void* target); - - [DllImport(IgniteUtils.FileIgniteJniDll, EntryPoint = "IgniteListenableIsCancelled")] - [return: MarshalAs(UnmanagedType.U1)] - public static extern bool ListenableIsCancelled(void* ctx, void* target); - [DllImport(IgniteUtils.FileIgniteJniDll, EntryPoint = "IgniteSetConsoleHandler")] public static extern void SetConsoleHandler(void* consoleHandler); http://git-wip-us.apache.org/repos/asf/ignite/blob/59e6fec0/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedUtils.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedUtils.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedUtils.cs index f36c35f..0a2a1f0 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedUtils.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/UnmanagedUtils.cs @@ -516,11 +516,6 @@ namespace Apache.Ignite.Core.Impl.Unmanaged JNI.DestroyJvm(ctx); } - internal static bool ListenableCancel(IUnmanagedTarget target) - { - return JNI.ListenableCancel(target.Context, target.Target); - } - #endregion } }
