IGNITE-4583: new async API at the IgniteCluster, IgniteEvents, IgniteMessaging, 
IgniteServices, Transaction.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/fde1f486
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/fde1f486
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/fde1f486

Branch: refs/heads/ignite-4475-async
Commit: fde1f486d6465601cfd40d0eb775141501024a28
Parents: dd4d439
Author: devozerov <voze...@gridgain.com>
Authored: Wed Feb 8 15:42:07 2017 +0300
Committer: devozerov <voze...@gridgain.com>
Committed: Wed Feb 8 15:42:07 2017 +0300

----------------------------------------------------------------------
 .../ignite/tests/utils/TestTransaction.java     |  11 +
 .../java/org/apache/ignite/IgniteCluster.java   | 126 +++++++++++
 .../java/org/apache/ignite/IgniteEvents.java    | 128 ++++++++++-
 .../java/org/apache/ignite/IgniteMessaging.java |  27 +++
 .../java/org/apache/ignite/IgniteServices.java  | 221 +++++++++++++++++--
 .../ignite/internal/IgniteEventsImpl.java       |  79 +++++++
 .../ignite/internal/IgniteMessagingImpl.java    |  39 ++++
 .../ignite/internal/IgniteServicesImpl.java     | 108 +++++++++
 .../cluster/IgniteClusterAsyncImpl.java         |  28 ++-
 .../internal/cluster/IgniteClusterImpl.java     |  24 +-
 .../cache/transactions/IgniteInternalTx.java    |   3 -
 .../transactions/TransactionProxyImpl.java      |  39 +++-
 ...formDotNetEntityFrameworkCacheExtension.java |   1 -
 .../platform/events/PlatformEvents.java         |  57 +++--
 .../platform/messaging/PlatformMessaging.java   |  35 +--
 .../platform/services/PlatformServices.java     |  95 +++++---
 .../transactions/PlatformTransactions.java      |   9 +-
 .../apache/ignite/transactions/Transaction.java |  22 ++
 .../continuous/GridEventConsumeSelfTest.java    | 169 ++++++++++++++
 .../internal/processors/igfs/IgfsMock.java      |   5 +
 .../GridServiceProcessorAbstractSelfTest.java   | 209 ++++++++++++++++++
 .../ignite/messaging/GridMessagingSelfTest.java |  54 ++++-
 .../cache/GridAbstractCacheStoreSelfTest.java   |  11 +
 .../multijvm/IgniteClusterProcessProxy.java     |  13 ++
 .../multijvm/IgniteEventsProcessProxy.java      |  31 +++
 ...gniteProjectionStartStopRestartSelfTest.java |   6 +-
 26 files changed, 1437 insertions(+), 113 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/fde1f486/modules/cassandra/store/src/test/java/org/apache/ignite/tests/utils/TestTransaction.java
----------------------------------------------------------------------
diff --git 
a/modules/cassandra/store/src/test/java/org/apache/ignite/tests/utils/TestTransaction.java
 
b/modules/cassandra/store/src/test/java/org/apache/ignite/tests/utils/TestTransaction.java
index 5f3ec69..4a03d25 100644
--- 
a/modules/cassandra/store/src/test/java/org/apache/ignite/tests/utils/TestTransaction.java
+++ 
b/modules/cassandra/store/src/test/java/org/apache/ignite/tests/utils/TestTransaction.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.tests.utils;
 
+import org.apache.ignite.IgniteException;
 import org.apache.ignite.lang.IgniteAsyncSupport;
 import org.apache.ignite.lang.IgniteFuture;
 import org.apache.ignite.lang.IgniteUuid;
@@ -106,6 +107,11 @@ public class TestTransaction implements Transaction {
     }
 
     /** {@inheritDoc} */
+    @Override public IgniteFuture<Void> commitAsync() throws IgniteException {
+        return null;
+    }
+
+    /** {@inheritDoc} */
     @Override public void close() {
         // No-op.
     }
