IGNITE-4583: new async API at the IgniteCluster, IgniteEvents, IgniteMessaging, IgniteServices, Transaction.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/fde1f486 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/fde1f486 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/fde1f486 Branch: refs/heads/ignite-4475-async Commit: fde1f486d6465601cfd40d0eb775141501024a28 Parents: dd4d439 Author: devozerov <voze...@gridgain.com> Authored: Wed Feb 8 15:42:07 2017 +0300 Committer: devozerov <voze...@gridgain.com> Committed: Wed Feb 8 15:42:07 2017 +0300 ---------------------------------------------------------------------- .../ignite/tests/utils/TestTransaction.java | 11 + .../java/org/apache/ignite/IgniteCluster.java | 126 +++++++++++ .../java/org/apache/ignite/IgniteEvents.java | 128 ++++++++++- .../java/org/apache/ignite/IgniteMessaging.java | 27 +++ .../java/org/apache/ignite/IgniteServices.java | 221 +++++++++++++++++-- .../ignite/internal/IgniteEventsImpl.java | 79 +++++++ .../ignite/internal/IgniteMessagingImpl.java | 39 ++++ .../ignite/internal/IgniteServicesImpl.java | 108 +++++++++ .../cluster/IgniteClusterAsyncImpl.java | 28 ++- .../internal/cluster/IgniteClusterImpl.java | 24 +- .../cache/transactions/IgniteInternalTx.java | 3 - .../transactions/TransactionProxyImpl.java | 39 +++- ...formDotNetEntityFrameworkCacheExtension.java | 1 - .../platform/events/PlatformEvents.java | 57 +++-- .../platform/messaging/PlatformMessaging.java | 35 +-- .../platform/services/PlatformServices.java | 95 +++++--- .../transactions/PlatformTransactions.java | 9 +- .../apache/ignite/transactions/Transaction.java | 22 ++ .../continuous/GridEventConsumeSelfTest.java | 169 ++++++++++++++ .../internal/processors/igfs/IgfsMock.java | 5 + .../GridServiceProcessorAbstractSelfTest.java | 209 ++++++++++++++++++ .../ignite/messaging/GridMessagingSelfTest.java | 54 ++++- .../cache/GridAbstractCacheStoreSelfTest.java | 11 + .../multijvm/IgniteClusterProcessProxy.java | 13 ++ .../multijvm/IgniteEventsProcessProxy.java | 31 +++ ...gniteProjectionStartStopRestartSelfTest.java | 6 +- 26 files changed, 1437 insertions(+), 113 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/fde1f486/modules/cassandra/store/src/test/java/org/apache/ignite/tests/utils/TestTransaction.java ---------------------------------------------------------------------- diff --git a/modules/cassandra/store/src/test/java/org/apache/ignite/tests/utils/TestTransaction.java b/modules/cassandra/store/src/test/java/org/apache/ignite/tests/utils/TestTransaction.java index 5f3ec69..4a03d25 100644 --- a/modules/cassandra/store/src/test/java/org/apache/ignite/tests/utils/TestTransaction.java +++ b/modules/cassandra/store/src/test/java/org/apache/ignite/tests/utils/TestTransaction.java @@ -17,6 +17,7 @@ package org.apache.ignite.tests.utils; +import org.apache.ignite.IgniteException; import org.apache.ignite.lang.IgniteAsyncSupport; import org.apache.ignite.lang.IgniteFuture; import org.apache.ignite.lang.IgniteUuid; @@ -106,6 +107,11 @@ public class TestTransaction implements Transaction { } /** {@inheritDoc} */ + @Override public IgniteFuture<Void> commitAsync() throws IgniteException { + return null; + } + + /** {@inheritDoc} */ @Override public void close() { // No-op. } @@ -129,4 +135,9 @@ public class TestTransaction implements Transaction { @Override public void rollback() { // No-op. } + + /** {@inheritDoc} */ + @Override public IgniteFuture<Void> rollbackAsync() throws IgniteException { + return null; + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/fde1f486/modules/core/src/main/java/org/apache/ignite/IgniteCluster.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteCluster.java b/modules/core/src/main/java/org/apache/ignite/IgniteCluster.java index 23b03df..dc7b687 100644 --- a/modules/core/src/main/java/org/apache/ignite/IgniteCluster.java +++ b/modules/core/src/main/java/org/apache/ignite/IgniteCluster.java @@ -188,6 +188,33 @@ public interface IgniteCluster extends ClusterGroup, IgniteAsyncSupport { int maxConn) throws IgniteException; /** + * Starts one or more nodes on remote host(s) asynchronously. + * <p> + * This method takes INI file which defines all startup parameters. It can contain one or + * more sections, each for a host or for range of hosts (note that they must have different + * names) and a special '{@code defaults}' section with default values. They are applied to + * undefined parameters in host's sections. + * <p> + * Completed future contains collection of tuples. Each tuple corresponds to one node start attempt and + * contains hostname, success flag and error message if attempt was not successful. Note that + * successful attempt doesn't mean that node was actually started and joined topology. For large + * topologies (> 100s nodes) it can take over 10 minutes for all nodes to start. See individual + * node logs for details. + * + * @param file Configuration file. + * @param restart Whether to stop existing nodes. If {@code true}, all existing + * nodes on the host will be stopped before starting new ones. If + * {@code false}, nodes will be started only if there are less + * nodes on the host than expected. + * @param timeout Connection timeout. + * @param maxConn Number of parallel SSH connections to one host. + * @return a Future representing pending completion of the starting nodes. + * @throws IgniteException In case of error. + */ + public IgniteFuture<Collection<ClusterStartNodeResult>> startNodesAsync(File file, boolean restart, int timeout, + int maxConn) throws IgniteException; + + /** * Starts one or more nodes on remote host(s). * <p> * Each map in {@code hosts} collection @@ -290,6 +317,104 @@ public interface IgniteCluster extends ClusterGroup, IgniteAsyncSupport { @Nullable Map<String, Object> dflts, boolean restart, int timeout, int maxConn) throws IgniteException; /** + * Starts one or more nodes on remote host(s) asynchronously. + * <p> + * Each map in {@code hosts} collection + * defines startup parameters for one host or for a range of hosts. The following + * parameters are supported: + * <table class="doctable"> + * <tr> + * <th>Name</th> + * <th>Type</th> + * <th>Description</th> + * </tr> + * <tr> + * <td><b>host</b></td> + * <td>String</td> + * <td> + * Hostname (required). Can define several hosts if their IPs are sequential. + * E.g., {@code 10.0.0.1~5} defines range of five IP addresses. Other + * parameters are applied to all hosts equally. + * </td> + * </tr> + * <tr> + * <td><b>port</b></td> + * <td>Integer</td> + * <td>Port number (default is {@code 22}).</td> + * </tr> + * <tr> + * <td><b>uname</b></td> + * <td>String</td> + * <td>Username (if not defined, current local username will be used).</td> + * </tr> + * <tr> + * <td><b>passwd</b></td> + * <td>String</td> + * <td>Password (if not defined, private key file must be defined).</td> + * </tr> + * <tr> + * <td><b>key</b></td> + * <td>File</td> + * <td>Private key file (if not defined, password must be defined).</td> + * </tr> + * <tr> + * <td><b>nodes</b></td> + * <td>Integer</td> + * <td> + * Expected number of nodes on the host. If some nodes are started + * already, then only remaining nodes will be started. If current count of + * nodes is equal to this number, and {@code restart} flag is {@code false}, + * then nothing will happen. + * </td> + * </tr> + * <tr> + * <td><b>igniteHome</b></td> + * <td>String</td> + * <td> + * Path to Ignite installation folder. If not defined, IGNITE_HOME + * environment variable must be set on remote hosts. + * </td> + * </tr> + * <tr> + * <td><b>cfg</b></td> + * <td>String</td> + * <td>Path to configuration file (relative to {@code igniteHome}).</td> + * </tr> + * <tr> + * <td><b>script</b></td> + * <td>String</td> + * <td> + * Custom startup script file name and path (relative to {@code igniteHome}). + * You can also specify a space-separated list of parameters in the same + * string (for example: {@code "bin/my-custom-script.sh -v"}). + * </td> + * </tr> + * </table> + * <p> + * {@code dflts} map defines default values. They are applied to undefined parameters in + * {@code hosts} collection. + * <p> + * Completed future contains collection of tuples. Each tuple corresponds to one node start attempt and + * contains hostname, success flag and error message if attempt was not successful. Note that + * successful attempt doesn't mean that node was actually started and joined topology. For large + * topologies (> 100s nodes) it can take over 10 minutes for all nodes to start. See individual + * node logs for details. + * + * @param hosts Startup parameters. + * @param dflts Default values. + * @param restart Whether to stop existing nodes. If {@code true}, all existing + * nodes on the host will be stopped before starting new ones. If + * {@code false}, nodes will be started only if there are less + * nodes on the host than expected. + * @param timeout Connection timeout in milliseconds. + * @param maxConn Number of parallel SSH connections to one host. + * @return a Future representing pending completion of the starting nodes. + * @throws IgniteException In case of error. + */ + public IgniteFuture<Collection<ClusterStartNodeResult>> startNodesAsync(Collection<Map<String, Object>> hosts, + @Nullable Map<String, Object> dflts, boolean restart, int timeout, int maxConn) throws IgniteException; + + /** * Stops nodes satisfying optional set of predicates. * <p> * <b>NOTE:</b> {@code System.exit(Ignition.KILL_EXIT_CODE)} will be executed on each @@ -347,5 +472,6 @@ public interface IgniteCluster extends ClusterGroup, IgniteAsyncSupport { @Nullable public IgniteFuture<?> clientReconnectFuture(); /** {@inheritDoc} */ + @Deprecated @Override public IgniteCluster withAsync(); } http://git-wip-us.apache.org/repos/asf/ignite/blob/fde1f486/modules/core/src/main/java/org/apache/ignite/IgniteEvents.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteEvents.java b/modules/core/src/main/java/org/apache/ignite/IgniteEvents.java index c0e4d3b..c081f2f 100644 --- a/modules/core/src/main/java/org/apache/ignite/IgniteEvents.java +++ b/modules/core/src/main/java/org/apache/ignite/IgniteEvents.java @@ -25,6 +25,7 @@ import org.apache.ignite.events.Event; import org.apache.ignite.lang.IgniteAsyncSupport; import org.apache.ignite.lang.IgniteAsyncSupported; import org.apache.ignite.lang.IgniteBiPredicate; +import org.apache.ignite.lang.IgniteFuture; import org.apache.ignite.lang.IgnitePredicate; import org.jetbrains.annotations.Nullable; @@ -90,13 +91,27 @@ public interface IgniteEvents extends IgniteAsyncSupport { throws IgniteException; /** + * Asynchronously queries nodes in this cluster group for events using passed in predicate filter for event + * selection. + * + * @param p Predicate filter used to query events on remote nodes. + * @param timeout Maximum time to wait for result, {@code 0} to wait forever. + * @param types Event types to be queried. + * @return a Future representing pending completion of the query. The completed future contains + * collection of grid events returned from specified nodes. + * @throws IgniteException If query failed. + */ + public <T extends Event> IgniteFuture<List<T>> remoteQueryAsync(IgnitePredicate<T> p, long timeout, + @Nullable int... types) throws IgniteException; + + /** * Adds event listener for specified events to all nodes in the cluster group (possibly including * local node if it belongs to the cluster group as well). This means that all events occurring on * any node within this cluster group that pass remote filter will be sent to local node for * local listener notifications. * <p> * The listener can be unsubscribed automatically if local node stops, if {@code locLsnr} callback - * returns {@code false} or if {@link #stopRemoteListen(UUID)} is called. + * returns {@code false} or if {@link #stopRemoteListen(UUID)} or {@link #stopRemoteListenAsync(UUID)} are called. * * @param locLsnr Listener callback that is called on local node. If {@code null}, this events will be handled * on remote nodes by passed in {@code rmtFilter}. @@ -108,7 +123,8 @@ public interface IgniteEvents extends IgniteAsyncSupport { * @param types Types of events to listen for. If not provided, all events that pass the * provided remote filter will be sent to local node. * @param <T> Type of the event. - * @return {@code Operation ID} that can be passed to {@link #stopRemoteListen(UUID)} method to stop listening. + * @return {@code Operation ID} that can be passed to {@link #stopRemoteListen(UUID)} or + * {@link #stopRemoteListenAsync(UUID)} methods to stop listening. * @throws IgniteException If failed to add listener. */ @IgniteAsyncSupported @@ -118,6 +134,35 @@ public interface IgniteEvents extends IgniteAsyncSupport { throws IgniteException; /** + * Asynchronously adds event listener for specified events to all nodes in the cluster group (possibly including + * local node if it belongs to the cluster group as well). This means that all events occurring on + * any node within this cluster group that pass remote filter will be sent to local node for + * local listener notifications. + * <p> + * The listener can be unsubscribed automatically if local node stops, if {@code locLsnr} callback + * returns {@code false} or if {@link #stopRemoteListen(UUID)} or {@link #stopRemoteListenAsync(UUID)} are called. + * + * @param <T> Type of the event. + * @param locLsnr Listener callback that is called on local node. If {@code null}, this events will be handled + * on remote nodes by passed in {@code rmtFilter}. + * @param rmtFilter Filter callback that is called on remote node. Only events that pass the remote filter + * will be sent to local node. If {@code null}, all events of specified types will + * be sent to local node. This remote filter can be used to pre-handle events remotely, + * before they are passed in to local callback. It will be auto-unsubsribed on the node + * where event occurred in case if it returns {@code false}. + * @param types Types of events to listen for. If not provided, all events that pass the + * provided remote filter will be sent to local node. + * @return a Future representing pending completion of the operation. The completed future contains + * {@code Operation ID} that can be passed to {@link #stopRemoteListen(UUID)} or + * {@link #stopRemoteListenAsync(UUID)} methods to stop listening. + * @throws IgniteException If failed to add listener. + */ + public <T extends Event> IgniteFuture<UUID> remoteListenAsync(@Nullable IgniteBiPredicate<UUID, T> locLsnr, + @Nullable IgnitePredicate<T> rmtFilter, + @Nullable int... types) + throws IgniteException; + + /** * Adds event listener for specified events to all nodes in the cluster group (possibly including * local node if it belongs to the cluster group as well). This means that all events occurring on * any node within this cluster group that pass remote filter will be sent to local node for @@ -148,9 +193,11 @@ public interface IgniteEvents extends IgniteAsyncSupport { * @param types Types of events to listen for. If not provided, all events that pass the * provided remote filter will be sent to local node. * @param <T> Type of the event. - * @return {@code Operation ID} that can be passed to {@link #stopRemoteListen(UUID)} method to stop listening. - * @see #stopRemoteListen(UUID) + * @return {@code Operation ID} that can be passed to {@link #stopRemoteListen(UUID)} or + * {@link #stopRemoteListen(UUID)} methods to stop listening. * @throws IgniteException If failed to add listener. + * @see #stopRemoteListen(UUID) + * @see #stopRemoteListenAsync(UUID) */ @IgniteAsyncSupported public <T extends Event> UUID remoteListen(int bufSize, @@ -162,6 +209,50 @@ public interface IgniteEvents extends IgniteAsyncSupport { throws IgniteException; /** + * Asynchronously adds event listener for specified events to all nodes in the cluster group (possibly including + * local node if it belongs to the cluster group as well). This means that all events occurring on + * any node within this cluster group that pass remote filter will be sent to local node for + * local listener notification. + * + * @param <T> Type of the event. + * @param bufSize Remote events buffer size. Events from remote nodes won't be sent until buffer + * is full or time interval is exceeded. + * @param interval Maximum time interval after which events from remote node will be sent. Events + * from remote nodes won't be sent until buffer is full or time interval is exceeded. + * @param autoUnsubscribe Flag indicating that event listeners on remote nodes should be + * automatically unregistered if master node (node that initiated event listening) leaves + * topology. If this flag is {@code false}, listeners will be unregistered only when + * {@link #stopRemoteListen(UUID)} method is called, or the {@code 'callback (locLsnr)'} + * passed in returns {@code false}. + * @param locLsnr Callback that is called on local node. If this predicate returns {@code true}, + * the implementation will continue listening to events. Otherwise, events + * listening will be stopped and listeners will be unregistered on all nodes + * in the cluster group. If {@code null}, this events will be handled on remote nodes by + * passed in {@code rmtFilter} until local node stops (if {@code 'autoUnsubscribe'} is {@code true}) + * or until {@link #stopRemoteListen(UUID)} is called. + * @param rmtFilter Filter callback that is called on remote node. Only events that pass the remote filter + * will be sent to local node. If {@code null}, all events of specified types will + * be sent to local node. This remote filter can be used to pre-handle events remotely, + * before they are passed in to local callback. It will be auto-unsubsribed on the node + * where event occurred in case if it returns {@code false}. + * @param types Types of events to listen for. If not provided, all events that pass the + * provided remote filter will be sent to local node. + * @return a Future representing pending completion of the operation. The completed future contains + * {@code Operation ID} that can be passed to {@link #stopRemoteListen(UUID)} + * or {@link #stopRemoteListen(UUID)} methods to stop listening. + * @throws IgniteException If failed to add listener. + * @see #stopRemoteListen(UUID) + * @see #stopRemoteListenAsync(UUID) + */ + public <T extends Event> IgniteFuture<UUID> remoteListenAsync(int bufSize, + long interval, + boolean autoUnsubscribe, + @Nullable IgniteBiPredicate<UUID, T> locLsnr, + @Nullable IgnitePredicate<T> rmtFilter, + @Nullable int... types) + throws IgniteException; + + /** * Stops listening to remote events. This will unregister all listeners identified with provided * operation ID on all nodes defined by {@link #clusterGroup()}. * <p> @@ -169,13 +260,27 @@ public interface IgniteEvents extends IgniteAsyncSupport { * * @param opId Operation ID that was returned from * {@link #remoteListen(IgniteBiPredicate, IgnitePredicate, int...)} method. - * @see #remoteListen(IgniteBiPredicate, IgnitePredicate, int...) * @throws IgniteException If failed to stop listeners. + * @see #remoteListen(IgniteBiPredicate, IgnitePredicate, int...) + * @see #remoteListenAsync(int, long, boolean, IgniteBiPredicate, IgnitePredicate, int...) */ @IgniteAsyncSupported public void stopRemoteListen(UUID opId) throws IgniteException; /** + * Asynchronously stops listening to remote events. This will unregister all listeners identified with provided + * operation ID on all nodes defined by {@link #clusterGroup()}. + * + * @param opId Operation ID that was returned from + * {@link #remoteListen(IgniteBiPredicate, IgnitePredicate, int...)} method. + * @return a Future representing pending completion of the operation. + * @throws IgniteException If failed to stop listeners. + * @see #remoteListen(IgniteBiPredicate, IgnitePredicate, int...) + * @see #remoteListenAsync(int, long, boolean, IgniteBiPredicate, IgnitePredicate, int...) + */ + public IgniteFuture<Void> stopRemoteListenAsync(UUID opId) throws IgniteException; + + /** * Waits for the specified events. * <p> * Supports asynchronous execution (see {@link IgniteAsyncSupport}). @@ -191,6 +296,18 @@ public interface IgniteEvents extends IgniteAsyncSupport { throws IgniteException; /** + * Create future to wait for the specified events. + * + * @param filter Optional filtering predicate. Only if predicates evaluates to {@code true} will the event + * end the wait. + * @param types Types of the events to wait for. If not provided, all events will be passed to the filter. + * @return a Future representing pending completion of the operation. The completed future contains grid event. + * @throws IgniteException If wait was interrupted. + */ + public <T extends Event> IgniteFuture<T> waitForLocalAsync(@Nullable IgnitePredicate<T> filter, + @Nullable int... types) throws IgniteException; + + /** * Queries local node for events using passed-in predicate filter for event selection. * * @param p Predicate to filter events. All predicates must be satisfied for the @@ -269,5 +386,6 @@ public interface IgniteEvents extends IgniteAsyncSupport { public boolean isEnabled(int type); /** {@inheritDoc} */ + @Deprecated @Override public IgniteEvents withAsync(); } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/fde1f486/modules/core/src/main/java/org/apache/ignite/IgniteMessaging.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteMessaging.java b/modules/core/src/main/java/org/apache/ignite/IgniteMessaging.java index ab554af..bb96d65 100644 --- a/modules/core/src/main/java/org/apache/ignite/IgniteMessaging.java +++ b/modules/core/src/main/java/org/apache/ignite/IgniteMessaging.java @@ -25,6 +25,7 @@ import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.lang.IgniteAsyncSupport; import org.apache.ignite.lang.IgniteAsyncSupported; import org.apache.ignite.lang.IgniteBiPredicate; +import org.apache.ignite.lang.IgniteFuture; import org.jetbrains.annotations.Nullable; /** @@ -150,6 +151,22 @@ public interface IgniteMessaging extends IgniteAsyncSupport { public UUID remoteListen(@Nullable Object topic, IgniteBiPredicate<UUID, ?> p) throws IgniteException; /** + * Asynchronously adds a message listener for a given topic to all nodes in the cluster group (possibly including + * this node if it belongs to the cluster group as well). This means that any node within this cluster + * group can send a message for a given topic and all nodes within the cluster group will receive + * listener notifications. + * + * @param topic Topic to subscribe to, {@code null} means default topic. + * @param p Predicate that is called on each node for each received message. If predicate returns {@code false}, + * then it will be unsubscribed from any further notifications. + * @return a Future representing pending completion of the operation. The completed future contains + * {@code Operation ID} that can be passed to {@link #stopRemoteListen(UUID)} method to stop listening. + * @throws IgniteException If failed to add listener. + */ + public IgniteFuture<UUID> remoteListenAsync(@Nullable Object topic, IgniteBiPredicate<UUID, ?> p) + throws IgniteException; + + /** * Unregisters all listeners identified with provided operation ID on all nodes in the cluster group. * <p> * Supports asynchronous execution (see {@link IgniteAsyncSupport}). @@ -160,6 +177,16 @@ public interface IgniteMessaging extends IgniteAsyncSupport { @IgniteAsyncSupported public void stopRemoteListen(UUID opId) throws IgniteException; + /** + * Asynchronously unregisters all listeners identified with provided operation ID on all nodes in the cluster group. + * + * @param opId Listen ID that was returned from {@link #remoteListen(Object, IgniteBiPredicate)} method. + * @return a Future representing pending completion of the operation. + * @throws IgniteException If failed to unregister listeners. + */ + public IgniteFuture<Void> stopRemoteListenAsync(UUID opId) throws IgniteException; + /** {@inheritDoc} */ + @Deprecated @Override IgniteMessaging withAsync(); } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/fde1f486/modules/core/src/main/java/org/apache/ignite/IgniteServices.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteServices.java b/modules/core/src/main/java/org/apache/ignite/IgniteServices.java index 8365ec7..1c01598 100644 --- a/modules/core/src/main/java/org/apache/ignite/IgniteServices.java +++ b/modules/core/src/main/java/org/apache/ignite/IgniteServices.java @@ -22,6 +22,7 @@ import org.apache.ignite.cluster.ClusterGroup; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.lang.IgniteAsyncSupport; import org.apache.ignite.lang.IgniteAsyncSupported; +import org.apache.ignite.lang.IgniteFuture; import org.apache.ignite.resources.IgniteInstanceResource; import org.apache.ignite.services.Service; import org.apache.ignite.services.ServiceConfiguration; @@ -150,7 +151,8 @@ public interface IgniteServices extends IgniteAsyncSupport { * when a singleton service instance will be active on more than one node (e.g. crash detection delay). * <p> * This method is analogous to calling - * {@link #deployMultiple(String, org.apache.ignite.services.Service, int, int) deployMultiple(name, svc, 1, 1)} method. + * {@link #deployMultiple(String, org.apache.ignite.services.Service, int, int) deployMultiple(name, svc, 1, 1)} + * method. * * @param name Service name. * @param svc Service instance. @@ -160,13 +162,35 @@ public interface IgniteServices extends IgniteAsyncSupport { public void deployClusterSingleton(String name, Service svc) throws IgniteException; /** + * Asynchronously deploys a cluster-wide singleton service. Ignite will guarantee that there is always + * one instance of the service in the cluster. In case if grid node on which the service + * was deployed crashes or stops, Ignite will automatically redeploy it on another node. + * However, if the node on which the service is deployed remains in topology, then the + * service will always be deployed on that node only, regardless of topology changes. + * <p> + * Note that in case of topology changes, due to network delays, there may be a temporary situation + * when a singleton service instance will be active on more than one node (e.g. crash detection delay). + * <p> + * This method is analogous to calling + * {@link #deployMultipleAsync(String, org.apache.ignite.services.Service, int, int) + * deployMultipleAsync(name, svc, 1, 1)} method. + * + * @param name Service name. + * @param svc Service instance. + * @return a Future representing pending completion of the operation. + * @throws IgniteException If failed to deploy service. + */ + public IgniteFuture<Void> deployClusterSingletonAsync(String name, Service svc) throws IgniteException; + + /** * Deploys a per-node singleton service. Ignite will guarantee that there is always * one instance of the service running on each node. Whenever new nodes are started * within the underlying cluster group, Ignite will automatically deploy one instance of * the service on every new node. * <p> * This method is analogous to calling - * {@link #deployMultiple(String, org.apache.ignite.services.Service, int, int) deployMultiple(name, svc, 0, 1)} method. + * {@link #deployMultiple(String, org.apache.ignite.services.Service, int, int) deployMultiple(name, svc, 0, 1)} + * method. * * @param name Service name. * @param svc Service instance. @@ -176,6 +200,23 @@ public interface IgniteServices extends IgniteAsyncSupport { public void deployNodeSingleton(String name, Service svc) throws IgniteException; /** + * Asynchronously deploys a per-node singleton service. Ignite will guarantee that there is always + * one instance of the service running on each node. Whenever new nodes are started + * within the underlying cluster group, Ignite will automatically deploy one instance of + * the service on every new node. + * <p> + * This method is analogous to calling + * {@link #deployMultipleAsync(String, org.apache.ignite.services.Service, int, int) + * deployMultipleAsync(name, svc, 0, 1)} method. + * + * @param name Service name. + * @param svc Service instance. + * @return a Future representing pending completion of the operation. + * @throws IgniteException If failed to deploy service. + */ + public IgniteFuture<Void> deployNodeSingletonAsync(String name, Service svc) throws IgniteException; + + /** * Deploys one instance of this service on the primary node for a given affinity key. * Whenever topology changes and primary node assignment changes, Ignite will always * make sure that the service is undeployed on the previous primary node and deployed @@ -184,8 +225,8 @@ public interface IgniteServices extends IgniteAsyncSupport { * Note that in case of topology changes, due to network delays, there may be a temporary situation * when a service instance will be active on more than one node (e.g. crash detection delay). * <p> - * This method is analogous to the invocation of {@link #deploy(org.apache.ignite.services.ServiceConfiguration)} method - * as follows: + * This method is analogous to the invocation of {@link #deploy(org.apache.ignite.services.ServiceConfiguration)} + * method as follows: * <pre name="code" class="java"> * ServiceConfiguration cfg = new ServiceConfiguration(); * @@ -211,6 +252,41 @@ public interface IgniteServices extends IgniteAsyncSupport { throws IgniteException; /** + * Asynchronously deploys one instance of this service on the primary node for a given affinity key. + * Whenever topology changes and primary node assignment changes, Ignite will always + * make sure that the service is undeployed on the previous primary node and deployed + * on the new primary node. + * <p> + * Note that in case of topology changes, due to network delays, there may be a temporary situation + * when a service instance will be active on more than one node (e.g. crash detection delay). + * <p> + * This method is analogous to the invocation of + * {@link #deployAsync(org.apache.ignite.services.ServiceConfiguration)} method as follows: + * <pre name="code" class="java"> + * ServiceConfiguration cfg = new ServiceConfiguration(); + * + * cfg.setName(name); + * cfg.setService(svc); + * cfg.setCacheName(cacheName); + * cfg.setAffinityKey(affKey); + * cfg.setTotalCount(1); + * cfg.setMaxPerNodeCount(1); + * + * ignite.services().deployAsync(cfg); + * </pre> + * + * @param name Service name. + * @param svc Service instance. + * @param cacheName Name of the cache on which affinity for key should be calculated, {@code null} for + * default cache. + * @param affKey Affinity cache key. + * @return a Future representing pending completion of the operation. + * @throws IgniteException If failed to deploy service. + */ + public IgniteFuture<Void> deployKeyAffinitySingletonAsync(String name, Service svc, @Nullable String cacheName, + Object affKey) throws IgniteException; + + /** * Deploys multiple instances of the service on the grid. Ignite will deploy a * maximum amount of services equal to {@code 'totalCnt'} parameter making sure that * there are no more than {@code 'maxPerNodeCnt'} service instances running @@ -221,8 +297,8 @@ public interface IgniteServices extends IgniteAsyncSupport { * Note that at least one of {@code 'totalCnt'} or {@code 'maxPerNodeCnt'} parameters must have * value greater than {@code 0}. * <p> - * This method is analogous to the invocation of {@link #deploy(org.apache.ignite.services.ServiceConfiguration)} method - * as follows: + * This method is analogous to the invocation of {@link #deploy(org.apache.ignite.services.ServiceConfiguration)} + * method as follows: * <pre name="code" class="java"> * ServiceConfiguration cfg = new ServiceConfiguration(); * @@ -244,20 +320,57 @@ public interface IgniteServices extends IgniteAsyncSupport { public void deployMultiple(String name, Service svc, int totalCnt, int maxPerNodeCnt) throws IgniteException; /** + * Asynchronously deploys multiple instances of the service on the grid. Ignite will deploy a + * maximum amount of services equal to {@code 'totalCnt'} parameter making sure that + * there are no more than {@code 'maxPerNodeCnt'} service instances running + * on each node. Whenever topology changes, Ignite will automatically rebalance + * the deployed services within cluster to make sure that each node will end up with + * about equal number of deployed instances whenever possible. + * <p> + * Note that at least one of {@code 'totalCnt'} or {@code 'maxPerNodeCnt'} parameters must have + * value greater than {@code 0}. + * <p> + * This method is analogous to the invocation of + * {@link #deployAsync(org.apache.ignite.services.ServiceConfiguration)} method as follows: + * <pre name="code" class="java"> + * ServiceConfiguration cfg = new ServiceConfiguration(); + * + * cfg.setName(name); + * cfg.setService(svc); + * cfg.setTotalCount(totalCnt); + * cfg.setMaxPerNodeCount(maxPerNodeCnt); + * + * ignite.services().deployAsync(cfg); + * </pre> + * + * @param name Service name. + * @param svc Service instance. + * @param totalCnt Maximum number of deployed services in the grid, {@code 0} for unlimited. + * @param maxPerNodeCnt Maximum number of deployed services on each node, {@code 0} for unlimited. + * @return a Future representing pending completion of the operation. + * @throws IgniteException If failed to deploy service. + */ + public IgniteFuture<Void> deployMultipleAsync(String name, Service svc, int totalCnt, int maxPerNodeCnt) + throws IgniteException; + + /** * Deploys multiple instances of the service on the grid according to provided * configuration. Ignite will deploy a maximum amount of services equal to * {@link org.apache.ignite.services.ServiceConfiguration#getTotalCount() cfg.getTotalCount()} parameter - * making sure that there are no more than {@link org.apache.ignite.services.ServiceConfiguration#getMaxPerNodeCount() cfg.getMaxPerNodeCount()} + * making sure that there are no more than + * {@link org.apache.ignite.services.ServiceConfiguration#getMaxPerNodeCount() cfg.getMaxPerNodeCount()} * service instances running on each node. Whenever topology changes, Ignite will automatically rebalance * the deployed services within cluster to make sure that each node will end up with * about equal number of deployed instances whenever possible. * <p> - * If {@link org.apache.ignite.services.ServiceConfiguration#getAffinityKey() cfg.getAffinityKey()} is not {@code null}, then Ignite - * will deploy the service on the primary node for given affinity key. The affinity will be calculated - * on the cache with {@link org.apache.ignite.services.ServiceConfiguration#getCacheName() cfg.getCacheName()} name. + * If {@link org.apache.ignite.services.ServiceConfiguration#getAffinityKey() cfg.getAffinityKey()} + * is not {@code null}, then Ignite will deploy the service on the primary node for given affinity key. + * The affinity will be calculated on the cache with + * {@link org.apache.ignite.services.ServiceConfiguration#getCacheName() cfg.getCacheName()} name. * <p> - * If {@link org.apache.ignite.services.ServiceConfiguration#getNodeFilter() cfg.getNodeFilter()} is not {@code null}, then - * Ignite will deploy service on all grid nodes for which the provided filter evaluates to {@code true}. + * If {@link org.apache.ignite.services.ServiceConfiguration#getNodeFilter() cfg.getNodeFilter()} + * is not {@code null}, then Ignite will deploy service on all grid nodes for which + * the provided filter evaluates to {@code true}. * The node filter will be checked in addition to the underlying cluster group filter, or the * whole grid, if the underlying cluster group includes all the cluster nodes. * <p> @@ -283,12 +396,56 @@ public interface IgniteServices extends IgniteAsyncSupport { public void deploy(ServiceConfiguration cfg) throws IgniteException; /** + * Asynchronously deploys multiple instances of the service on the grid according to provided + * configuration. Ignite will deploy a maximum amount of services equal to + * {@link org.apache.ignite.services.ServiceConfiguration#getTotalCount() cfg.getTotalCount()} parameter + * making sure that there are no more than + * {@link org.apache.ignite.services.ServiceConfiguration#getMaxPerNodeCount() cfg.getMaxPerNodeCount()} + * service instances running on each node. Whenever topology changes, Ignite will automatically rebalance + * the deployed services within cluster to make sure that each node will end up with + * about equal number of deployed instances whenever possible. + * <p> + * If {@link org.apache.ignite.services.ServiceConfiguration#getAffinityKey() cfg.getAffinityKey()} + * is not {@code null}, then Ignite + * will deploy the service on the primary node for given affinity key. The affinity will be calculated + * on the cache with {@link org.apache.ignite.services.ServiceConfiguration#getCacheName() cfg.getCacheName()} name. + * <p> + * If {@link org.apache.ignite.services.ServiceConfiguration#getNodeFilter() cfg.getNodeFilter()} + * is not {@code null}, then Ignite will deploy service on all grid nodes + * for which the provided filter evaluates to {@code true}. + * The node filter will be checked in addition to the underlying cluster group filter, or the + * whole grid, if the underlying cluster group includes all the cluster nodes. + * <p> + * Note that at least one of {@code 'totalCnt'} or {@code 'maxPerNodeCnt'} parameters must have + * value greater than {@code 0}. + * <p> + * Here is an example of creating service deployment configuration: + * <pre name="code" class="java"> + * ServiceConfiguration cfg = new ServiceConfiguration(); + * + * cfg.setName(name); + * cfg.setService(svc); + * cfg.setTotalCount(0); // Unlimited. + * cfg.setMaxPerNodeCount(2); // Deploy 2 instances of service on each node. + * + * ignite.services().deployAsync(cfg); + * </pre> + * + * @param cfg Service configuration. + * @return a Future representing pending completion of the operation. + * @throws IgniteException If failed to deploy service. + */ + public IgniteFuture<Void> deployAsync(ServiceConfiguration cfg) throws IgniteException; + + /** * Cancels service deployment. If a service with specified name was deployed on the grid, - * then {@link org.apache.ignite.services.Service#cancel(org.apache.ignite.services.ServiceContext)} method will be called on it. + * then {@link org.apache.ignite.services.Service#cancel(org.apache.ignite.services.ServiceContext)} + * method will be called on it. * <p> - * Note that Ignite cannot guarantee that the service exits from {@link org.apache.ignite.services.Service#execute(org.apache.ignite.services.ServiceContext)} - * method whenever {@link org.apache.ignite.services.Service#cancel(org.apache.ignite.services.ServiceContext)} is called. It is up to the user to - * make sure that the service code properly reacts to cancellations. + * Note that Ignite cannot guarantee that the service exits from + * {@link org.apache.ignite.services.Service#execute(org.apache.ignite.services.ServiceContext)} + * method whenever {@link org.apache.ignite.services.Service#cancel(org.apache.ignite.services.ServiceContext)} + * is called. It is up to the user to make sure that the service code properly reacts to cancellations. * <p> * Supports asynchronous execution (see {@link IgniteAsyncSupport}). * @@ -299,6 +456,23 @@ public interface IgniteServices extends IgniteAsyncSupport { public void cancel(String name) throws IgniteException; /** + * Asynchronously cancels service deployment. If a service with specified name was deployed on the grid, + * then {@link org.apache.ignite.services.Service#cancel(org.apache.ignite.services.ServiceContext)} + * method will be called on it. + * <p> + * Note that Ignite cannot guarantee that the service exits from + * {@link org.apache.ignite.services.Service#execute(org.apache.ignite.services.ServiceContext)} + * method whenever {@link org.apache.ignite.services.Service#cancel(org.apache.ignite.services.ServiceContext)} + * is called. It is up to the user to + * make sure that the service code properly reacts to cancellations. + * + * @param name Name of service to cancel. + * @return a Future representing pending completion of the operation. + * @throws IgniteException If failed to cancel service. + */ + public IgniteFuture<Void> cancelAsync(String name) throws IgniteException; + + /** * Cancels all deployed services. * <p> * Note that depending on user logic, it may still take extra time for a service to @@ -312,6 +486,17 @@ public interface IgniteServices extends IgniteAsyncSupport { public void cancelAll() throws IgniteException; /** + * Asynchronously cancels all deployed services. + * <p> + * Note that depending on user logic, it may still take extra time for a service to + * finish execution, even after it was cancelled. + * + * @return a Future representing pending completion of the operation. + * @throws IgniteException If failed to cancel services. + */ + public IgniteFuture<Void> cancelAllAsync() throws IgniteException; + + /** * Gets metadata about all deployed services in the grid. * * @return Metadata about all deployed services in the grid. @@ -364,8 +549,10 @@ public interface IgniteServices extends IgniteAsyncSupport { * @return Either proxy over remote service or local service if it is deployed locally. * @throws IgniteException If failed to create service proxy. */ - public <T> T serviceProxy(String name, Class<? super T> svcItf, boolean sticky, long timeout) throws IgniteException; + public <T> T serviceProxy(String name, Class<? super T> svcItf, boolean sticky, long timeout) + throws IgniteException; /** {@inheritDoc} */ + @Deprecated @Override public IgniteServices withAsync(); } http://git-wip-us.apache.org/repos/asf/ignite/blob/fde1f486/modules/core/src/main/java/org/apache/ignite/internal/IgniteEventsImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteEventsImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteEventsImpl.java index 3c6218d..9acccab 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteEventsImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteEventsImpl.java @@ -27,13 +27,16 @@ import java.util.List; import java.util.UUID; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteEvents; +import org.apache.ignite.IgniteException; import org.apache.ignite.cluster.ClusterGroup; import org.apache.ignite.events.Event; import org.apache.ignite.internal.cluster.ClusterGroupAdapter; +import org.apache.ignite.internal.util.future.IgniteFutureImpl; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.A; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteBiPredicate; +import org.apache.ignite.lang.IgniteFuture; import org.apache.ignite.lang.IgnitePredicate; import org.jetbrains.annotations.Nullable; @@ -93,12 +96,34 @@ public class IgniteEventsImpl extends AsyncSupportAdapter<IgniteEvents> implemen } /** {@inheritDoc} */ + @Override public <T extends Event> IgniteFuture<List<T>> remoteQueryAsync(IgnitePredicate<T> p, long timeout, + @Nullable int... types) throws IgniteException { + + guard(); + + try { + return new IgniteFutureImpl<>(ctx.event().remoteEventsAsync(compoundPredicate(p, types), + prj.nodes(), timeout)); + } + finally { + unguard(); + } + } + + /** {@inheritDoc} */ @Override public <T extends Event> UUID remoteListen(@Nullable IgniteBiPredicate<UUID, T> locLsnr, @Nullable IgnitePredicate<T> rmtFilter, @Nullable int... types) { return remoteListen(1, 0, true, locLsnr, rmtFilter, types); } /** {@inheritDoc} */ + @Override public <T extends Event> IgniteFuture<UUID> remoteListenAsync( + @Nullable IgniteBiPredicate<UUID, T> locLsnr, @Nullable IgnitePredicate<T> rmtFilter, + @Nullable int... types) throws IgniteException { + return remoteListenAsync(1, 0, true, locLsnr, rmtFilter, types); + } + + /** {@inheritDoc} */ @Override public <T extends Event> UUID remoteListen(int bufSize, long interval, boolean autoUnsubscribe, @Nullable IgniteBiPredicate<UUID, T> locLsnr, @Nullable IgnitePredicate<T> rmtFilter, @Nullable int... types) { @@ -128,6 +153,32 @@ public class IgniteEventsImpl extends AsyncSupportAdapter<IgniteEvents> implemen } /** {@inheritDoc} */ + @Override public <T extends Event> IgniteFuture<UUID> remoteListenAsync(int bufSize, long interval, + boolean autoUnsubscribe, @Nullable IgniteBiPredicate<UUID, T> locLsnr, @Nullable IgnitePredicate<T> rmtFilter, + @Nullable int... types) throws IgniteException { + A.ensure(bufSize > 0, "bufSize > 0"); + A.ensure(interval >= 0, "interval >= 0"); + + guard(); + + try { + GridEventConsumeHandler hnd = new GridEventConsumeHandler((IgniteBiPredicate<UUID, Event>)locLsnr, + (IgnitePredicate<Event>)rmtFilter, types); + + return new IgniteFutureImpl<>(ctx.continuous().startRoutine( + hnd, + false, + bufSize, + interval, + autoUnsubscribe, + prj.predicate())); + } + finally { + unguard(); + } + } + + /** {@inheritDoc} */ @Override public void stopRemoteListen(UUID opId) { A.notNull(opId, "consumeId"); @@ -145,6 +196,21 @@ public class IgniteEventsImpl extends AsyncSupportAdapter<IgniteEvents> implemen } /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override public IgniteFuture<Void> stopRemoteListenAsync(UUID opId) throws IgniteException { + A.notNull(opId, "consumeId"); + + guard(); + + try { + return (IgniteFuture<Void>)new IgniteFutureImpl<>(ctx.continuous().stopRoutine(opId)); + } + finally { + unguard(); + } + } + + /** {@inheritDoc} */ @Override public <T extends Event> T waitForLocal(@Nullable IgnitePredicate<T> filter, @Nullable int... types) { guard(); @@ -161,6 +227,19 @@ public class IgniteEventsImpl extends AsyncSupportAdapter<IgniteEvents> implemen } /** {@inheritDoc} */ + @Override public <T extends Event> IgniteFuture<T> waitForLocalAsync(@Nullable IgnitePredicate<T> filter, + @Nullable int... types) throws IgniteException { + guard(); + + try { + return new IgniteFutureImpl<>(ctx.event().waitForEvent(filter, types)); + } + finally { + unguard(); + } + } + + /** {@inheritDoc} */ @Override public <T extends Event> Collection<T> localQuery(IgnitePredicate<T> p, @Nullable int... types) { A.notNull(p, "p"); http://git-wip-us.apache.org/repos/asf/ignite/blob/fde1f486/modules/core/src/main/java/org/apache/ignite/internal/IgniteMessagingImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteMessagingImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteMessagingImpl.java index 2800777..e5c00bd 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteMessagingImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteMessagingImpl.java @@ -25,15 +25,18 @@ import java.io.ObjectStreamException; import java.util.Collection; import java.util.UUID; import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteMessaging; import org.apache.ignite.cluster.ClusterGroup; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.internal.cluster.ClusterGroupAdapter; import org.apache.ignite.internal.processors.continuous.GridContinuousHandler; +import org.apache.ignite.internal.util.future.IgniteFutureImpl; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.A; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteBiPredicate; +import org.apache.ignite.lang.IgniteFuture; import org.jetbrains.annotations.Nullable; /** @@ -200,6 +203,28 @@ public class IgniteMessagingImpl extends AsyncSupportAdapter<IgniteMessaging> } /** {@inheritDoc} */ + @Override public IgniteFuture<UUID> remoteListenAsync(@Nullable Object topic, + IgniteBiPredicate<UUID, ?> p) throws IgniteException { + A.notNull(p, "p"); + + guard(); + + try { + GridContinuousHandler hnd = new GridMessageListenHandler(topic, (IgniteBiPredicate<UUID, Object>)p); + + return new IgniteFutureImpl<>(ctx.continuous().startRoutine(hnd, + false, + 1, + 0, + false, + prj.predicate())); + } + finally { + unguard(); + } + } + + /** {@inheritDoc} */ @Override public void stopRemoteListen(UUID opId) { A.notNull(opId, "opId"); @@ -216,6 +241,20 @@ public class IgniteMessagingImpl extends AsyncSupportAdapter<IgniteMessaging> } } + /** {@inheritDoc} */ + @Override public IgniteFuture<Void> stopRemoteListenAsync(UUID opId) throws IgniteException { + A.notNull(opId, "opId"); + + guard(); + + try { + return (IgniteFuture<Void>)new IgniteFutureImpl<>(ctx.continuous().stopRoutine(opId)); + } + finally { + unguard(); + } + } + /** * <tt>ctx.gateway().readLock()</tt> */ http://git-wip-us.apache.org/repos/asf/ignite/blob/fde1f486/modules/core/src/main/java/org/apache/ignite/internal/IgniteServicesImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteServicesImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteServicesImpl.java index df6e5df..607dccc 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteServicesImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteServicesImpl.java @@ -28,8 +28,10 @@ import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteServices; import org.apache.ignite.cluster.ClusterGroup; import org.apache.ignite.internal.cluster.ClusterGroupAdapter; +import org.apache.ignite.internal.util.future.IgniteFutureImpl; import org.apache.ignite.internal.util.typedef.internal.A; import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteFuture; import org.apache.ignite.services.Service; import org.apache.ignite.services.ServiceConfiguration; import org.apache.ignite.services.ServiceDescriptor; @@ -38,6 +40,7 @@ import org.jetbrains.annotations.Nullable; /** * {@link org.apache.ignite.IgniteServices} implementation. */ +@SuppressWarnings("unchecked") public class IgniteServicesImpl extends AsyncSupportAdapter implements IgniteServices, Externalizable { /** */ private static final long serialVersionUID = 0L; @@ -91,6 +94,21 @@ public class IgniteServicesImpl extends AsyncSupportAdapter implements IgniteSer } /** {@inheritDoc} */ + @Override public IgniteFuture<Void> deployNodeSingletonAsync(String name, Service svc) throws IgniteException { + A.notNull(name, "name"); + A.notNull(svc, "svc"); + + guard(); + + try { + return (IgniteFuture<Void>)new IgniteFutureImpl<>(ctx.service().deployNodeSingleton(prj, name, svc)); + } + finally { + unguard(); + } + } + + /** {@inheritDoc} */ @Override public void deployClusterSingleton(String name, Service svc) { A.notNull(name, "name"); A.notNull(svc, "svc"); @@ -109,6 +127,21 @@ public class IgniteServicesImpl extends AsyncSupportAdapter implements IgniteSer } /** {@inheritDoc} */ + @Override public IgniteFuture<Void> deployClusterSingletonAsync(String name, Service svc) throws IgniteException { + A.notNull(name, "name"); + A.notNull(svc, "svc"); + + guard(); + + try { + return (IgniteFuture<Void>)new IgniteFutureImpl<>(ctx.service().deployClusterSingleton(prj, name, svc)); + } + finally { + unguard(); + } + } + + /** {@inheritDoc} */ @Override public void deployMultiple(String name, Service svc, int totalCnt, int maxPerNodeCnt) { A.notNull(name, "name"); A.notNull(svc, "svc"); @@ -127,6 +160,23 @@ public class IgniteServicesImpl extends AsyncSupportAdapter implements IgniteSer } /** {@inheritDoc} */ + @Override public IgniteFuture<Void> deployMultipleAsync(String name, Service svc, int totalCnt, + int maxPerNodeCnt) throws IgniteException { + A.notNull(name, "name"); + A.notNull(svc, "svc"); + + guard(); + + try { + return (IgniteFuture<Void>)new IgniteFutureImpl<>(ctx.service().deployMultiple(prj, name, svc, + totalCnt, maxPerNodeCnt)); + } + finally { + unguard(); + } + } + + /** {@inheritDoc} */ @Override public void deployKeyAffinitySingleton(String name, Service svc, @Nullable String cacheName, Object affKey) { A.notNull(name, "name"); @@ -147,6 +197,24 @@ public class IgniteServicesImpl extends AsyncSupportAdapter implements IgniteSer } /** {@inheritDoc} */ + @Override public IgniteFuture<Void> deployKeyAffinitySingletonAsync(String name, Service svc, + @Nullable String cacheName, Object affKey) throws IgniteException { + A.notNull(name, "name"); + A.notNull(svc, "svc"); + A.notNull(affKey, "affKey"); + + guard(); + + try { + return (IgniteFuture<Void>)new IgniteFutureImpl<>(ctx.service().deployKeyAffinitySingleton(name, svc, + cacheName, affKey)); + } + finally { + unguard(); + } + } + + /** {@inheritDoc} */ @Override public void deploy(ServiceConfiguration cfg) { A.notNull(cfg, "cfg"); @@ -164,6 +232,20 @@ public class IgniteServicesImpl extends AsyncSupportAdapter implements IgniteSer } /** {@inheritDoc} */ + @Override public IgniteFuture<Void> deployAsync(ServiceConfiguration cfg) throws IgniteException { + A.notNull(cfg, "cfg"); + + guard(); + + try { + return (IgniteFuture<Void>)new IgniteFutureImpl<>(ctx.service().deploy(cfg)); + } + finally { + unguard(); + } + } + + /** {@inheritDoc} */ @Override public void cancel(String name) { A.notNull(name, "name"); @@ -181,6 +263,20 @@ public class IgniteServicesImpl extends AsyncSupportAdapter implements IgniteSer } /** {@inheritDoc} */ + @Override public IgniteFuture<Void> cancelAsync(String name) throws IgniteException { + A.notNull(name, "name"); + + guard(); + + try { + return (IgniteFuture<Void>)new IgniteFutureImpl<>(ctx.service().cancel(name)); + } + finally { + unguard(); + } + } + + /** {@inheritDoc} */ @Override public void cancelAll() { guard(); @@ -196,6 +292,18 @@ public class IgniteServicesImpl extends AsyncSupportAdapter implements IgniteSer } /** {@inheritDoc} */ + @Override public IgniteFuture<Void> cancelAllAsync() throws IgniteException { + guard(); + + try { + return (IgniteFuture<Void>)new IgniteFutureImpl<>(ctx.service().cancelAll()); + } + finally { + unguard(); + } + } + + /** {@inheritDoc} */ @Override public Collection<ServiceDescriptor> serviceDescriptors() { guard(); http://git-wip-us.apache.org/repos/asf/ignite/blob/fde1f486/modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteClusterAsyncImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteClusterAsyncImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteClusterAsyncImpl.java index fb9b190..d392813 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteClusterAsyncImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteClusterAsyncImpl.java @@ -22,7 +22,6 @@ import java.io.File; import java.io.IOException; import java.io.ObjectInput; import java.io.ObjectOutput; -import java.io.ObjectStreamException; import java.util.Collection; import java.util.Map; import java.util.UUID; @@ -30,6 +29,7 @@ import java.util.concurrent.ConcurrentMap; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteCluster; +import org.apache.ignite.IgniteException; import org.apache.ignite.cluster.ClusterGroup; import org.apache.ignite.cluster.ClusterMetrics; import org.apache.ignite.cluster.ClusterNode; @@ -115,7 +115,7 @@ public class IgniteClusterAsyncImpl extends AsyncSupportAdapter<IgniteCluster> int maxConn) { try { - return saveOrGet(cluster.startNodesAsync(file, restart, timeout, maxConn)); + return saveOrGet(cluster.startNodesAsync0(file, restart, timeout, maxConn)); } catch (IgniteCheckedException e) { throw U.convertException(e); @@ -123,6 +123,12 @@ public class IgniteClusterAsyncImpl extends AsyncSupportAdapter<IgniteCluster> } /** {@inheritDoc} */ + @Override public IgniteFuture<Collection<ClusterStartNodeResult>> startNodesAsync(File file, boolean restart, + int timeout, int maxConn) throws IgniteException { + return cluster.startNodesAsync(file, restart, timeout, maxConn); + } + + /** {@inheritDoc} */ @Override public Collection<ClusterStartNodeResult> startNodes( Collection<Map<String, Object>> hosts, @Nullable Map<String, Object> dflts, @@ -131,7 +137,7 @@ public class IgniteClusterAsyncImpl extends AsyncSupportAdapter<IgniteCluster> int maxConn) { try { - return saveOrGet(cluster.startNodesAsync(hosts, dflts, restart, timeout, maxConn)); + return saveOrGet(cluster.startNodesAsync0(hosts, dflts, restart, timeout, maxConn)); } catch (IgniteCheckedException e) { throw U.convertException(e); @@ -139,6 +145,13 @@ public class IgniteClusterAsyncImpl extends AsyncSupportAdapter<IgniteCluster> } /** {@inheritDoc} */ + @Override public IgniteFuture<Collection<ClusterStartNodeResult>> startNodesAsync( + Collection<Map<String, Object>> hosts, @Nullable Map<String, Object> dflts, + boolean restart, int timeout, int maxConn) throws IgniteException { + return cluster.startNodesAsync(hosts, dflts, restart, timeout, maxConn); + } + + /** {@inheritDoc} */ @Override public void stopNodes() { cluster.stopNodes(); } @@ -312,13 +325,4 @@ public class IgniteClusterAsyncImpl extends AsyncSupportAdapter<IgniteCluster> @Override public void writeExternal(ObjectOutput out) throws IOException { out.writeObject(cluster); } - - /** - * @return Cluster async instance. - * - * @throws ObjectStreamException If failed. - */ - protected Object readResolve() throws ObjectStreamException { - return cluster.withAsync(); - } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/fde1f486/modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteClusterImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteClusterImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteClusterImpl.java index aa5e63f..857c1ef 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteClusterImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteClusterImpl.java @@ -47,6 +47,7 @@ import org.apache.ignite.internal.IgniteComponentType; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.util.future.GridCompoundFuture; import org.apache.ignite.internal.util.future.GridFinishedFuture; +import org.apache.ignite.internal.util.future.IgniteFutureImpl; import org.apache.ignite.internal.util.nodestart.IgniteRemoteStartSpecification; import org.apache.ignite.internal.util.nodestart.IgniteSshHelper; import org.apache.ignite.internal.util.nodestart.StartNodeCallable; @@ -222,7 +223,7 @@ public class IgniteClusterImpl extends ClusterGroupAdapter implements IgniteClus throws IgniteException { try { - return startNodesAsync(file, restart, timeout, maxConn).get(); + return startNodesAsync0(file, restart, timeout, maxConn).get(); } catch (IgniteCheckedException e) { throw U.convertException(e); @@ -230,6 +231,12 @@ public class IgniteClusterImpl extends ClusterGroupAdapter implements IgniteClus } /** {@inheritDoc} */ + @Override public IgniteFuture<Collection<ClusterStartNodeResult>> startNodesAsync(File file, boolean restart, + int timeout, int maxConn) throws IgniteException { + return new IgniteFutureImpl<>(startNodesAsync0(file, restart, timeout, maxConn)); + } + + /** {@inheritDoc} */ @Override public Collection<ClusterStartNodeResult> startNodes(Collection<Map<String, Object>> hosts, @Nullable Map<String, Object> dflts, boolean restart, @@ -238,7 +245,7 @@ public class IgniteClusterImpl extends ClusterGroupAdapter implements IgniteClus throws IgniteException { try { - return startNodesAsync(hosts, dflts, restart, timeout, maxConn).get(); + return startNodesAsync0(hosts, dflts, restart, timeout, maxConn).get(); } catch (IgniteCheckedException e) { throw U.convertException(e); @@ -246,6 +253,13 @@ public class IgniteClusterImpl extends ClusterGroupAdapter implements IgniteClus } /** {@inheritDoc} */ + @Override public IgniteFuture<Collection<ClusterStartNodeResult>> startNodesAsync( + Collection<Map<String, Object>> hosts, @Nullable Map<String, Object> dflts, + boolean restart, int timeout, int maxConn) throws IgniteException { + return new IgniteFutureImpl<>(startNodesAsync0(hosts, dflts, restart, timeout, maxConn)); + } + + /** {@inheritDoc} */ @Override public void stopNodes() throws IgniteException { guard(); @@ -330,7 +344,7 @@ public class IgniteClusterImpl extends ClusterGroupAdapter implements IgniteClus * @return Future with results. * @see IgniteCluster#startNodes(java.io.File, boolean, int, int) */ - IgniteInternalFuture<Collection<ClusterStartNodeResult>> startNodesAsync(File file, + IgniteInternalFuture<Collection<ClusterStartNodeResult>> startNodesAsync0(File file, boolean restart, int timeout, int maxConn) @@ -342,7 +356,7 @@ public class IgniteClusterImpl extends ClusterGroupAdapter implements IgniteClus try { IgniteBiTuple<Collection<Map<String, Object>>, Map<String, Object>> t = parseFile(file); - return startNodesAsync(t.get1(), t.get2(), restart, timeout, maxConn); + return startNodesAsync0(t.get1(), t.get2(), restart, timeout, maxConn); } catch (IgniteCheckedException e) { return new GridFinishedFuture<>(e); @@ -358,7 +372,7 @@ public class IgniteClusterImpl extends ClusterGroupAdapter implements IgniteClus * @return Future with results. * @see IgniteCluster#startNodes(java.util.Collection, java.util.Map, boolean, int, int) */ - IgniteInternalFuture<Collection<ClusterStartNodeResult>> startNodesAsync( + IgniteInternalFuture<Collection<ClusterStartNodeResult>> startNodesAsync0( Collection<Map<String, Object>> hosts, @Nullable Map<String, Object> dflts, boolean restart, http://git-wip-us.apache.org/repos/asf/ignite/blob/fde1f486/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java index dd900fe..30f5c5d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java @@ -35,7 +35,6 @@ import org.apache.ignite.internal.processors.cache.KeyCacheObject; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.transactions.IgniteTxTimeoutCheckedException; import org.apache.ignite.internal.util.lang.GridTuple; -import org.apache.ignite.lang.IgniteAsyncSupported; import org.apache.ignite.lang.IgniteUuid; import org.apache.ignite.transactions.TransactionConcurrency; import org.apache.ignite.transactions.TransactionIsolation; @@ -187,7 +186,6 @@ public interface IgniteInternalTx extends AutoCloseable { * * @throws IgniteCheckedException If commit failed. */ - @IgniteAsyncSupported public void commit() throws IgniteCheckedException; /** @@ -202,7 +200,6 @@ public interface IgniteInternalTx extends AutoCloseable { * * @throws IgniteCheckedException If rollback failed. */ - @IgniteAsyncSupported public void rollback() throws IgniteCheckedException; /** http://git-wip-us.apache.org/repos/asf/ignite/blob/fde1f486/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TransactionProxyImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TransactionProxyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TransactionProxyImpl.java index 6134b9f..2a058ac 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TransactionProxyImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TransactionProxyImpl.java @@ -24,6 +24,7 @@ import java.io.ObjectOutput; import java.util.UUID; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteClientDisconnectedException; +import org.apache.ignite.IgniteException; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.util.future.IgniteFinishedFutureImpl; @@ -45,6 +46,7 @@ import org.apache.ignite.transactions.TransactionState; /** * Cache transaction proxy. */ +@SuppressWarnings("unchecked") public class TransactionProxyImpl<K, V> implements TransactionProxy, Externalizable { /** */ private static final long serialVersionUID = 0L; @@ -269,6 +271,18 @@ public class TransactionProxyImpl<K, V> implements TransactionProxy, Externaliza } /** {@inheritDoc} */ + @Override public IgniteFuture<Void> commitAsync() throws IgniteException { + enter(); + + try { + return (IgniteFuture<Void>)createFuture(cctx.commitTxAsync(tx)); + } + finally { + leave(); + } + } + + /** {@inheritDoc} */ @Override public void close() { enter(); @@ -303,6 +317,21 @@ public class TransactionProxyImpl<K, V> implements TransactionProxy, Externaliza } } + /** {@inheritDoc} */ + @Override public IgniteFuture<Void> rollbackAsync() throws IgniteException { + enter(); + + try { + return (IgniteFuture<Void>)(new IgniteFutureImpl(cctx.rollbackTxAsync(tx))); + } + catch (IgniteCheckedException e) { + throw U.convertException(e); + } + finally { + leave(); + } + } + /** * @param res Result to convert to finished future. */ @@ -314,13 +343,21 @@ public class TransactionProxyImpl<K, V> implements TransactionProxy, Externaliza * @param fut Internal future. */ private void saveFuture(IgniteInternalFuture<IgniteInternalTx> fut) { + asyncRes = createFuture(fut); + } + + /** + * @param fut Internal future. + * @return User future. + */ + private IgniteFuture<?> createFuture(IgniteInternalFuture<IgniteInternalTx> fut) { IgniteInternalFuture<Transaction> fut0 = fut.chain(new CX1<IgniteInternalFuture<IgniteInternalTx>, Transaction>() { @Override public Transaction applyx(IgniteInternalFuture<IgniteInternalTx> fut) throws IgniteCheckedException { return fut.get().proxy(); } }); - asyncRes = new IgniteFutureImpl(fut0); + return new IgniteFutureImpl(fut0); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/fde1f486/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/entityframework/PlatformDotNetEntityFrameworkCacheExtension.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/entityframework/PlatformDotNetEntityFrameworkCacheExtension.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/entityframework/PlatformDotNetEntityFrameworkCacheExtension.java index eb675fb..cb27b19 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/entityframework/PlatformDotNetEntityFrameworkCacheExtension.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/entityframework/PlatformDotNetEntityFrameworkCacheExtension.java @@ -20,7 +20,6 @@ package org.apache.ignite.internal.processors.platform.entityframework; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.IgniteCompute; import org.apache.ignite.cache.CachePeekMode; import org.apache.ignite.cluster.ClusterGroup; import org.apache.ignite.cluster.ClusterNode; http://git-wip-us.apache.org/repos/asf/ignite/blob/fde1f486/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/events/PlatformEvents.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/events/PlatformEvents.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/events/PlatformEvents.java index 9ddcc37..e16abe4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/events/PlatformEvents.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/events/PlatformEvents.java @@ -17,11 +17,11 @@ package org.apache.ignite.internal.processors.platform.events; +import java.util.List; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteEvents; import org.apache.ignite.events.Event; import org.apache.ignite.events.EventAdapter; -import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.binary.BinaryRawReaderEx; import org.apache.ignite.internal.binary.BinaryRawWriterEx; import org.apache.ignite.internal.processors.platform.PlatformAbstractTarget; @@ -29,8 +29,8 @@ import org.apache.ignite.internal.processors.platform.PlatformContext; import org.apache.ignite.internal.processors.platform.PlatformEventFilterListener; import org.apache.ignite.internal.processors.platform.PlatformTarget; import org.apache.ignite.internal.processors.platform.utils.PlatformFutureUtils; -import org.apache.ignite.internal.util.future.IgniteFutureImpl; import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.lang.IgniteFuture; import org.apache.ignite.lang.IgnitePredicate; import org.jetbrains.annotations.Nullable; @@ -91,9 +91,6 @@ public class PlatformEvents extends PlatformAbstractTarget { private final IgniteEvents events; /** */ - private final IgniteEvents eventsAsync; - - /** */ private final EventResultWriter eventResWriter; /** */ @@ -111,7 +108,6 @@ public class PlatformEvents extends PlatformAbstractTarget { assert events != null; this.events = events; - eventsAsync = events.withAsync(); eventResWriter = new EventResultWriter(platformCtx); eventColResWriter = new EventCollectionResultWriter(platformCtx); @@ -148,16 +144,12 @@ public class PlatformEvents extends PlatformAbstractTarget { return TRUE; case OP_REMOTE_QUERY_ASYNC: - startRemoteQuery(reader, eventsAsync); - - readAndListenFuture(reader, currentFuture(), eventColResWriter); + readAndListenFuture(reader, startRemoteQueryAsync(reader, events), eventColResWriter); return TRUE; case OP_WAIT_FOR_LOCAL_ASYNC: { - startWaitForLocal(reader, eventsAsync); - - readAndListenFuture(reader, currentFuture(), eventResWriter); + readAndListenFuture(reader, startWaitForLocalAsync(reader, events), eventResWriter); return TRUE; } @@ -253,6 +245,23 @@ public class PlatformEvents extends PlatformAbstractTarget { } /** + * Starts the waitForLocal asynchronously. + * + * @param reader Reader + * @param events Events. + * @return Result. + */ + private IgniteFuture<EventAdapter> startWaitForLocalAsync(BinaryRawReaderEx reader, IgniteEvents events) { + Long filterHnd = reader.readObject(); + + IgnitePredicate filter = filterHnd != null ? localFilter(filterHnd) : null; + + int[] eventTypes = readEventTypes(reader); + + return events.waitForLocalAsync(filter, eventTypes); + } + + /** * Starts the remote query. * * @param reader Reader. @@ -271,6 +280,25 @@ public class PlatformEvents extends PlatformAbstractTarget { return events.remoteQuery(filter, timeout); } + /** + * Starts the remote query asynchronously. + * + * @param reader Reader. + * @param events Events. + * @return Result. + */ + private IgniteFuture<List<Event>> startRemoteQueryAsync(BinaryRawReaderEx reader, IgniteEvents events) { + Object pred = reader.readObjectDetached(); + + long timeout = reader.readLong(); + + int[] types = readEventTypes(reader); + + PlatformEventFilterListener filter = platformCtx.createRemoteEventFilter(pred, types); + + return events.remoteQueryAsync(filter, timeout); + } + /** {@inheritDoc} */ @Override public void processOutStream(int type, BinaryRawWriterEx writer) throws IgniteCheckedException { switch (type) { @@ -311,11 +339,6 @@ public class PlatformEvents extends PlatformAbstractTarget { } /** {@inheritDoc} */ - @Override public IgniteInternalFuture currentFuture() throws IgniteCheckedException { - return ((IgniteFutureImpl)eventsAsync.future()).internalFuture(); - } - - /** {@inheritDoc} */ @Nullable @Override public PlatformFutureUtils.Writer futureWriter(int opId) { switch (opId) { case OP_WAIT_FOR_LOCAL: http://git-wip-us.apache.org/repos/asf/ignite/blob/fde1f486/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/messaging/PlatformMessaging.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/messaging/PlatformMessaging.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/messaging/PlatformMessaging.java index 6fe109e..8018986 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/messaging/PlatformMessaging.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/messaging/PlatformMessaging.java @@ -19,7 +19,6 @@ package org.apache.ignite.internal.processors.platform.messaging; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteMessaging; -import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.binary.BinaryRawReaderEx; import org.apache.ignite.internal.binary.BinaryRawWriterEx; import org.apache.ignite.internal.processors.platform.PlatformAbstractTarget; @@ -27,7 +26,7 @@ import org.apache.ignite.internal.processors.platform.PlatformContext; import org.apache.ignite.internal.processors.platform.PlatformTarget; import org.apache.ignite.internal.processors.platform.message.PlatformMessageFilter; import org.apache.ignite.internal.processors.platform.utils.PlatformUtils; -import org.apache.ignite.internal.util.future.IgniteFutureImpl; +import org.apache.ignite.lang.IgniteFuture; import java.util.UUID; @@ -68,9 +67,6 @@ public class PlatformMessaging extends PlatformAbstractTarget { /** */ private final IgniteMessaging messaging; - /** */ - private final IgniteMessaging messagingAsync; - /** * Ctor. * @@ -83,7 +79,6 @@ public class PlatformMessaging extends PlatformAbstractTarget { assert messaging != null; this.messaging = messaging; - messagingAsync = messaging.withAsync(); } /** {@inheritDoc} */ @@ -132,15 +127,15 @@ public class PlatformMessaging extends PlatformAbstractTarget { } case OP_REMOTE_LISTEN_ASYNC: { - startRemoteListen(reader, messagingAsync); + readAndListenFuture(reader, startRemoteListenAsync(reader, messaging)); - return readAndListenFuture(reader); + return TRUE; } case OP_STOP_REMOTE_LISTEN_ASYNC: { - messagingAsync.stopRemoteListen(reader.readUuid()); + readAndListenFuture(reader, messaging.stopRemoteListenAsync(reader.readUuid())); - return readAndListenFuture(reader); + return TRUE; } default: @@ -167,6 +162,7 @@ public class PlatformMessaging extends PlatformAbstractTarget { /** * Starts the remote listener. * @param reader Reader. + * @param messaging Messaging. * @return Listen id. */ private UUID startRemoteListen(BinaryRawReaderEx reader, IgniteMessaging messaging) { @@ -181,9 +177,22 @@ public class PlatformMessaging extends PlatformAbstractTarget { return messaging.remoteListen(topic, filter); } - /** {@inheritDoc} */ - @Override public IgniteInternalFuture currentFuture() throws IgniteCheckedException { - return ((IgniteFutureImpl)messagingAsync.future()).internalFuture(); + /** + * Starts the remote listener. + * @param reader Reader. + * @param messaging Messaging. + * @return Future of the operation. + */ + private IgniteFuture<UUID> startRemoteListenAsync(BinaryRawReaderEx reader, IgniteMessaging messaging) { + Object nativeFilter = reader.readObjectDetached(); + + long ptr = reader.readLong(); // interop pointer + + Object topic = reader.readObjectDetached(); + + PlatformMessageFilter filter = platformCtx.createRemoteMessageFilter(nativeFilter, ptr); + + return messaging.remoteListenAsync(topic, filter); } /** {@inheritDoc} */