@@ -129,4 +135,9 @@ public class TestTransaction implements Transaction {
     @Override public void rollback() {
         // No-op.
     }
+
+    /** {@inheritDoc} */
+    @Override public IgniteFuture<Void> rollbackAsync() throws IgniteException 
{
+        return null;
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/fde1f486/modules/core/src/main/java/org/apache/ignite/IgniteCluster.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteCluster.java 
b/modules/core/src/main/java/org/apache/ignite/IgniteCluster.java
index 23b03df..dc7b687 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteCluster.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteCluster.java
@@ -188,6 +188,33 @@ public interface IgniteCluster extends ClusterGroup, 
IgniteAsyncSupport {
         int maxConn) throws IgniteException;
 
     /**
+     * Starts one or more nodes on remote host(s) asynchronously.
+     * <p>
+     * This method takes INI file which defines all startup parameters. It can 
contain one or
+     * more sections, each for a host or for range of hosts (note that they 
must have different
+     * names) and a special '{@code defaults}' section with default values. 
They are applied to
+     * undefined parameters in host's sections.
+     * <p>
+     * Completed future contains collection of tuples. Each tuple corresponds 
to one node start attempt and
+     * contains hostname, success flag and error message if attempt was not 
successful. Note that
+     * successful attempt doesn't mean that node was actually started and 
joined topology. For large
+     * topologies (> 100s nodes) it can take over 10 minutes for all nodes to 
start. See individual
+     * node logs for details.
+     *
+     * @param file Configuration file.
+     * @param restart Whether to stop existing nodes. If {@code true}, all 
existing
+     *      nodes on the host will be stopped before starting new ones. If
+     *      {@code false}, nodes will be started only if there are less
+     *      nodes on the host than expected.
+     * @param timeout Connection timeout.
+     * @param maxConn Number of parallel SSH connections to one host.
+     * @return a Future representing pending completion of the starting nodes.
+     * @throws IgniteException In case of error.
+     */
+    public IgniteFuture<Collection<ClusterStartNodeResult>> 
startNodesAsync(File file, boolean restart, int timeout,
+        int maxConn) throws IgniteException;
+
+    /**
      * Starts one or more nodes on remote host(s).
      * <p>
      * Each map in {@code hosts} collection
@@ -290,6 +317,104 @@ public interface IgniteCluster extends ClusterGroup, 
IgniteAsyncSupport {
         @Nullable Map<String, Object> dflts, boolean restart, int timeout, int 
maxConn) throws IgniteException;
 
     /**
+     * Starts one or more nodes on remote host(s) asynchronously.
+     * <p>
+     * Each map in {@code hosts} collection
+     * defines startup parameters for one host or for a range of hosts. The 
following
+     * parameters are supported:
+     *     <table class="doctable">
+     *         <tr>
+     *             <th>Name</th>
+     *             <th>Type</th>
+     *             <th>Description</th>
+     *         </tr>
+     *         <tr>
+     *             <td><b>host</b></td>
+     *             <td>String</td>
+     *             <td>
+     *                 Hostname (required). Can define several hosts if their 
IPs are sequential.
+     *                 E.g., {@code 10.0.0.1~5} defines range of five IP 
addresses. Other
+     *                 parameters are applied to all hosts equally.
+     *             </td>
+     *         </tr>
+     *         <tr>
+     *             <td><b>port</b></td>
+     *             <td>Integer</td>
+     *             <td>Port number (default is {@code 22}).</td>
+     *         </tr>
+     *         <tr>
+     *             <td><b>uname</b></td>
+     *             <td>String</td>
+     *             <td>Username (if not defined, current local username will 
be used).</td>
+     *         </tr>
+     *         <tr>
+     *             <td><b>passwd</b></td>
+     *             <td>String</td>
+     *             <td>Password (if not defined, private key file must be 
defined).</td>
+     *         </tr>
+     *         <tr>
+     *             <td><b>key</b></td>
+     *             <td>File</td>
+     *             <td>Private key file (if not defined, password must be 
defined).</td>
+     *         </tr>
+     *         <tr>
+     *             <td><b>nodes</b></td>
+     *             <td>Integer</td>
+     *             <td>
+     *                 Expected number of nodes on the host. If some nodes are 
started
+     *                 already, then only remaining nodes will be started. If 
current count of
+     *                 nodes is equal to this number, and {@code restart} flag 
is {@code false},
+     *                 then nothing will happen.
+     *             </td>
+     *         </tr>
+     *         <tr>
+     *             <td><b>igniteHome</b></td>
+     *             <td>String</td>
+     *             <td>
+     *                 Path to Ignite installation folder. If not defined, 
IGNITE_HOME
+     *                 environment variable must be set on remote hosts.
+     *             </td>
+     *         </tr>
+     *         <tr>
+     *             <td><b>cfg</b></td>
+     *             <td>String</td>
+     *             <td>Path to configuration file (relative to {@code 
igniteHome}).</td>
+     *         </tr>
+     *         <tr>
+     *             <td><b>script</b></td>
+     *             <td>String</td>
+     *             <td>
+     *                 Custom startup script file name and path (relative to 
{@code igniteHome}).
+     *                 You can also specify a space-separated list of 
parameters in the same
+     *                 string (for example: {@code "bin/my-custom-script.sh 
-v"}).
+     *             </td>
+     *         </tr>
+     *     </table>
+     * <p>
+     * {@code dflts} map defines default values. They are applied to undefined 
parameters in
+     * {@code hosts} collection.
+     * <p>
+     * Completed future contains collection of tuples. Each tuple corresponds 
to one node start attempt and
+     * contains hostname, success flag and error message if attempt was not 
successful. Note that
+     * successful attempt doesn't mean that node was actually started and 
joined topology. For large
+     * topologies (> 100s nodes) it can take over 10 minutes for all nodes to 
start. See individual
+     * node logs for details.
+     *
+     * @param hosts Startup parameters.
+     * @param dflts Default values.
+     * @param restart Whether to stop existing nodes. If {@code true}, all 
existing
+     *      nodes on the host will be stopped before starting new ones. If
+     *      {@code false}, nodes will be started only if there are less
+     *      nodes on the host than expected.
+     * @param timeout Connection timeout in milliseconds.
+     * @param maxConn Number of parallel SSH connections to one host.
+     * @return a Future representing pending completion of the starting nodes.
+     * @throws IgniteException In case of error.
+     */
+    public IgniteFuture<Collection<ClusterStartNodeResult>> 
startNodesAsync(Collection<Map<String, Object>> hosts,
+        @Nullable Map<String, Object> dflts, boolean restart, int timeout, int 
maxConn) throws IgniteException;
+
+    /**
      * Stops nodes satisfying optional set of predicates.
      * <p>
      * <b>NOTE:</b> {@code System.exit(Ignition.KILL_EXIT_CODE)} will be 
executed on each
@@ -347,5 +472,6 @@ public interface IgniteCluster extends ClusterGroup, 
IgniteAsyncSupport {
     @Nullable public IgniteFuture<?> clientReconnectFuture();
 
     /** {@inheritDoc} */
+    @Deprecated
     @Override public IgniteCluster withAsync();
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/fde1f486/modules/core/src/main/java/org/apache/ignite/IgniteEvents.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteEvents.java 
b/modules/core/src/main/java/org/apache/ignite/IgniteEvents.java
index c0e4d3b..c081f2f 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteEvents.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteEvents.java
@@ -25,6 +25,7 @@ import org.apache.ignite.events.Event;
 import org.apache.ignite.lang.IgniteAsyncSupport;
 import org.apache.ignite.lang.IgniteAsyncSupported;
 import org.apache.ignite.lang.IgniteBiPredicate;
+import org.apache.ignite.lang.IgniteFuture;
 import org.apache.ignite.lang.IgnitePredicate;
 import org.jetbrains.annotations.Nullable;
 
@@ -90,13 +91,27 @@ public interface IgniteEvents extends IgniteAsyncSupport {
         throws IgniteException;
 
     /**
+     * Asynchronously queries nodes in this cluster group for events using 
passed in predicate filter for event
+     * selection.
+     *
+     * @param p Predicate filter used to query events on remote nodes.
+     * @param timeout Maximum time to wait for result, {@code 0} to wait 
forever.
+     * @param types Event types to be queried.
+     * @return a Future representing pending completion of the query. The 
completed future contains
+     *      collection of grid events returned from specified nodes.
+     * @throws IgniteException If query failed.
+     */
+    public <T extends Event> IgniteFuture<List<T>> 
remoteQueryAsync(IgnitePredicate<T> p, long timeout,
+        @Nullable int... types) throws IgniteException;
+
+    /**
      * Adds event listener for specified events to all nodes in the cluster 
group (possibly including
      * local node if it belongs to the cluster group as well). This means that 
all events occurring on
      * any node within this cluster group that pass remote filter will be sent 
to local node for
      * local listener notifications.
      * <p>
      * The listener can be unsubscribed automatically if local node stops, if 
{@code locLsnr} callback
-     * returns {@code false} or if {@link #stopRemoteListen(UUID)} is called.
+     * returns {@code false} or if {@link #stopRemoteListen(UUID)} or {@link 
#stopRemoteListenAsync(UUID)} are called.
      *
      * @param locLsnr Listener callback that is called on local node. If 
{@code null}, this events will be handled
      *      on remote nodes by passed in {@code rmtFilter}.
@@ -108,7 +123,8 @@ public interface IgniteEvents extends IgniteAsyncSupport {
      * @param types Types of events to listen for. If not provided, all events 
that pass the
      *      provided remote filter will be sent to local node.
      * @param <T> Type of the event.
-     * @return {@code Operation ID} that can be passed to {@link 
#stopRemoteListen(UUID)} method to stop listening.
+     * @return {@code Operation ID} that can be passed to {@link 
#stopRemoteListen(UUID)} or
+     * {@link #stopRemoteListenAsync(UUID)} methods to stop listening.
      * @throws IgniteException If failed to add listener.
      */
     @IgniteAsyncSupported
@@ -118,6 +134,35 @@ public interface IgniteEvents extends IgniteAsyncSupport {
         throws IgniteException;
 
     /**
+     * Asynchronously adds event listener for specified events to all nodes in 
the cluster group (possibly including
+     * local node if it belongs to the cluster group as well). This means that 
all events occurring on
+     * any node within this cluster group that pass remote filter will be sent 
to local node for
+     * local listener notifications.
+     * <p>
+     * The listener can be unsubscribed automatically if local node stops, if 
{@code locLsnr} callback
+     * returns {@code false} or if {@link #stopRemoteListen(UUID)} or {@link 
#stopRemoteListenAsync(UUID)} are called.
+     *
+     * @param <T> Type of the event.
+     * @param locLsnr Listener callback that is called on local node. If 
{@code null}, this events will be handled
+     *      on remote nodes by passed in {@code rmtFilter}.
+     * @param rmtFilter Filter callback that is called on remote node. Only 
events that pass the remote filter
+     *      will be sent to local node. If {@code null}, all events of 
specified types will
+     *      be sent to local node. This remote filter can be used to 
pre-handle events remotely,
+     *      before they are passed in to local callback. It will be 
auto-unsubsribed on the node
+     *      where event occurred in case if it returns {@code false}.
+     * @param types Types of events to listen for. If not provided, all events 
that pass the
+     *      provided remote filter will be sent to local node.
+     * @return a Future representing pending completion of the operation. The 
completed future contains
+     *      {@code Operation ID} that can be passed to {@link 
#stopRemoteListen(UUID)} or
+     *      {@link #stopRemoteListenAsync(UUID)} methods to stop listening.
+     * @throws IgniteException If failed to add listener.
+     */
+    public <T extends Event> IgniteFuture<UUID> remoteListenAsync(@Nullable 
IgniteBiPredicate<UUID, T> locLsnr,
+        @Nullable IgnitePredicate<T> rmtFilter,
+        @Nullable int... types)
+        throws IgniteException;
+
+    /**
      * Adds event listener for specified events to all nodes in the cluster 
group (possibly including
      * local node if it belongs to the cluster group as well). This means that 
all events occurring on
      * any node within this cluster group that pass remote filter will be sent 
to local node for
@@ -148,9 +193,11 @@ public interface IgniteEvents extends IgniteAsyncSupport {
      * @param types Types of events to listen for. If not provided, all events 
that pass the
      *      provided remote filter will be sent to local node.
      * @param <T> Type of the event.
-     * @return {@code Operation ID} that can be passed to {@link 
#stopRemoteListen(UUID)} method to stop listening.
-     * @see #stopRemoteListen(UUID)
+     * @return {@code Operation ID} that can be passed to {@link 
#stopRemoteListen(UUID)} or
+     *      {@link #stopRemoteListen(UUID)} methods to stop listening.
      * @throws IgniteException If failed to add listener.
+     * @see #stopRemoteListen(UUID)
+     * @see #stopRemoteListenAsync(UUID)
      */
     @IgniteAsyncSupported
     public <T extends Event> UUID remoteListen(int bufSize,
@@ -162,6 +209,50 @@ public interface IgniteEvents extends IgniteAsyncSupport {
         throws IgniteException;
 
     /**
+     * Asynchronously adds event listener for specified events to all nodes in 
the cluster group (possibly including
+     * local node if it belongs to the cluster group as well). This means that 
all events occurring on
+     * any node within this cluster group that pass remote filter will be sent 
to local node for
+     * local listener notification.
+     *
+     * @param <T> Type of the event.
+     * @param bufSize Remote events buffer size. Events from remote nodes 
won't be sent until buffer
+     *      is full or time interval is exceeded.
+     * @param interval Maximum time interval after which events from remote 
node will be sent. Events
+     *      from remote nodes won't be sent until buffer is full or time 
interval is exceeded.
+     * @param autoUnsubscribe Flag indicating that event listeners on remote 
nodes should be
+     *      automatically unregistered if master node (node that initiated 
event listening) leaves
+     *      topology. If this flag is {@code false}, listeners will be 
unregistered only when
+     *      {@link #stopRemoteListen(UUID)} method is called, or the {@code 
'callback (locLsnr)'}
+     *      passed in returns {@code false}.
+     * @param locLsnr Callback that is called on local node. If this predicate 
returns {@code true},
+     *      the implementation will continue listening to events. Otherwise, 
events
+     *      listening will be stopped and listeners will be unregistered on 
all nodes
+     *      in the cluster group. If {@code null}, this events will be handled 
on remote nodes by
+     *      passed in {@code rmtFilter} until local node stops (if {@code 
'autoUnsubscribe'} is {@code true})
+     *      or until {@link #stopRemoteListen(UUID)} is called.
+     * @param rmtFilter Filter callback that is called on remote node. Only 
events that pass the remote filter
+     *      will be sent to local node. If {@code null}, all events of 
specified types will
+     *      be sent to local node. This remote filter can be used to 
pre-handle events remotely,
+     *      before they are passed in to local callback. It will be 
auto-unsubsribed on the node
+     *      where event occurred in case if it returns {@code false}.
+     * @param types Types of events to listen for. If not provided, all events 
that pass the
+     *      provided remote filter will be sent to local node.
+     * @return a Future representing pending completion of the operation. The 
completed future contains
+     *      {@code Operation ID} that can be passed to {@link 
#stopRemoteListen(UUID)}
+     *      or {@link #stopRemoteListen(UUID)} methods to stop listening.
+     * @throws IgniteException If failed to add listener.
+     * @see #stopRemoteListen(UUID)
+     * @see #stopRemoteListenAsync(UUID)
+     */
+    public <T extends Event> IgniteFuture<UUID> remoteListenAsync(int bufSize,
+        long interval,
+        boolean autoUnsubscribe,
+        @Nullable IgniteBiPredicate<UUID, T> locLsnr,
+        @Nullable IgnitePredicate<T> rmtFilter,
+        @Nullable int... types)
+        throws IgniteException;
+
+    /**
      * Stops listening to remote events. This will unregister all listeners 
identified with provided
      * operation ID on all nodes defined by {@link #clusterGroup()}.
      * <p>
@@ -169,13 +260,27 @@ public interface IgniteEvents extends IgniteAsyncSupport {
      *
      * @param opId Operation ID that was returned from
      *      {@link #remoteListen(IgniteBiPredicate, IgnitePredicate, int...)} 
method.
-     * @see #remoteListen(IgniteBiPredicate, IgnitePredicate, int...)
      * @throws IgniteException If failed to stop listeners.
+     * @see #remoteListen(IgniteBiPredicate, IgnitePredicate, int...)
+     * @see #remoteListenAsync(int, long, boolean, IgniteBiPredicate, 
IgnitePredicate, int...)
      */
     @IgniteAsyncSupported
     public void stopRemoteListen(UUID opId) throws IgniteException;
 
     /**
+     * Asynchronously stops listening to remote events. This will unregister 
all listeners identified with provided
+     * operation ID on all nodes defined by {@link #clusterGroup()}.
+     *
+     * @param opId Operation ID that was returned from
+     *      {@link #remoteListen(IgniteBiPredicate, IgnitePredicate, int...)} 
method.
+     * @return a Future representing pending completion of the operation.
+     * @throws IgniteException If failed to stop listeners.
+     * @see #remoteListen(IgniteBiPredicate, IgnitePredicate, int...)
+     * @see #remoteListenAsync(int, long, boolean, IgniteBiPredicate, 
IgnitePredicate, int...)
+     */
+    public IgniteFuture<Void> stopRemoteListenAsync(UUID opId) throws 
IgniteException;
+
+    /**
      * Waits for the specified events.
      * <p>
      * Supports asynchronous execution (see {@link IgniteAsyncSupport}).
@@ -191,6 +296,18 @@ public interface IgniteEvents extends IgniteAsyncSupport {
         throws IgniteException;
 
     /**
+     * Create future to wait for the specified events.
+     *
+     * @param filter Optional filtering predicate. Only if predicates 
evaluates to {@code true} will the event
+     *      end the wait.
+     * @param types Types of the events to wait for. If not provided, all 
events will be passed to the filter.
+     * @return a Future representing pending completion of the operation. The 
completed future contains grid event.
+     * @throws IgniteException If wait was interrupted.
+     */
+    public <T extends Event> IgniteFuture<T> waitForLocalAsync(@Nullable 
IgnitePredicate<T> filter,
+        @Nullable int... types) throws IgniteException;
+
+    /**
      * Queries local node for events using passed-in predicate filter for 
event selection.
      *
      * @param p Predicate to filter events. All predicates must be satisfied 
for the
@@ -269,5 +386,6 @@ public interface IgniteEvents extends IgniteAsyncSupport {
     public boolean isEnabled(int type);
 
     /** {@inheritDoc} */
+    @Deprecated
     @Override public IgniteEvents withAsync();
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/fde1f486/modules/core/src/main/java/org/apache/ignite/IgniteMessaging.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteMessaging.java 
b/modules/core/src/main/java/org/apache/ignite/IgniteMessaging.java
index ab554af..bb96d65 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteMessaging.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteMessaging.java
@@ -25,6 +25,7 @@ import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.lang.IgniteAsyncSupport;
 import org.apache.ignite.lang.IgniteAsyncSupported;
 import org.apache.ignite.lang.IgniteBiPredicate;
+import org.apache.ignite.lang.IgniteFuture;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -150,6 +151,22 @@ public interface IgniteMessaging extends 
IgniteAsyncSupport {
     public UUID remoteListen(@Nullable Object topic, IgniteBiPredicate<UUID, 
?> p) throws IgniteException;
 
     /**
+     * Asynchronously adds a message listener for a given topic to all nodes 
in the cluster group (possibly including
+     * this node if it belongs to the cluster group as well). This means that 
any node within this cluster
+     * group can send a message for a given topic and all nodes within the 
cluster group will receive
+     * listener notifications.
+     *
+     * @param topic Topic to subscribe to, {@code null} means default topic.
+     * @param p Predicate that is called on each node for each received 
message. If predicate returns {@code false},
+     *      then it will be unsubscribed from any further notifications.
+     * @return a Future representing pending completion of the operation. The 
completed future contains
+     *      {@code Operation ID} that can be passed to {@link 
#stopRemoteListen(UUID)} method to stop listening.
+     * @throws IgniteException If failed to add listener.
+     */
+    public IgniteFuture<UUID> remoteListenAsync(@Nullable Object topic, 
IgniteBiPredicate<UUID, ?> p)
+        throws IgniteException;
+
+    /**
      * Unregisters all listeners identified with provided operation ID on all 
nodes in the cluster group.
      * <p>
      * Supports asynchronous execution (see {@link IgniteAsyncSupport}).
@@ -160,6 +177,16 @@ public interface IgniteMessaging extends 
IgniteAsyncSupport {
     @IgniteAsyncSupported
     public void stopRemoteListen(UUID opId) throws IgniteException;
 
+    /**
+     * Asynchronously unregisters all listeners identified with provided 
operation ID on all nodes in the cluster group.
+     *
+     * @param opId Listen ID that was returned from {@link 
#remoteListen(Object, IgniteBiPredicate)} method.
+     * @return a Future representing pending completion of the operation.
+     * @throws IgniteException If failed to unregister listeners.
+     */
+    public IgniteFuture<Void> stopRemoteListenAsync(UUID opId) throws 
IgniteException;
+
     /** {@inheritDoc} */
+    @Deprecated
     @Override IgniteMessaging withAsync();
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/fde1f486/modules/core/src/main/java/org/apache/ignite/IgniteServices.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteServices.java 
b/modules/core/src/main/java/org/apache/ignite/IgniteServices.java
index 8365ec7..1c01598 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteServices.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteServices.java
@@ -22,6 +22,7 @@ import org.apache.ignite.cluster.ClusterGroup;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.lang.IgniteAsyncSupport;
 import org.apache.ignite.lang.IgniteAsyncSupported;
+import org.apache.ignite.lang.IgniteFuture;
 import org.apache.ignite.resources.IgniteInstanceResource;
 import org.apache.ignite.services.Service;
 import org.apache.ignite.services.ServiceConfiguration;
@@ -150,7 +151,8 @@ public interface IgniteServices extends IgniteAsyncSupport {
      * when a singleton service instance will be active on more than one node 
(e.g. crash detection delay).
      * <p>
      * This method is analogous to calling
-     * {@link #deployMultiple(String, org.apache.ignite.services.Service, int, 
int) deployMultiple(name, svc, 1, 1)} method.
+     * {@link #deployMultiple(String, org.apache.ignite.services.Service, int, 
int) deployMultiple(name, svc, 1, 1)}
+     * method.
      *
      * @param name Service name.
      * @param svc Service instance.
@@ -160,13 +162,35 @@ public interface IgniteServices extends 
IgniteAsyncSupport {
     public void deployClusterSingleton(String name, Service svc) throws 
IgniteException;
 
     /**
+     * Asynchronously deploys a cluster-wide singleton service. Ignite will 
guarantee that there is always
+     * one instance of the service in the cluster. In case if grid node on 
which the service
+     * was deployed crashes or stops, Ignite will automatically redeploy it on 
another node.
+     * However, if the node on which the service is deployed remains in 
topology, then the
+     * service will always be deployed on that node only, regardless of 
topology changes.
+     * <p>
+     * Note that in case of topology changes, due to network delays, there may 
be a temporary situation
+     * when a singleton service instance will be active on more than one node 
(e.g. crash detection delay).
+     * <p>
+     * This method is analogous to calling
+     * {@link #deployMultipleAsync(String, org.apache.ignite.services.Service, 
int, int)
+     * deployMultipleAsync(name, svc, 1, 1)} method.
+     *
+     * @param name Service name.
+     * @param svc Service instance.
+     * @return a Future representing pending completion of the operation.
+     * @throws IgniteException If failed to deploy service.
+     */
+    public IgniteFuture<Void> deployClusterSingletonAsync(String name, Service 
svc) throws IgniteException;
+
+    /**
      * Deploys a per-node singleton service. Ignite will guarantee that there 
is always
      * one instance of the service running on each node. Whenever new nodes 
are started
      * within the underlying cluster group, Ignite will automatically deploy 
one instance of
      * the service on every new node.
      * <p>
      * This method is analogous to calling
-     * {@link #deployMultiple(String, org.apache.ignite.services.Service, int, 
int) deployMultiple(name, svc, 0, 1)} method.
+     * {@link #deployMultiple(String, org.apache.ignite.services.Service, int, 
int) deployMultiple(name, svc, 0, 1)}
+     * method.
      *
      * @param name Service name.
      * @param svc Service instance.
@@ -176,6 +200,23 @@ public interface IgniteServices extends IgniteAsyncSupport 
{
     public void deployNodeSingleton(String name, Service svc) throws 
IgniteException;
 
     /**
+     * Asynchronously deploys a per-node singleton service. Ignite will 
guarantee that there is always
+     * one instance of the service running on each node. Whenever new nodes 
are started
+     * within the underlying cluster group, Ignite will automatically deploy 
one instance of
+     * the service on every new node.
+     * <p>
+     * This method is analogous to calling
+     * {@link #deployMultipleAsync(String, org.apache.ignite.services.Service, 
int, int)
+     * deployMultipleAsync(name, svc, 0, 1)} method.
+     *
+     * @param name Service name.
+     * @param svc Service instance.
+     * @return a Future representing pending completion of the operation.
+     * @throws IgniteException If failed to deploy service.
+     */
+    public IgniteFuture<Void> deployNodeSingletonAsync(String name, Service 
svc) throws IgniteException;
+
+    /**
      * Deploys one instance of this service on the primary node for a given 
affinity key.
      * Whenever topology changes and primary node assignment changes, Ignite 
will always
      * make sure that the service is undeployed on the previous primary node 
and deployed
@@ -184,8 +225,8 @@ public interface IgniteServices extends IgniteAsyncSupport {
      * Note that in case of topology changes, due to network delays, there may 
be a temporary situation
      * when a service instance will be active on more than one node (e.g. 
crash detection delay).
      * <p>
-     * This method is analogous to the invocation of {@link 
#deploy(org.apache.ignite.services.ServiceConfiguration)} method
-     * as follows:
+     * This method is analogous to the invocation of {@link 
#deploy(org.apache.ignite.services.ServiceConfiguration)}
+     * method as follows:
      * <pre name="code" class="java">
      *     ServiceConfiguration cfg = new ServiceConfiguration();
      *
@@ -211,6 +252,41 @@ public interface IgniteServices extends IgniteAsyncSupport 
{
         throws IgniteException;
 
     /**
+     * Asynchronously deploys one instance of this service on the primary node 
for a given affinity key.
+     * Whenever topology changes and primary node assignment changes, Ignite 
will always
+     * make sure that the service is undeployed on the previous primary node 
and deployed
+     * on the new primary node.
+     * <p>
+     * Note that in case of topology changes, due to network delays, there may 
be a temporary situation
+     * when a service instance will be active on more than one node (e.g. 
crash detection delay).
+     * <p>
+     * This method is analogous to the invocation of
+     * {@link #deployAsync(org.apache.ignite.services.ServiceConfiguration)} 
method as follows:
+     * <pre name="code" class="java">
+     *     ServiceConfiguration cfg = new ServiceConfiguration();
+     *
+     *     cfg.setName(name);
+     *     cfg.setService(svc);
+     *     cfg.setCacheName(cacheName);
+     *     cfg.setAffinityKey(affKey);
+     *     cfg.setTotalCount(1);
+     *     cfg.setMaxPerNodeCount(1);
+     *
+     *     ignite.services().deployAsync(cfg);
+     * </pre>
+     *
+     * @param name Service name.
+     * @param svc Service instance.
+     * @param cacheName Name of the cache on which affinity for key should be 
calculated, {@code null} for
+     *      default cache.
+     * @param affKey Affinity cache key.
+     * @return a Future representing pending completion of the operation.
+     * @throws IgniteException If failed to deploy service.
+     */
+    public IgniteFuture<Void> deployKeyAffinitySingletonAsync(String name, 
Service svc, @Nullable String cacheName,
+        Object affKey) throws IgniteException;
+
+    /**
      * Deploys multiple instances of the service on the grid. Ignite will 
deploy a
      * maximum amount of services equal to {@code 'totalCnt'} parameter making 
sure that
      * there are no more than {@code 'maxPerNodeCnt'} service instances running
@@ -221,8 +297,8 @@ public interface IgniteServices extends IgniteAsyncSupport {
      * Note that at least one of {@code 'totalCnt'} or {@code 'maxPerNodeCnt'} 
parameters must have
      * value greater than {@code 0}.
      * <p>
-     * This method is analogous to the invocation of {@link 
#deploy(org.apache.ignite.services.ServiceConfiguration)} method
-     * as follows:
+     * This method is analogous to the invocation of {@link 
#deploy(org.apache.ignite.services.ServiceConfiguration)}
+     * method as follows:
      * <pre name="code" class="java">
      *     ServiceConfiguration cfg = new ServiceConfiguration();
      *
@@ -244,20 +320,57 @@ public interface IgniteServices extends 
IgniteAsyncSupport {
     public void deployMultiple(String name, Service svc, int totalCnt, int 
maxPerNodeCnt) throws IgniteException;
 
     /**
+     * Asynchronously deploys multiple instances of the service on the grid. 
Ignite will deploy a
+     * maximum amount of services equal to {@code 'totalCnt'} parameter making 
sure that
+     * there are no more than {@code 'maxPerNodeCnt'} service instances running
+     * on each node. Whenever topology changes, Ignite will automatically 
rebalance
+     * the deployed services within cluster to make sure that each node will 
end up with
+     * about equal number of deployed instances whenever possible.
+     * <p>
+     * Note that at least one of {@code 'totalCnt'} or {@code 'maxPerNodeCnt'} 
parameters must have
+     * value greater than {@code 0}.
+     * <p>
+     * This method is analogous to the invocation of
+     * {@link #deployAsync(org.apache.ignite.services.ServiceConfiguration)} 
method as follows:
+     * <pre name="code" class="java">
+     *     ServiceConfiguration cfg = new ServiceConfiguration();
+     *
+     *     cfg.setName(name);
+     *     cfg.setService(svc);
+     *     cfg.setTotalCount(totalCnt);
+     *     cfg.setMaxPerNodeCount(maxPerNodeCnt);
+     *
+     *     ignite.services().deployAsync(cfg);
+     * </pre>
+     *
+     * @param name Service name.
+     * @param svc Service instance.
+     * @param totalCnt Maximum number of deployed services in the grid, {@code 
0} for unlimited.
+     * @param maxPerNodeCnt Maximum number of deployed services on each node, 
{@code 0} for unlimited.
+     * @return a Future representing pending completion of the operation.
+     * @throws IgniteException If failed to deploy service.
+     */
+    public IgniteFuture<Void> deployMultipleAsync(String name, Service svc, 
int totalCnt, int maxPerNodeCnt)
+        throws IgniteException;
+
+    /**
      * Deploys multiple instances of the service on the grid according to 
provided
      * configuration. Ignite will deploy a maximum amount of services equal to
      * {@link org.apache.ignite.services.ServiceConfiguration#getTotalCount() 
cfg.getTotalCount()}  parameter
-     * making sure that there are no more than {@link 
org.apache.ignite.services.ServiceConfiguration#getMaxPerNodeCount() 
cfg.getMaxPerNodeCount()}
+     * making sure that there are no more than
+     * {@link 
org.apache.ignite.services.ServiceConfiguration#getMaxPerNodeCount() 
cfg.getMaxPerNodeCount()}
      * service instances running on each node. Whenever topology changes, 
Ignite will automatically rebalance
      * the deployed services within cluster to make sure that each node will 
end up with
      * about equal number of deployed instances whenever possible.
      * <p>
-     * If {@link 
org.apache.ignite.services.ServiceConfiguration#getAffinityKey() 
cfg.getAffinityKey()} is not {@code null}, then Ignite
-     * will deploy the service on the primary node for given affinity key. The 
affinity will be calculated
-     * on the cache with {@link 
org.apache.ignite.services.ServiceConfiguration#getCacheName() 
cfg.getCacheName()} name.
+     * If {@link 
org.apache.ignite.services.ServiceConfiguration#getAffinityKey() 
cfg.getAffinityKey()}
+     * is not {@code null}, then Ignite  will deploy the service on the 
primary node for given affinity key.
+     * The affinity will be calculated on the cache with
+     * {@link org.apache.ignite.services.ServiceConfiguration#getCacheName() 
cfg.getCacheName()} name.
      * <p>
-     * If {@link 
org.apache.ignite.services.ServiceConfiguration#getNodeFilter() 
cfg.getNodeFilter()} is not {@code null}, then
-     * Ignite will deploy service on all grid nodes for which the provided 
filter evaluates to {@code true}.
+     * If {@link 
org.apache.ignite.services.ServiceConfiguration#getNodeFilter() 
cfg.getNodeFilter()}
+     * is not {@code null}, then  Ignite will deploy service on all grid nodes 
for which
+     * the provided filter evaluates to {@code true}.
      * The node filter will be checked in addition to the underlying cluster 
group filter, or the
      * whole grid, if the underlying cluster group includes all the cluster 
nodes.
      * <p>
@@ -283,12 +396,56 @@ public interface IgniteServices extends 
IgniteAsyncSupport {
     public void deploy(ServiceConfiguration cfg) throws IgniteException;
 
     /**
+     * Asynchronously deploys multiple instances of the service on the grid 
according to provided
+     * configuration. Ignite will deploy a maximum amount of services equal to
+     * {@link org.apache.ignite.services.ServiceConfiguration#getTotalCount() 
cfg.getTotalCount()}  parameter
+     * making sure that there are no more than
+     * {@link 
org.apache.ignite.services.ServiceConfiguration#getMaxPerNodeCount() 
cfg.getMaxPerNodeCount()}
+     * service instances running on each node. Whenever topology changes, 
Ignite will automatically rebalance
+     * the deployed services within cluster to make sure that each node will 
end up with
+     * about equal number of deployed instances whenever possible.
+     * <p>
+     * If {@link 
org.apache.ignite.services.ServiceConfiguration#getAffinityKey() 
cfg.getAffinityKey()}
+     * is not {@code null}, then Ignite
+     * will deploy the service on the primary node for given affinity key. The 
affinity will be calculated
+     * on the cache with {@link 
org.apache.ignite.services.ServiceConfiguration#getCacheName() 
cfg.getCacheName()} name.
+     * <p>
+     * If {@link 
org.apache.ignite.services.ServiceConfiguration#getNodeFilter() 
cfg.getNodeFilter()}
+     * is not {@code null}, then Ignite will deploy service on all grid nodes
+     * for which the provided filter evaluates to {@code true}.
+     * The node filter will be checked in addition to the underlying cluster 
group filter, or the
+     * whole grid, if the underlying cluster group includes all the cluster 
nodes.
+     * <p>
+     * Note that at least one of {@code 'totalCnt'} or {@code 'maxPerNodeCnt'} 
parameters must have
+     * value greater than {@code 0}.
+     * <p>
+     * Here is an example of creating service deployment configuration:
+     * <pre name="code" class="java">
+     *     ServiceConfiguration cfg = new ServiceConfiguration();
+     *
+     *     cfg.setName(name);
+     *     cfg.setService(svc);
+     *     cfg.setTotalCount(0); // Unlimited.
+     *     cfg.setMaxPerNodeCount(2); // Deploy 2 instances of service on each 
node.
+     *
+     *     ignite.services().deployAsync(cfg);
+     * </pre>
+     *
+     * @param cfg Service configuration.
+     * @return a Future representing pending completion of the operation.
+     * @throws IgniteException If failed to deploy service.
+     */
+    public IgniteFuture<Void> deployAsync(ServiceConfiguration cfg) throws 
IgniteException;
+
+    /**
      * Cancels service deployment. If a service with specified name was 
deployed on the grid,
-     * then {@link 
org.apache.ignite.services.Service#cancel(org.apache.ignite.services.ServiceContext)}
 method will be called on it.
+     * then {@link 
org.apache.ignite.services.Service#cancel(org.apache.ignite.services.ServiceContext)}
+     * method will be called on it.
      * <p>
-     * Note that Ignite cannot guarantee that the service exits from {@link 
org.apache.ignite.services.Service#execute(org.apache.ignite.services.ServiceContext)}
-     * method whenever {@link 
org.apache.ignite.services.Service#cancel(org.apache.ignite.services.ServiceContext)}
 is called. It is up to the user to
-     * make sure that the service code properly reacts to cancellations.
+     * Note that Ignite cannot guarantee that the service exits from
+     * {@link 
org.apache.ignite.services.Service#execute(org.apache.ignite.services.ServiceContext)}
+     * method whenever {@link 
org.apache.ignite.services.Service#cancel(org.apache.ignite.services.ServiceContext)}
+     * is called. It is up to the user to  make sure that the service code 
properly reacts to cancellations.
      * <p>
      * Supports asynchronous execution (see {@link IgniteAsyncSupport}).
      *
@@ -299,6 +456,23 @@ public interface IgniteServices extends IgniteAsyncSupport 
{
     public void cancel(String name) throws IgniteException;
 
     /**
+     * Asynchronously cancels service deployment. If a service with specified 
name was deployed on the grid,
+     * then {@link 
org.apache.ignite.services.Service#cancel(org.apache.ignite.services.ServiceContext)}
+     * method will be called on it.
+     * <p>
+     * Note that Ignite cannot guarantee that the service exits from
+     * {@link 
org.apache.ignite.services.Service#execute(org.apache.ignite.services.ServiceContext)}
+     * method whenever {@link 
org.apache.ignite.services.Service#cancel(org.apache.ignite.services.ServiceContext)}
+     * is called. It is up to the user to
+     * make sure that the service code properly reacts to cancellations.
+     *
+     * @param name Name of service to cancel.
+     * @return a Future representing pending completion of the operation.
+     * @throws IgniteException If failed to cancel service.
+     */
+    public IgniteFuture<Void> cancelAsync(String name) throws IgniteException;
+
+    /**
      * Cancels all deployed services.
      * <p>
      * Note that depending on user logic, it may still take extra time for a 
service to
@@ -312,6 +486,17 @@ public interface IgniteServices extends IgniteAsyncSupport 
{
     public void cancelAll() throws IgniteException;
 
     /**
+     * Asynchronously cancels all deployed services.
+     * <p>
+     * Note that depending on user logic, it may still take extra time for a 
service to
+     * finish execution, even after it was cancelled.
+     *
+     * @return a Future representing pending completion of the operation.
+     * @throws IgniteException If failed to cancel services.
+     */
+    public IgniteFuture<Void> cancelAllAsync() throws IgniteException;
+
+    /**
      * Gets metadata about all deployed services in the grid.
      *
      * @return Metadata about all deployed services in the grid.
@@ -364,8 +549,10 @@ public interface IgniteServices extends IgniteAsyncSupport 
{
      * @return Either proxy over remote service or local service if it is 
deployed locally.
      * @throws IgniteException If failed to create service proxy.
      */
-    public <T> T serviceProxy(String name, Class<? super T> svcItf, boolean 
sticky, long timeout) throws IgniteException;
+    public <T> T serviceProxy(String name, Class<? super T> svcItf, boolean 
sticky, long timeout)
+        throws IgniteException;
 
     /** {@inheritDoc} */
+    @Deprecated
     @Override public IgniteServices withAsync();
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/fde1f486/modules/core/src/main/java/org/apache/ignite/internal/IgniteEventsImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/IgniteEventsImpl.java 
b/modules/core/src/main/java/org/apache/ignite/internal/IgniteEventsImpl.java
index 3c6218d..9acccab 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/IgniteEventsImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/IgniteEventsImpl.java
@@ -27,13 +27,16 @@ import java.util.List;
 import java.util.UUID;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteEvents;
+import org.apache.ignite.IgniteException;
 import org.apache.ignite.cluster.ClusterGroup;
 import org.apache.ignite.events.Event;
 import org.apache.ignite.internal.cluster.ClusterGroupAdapter;
+import org.apache.ignite.internal.util.future.IgniteFutureImpl;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.A;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteBiPredicate;
+import org.apache.ignite.lang.IgniteFuture;
 import org.apache.ignite.lang.IgnitePredicate;
 import org.jetbrains.annotations.Nullable;
 
@@ -93,12 +96,34 @@ public class IgniteEventsImpl extends 
AsyncSupportAdapter<IgniteEvents> implemen
     }
 
     /** {@inheritDoc} */
+    @Override public <T extends Event> IgniteFuture<List<T>> 
remoteQueryAsync(IgnitePredicate<T> p, long timeout,
+        @Nullable int... types) throws IgniteException {
+
+        guard();
+
+        try {
+            return new 
IgniteFutureImpl<>(ctx.event().remoteEventsAsync(compoundPredicate(p, types),
+                prj.nodes(), timeout));
+        }
+        finally {
+            unguard();
+        }
+    }
+
+    /** {@inheritDoc} */
     @Override public <T extends Event> UUID remoteListen(@Nullable 
IgniteBiPredicate<UUID, T> locLsnr,
         @Nullable IgnitePredicate<T> rmtFilter, @Nullable int... types) {
         return remoteListen(1, 0, true, locLsnr, rmtFilter, types);
     }
 
     /** {@inheritDoc} */
+    @Override public <T extends Event> IgniteFuture<UUID> remoteListenAsync(
+        @Nullable IgniteBiPredicate<UUID, T> locLsnr, @Nullable 
IgnitePredicate<T> rmtFilter,
+        @Nullable int... types) throws IgniteException {
+        return remoteListenAsync(1, 0, true, locLsnr, rmtFilter, types);
+    }
+
+    /** {@inheritDoc} */
     @Override public <T extends Event> UUID remoteListen(int bufSize, long 
interval,
         boolean autoUnsubscribe, @Nullable IgniteBiPredicate<UUID, T> locLsnr, 
@Nullable IgnitePredicate<T> rmtFilter,
         @Nullable int... types) {
@@ -128,6 +153,32 @@ public class IgniteEventsImpl extends 
AsyncSupportAdapter<IgniteEvents> implemen
     }
 
     /** {@inheritDoc} */
+    @Override public <T extends Event> IgniteFuture<UUID> 
remoteListenAsync(int bufSize, long interval,
+        boolean autoUnsubscribe, @Nullable IgniteBiPredicate<UUID, T> locLsnr, 
@Nullable IgnitePredicate<T> rmtFilter,
+        @Nullable int... types) throws IgniteException {
+        A.ensure(bufSize > 0, "bufSize > 0");
+        A.ensure(interval >= 0, "interval >= 0");
+
+        guard();
+
+        try {
+            GridEventConsumeHandler hnd = new 
GridEventConsumeHandler((IgniteBiPredicate<UUID, Event>)locLsnr,
+                (IgnitePredicate<Event>)rmtFilter, types);
+
+            return new IgniteFutureImpl<>(ctx.continuous().startRoutine(
+                hnd,
+                false,
+                bufSize,
+                interval,
+                autoUnsubscribe,
+                prj.predicate()));
+        }
+        finally {
+            unguard();
+        }
+    }
+
+    /** {@inheritDoc} */
     @Override public void stopRemoteListen(UUID opId) {
         A.notNull(opId, "consumeId");
 
@@ -145,6 +196,21 @@ public class IgniteEventsImpl extends 
AsyncSupportAdapter<IgniteEvents> implemen
     }
 
     /** {@inheritDoc} */
+    @SuppressWarnings("unchecked")
+    @Override public IgniteFuture<Void> stopRemoteListenAsync(UUID opId) 
throws IgniteException {
+        A.notNull(opId, "consumeId");
+
+        guard();
+
+        try {
+            return (IgniteFuture<Void>)new 
IgniteFutureImpl<>(ctx.continuous().stopRoutine(opId));
+        }
+        finally {
+            unguard();
+        }
+    }
+
+    /** {@inheritDoc} */
     @Override public <T extends Event> T waitForLocal(@Nullable 
IgnitePredicate<T> filter,
         @Nullable int... types) {
         guard();
@@ -161,6 +227,19 @@ public class IgniteEventsImpl extends 
AsyncSupportAdapter<IgniteEvents> implemen
     }
 
     /** {@inheritDoc} */
+    @Override public <T extends Event> IgniteFuture<T> 
waitForLocalAsync(@Nullable IgnitePredicate<T> filter,
+        @Nullable int... types) throws IgniteException {
+        guard();
+
+        try {
+            return new IgniteFutureImpl<>(ctx.event().waitForEvent(filter, 
types));
+        }
+        finally {
+            unguard();
+        }
+    }
+
+    /** {@inheritDoc} */
     @Override public <T extends Event> Collection<T> 
localQuery(IgnitePredicate<T> p, @Nullable int... types) {
         A.notNull(p, "p");
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/fde1f486/modules/core/src/main/java/org/apache/ignite/internal/IgniteMessagingImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/IgniteMessagingImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/IgniteMessagingImpl.java
index 2800777..e5c00bd 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/IgniteMessagingImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/IgniteMessagingImpl.java
@@ -25,15 +25,18 @@ import java.io.ObjectStreamException;
 import java.util.Collection;
 import java.util.UUID;
 import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteMessaging;
 import org.apache.ignite.cluster.ClusterGroup;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.cluster.ClusterGroupAdapter;
 import org.apache.ignite.internal.processors.continuous.GridContinuousHandler;
+import org.apache.ignite.internal.util.future.IgniteFutureImpl;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.A;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteBiPredicate;
+import org.apache.ignite.lang.IgniteFuture;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -200,6 +203,28 @@ public class IgniteMessagingImpl extends 
AsyncSupportAdapter<IgniteMessaging>
     }
 
     /** {@inheritDoc} */
+    @Override public IgniteFuture<UUID> remoteListenAsync(@Nullable Object 
topic,
+        IgniteBiPredicate<UUID, ?> p) throws IgniteException {
+        A.notNull(p, "p");
+
+        guard();
+
+        try {
+            GridContinuousHandler hnd = new GridMessageListenHandler(topic, 
(IgniteBiPredicate<UUID, Object>)p);
+
+            return new IgniteFutureImpl<>(ctx.continuous().startRoutine(hnd,
+                false,
+                1,
+                0,
+                false,
+                prj.predicate()));
+        }
+        finally {
+            unguard();
+        }
+    }
+
+    /** {@inheritDoc} */
     @Override public void stopRemoteListen(UUID opId) {
         A.notNull(opId, "opId");
 
@@ -216,6 +241,20 @@ public class IgniteMessagingImpl extends 
AsyncSupportAdapter<IgniteMessaging>
         }
     }
 
+    /** {@inheritDoc} */
+    @Override public IgniteFuture<Void> stopRemoteListenAsync(UUID opId) 
throws IgniteException {
+        A.notNull(opId, "opId");
+
+        guard();
+
+        try {
+            return (IgniteFuture<Void>)new 
IgniteFutureImpl<>(ctx.continuous().stopRoutine(opId));
+        }
+        finally {
+            unguard();
+        }
+    }
+
     /**
      * <tt>ctx.gateway().readLock()</tt>
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/fde1f486/modules/core/src/main/java/org/apache/ignite/internal/IgniteServicesImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/IgniteServicesImpl.java 
b/modules/core/src/main/java/org/apache/ignite/internal/IgniteServicesImpl.java
index df6e5df..607dccc 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/IgniteServicesImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/IgniteServicesImpl.java
@@ -28,8 +28,10 @@ import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteServices;
 import org.apache.ignite.cluster.ClusterGroup;
 import org.apache.ignite.internal.cluster.ClusterGroupAdapter;
+import org.apache.ignite.internal.util.future.IgniteFutureImpl;
 import org.apache.ignite.internal.util.typedef.internal.A;
 import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteFuture;
 import org.apache.ignite.services.Service;
 import org.apache.ignite.services.ServiceConfiguration;
 import org.apache.ignite.services.ServiceDescriptor;
@@ -38,6 +40,7 @@ import org.jetbrains.annotations.Nullable;
 /**
  * {@link org.apache.ignite.IgniteServices} implementation.
  */
+@SuppressWarnings("unchecked")
 public class IgniteServicesImpl extends AsyncSupportAdapter implements 
IgniteServices, Externalizable {
     /** */
     private static final long serialVersionUID = 0L;
@@ -91,6 +94,21 @@ public class IgniteServicesImpl extends AsyncSupportAdapter 
implements IgniteSer
     }
 
     /** {@inheritDoc} */
+    @Override public IgniteFuture<Void> deployNodeSingletonAsync(String name, 
Service svc) throws IgniteException {
+        A.notNull(name, "name");
+        A.notNull(svc, "svc");
+
+        guard();
+
+        try {
+            return (IgniteFuture<Void>)new 
IgniteFutureImpl<>(ctx.service().deployNodeSingleton(prj, name, svc));
+        }
+        finally {
+            unguard();
+        }
+    }
+
+    /** {@inheritDoc} */
     @Override public void deployClusterSingleton(String name, Service svc) {
         A.notNull(name, "name");
         A.notNull(svc, "svc");
@@ -109,6 +127,21 @@ public class IgniteServicesImpl extends 
AsyncSupportAdapter implements IgniteSer
     }
 
     /** {@inheritDoc} */
+    @Override public IgniteFuture<Void> deployClusterSingletonAsync(String 
name, Service svc) throws IgniteException {
+        A.notNull(name, "name");
+        A.notNull(svc, "svc");
+
+        guard();
+
+        try {
+            return (IgniteFuture<Void>)new 
IgniteFutureImpl<>(ctx.service().deployClusterSingleton(prj, name, svc));
+        }
+        finally {
+            unguard();
+        }
+    }
+
+    /** {@inheritDoc} */
     @Override public void deployMultiple(String name, Service svc, int 
totalCnt, int maxPerNodeCnt) {
         A.notNull(name, "name");
         A.notNull(svc, "svc");
@@ -127,6 +160,23 @@ public class IgniteServicesImpl extends 
AsyncSupportAdapter implements IgniteSer
     }
 
     /** {@inheritDoc} */
+    @Override public IgniteFuture<Void> deployMultipleAsync(String name, 
Service svc, int totalCnt,
+        int maxPerNodeCnt) throws IgniteException {
+        A.notNull(name, "name");
+        A.notNull(svc, "svc");
+
+        guard();
+
+        try {
+            return (IgniteFuture<Void>)new 
IgniteFutureImpl<>(ctx.service().deployMultiple(prj, name, svc,
+                totalCnt, maxPerNodeCnt));
+        }
+        finally {
+            unguard();
+        }
+    }
+
+    /** {@inheritDoc} */
     @Override public void deployKeyAffinitySingleton(String name, Service svc, 
@Nullable String cacheName,
         Object affKey) {
         A.notNull(name, "name");
@@ -147,6 +197,24 @@ public class IgniteServicesImpl extends 
AsyncSupportAdapter implements IgniteSer
     }
 
     /** {@inheritDoc} */
+    @Override public IgniteFuture<Void> deployKeyAffinitySingletonAsync(String 
name, Service svc,
+        @Nullable String cacheName, Object affKey) throws IgniteException {
+        A.notNull(name, "name");
+        A.notNull(svc, "svc");
+        A.notNull(affKey, "affKey");
+
+        guard();
+
+        try {
+            return (IgniteFuture<Void>)new 
IgniteFutureImpl<>(ctx.service().deployKeyAffinitySingleton(name, svc,
+                cacheName, affKey));
+        }
+        finally {
+            unguard();
+        }
+    }
+
+    /** {@inheritDoc} */
     @Override public void deploy(ServiceConfiguration cfg) {
         A.notNull(cfg, "cfg");
 
@@ -164,6 +232,20 @@ public class IgniteServicesImpl extends 
AsyncSupportAdapter implements IgniteSer
     }
 
     /** {@inheritDoc} */
+    @Override public IgniteFuture<Void> deployAsync(ServiceConfiguration cfg) 
throws IgniteException {
+        A.notNull(cfg, "cfg");
+
+        guard();
+
+        try {
+            return (IgniteFuture<Void>)new 
IgniteFutureImpl<>(ctx.service().deploy(cfg));
+        }
+        finally {
+            unguard();
+        }
+    }
+
+    /** {@inheritDoc} */
     @Override public void cancel(String name) {
         A.notNull(name, "name");
 
@@ -181,6 +263,20 @@ public class IgniteServicesImpl extends 
AsyncSupportAdapter implements IgniteSer
     }
 
     /** {@inheritDoc} */
+    @Override public IgniteFuture<Void> cancelAsync(String name) throws 
IgniteException {
+        A.notNull(name, "name");
+
+        guard();
+
+        try {
+            return (IgniteFuture<Void>)new 
IgniteFutureImpl<>(ctx.service().cancel(name));
+        }
+        finally {
+            unguard();
+        }
+    }
+
+    /** {@inheritDoc} */
     @Override public void cancelAll() {
         guard();
 
@@ -196,6 +292,18 @@ public class IgniteServicesImpl extends 
AsyncSupportAdapter implements IgniteSer
     }
 
     /** {@inheritDoc} */
+    @Override public IgniteFuture<Void> cancelAllAsync() throws 
IgniteException {
+        guard();
+
+        try {
+            return (IgniteFuture<Void>)new 
IgniteFutureImpl<>(ctx.service().cancelAll());
+        }
+        finally {
+            unguard();
+        }
+    }
+
+    /** {@inheritDoc} */
     @Override public Collection<ServiceDescriptor> serviceDescriptors() {
         guard();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/fde1f486/modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteClusterAsyncImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteClusterAsyncImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteClusterAsyncImpl.java
index fb9b190..d392813 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteClusterAsyncImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteClusterAsyncImpl.java
@@ -22,7 +22,6 @@ import java.io.File;
 import java.io.IOException;
 import java.io.ObjectInput;
 import java.io.ObjectOutput;
-import java.io.ObjectStreamException;
 import java.util.Collection;
 import java.util.Map;
 import java.util.UUID;
@@ -30,6 +29,7 @@ import java.util.concurrent.ConcurrentMap;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteCluster;
+import org.apache.ignite.IgniteException;
 import org.apache.ignite.cluster.ClusterGroup;
 import org.apache.ignite.cluster.ClusterMetrics;
 import org.apache.ignite.cluster.ClusterNode;
@@ -115,7 +115,7 @@ public class IgniteClusterAsyncImpl extends 
AsyncSupportAdapter<IgniteCluster>
         int maxConn)
     {
         try {
-            return saveOrGet(cluster.startNodesAsync(file, restart, timeout, 
maxConn));
+            return saveOrGet(cluster.startNodesAsync0(file, restart, timeout, 
maxConn));
         }
         catch (IgniteCheckedException e) {
             throw U.convertException(e);
@@ -123,6 +123,12 @@ public class IgniteClusterAsyncImpl extends 
AsyncSupportAdapter<IgniteCluster>
     }
 
     /** {@inheritDoc} */
+    @Override public IgniteFuture<Collection<ClusterStartNodeResult>> 
startNodesAsync(File file, boolean restart,
+        int timeout, int maxConn) throws IgniteException {
+        return cluster.startNodesAsync(file, restart, timeout, maxConn);
+    }
+
+    /** {@inheritDoc} */
     @Override public Collection<ClusterStartNodeResult> startNodes(
         Collection<Map<String, Object>> hosts,
         @Nullable Map<String, Object> dflts,
@@ -131,7 +137,7 @@ public class IgniteClusterAsyncImpl extends 
AsyncSupportAdapter<IgniteCluster>
         int maxConn)
     {
         try {
-            return saveOrGet(cluster.startNodesAsync(hosts, dflts, restart, 
timeout, maxConn));
+            return saveOrGet(cluster.startNodesAsync0(hosts, dflts, restart, 
timeout, maxConn));
         }
         catch (IgniteCheckedException e) {
             throw U.convertException(e);
@@ -139,6 +145,13 @@ public class IgniteClusterAsyncImpl extends 
AsyncSupportAdapter<IgniteCluster>
     }
 
     /** {@inheritDoc} */
+    @Override public IgniteFuture<Collection<ClusterStartNodeResult>> 
startNodesAsync(
+        Collection<Map<String, Object>> hosts, @Nullable Map<String, Object> 
dflts,
+        boolean restart, int timeout, int maxConn) throws IgniteException {
+        return cluster.startNodesAsync(hosts, dflts, restart, timeout, 
maxConn);
+    }
+
+    /** {@inheritDoc} */
     @Override public void stopNodes() {
         cluster.stopNodes();
     }
@@ -312,13 +325,4 @@ public class IgniteClusterAsyncImpl extends 
AsyncSupportAdapter<IgniteCluster>
     @Override public void writeExternal(ObjectOutput out) throws IOException {
         out.writeObject(cluster);
     }
-
-    /**
-     * @return Cluster async instance.
-     *
-     * @throws ObjectStreamException If failed.
-     */
-    protected Object readResolve() throws ObjectStreamException {
-        return cluster.withAsync();
-    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/fde1f486/modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteClusterImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteClusterImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteClusterImpl.java
index aa5e63f..857c1ef 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteClusterImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteClusterImpl.java
@@ -47,6 +47,7 @@ import org.apache.ignite.internal.IgniteComponentType;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.util.future.GridCompoundFuture;
 import org.apache.ignite.internal.util.future.GridFinishedFuture;
+import org.apache.ignite.internal.util.future.IgniteFutureImpl;
 import 
org.apache.ignite.internal.util.nodestart.IgniteRemoteStartSpecification;
 import org.apache.ignite.internal.util.nodestart.IgniteSshHelper;
 import org.apache.ignite.internal.util.nodestart.StartNodeCallable;
@@ -222,7 +223,7 @@ public class IgniteClusterImpl extends ClusterGroupAdapter 
implements IgniteClus
         throws IgniteException
     {
         try {
-            return startNodesAsync(file, restart, timeout, maxConn).get();
+            return startNodesAsync0(file, restart, timeout, maxConn).get();
         }
         catch (IgniteCheckedException e) {
             throw U.convertException(e);
@@ -230,6 +231,12 @@ public class IgniteClusterImpl extends ClusterGroupAdapter 
implements IgniteClus
     }
 
     /** {@inheritDoc} */
+    @Override public IgniteFuture<Collection<ClusterStartNodeResult>> 
startNodesAsync(File file, boolean restart,
+        int timeout, int maxConn) throws IgniteException {
+        return new IgniteFutureImpl<>(startNodesAsync0(file, restart, timeout, 
maxConn));
+    }
+
+    /** {@inheritDoc} */
     @Override public Collection<ClusterStartNodeResult> 
startNodes(Collection<Map<String, Object>> hosts,
         @Nullable Map<String, Object> dflts,
         boolean restart,
@@ -238,7 +245,7 @@ public class IgniteClusterImpl extends ClusterGroupAdapter 
implements IgniteClus
         throws IgniteException
     {
         try {
-            return startNodesAsync(hosts, dflts, restart, timeout, 
maxConn).get();
+            return startNodesAsync0(hosts, dflts, restart, timeout, 
maxConn).get();
         }
         catch (IgniteCheckedException e) {
             throw U.convertException(e);
@@ -246,6 +253,13 @@ public class IgniteClusterImpl extends ClusterGroupAdapter 
implements IgniteClus
     }
 
     /** {@inheritDoc} */
+    @Override public IgniteFuture<Collection<ClusterStartNodeResult>> 
startNodesAsync(
+        Collection<Map<String, Object>> hosts, @Nullable Map<String, Object> 
dflts,
+        boolean restart, int timeout, int maxConn) throws IgniteException {
+        return new IgniteFutureImpl<>(startNodesAsync0(hosts, dflts, restart, 
timeout, maxConn));
+    }
+
+    /** {@inheritDoc} */
     @Override public void stopNodes() throws IgniteException {
         guard();
 
@@ -330,7 +344,7 @@ public class IgniteClusterImpl extends ClusterGroupAdapter 
implements IgniteClus
      * @return Future with results.
      * @see IgniteCluster#startNodes(java.io.File, boolean, int, int)
      */
-    IgniteInternalFuture<Collection<ClusterStartNodeResult>> 
startNodesAsync(File file,
+    IgniteInternalFuture<Collection<ClusterStartNodeResult>> 
startNodesAsync0(File file,
       boolean restart,
       int timeout,
       int maxConn)
@@ -342,7 +356,7 @@ public class IgniteClusterImpl extends ClusterGroupAdapter 
implements IgniteClus
         try {
             IgniteBiTuple<Collection<Map<String, Object>>, Map<String, 
Object>> t = parseFile(file);
 
-            return startNodesAsync(t.get1(), t.get2(), restart, timeout, 
maxConn);
+            return startNodesAsync0(t.get1(), t.get2(), restart, timeout, 
maxConn);
         }
         catch (IgniteCheckedException e) {
             return new GridFinishedFuture<>(e);
@@ -358,7 +372,7 @@ public class IgniteClusterImpl extends ClusterGroupAdapter 
implements IgniteClus
      * @return Future with results.
      * @see IgniteCluster#startNodes(java.util.Collection, java.util.Map, 
boolean, int, int)
      */
-    IgniteInternalFuture<Collection<ClusterStartNodeResult>> startNodesAsync(
+    IgniteInternalFuture<Collection<ClusterStartNodeResult>> startNodesAsync0(
         Collection<Map<String, Object>> hosts,
         @Nullable Map<String, Object> dflts,
         boolean restart,

http://git-wip-us.apache.org/repos/asf/ignite/blob/fde1f486/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
index dd900fe..30f5c5d 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
@@ -35,7 +35,6 @@ import 
org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.transactions.IgniteTxTimeoutCheckedException;
 import org.apache.ignite.internal.util.lang.GridTuple;
-import org.apache.ignite.lang.IgniteAsyncSupported;
 import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.transactions.TransactionConcurrency;
 import org.apache.ignite.transactions.TransactionIsolation;
@@ -187,7 +186,6 @@ public interface IgniteInternalTx extends AutoCloseable {
      *
      * @throws IgniteCheckedException If commit failed.
      */
-    @IgniteAsyncSupported
     public void commit() throws IgniteCheckedException;
 
     /**
@@ -202,7 +200,6 @@ public interface IgniteInternalTx extends AutoCloseable {
      *
      * @throws IgniteCheckedException If rollback failed.
      */
-    @IgniteAsyncSupported
     public void rollback() throws IgniteCheckedException;
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/fde1f486/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TransactionProxyImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TransactionProxyImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TransactionProxyImpl.java
index 6134b9f..2a058ac 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TransactionProxyImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TransactionProxyImpl.java
@@ -24,6 +24,7 @@ import java.io.ObjectOutput;
 import java.util.UUID;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteClientDisconnectedException;
+import org.apache.ignite.IgniteException;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.util.future.IgniteFinishedFutureImpl;
@@ -45,6 +46,7 @@ import org.apache.ignite.transactions.TransactionState;
 /**
  * Cache transaction proxy.
  */
+@SuppressWarnings("unchecked")
 public class TransactionProxyImpl<K, V> implements TransactionProxy, 
Externalizable {
     /** */
     private static final long serialVersionUID = 0L;
@@ -269,6 +271,18 @@ public class TransactionProxyImpl<K, V> implements 
TransactionProxy, Externaliza
     }
 
     /** {@inheritDoc} */
+    @Override public IgniteFuture<Void> commitAsync() throws IgniteException {
+        enter();
+
+        try {
+            return (IgniteFuture<Void>)createFuture(cctx.commitTxAsync(tx));
+        }
+        finally {
+            leave();
+        }
+    }
+
+    /** {@inheritDoc} */
     @Override public void close() {
         enter();
 
@@ -303,6 +317,21 @@ public class TransactionProxyImpl<K, V> implements 
TransactionProxy, Externaliza
         }
     }
 
+    /** {@inheritDoc} */
+    @Override public IgniteFuture<Void> rollbackAsync() throws IgniteException 
{
+        enter();
+
+        try {
+            return (IgniteFuture<Void>)(new 
IgniteFutureImpl(cctx.rollbackTxAsync(tx)));
+        }
+        catch (IgniteCheckedException e) {
+            throw U.convertException(e);
+        }
+        finally {
+            leave();
+        }
+    }
+
     /**
      * @param res Result to convert to finished future.
      */
@@ -314,13 +343,21 @@ public class TransactionProxyImpl<K, V> implements 
TransactionProxy, Externaliza
      * @param fut Internal future.
      */
     private void saveFuture(IgniteInternalFuture<IgniteInternalTx> fut) {
+        asyncRes = createFuture(fut);
+    }
+
+    /**
+     * @param fut Internal future.
+     * @return User future.
+     */
+    private IgniteFuture<?> 
createFuture(IgniteInternalFuture<IgniteInternalTx> fut) {
         IgniteInternalFuture<Transaction> fut0 = fut.chain(new 
CX1<IgniteInternalFuture<IgniteInternalTx>, Transaction>() {
             @Override public Transaction 
applyx(IgniteInternalFuture<IgniteInternalTx> fut) throws 
IgniteCheckedException {
                 return fut.get().proxy();
             }
         });
 
-        asyncRes = new IgniteFutureImpl(fut0);
+        return new IgniteFutureImpl(fut0);
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/fde1f486/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/entityframework/PlatformDotNetEntityFrameworkCacheExtension.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/entityframework/PlatformDotNetEntityFrameworkCacheExtension.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/entityframework/PlatformDotNetEntityFrameworkCacheExtension.java
index eb675fb..cb27b19 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/entityframework/PlatformDotNetEntityFrameworkCacheExtension.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/entityframework/PlatformDotNetEntityFrameworkCacheExtension.java
@@ -20,7 +20,6 @@ package 
org.apache.ignite.internal.processors.platform.entityframework;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.IgniteCompute;
 import org.apache.ignite.cache.CachePeekMode;
 import org.apache.ignite.cluster.ClusterGroup;
 import org.apache.ignite.cluster.ClusterNode;

http://git-wip-us.apache.org/repos/asf/ignite/blob/fde1f486/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/events/PlatformEvents.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/events/PlatformEvents.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/events/PlatformEvents.java
index 9ddcc37..e16abe4 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/events/PlatformEvents.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/events/PlatformEvents.java
@@ -17,11 +17,11 @@
 
 package org.apache.ignite.internal.processors.platform.events;
 
+import java.util.List;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteEvents;
 import org.apache.ignite.events.Event;
 import org.apache.ignite.events.EventAdapter;
-import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.binary.BinaryRawReaderEx;
 import org.apache.ignite.internal.binary.BinaryRawWriterEx;
 import org.apache.ignite.internal.processors.platform.PlatformAbstractTarget;
@@ -29,8 +29,8 @@ import 
org.apache.ignite.internal.processors.platform.PlatformContext;
 import 
org.apache.ignite.internal.processors.platform.PlatformEventFilterListener;
 import org.apache.ignite.internal.processors.platform.PlatformTarget;
 import 
org.apache.ignite.internal.processors.platform.utils.PlatformFutureUtils;
-import org.apache.ignite.internal.util.future.IgniteFutureImpl;
 import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.lang.IgniteFuture;
 import org.apache.ignite.lang.IgnitePredicate;
 import org.jetbrains.annotations.Nullable;
 
@@ -91,9 +91,6 @@ public class PlatformEvents extends PlatformAbstractTarget {
     private final IgniteEvents events;
 
     /** */
-    private final IgniteEvents eventsAsync;
-
-    /** */
     private final EventResultWriter eventResWriter;
 
     /** */
@@ -111,7 +108,6 @@ public class PlatformEvents extends PlatformAbstractTarget {
         assert events != null;
 
         this.events = events;
-        eventsAsync = events.withAsync();
 
         eventResWriter = new EventResultWriter(platformCtx);
         eventColResWriter = new EventCollectionResultWriter(platformCtx);
@@ -148,16 +144,12 @@ public class PlatformEvents extends 
PlatformAbstractTarget {
                 return TRUE;
 
             case OP_REMOTE_QUERY_ASYNC:
-                startRemoteQuery(reader, eventsAsync);
-
-                readAndListenFuture(reader, currentFuture(), 
eventColResWriter);
+                readAndListenFuture(reader, startRemoteQueryAsync(reader, 
events), eventColResWriter);
 
                 return TRUE;
 
             case OP_WAIT_FOR_LOCAL_ASYNC: {
-                startWaitForLocal(reader, eventsAsync);
-
-                readAndListenFuture(reader, currentFuture(), eventResWriter);
+                readAndListenFuture(reader, startWaitForLocalAsync(reader, 
events), eventResWriter);
 
                 return TRUE;
             }
@@ -253,6 +245,23 @@ public class PlatformEvents extends PlatformAbstractTarget 
{
     }
 
     /**
+     * Starts the waitForLocal asynchronously.
+     *
+     * @param reader Reader
+     * @param events Events.
+     * @return Result.
+     */
+    private IgniteFuture<EventAdapter> 
startWaitForLocalAsync(BinaryRawReaderEx reader, IgniteEvents events) {
+        Long filterHnd = reader.readObject();
+
+        IgnitePredicate filter = filterHnd != null ? localFilter(filterHnd) : 
null;
+
+        int[] eventTypes = readEventTypes(reader);
+
+        return events.waitForLocalAsync(filter, eventTypes);
+    }
+
+    /**
      * Starts the remote query.
      *
      * @param reader Reader.
@@ -271,6 +280,25 @@ public class PlatformEvents extends PlatformAbstractTarget 
{
         return events.remoteQuery(filter, timeout);
     }
 
+    /**
+     * Starts the remote query asynchronously.
+     *
+     * @param reader Reader.
+     * @param events Events.
+     * @return Result.
+     */
+    private IgniteFuture<List<Event>> startRemoteQueryAsync(BinaryRawReaderEx 
reader, IgniteEvents events) {
+        Object pred = reader.readObjectDetached();
+
+        long timeout = reader.readLong();
+
+        int[] types = readEventTypes(reader);
+
+        PlatformEventFilterListener filter = 
platformCtx.createRemoteEventFilter(pred, types);
+
+        return events.remoteQueryAsync(filter, timeout);
+    }
+
     /** {@inheritDoc} */
     @Override public void processOutStream(int type, BinaryRawWriterEx writer) 
throws IgniteCheckedException {
         switch (type) {
@@ -311,11 +339,6 @@ public class PlatformEvents extends PlatformAbstractTarget 
{
     }
 
     /** {@inheritDoc} */
-    @Override public IgniteInternalFuture currentFuture() throws 
IgniteCheckedException {
-        return ((IgniteFutureImpl)eventsAsync.future()).internalFuture();
-    }
-
-    /** {@inheritDoc} */
     @Nullable @Override public PlatformFutureUtils.Writer futureWriter(int 
opId) {
         switch (opId) {
             case OP_WAIT_FOR_LOCAL:

http://git-wip-us.apache.org/repos/asf/ignite/blob/fde1f486/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/messaging/PlatformMessaging.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/messaging/PlatformMessaging.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/messaging/PlatformMessaging.java
index 6fe109e..8018986 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/messaging/PlatformMessaging.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/messaging/PlatformMessaging.java
@@ -19,7 +19,6 @@ package 
org.apache.ignite.internal.processors.platform.messaging;
 
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteMessaging;
-import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.binary.BinaryRawReaderEx;
 import org.apache.ignite.internal.binary.BinaryRawWriterEx;
 import org.apache.ignite.internal.processors.platform.PlatformAbstractTarget;
@@ -27,7 +26,7 @@ import 
org.apache.ignite.internal.processors.platform.PlatformContext;
 import org.apache.ignite.internal.processors.platform.PlatformTarget;
 import 
org.apache.ignite.internal.processors.platform.message.PlatformMessageFilter;
 import org.apache.ignite.internal.processors.platform.utils.PlatformUtils;
-import org.apache.ignite.internal.util.future.IgniteFutureImpl;
+import org.apache.ignite.lang.IgniteFuture;
 
 import java.util.UUID;
 
@@ -68,9 +67,6 @@ public class PlatformMessaging extends PlatformAbstractTarget 
{
     /** */
     private final IgniteMessaging messaging;
 
-    /** */
-    private final IgniteMessaging messagingAsync;
-
     /**
      * Ctor.
      *
@@ -83,7 +79,6 @@ public class PlatformMessaging extends PlatformAbstractTarget 
{
         assert messaging != null;
 
         this.messaging = messaging;
-        messagingAsync = messaging.withAsync();
     }
 
     /** {@inheritDoc} */
@@ -132,15 +127,15 @@ public class PlatformMessaging extends 
PlatformAbstractTarget {
             }
 
             case OP_REMOTE_LISTEN_ASYNC: {
-                startRemoteListen(reader, messagingAsync);
+                readAndListenFuture(reader, startRemoteListenAsync(reader, 
messaging));
 
-                return readAndListenFuture(reader);
+                return TRUE;
             }
 
             case OP_STOP_REMOTE_LISTEN_ASYNC: {
-                messagingAsync.stopRemoteListen(reader.readUuid());
+                readAndListenFuture(reader, 
messaging.stopRemoteListenAsync(reader.readUuid()));
 
-                return readAndListenFuture(reader);
+                return TRUE;
             }
 
             default:
@@ -167,6 +162,7 @@ public class PlatformMessaging extends 
PlatformAbstractTarget {
     /**
      * Starts the remote listener.
      * @param reader Reader.
+     * @param messaging Messaging.
      * @return Listen id.
      */
     private UUID startRemoteListen(BinaryRawReaderEx reader, IgniteMessaging 
messaging) {
@@ -181,9 +177,22 @@ public class PlatformMessaging extends 
PlatformAbstractTarget {
         return messaging.remoteListen(topic, filter);
     }
 
-    /** {@inheritDoc} */
-    @Override public IgniteInternalFuture currentFuture() throws 
IgniteCheckedException {
-        return ((IgniteFutureImpl)messagingAsync.future()).internalFuture();
+    /**
+     * Starts the remote listener.
+     * @param reader Reader.
+     * @param messaging Messaging.
+     * @return Future of the operation.
+     */
+    private IgniteFuture<UUID> startRemoteListenAsync(BinaryRawReaderEx 
reader, IgniteMessaging messaging) {
+        Object nativeFilter = reader.readObjectDetached();
+
+        long ptr = reader.readLong();  // interop pointer
+
+        Object topic = reader.readObjectDetached();
+
+        PlatformMessageFilter filter = 
platformCtx.createRemoteMessageFilter(nativeFilter, ptr);
+
+        return messaging.remoteListenAsync(topic, filter);
     }
 
     /** {@inheritDoc} */

Reply via email to