IGNITE-5582: Implemented Compute::Broadcast for C++

(cherry picked from commit fa974286e8f066a8d6aa57519edf5ec7761be095)


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/3c887378
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/3c887378
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/3c887378

Branch: refs/heads/master
Commit: 3c887378eb64d2d236073410070082e5699e8334
Parents: 99713fe
Author: Igor Sapego <isap...@gridgain.com>
Authored: Fri Jul 7 16:52:31 2017 +0300
Committer: Igor Sapego <isap...@gridgain.com>
Committed: Fri Jul 7 16:52:31 2017 +0300

----------------------------------------------------------------------
 .../cpp/core-test/src/compute_test.cpp          |  91 ++++++-
 modules/platforms/cpp/core/include/Makefile.am  |   2 +
 .../cpp/core/include/ignite/compute/compute.h   |  66 +++++
 .../include/ignite/impl/compute/compute_impl.h  | 161 +++++++----
 .../ignite/impl/compute/compute_job_result.h    |  54 +++-
 .../ignite/impl/compute/compute_task_holder.h   | 204 +-------------
 .../compute/multiple_job_compute_task_holder.h  | 265 +++++++++++++++++++
 .../compute/single_job_compute_task_holder.h    | 212 +++++++++++++++
 .../platforms/cpp/core/project/vs/core.vcxproj  |   2 +
 .../cpp/core/project/vs/core.vcxproj.filters    |   6 +
 10 files changed, 811 insertions(+), 252 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/3c887378/modules/platforms/cpp/core-test/src/compute_test.cpp
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core-test/src/compute_test.cpp 
b/modules/platforms/cpp/core-test/src/compute_test.cpp
index 8c57ef1..1fd7670 100644
--- a/modules/platforms/cpp/core-test/src/compute_test.cpp
+++ b/modules/platforms/cpp/core-test/src/compute_test.cpp
@@ -476,7 +476,7 @@ BOOST_AUTO_TEST_CASE(IgniteRunAsyncLocalError)
     BOOST_CHECK_EXCEPTION(res.GetValue(), IgniteError, IsTestError);
 }
 
-BOOST_AUTO_TEST_CASE(IgniteRunTestRemote)
+BOOST_AUTO_TEST_CASE(IgniteRunRemote)
 {
     Ignite node2 = MakeNode("ComputeNode2");
     Compute compute = node.GetCompute();
@@ -489,7 +489,7 @@ BOOST_AUTO_TEST_CASE(IgniteRunTestRemote)
     BOOST_CHECK_EQUAL(Func3::res, "42.24");
 }
 
-BOOST_AUTO_TEST_CASE(IgniteRunTestRemoteError)
+BOOST_AUTO_TEST_CASE(IgniteRunRemoteError)
 {
     Ignite node2 = MakeNode("ComputeNode2");
     Compute compute = node.GetCompute();
@@ -509,5 +509,92 @@ BOOST_AUTO_TEST_CASE(IgniteRunTestRemoteError)
     BOOST_CHECK_EXCEPTION(res.GetValue(), IgniteError, IsTestError);
 }
 
+BOOST_AUTO_TEST_CASE(IgniteBroadcastLocalSync)
+{
+    Compute compute = node.GetCompute();
+
+    BOOST_CHECKPOINT("Broadcasting");;
+    std::vector<std::string> res = compute.Broadcast<std::string>(Func2(8, 5));
+
+    BOOST_CHECK_EQUAL(res.size(), 1);
+    BOOST_CHECK_EQUAL(res[0], "8.5");
+}
+
+BOOST_AUTO_TEST_CASE(IgniteBroadcastLocalAsync)
+{
+    Compute compute = node.GetCompute();
+
+    BOOST_CHECKPOINT("Broadcasting");;
+    Future< std::vector<std::string> > res = 
compute.BroadcastAsync<std::string>(Func2(312, 245));
+
+    BOOST_CHECK(!res.IsReady());
+
+    BOOST_CHECKPOINT("Waiting with timeout");
+    res.WaitFor(100);
+
+    BOOST_CHECK(!res.IsReady());
+
+    std::vector<std::string> value = res.GetValue();
+
+    BOOST_CHECK_EQUAL(value.size(), 1);
+    BOOST_CHECK_EQUAL(value[0], "312.245");
+}
+
+BOOST_AUTO_TEST_CASE(IgniteBroadcastSyncLocalError)
+{
+    Compute compute = node.GetCompute();
+
+    BOOST_CHECKPOINT("Broadcasting");
+
+    BOOST_CHECK_EXCEPTION(compute.Broadcast(Func2(MakeTestError())), 
IgniteError, IsTestError);
+}
+
+BOOST_AUTO_TEST_CASE(IgniteBroadcastAsyncLocalError)
+{
+    Compute compute = node.GetCompute();
+
+    BOOST_CHECKPOINT("Broadcasting");
+    Future<void> res = compute.BroadcastAsync(Func2(MakeTestError()));
+
+    BOOST_CHECK(!res.IsReady());
+
+    BOOST_CHECKPOINT("Waiting with timeout");
+    res.WaitFor(100);
+
+    BOOST_CHECK(!res.IsReady());
+
+    BOOST_CHECK_EXCEPTION(res.GetValue(), IgniteError, IsTestError);
+}
+
+BOOST_AUTO_TEST_CASE(IgniteBroadcastRemote)
+{
+    Ignite node2 = MakeNode("ComputeNode2");
+    Compute compute = node.GetCompute();
+
+    BOOST_CHECKPOINT("Broadcasting");
+    std::vector<std::string> res = compute.Broadcast<std::string>(Func2(8, 5));
+
+    BOOST_CHECK_EQUAL(res.size(), 2);
+    BOOST_CHECK_EQUAL(res[0], "8.5");
+    BOOST_CHECK_EQUAL(res[1], "8.5");
+}
+
+BOOST_AUTO_TEST_CASE(IgniteBroadcastRemoteError)
+{
+    Ignite node2 = MakeNode("ComputeNode2");
+    Compute compute = node.GetCompute();
+
+    BOOST_CHECKPOINT("Broadcasting");
+    Future< std::vector<std::string> > res = 
compute.BroadcastAsync<std::string>(Func2(MakeTestError()));
+
+    BOOST_CHECK(!res.IsReady());
+
+    BOOST_CHECKPOINT("Waiting with timeout");
+    res.WaitFor(100);
+
+    BOOST_CHECK(!res.IsReady());
+
+    BOOST_CHECK_EXCEPTION(res.GetValue(), IgniteError, IsTestError);
+}
 
 BOOST_AUTO_TEST_SUITE_END()
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/3c887378/modules/platforms/cpp/core/include/Makefile.am
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core/include/Makefile.am 
b/modules/platforms/cpp/core/include/Makefile.am
index 50772cb..1e9369f 100644
--- a/modules/platforms/cpp/core/include/Makefile.am
+++ b/modules/platforms/cpp/core/include/Makefile.am
@@ -61,6 +61,8 @@ nobase_include_HEADERS = \
        ignite/impl/compute/compute_job_holder.h \
        ignite/impl/compute/compute_job_result.h \
        ignite/impl/compute/compute_task_holder.h \
+       ignite/impl/compute/single_job_compute_task_holder.h \
+       ignite/impl/compute/multiple_job_compute_task_holder.h \
        ignite/impl/handle_registry.h \
        ignite/impl/ignite_binding_impl.h \
        ignite/impl/ignite_environment.h \

http://git-wip-us.apache.org/repos/asf/ignite/blob/3c887378/modules/platforms/cpp/core/include/ignite/compute/compute.h
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core/include/ignite/compute/compute.h 
b/modules/platforms/cpp/core/include/ignite/compute/compute.h
index 75c8c85..9b4c9b9 100644
--- a/modules/platforms/cpp/core/include/ignite/compute/compute.h
+++ b/modules/platforms/cpp/core/include/ignite/compute/compute.h
@@ -157,6 +157,72 @@ namespace ignite
                 return impl.Get()->RunAsync<F>(action);
             }
 
+            /**
+             * Broadcasts provided ComputeFunc to all nodes in the cluster 
group.
+             *
+             * @tparam R Function return type. BinaryType should be specialized
+             *  for the type if it is not primitive.
+             * @tparam F Compute function type. Should implement ComputeFunc<R>
+             *  class.
+             * @param func Compute function to call.
+             * @return Vector containing computation results.
+             * @throw IgniteError in case of error.
+             */
+            template<typename R, typename F>
+            std::vector<R> Broadcast(const F& func)
+            {
+                return impl.Get()->BroadcastAsync<R, F>(func).GetValue();
+            }
+
+            /**
+             * Broadcasts provided ComputeFunc to all nodes in the cluster 
group.
+             *
+             * @tparam F Compute function type. Should implement ComputeFunc<R>
+             *  class.
+             * @param func Compute function to call.
+             * @throw IgniteError in case of error.
+             */
+            template<typename F>
+            void Broadcast(const F& func)
+            {
+                impl.Get()->BroadcastAsync<F, false>(func).GetValue();
+            }
+
+            /**
+             * Asyncronuously broadcasts provided ComputeFunc to all nodes in 
the
+             * cluster group.
+             *
+             * @tparam R Function return type. BinaryType should be specialized
+             *  for the type if it is not primitive.
+             * @tparam F Compute function type. Should implement ComputeFunc<R>
+             *  class.
+             * @param func Compute function to call.
+             * @return Future that can be used to access computation results 
once
+             *  they are ready.
+             * @throw IgniteError in case of error.
+             */
+            template<typename R, typename F>
+            Future< std::vector<R> > BroadcastAsync(const F& func)
+            {
+                return impl.Get()->BroadcastAsync<R, F>(func);
+            }
+
+            /**
+             * Asyncronuously broadcasts provided ComputeFunc to all nodes in 
the
+             * cluster group.
+             *
+             * @tparam F Compute function type. Should implement ComputeFunc<R>
+             *  class.
+             * @param func Compute function to call.
+             * @return Future that can be used to wait for action to complete.
+             * @throw IgniteError in case of error.
+             */
+            template<typename F>
+            Future<void> BroadcastAsync(const F& func)
+            {
+                return impl.Get()->BroadcastAsync<F, false>(func);
+            }
+
         private:
             /** Implementation. */
             common::concurrent::SharedPointer<impl::compute::ComputeImpl> impl;

http://git-wip-us.apache.org/repos/asf/ignite/blob/3c887378/modules/platforms/cpp/core/include/ignite/impl/compute/compute_impl.h
----------------------------------------------------------------------
diff --git 
a/modules/platforms/cpp/core/include/ignite/impl/compute/compute_impl.h 
b/modules/platforms/cpp/core/include/ignite/impl/compute/compute_impl.h
index 63f9a46..4ba1c1c 100644
--- a/modules/platforms/cpp/core/include/ignite/impl/compute/compute_impl.h
+++ b/modules/platforms/cpp/core/include/ignite/impl/compute/compute_impl.h
@@ -26,11 +26,10 @@
 #include <ignite/common/common.h>
 #include <ignite/common/promise.h>
 #include <ignite/impl/interop/interop_target.h>
-#include <ignite/impl/compute/compute_task_holder.h>
+#include <ignite/impl/compute/single_job_compute_task_holder.h>
+#include <ignite/impl/compute/multiple_job_compute_task_holder.h>
 #include <ignite/impl/compute/cancelable_impl.h>
 
-#include <ignite/ignite_error.h>
-
 namespace ignite
 {
     namespace impl
@@ -50,7 +49,9 @@ namespace ignite
                 {
                     enum Type
                     {
-                        Unicast = 5
+                        BROADCAST = 2,
+
+                        UNICAST = 5,
                     };
                 };
 
@@ -66,41 +67,113 @@ namespace ignite
                  * Asyncronuously calls provided ComputeFunc on a node within
                  * the underlying cluster group.
                  *
-                 * @tparam F Compute function type. Should implement 
ComputeFunc
-                 *  class.
-                 * @tparam R Call return type. BinaryType should be 
specialized for
-                 *  the type if it is not primitive. Should not be void. For
+                 * @tparam F Compute function type. Should implement
+                 *  ComputeFunc<R> class.
+                 * @tparam R Call return type. BinaryType should be specialized
+                 *  for the type if it is not primitive. Should not be void. 
For
                  *  non-returning methods see Compute::Run().
                  * @param func Compute function to call.
-                 * @return Future that can be used to acess computation result 
once
-                 *  it's ready.
-                 * @throw IgniteError in case of error.
+                 * @return Future that can be used to acess computation result
+                 *  once it's ready.
                  */
                 template<typename R, typename F>
                 Future<R> CallAsync(const F& func)
                 {
-                    common::concurrent::SharedPointer<interop::InteropMemory> 
mem = GetEnvironment().AllocateMemory();
-                    interop::InteropOutputStream out(mem.Get());
-                    binary::BinaryWriterImpl writer(&out, 
GetEnvironment().GetTypeManager());
+                    typedef ComputeJobHolderImpl<F, R> JobType;
+                    typedef SingleJobComputeTaskHolder<F, R> TaskType;
+
+                    return PerformTask<R, F, JobType, 
TaskType>(Operation::UNICAST, func);
+                }
+
+                /**
+                 * Asyncronuously runs provided ComputeFunc on a node within
+                 * the underlying cluster group.
+                 *
+                 * @tparam F Compute action type. Should implement
+                 *  ComputeFunc<R> class.
+                 * @param action Compute action to call.
+                 * @return Future that can be used to wait for action
+                 *  to complete.
+                 */
+                template<typename F>
+                Future<void> RunAsync(const F& action)
+                {
+                    typedef ComputeJobHolderImpl<F, void> JobType;
+                    typedef SingleJobComputeTaskHolder<F, void> TaskType;
+
+                    return PerformTask<void, F, JobType, 
TaskType>(Operation::UNICAST, action);
+                }
+
+                /**
+                 * Asyncronuously broadcasts provided ComputeFunc to all nodes
+                 * in the underlying cluster group.
+                 *
+                 * @tparam F Compute function type. Should implement
+                 *  ComputeFunc<R> class.
+                 * @tparam R Call return type. BinaryType should be specialized
+                 *  for the type if it is not primitive. Should not be void. 
For
+                 *  non-returning methods see Compute::Run().
+                 * @param func Compute function to call.
+                 * @return Future that can be used to acess computation result
+                 *  once it's ready.
+                 */
+                template<typename R, typename F>
+                Future< std::vector<R> > BroadcastAsync(const F& func)
+                {
+                    typedef ComputeJobHolderImpl<F, R> JobType;
+                    typedef MultipleJobComputeTaskHolder<F, R> TaskType;
+
+                    return PerformTask<std::vector<R>, F, JobType, 
TaskType>(Operation::BROADCAST, func);
+                }
+
+                /**
+                 * Asyncronuously broadcasts provided ComputeFunc to all nodes
+                 * in the underlying cluster group.
+                 *
+                 * @tparam F Compute function type. Should implement
+                 *  ComputeFunc<R> class.
+                 * @param func Compute function to call.
+                 * @return Future that can be used to acess computation result
+                 *  once it's ready.
+                 */
+                template<typename F, bool>
+                Future<void> BroadcastAsync(const F& func)
+                {
+                    typedef ComputeJobHolderImpl<F, void> JobType;
+                    typedef MultipleJobComputeTaskHolder<F, void> TaskType;
+
+                    return PerformTask<void, F, JobType, 
TaskType>(Operation::BROADCAST, func);
+                }
 
-                    common::concurrent::SharedPointer<ComputeJobHolder> 
job(new ComputeJobHolderImpl<F, R>(func));
+            private:
+                /**
+                 * Perform job.
+                 *
+                 * @tparam F Compute function type. Should implement
+                 *  ComputeFunc<R> class.
+                 * @tparam R Call return type. BinaryType should be specialized
+                 *  for the type if it is not primitive.
+                 * @tparam J Job type.
+                 * @tparam T Task type.
+                 *
+                 * @param operation Operation type.
+                 * @param func Function.
+                 * @return Future that can be used to acess computation result
+                 *  once it's ready.
+                 */
+                template<typename R, typename F, typename J, typename T>
+                Future<R> PerformTask(Operation::Type operation, const F& func)
+                {
+                    common::concurrent::SharedPointer<ComputeJobHolder> 
job(new J(func));
 
                     int64_t jobHandle = 
GetEnvironment().GetHandleRegistry().Allocate(job);
 
-                    ComputeTaskHolderImpl<F, R>* taskPtr = new 
ComputeTaskHolderImpl<F, R>(jobHandle);
+                    T* taskPtr = new T(jobHandle);
                     common::concurrent::SharedPointer<ComputeTaskHolder> 
task(taskPtr);
 
                     int64_t taskHandle = 
GetEnvironment().GetHandleRegistry().Allocate(task);
 
-                    writer.WriteInt64(taskHandle);
-                    writer.WriteInt32(1);
-                    writer.WriteInt64(jobHandle);
-                    writer.WriteObject<F>(func);
-
-                    out.Synchronize();
-
-                    jobject target = InStreamOutObject(Operation::Unicast, 
*mem.Get());
-                    std::auto_ptr<common::Cancelable> cancelable(new 
CancelableImpl(GetEnvironmentPointer(), target));
+                    std::auto_ptr<common::Cancelable> cancelable = 
PerformTask(operation, jobHandle, taskHandle, func);
 
                     common::Promise<R>& promise = taskPtr->GetPromise();
                     promise.SetCancelTarget(cancelable);
@@ -109,48 +182,38 @@ namespace ignite
                 }
 
                 /**
-                 * Asyncronuously runs provided ComputeFunc on a node within
-                 * the underlying cluster group.
+                 * Perform job.
                  *
-                 * @tparam F Compute action type. Should implement 
ComputeAction
-                 *  class.
-                 * @param action Compute action to call.
-                 * @return Future that can be used to wait for action to 
complete.
-                 * @throw IgniteError in case of error.
+                 * @tparam F Compute function type. Should implement
+                 *  ComputeFunc<R> class.
+                 *
+                 * @param operation Operation type.
+                 * @param jobHandle Job Handle.
+                 * @param taskHandle Task Handle.
+                 * @param func Function.
+                 * @return Cancelable auto pointer.
                  */
                 template<typename F>
-                Future<void> RunAsync(const F& action)
+                std::auto_ptr<common::Cancelable> PerformTask(Operation::Type 
operation, int64_t jobHandle,
+                    int64_t taskHandle, const F& func)
                 {
                     common::concurrent::SharedPointer<interop::InteropMemory> 
mem = GetEnvironment().AllocateMemory();
                     interop::InteropOutputStream out(mem.Get());
                     binary::BinaryWriterImpl writer(&out, 
GetEnvironment().GetTypeManager());
 
-                    common::concurrent::SharedPointer<ComputeJobHolder> 
job(new ComputeJobHolderImpl<F, void>(action));
-
-                    int64_t jobHandle = 
GetEnvironment().GetHandleRegistry().Allocate(job);
-
-                    ComputeTaskHolderImpl<F, void>* taskPtr = new 
ComputeTaskHolderImpl<F, void>(jobHandle);
-                    common::concurrent::SharedPointer<ComputeTaskHolder> 
task(taskPtr);
-
-                    int64_t taskHandle = 
GetEnvironment().GetHandleRegistry().Allocate(task);
-
                     writer.WriteInt64(taskHandle);
                     writer.WriteInt32(1);
                     writer.WriteInt64(jobHandle);
-                    writer.WriteObject<F>(action);
+                    writer.WriteObject<F>(func);
 
                     out.Synchronize();
 
-                    jobject target = InStreamOutObject(Operation::Unicast, 
*mem.Get());
+                    jobject target = InStreamOutObject(operation, *mem.Get());
                     std::auto_ptr<common::Cancelable> cancelable(new 
CancelableImpl(GetEnvironmentPointer(), target));
 
-                    common::Promise<void>& promise = taskPtr->GetPromise();
-                    promise.SetCancelTarget(cancelable);
-
-                    return promise.GetFuture();
+                    return cancelable;
                 }
 
-            private:
                 IGNITE_NO_COPY_ASSIGNMENT(ComputeImpl);
             };
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/3c887378/modules/platforms/cpp/core/include/ignite/impl/compute/compute_job_result.h
----------------------------------------------------------------------
diff --git 
a/modules/platforms/cpp/core/include/ignite/impl/compute/compute_job_result.h 
b/modules/platforms/cpp/core/include/ignite/impl/compute/compute_job_result.h
index 0874522..9d3dfea 100644
--- 
a/modules/platforms/cpp/core/include/ignite/impl/compute/compute_job_result.h
+++ 
b/modules/platforms/cpp/core/include/ignite/impl/compute/compute_job_result.h
@@ -36,6 +36,28 @@ namespace ignite
     {
         namespace compute
         {
+            struct ComputeJobResultPolicy
+            {
+                enum Type
+                {
+                    /**
+                    * Wait for results if any are still expected. If all 
results have been received -
+                    * it will start reducing results.
+                    */
+                    WAIT = 0,
+
+                    /**
+                    * Ignore all not yet received results and start reducing 
results.
+                    */
+                    REDUCE = 1,
+
+                    /**
+                    * Fail-over job to execute on another node.
+                    */
+                    FAILOVER = 2
+                };
+            };
+
             /**
              * Used to hold compute job result.
              */
@@ -65,16 +87,36 @@ namespace ignite
                 }
 
                 /**
+                 * Get result value.
+                 *
+                 * @return Result.
+                 */
+                const ResultType& GetResult() const
+                {
+                    return res;
+                }
+
+                /**
                  * Set error.
                  *
                  * @param error Error to set.
                  */
-                void SetError(const IgniteError error)
+                void SetError(const IgniteError& error)
                 {
                     err = error;
                 }
 
                 /**
+                 * Get error.
+                 *
+                 * @return Error.
+                 */
+                const IgniteError& GetError() const
+                {
+                    return err;
+                }
+
+                /**
                  * Set promise to a state which corresponds to result.
                  *
                  * @param promise Promise, which state to set.
@@ -192,6 +234,16 @@ namespace ignite
                 }
 
                 /**
+                 * Get error.
+                 *
+                 * @return Error.
+                 */
+                const IgniteError& GetError() const
+                {
+                    return err;
+                }
+
+                /**
                  * Set promise to a state which corresponds to result.
                  *
                  * @param promise Promise, which state to set.

http://git-wip-us.apache.org/repos/asf/ignite/blob/3c887378/modules/platforms/cpp/core/include/ignite/impl/compute/compute_task_holder.h
----------------------------------------------------------------------
diff --git 
a/modules/platforms/cpp/core/include/ignite/impl/compute/compute_task_holder.h 
b/modules/platforms/cpp/core/include/ignite/impl/compute/compute_task_holder.h
index f627f27..66276d1 100644
--- 
a/modules/platforms/cpp/core/include/ignite/impl/compute/compute_task_holder.h
+++ 
b/modules/platforms/cpp/core/include/ignite/impl/compute/compute_task_holder.h
@@ -17,17 +17,14 @@
 
 /**
  * @file
- * Declares ignite::impl::compute::ComputeTaskHolder class and
- * ignite::impl::compute::ComputeTaskHolderImpl class template.
+ * Declares ignite::impl::compute::ComputeTaskHolder.
  */
 
-#ifndef _IGNITE_IMPL_COMPUTE_COMPUTE_TASK_IMPL
-#define _IGNITE_IMPL_COMPUTE_COMPUTE_TASK_IMPL
+#ifndef _IGNITE_IMPL_COMPUTE_COMPUTE_TASK_HOLDER
+#define _IGNITE_IMPL_COMPUTE_COMPUTE_TASK_HOLDER
 
 #include <stdint.h>
 
-#include <ignite/common/promise.h>
-#include <ignite/impl/compute/compute_job_result.h>
 #include <ignite/impl/compute/compute_job_holder.h>
 
 namespace ignite
@@ -36,28 +33,6 @@ namespace ignite
     {
         namespace compute
         {
-            struct ComputeJobResultPolicy
-            {
-                enum Type
-                {
-                    /**
-                     * Wait for results if any are still expected. If all 
results have been received -
-                     * it will start reducing results.
-                     */
-                    WAIT = 0,
-
-                    /**
-                     * Ignore all not yet received results and start reducing 
results.
-                     */
-                    REDUCE = 1,
-
-                    /**
-                     * Fail-over job to execute on another node.
-                     */
-                    FAILOVER = 2
-                };
-            };
-
             /**
              * Compute task holder. Internal helper class.
              * Used to handle tasks in general way, without specific types.
@@ -120,179 +95,8 @@ namespace ignite
                 /** Related job handle. */
                 int64_t handle;
             };
-
-            /**
-             * Compute task holder type-specific implementation.
-             */
-            template<typename F, typename R>
-            class ComputeTaskHolderImpl : public ComputeTaskHolder
-            {
-            public:
-                typedef F JobType;
-                typedef R ResultType;
-
-                /**
-                 * Constructor.
-                 *
-                 * @param handle Job handle.
-                 */
-                ComputeTaskHolderImpl(int64_t handle) :
-                    ComputeTaskHolder(handle)
-                {
-                    // No-op.
-                }
-
-                /**
-                 * Destructor.
-                 */
-                virtual ~ComputeTaskHolderImpl()
-                {
-                    // No-op.
-                }
-
-                /**
-                 * Process local job result.
-                 *
-                 * @param job Job.
-                 * @return Policy.
-                 */
-                virtual int32_t JobResultLocal(ComputeJobHolder& job)
-                {
-                    typedef ComputeJobHolderImpl<JobType, ResultType> 
ActualComputeJobHolder;
-
-                    ActualComputeJobHolder& job0 = 
static_cast<ActualComputeJobHolder&>(job);
-
-                    res = job0.GetResult();
-
-                    return ComputeJobResultPolicy::WAIT;
-                }
-
-                /**
-                 * Process remote job result.
-                 *
-                 * @param job Job.
-                 * @param reader Reader for stream with result.
-                 * @return Policy.
-                 */
-                virtual int32_t JobResultRemote(ComputeJobHolder& job, 
binary::BinaryReaderImpl& reader)
-                {
-                    res.Read(reader);
-
-                    return ComputeJobResultPolicy::WAIT;
-                }
-
-                /**
-                 * Reduce results of related jobs.
-                 */
-                virtual void Reduce()
-                {
-                    res.SetPromise(promise);
-                }
-
-                /**
-                 * Get result promise.
-                 *
-                 * @return Reference to result promise.
-                 */
-                common::Promise<ResultType>& GetPromise()
-                {
-                    return promise;
-                }
-
-            private:
-                /** Result. */
-                ComputeJobResult<ResultType> res;
-
-                /** Task result promise. */
-                common::Promise<ResultType> promise;
-            };
-
-            /**
-             * Compute task holder type-specific implementation.
-             */
-            template<typename F>
-            class ComputeTaskHolderImpl<F, void> : public ComputeTaskHolder
-            {
-            public:
-                typedef F JobType;
-
-                /**
-                 * Constructor.
-                 *
-                 * @param handle Job handle.
-                 */
-                ComputeTaskHolderImpl(int64_t handle) :
-                    ComputeTaskHolder(handle)
-                {
-                    // No-op.
-                }
-
-                /**
-                 * Destructor.
-                 */
-                virtual ~ComputeTaskHolderImpl()
-                {
-                    // No-op.
-                }
-
-                /**
-                 * Process local job result.
-                 *
-                 * @param job Job.
-                 * @return Policy.
-                 */
-                virtual int32_t JobResultLocal(ComputeJobHolder& job)
-                {
-                    typedef ComputeJobHolderImpl<JobType, void> 
ActualComputeJobHolder;
-
-                    ActualComputeJobHolder& job0 = 
static_cast<ActualComputeJobHolder&>(job);
-
-                    res = job0.GetResult();
-
-                    return ComputeJobResultPolicy::WAIT;
-                }
-
-                /**
-                 * Process remote job result.
-                 *
-                 * @param job Job.
-                 * @param reader Reader for stream with result.
-                 * @return Policy.
-                 */
-                virtual int32_t JobResultRemote(ComputeJobHolder& job, 
binary::BinaryReaderImpl& reader)
-                {
-                    res.Read(reader);
-
-                    return ComputeJobResultPolicy::WAIT;
-                }
-
-                /**
-                 * Reduce results of related jobs.
-                 */
-                virtual void Reduce()
-                {
-                    res.SetPromise(promise);
-                }
-
-                /**
-                 * Get result promise.
-                 *
-                 * @return Reference to result promise.
-                 */
-                common::Promise<void>& GetPromise()
-                {
-                    return promise;
-                }
-
-            private:
-                /** Result. */
-                ComputeJobResult<void> res;
-
-                /** Task result promise. */
-                common::Promise<void> promise;
-            };
         }
     }
 }
 
-#endif //_IGNITE_IMPL_COMPUTE_COMPUTE_TASK_IMPL
+#endif //_IGNITE_IMPL_COMPUTE_COMPUTE_TASK_HOLDER

http://git-wip-us.apache.org/repos/asf/ignite/blob/3c887378/modules/platforms/cpp/core/include/ignite/impl/compute/multiple_job_compute_task_holder.h
----------------------------------------------------------------------
diff --git 
a/modules/platforms/cpp/core/include/ignite/impl/compute/multiple_job_compute_task_holder.h
 
b/modules/platforms/cpp/core/include/ignite/impl/compute/multiple_job_compute_task_holder.h
new file mode 100644
index 0000000..9fb13f1
--- /dev/null
+++ 
b/modules/platforms/cpp/core/include/ignite/impl/compute/multiple_job_compute_task_holder.h
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * @file
+ * Declares ignite::impl::compute::MultipleJobComputeTaskHolder class template.
+ */
+
+#ifndef _IGNITE_IMPL_COMPUTE_MULTIPLE_JOB_COMPUTE_TASK
+#define _IGNITE_IMPL_COMPUTE_MULTIPLE_JOB_COMPUTE_TASK
+
+#include <stdint.h>
+#include <vector>
+
+#include <ignite/common/promise.h>
+#include <ignite/impl/compute/compute_job_result.h>
+#include <ignite/impl/compute/compute_task_holder.h>
+
+namespace ignite
+{
+    namespace impl
+    {
+        namespace compute
+        {
+            /**
+             * Multiple Job Compute task holder type-specific implementation.
+             * Used for broadcast.
+             *
+             * @tparam F Function type.
+             * @tparam R Function result type.
+             */
+            template<typename F, typename R>
+            class MultipleJobComputeTaskHolder : public ComputeTaskHolder
+            {
+            public:
+                typedef F JobType;
+                typedef R ResultType;
+
+                /**
+                 * Constructor.
+                 *
+                 * @param handle Job handle.
+                 */
+                MultipleJobComputeTaskHolder(int64_t handle) :
+                    ComputeTaskHolder(handle),
+                    result(new std::vector<ResultType>()),
+                    error(),
+                    promise()
+                {
+                    // No-op.
+                }
+
+                /**
+                 * Destructor.
+                 */
+                virtual ~MultipleJobComputeTaskHolder()
+                {
+                    // No-op.
+                }
+
+                /**
+                 * Process local job result.
+                 *
+                 * @param job Job.
+                 * @return Policy.
+                 */
+                virtual int32_t JobResultLocal(ComputeJobHolder& job)
+                {
+                    typedef ComputeJobHolderImpl<JobType, ResultType> 
ActualComputeJobHolder;
+
+                    ActualComputeJobHolder& job0 = 
static_cast<ActualComputeJobHolder&>(job);
+
+                    ProcessResult(job0.GetResult());
+
+                    return ComputeJobResultPolicy::WAIT;
+                }
+
+                /**
+                 * Process remote job result.
+                 *
+                 * @param job Job.
+                 * @param reader Reader for stream with result.
+                 * @return Policy.
+                 */
+                virtual int32_t JobResultRemote(ComputeJobHolder& job, 
binary::BinaryReaderImpl& reader)
+                {
+                    ComputeJobResult<ResultType> res;
+
+                    res.Read(reader);
+
+                    ProcessResult(res);
+
+                    return ComputeJobResultPolicy::WAIT;
+                }
+
+                /**
+                 * Reduce results of related jobs.
+                 */
+                virtual void Reduce()
+                {
+                    if (error.GetCode() == IgniteError::IGNITE_SUCCESS)
+                        promise.SetValue(result);
+                    else
+                        promise.SetError(error);
+                }
+
+                /**
+                 * Get result promise.
+                 *
+                 * @return Reference to result promise.
+                 */
+                common::Promise< std::vector<ResultType> >& GetPromise()
+                {
+                    return promise;
+                }
+
+            private:
+                /**
+                 * Process result.
+                 *
+                 * @param res Result.
+                 */
+                void ProcessResult(const ComputeJobResult<ResultType>& res)
+                {
+                    const IgniteError& err = res.GetError();
+
+                    if (err.GetCode() == IgniteError::IGNITE_SUCCESS)
+                        result->push_back(res.GetResult());
+                    else
+                        error = err;
+                }
+
+                /** Result. */
+                std::auto_ptr< std::vector<ResultType> > result;
+
+                /** Error. */
+                IgniteError error;
+
+                /** Task result promise. */
+                common::Promise< std::vector<ResultType> > promise;
+            };
+
+            /**
+             * Compute task holder type-specific implementation.
+             */
+            template<typename F>
+            class MultipleJobComputeTaskHolder<F, void> : public 
ComputeTaskHolder
+            {
+            public:
+                typedef F JobType;
+
+                /**
+                 * Constructor.
+                 *
+                 * @param handle Job handle.
+                 */
+                MultipleJobComputeTaskHolder(int64_t handle) :
+                    ComputeTaskHolder(handle)
+                {
+                    // No-op.
+                }
+
+                /**
+                 * Destructor.
+                 */
+                virtual ~MultipleJobComputeTaskHolder()
+                {
+                    // No-op.
+                }
+
+                /**
+                 * Process local job result.
+                 *
+                 * @param job Job.
+                 * @return Policy.
+                 */
+                virtual int32_t JobResultLocal(ComputeJobHolder& job)
+                {
+                    typedef ComputeJobHolderImpl<JobType, void> 
ActualComputeJobHolder;
+
+                    ActualComputeJobHolder& job0 = 
static_cast<ActualComputeJobHolder&>(job);
+
+                    ProcessResult(job0.GetResult());
+
+                    return ComputeJobResultPolicy::WAIT;
+                }
+
+                /**
+                 * Process remote job result.
+                 *
+                 * @param job Job.
+                 * @param reader Reader for stream with result.
+                 * @return Policy.
+                 */
+                virtual int32_t JobResultRemote(ComputeJobHolder& job, 
binary::BinaryReaderImpl& reader)
+                {
+                    ComputeJobResult<void> res;
+
+                    res.Read(reader);
+
+                    ProcessResult(res);
+
+                    return ComputeJobResultPolicy::WAIT;
+                }
+
+                /**
+                 * Reduce results of related jobs.
+                 */
+                virtual void Reduce()
+                {
+                    if (error.GetCode() == IgniteError::IGNITE_SUCCESS)
+                        promise.SetValue();
+                    else
+                        promise.SetError(error);
+                }
+
+                /**
+                 * Get result promise.
+                 *
+                 * @return Reference to result promise.
+                 */
+                common::Promise<void>& GetPromise()
+                {
+                    return promise;
+                }
+
+            private:
+                /**
+                 * Process result.
+                 *
+                 * @param res Result.
+                 */
+                void ProcessResult(const ComputeJobResult<void>& res)
+                {
+                    const IgniteError& err = res.GetError();
+
+                    if (err.GetCode() != IgniteError::IGNITE_SUCCESS)
+                        error = err;
+                }
+
+                /** Error. */
+                IgniteError error;
+
+                /** Task result promise. */
+                common::Promise<void> promise;
+            };
+        }
+    }
+}
+
+#endif //_IGNITE_IMPL_COMPUTE_MULTIPLE_JOB_COMPUTE_TASK

http://git-wip-us.apache.org/repos/asf/ignite/blob/3c887378/modules/platforms/cpp/core/include/ignite/impl/compute/single_job_compute_task_holder.h
----------------------------------------------------------------------
diff --git 
a/modules/platforms/cpp/core/include/ignite/impl/compute/single_job_compute_task_holder.h
 
b/modules/platforms/cpp/core/include/ignite/impl/compute/single_job_compute_task_holder.h
new file mode 100644
index 0000000..9b0506a
--- /dev/null
+++ 
b/modules/platforms/cpp/core/include/ignite/impl/compute/single_job_compute_task_holder.h
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * @file
+ * Declares ignite::impl::compute::SingleJobComputeTaskHolder class template.
+ */
+
+#ifndef _IGNITE_IMPL_COMPUTE_SINGLE_JOB_COMPUTE_TASK
+#define _IGNITE_IMPL_COMPUTE_SINGLE_JOB_COMPUTE_TASK
+
+#include <stdint.h>
+
+#include <ignite/common/promise.h>
+#include <ignite/impl/compute/compute_job_result.h>
+#include <ignite/impl/compute/compute_task_holder.h>
+
+namespace ignite
+{
+    namespace impl
+    {
+        namespace compute
+        {
+            /**
+             * Compute task holder type-specific implementation.
+             */
+            template<typename F, typename R>
+            class SingleJobComputeTaskHolder : public ComputeTaskHolder
+            {
+            public:
+                typedef F JobType;
+                typedef R ResultType;
+
+                /**
+                 * Constructor.
+                 *
+                 * @param handle Job handle.
+                 */
+                SingleJobComputeTaskHolder(int64_t handle) :
+                    ComputeTaskHolder(handle)
+                {
+                    // No-op.
+                }
+
+                /**
+                 * Destructor.
+                 */
+                virtual ~SingleJobComputeTaskHolder()
+                {
+                    // No-op.
+                }
+
+                /**
+                 * Process local job result.
+                 *
+                 * @param job Job.
+                 * @return Policy.
+                 */
+                virtual int32_t JobResultLocal(ComputeJobHolder& job)
+                {
+                    typedef ComputeJobHolderImpl<JobType, ResultType> 
ActualComputeJobHolder;
+
+                    ActualComputeJobHolder& job0 = 
static_cast<ActualComputeJobHolder&>(job);
+
+                    res = job0.GetResult();
+
+                    return ComputeJobResultPolicy::WAIT;
+                }
+
+                /**
+                 * Process remote job result.
+                 *
+                 * @param job Job.
+                 * @param reader Reader for stream with result.
+                 * @return Policy.
+                 */
+                virtual int32_t JobResultRemote(ComputeJobHolder& job, 
binary::BinaryReaderImpl& reader)
+                {
+                    res.Read(reader);
+
+                    return ComputeJobResultPolicy::WAIT;
+                }
+
+                /**
+                 * Reduce results of related jobs.
+                 */
+                virtual void Reduce()
+                {
+                    res.SetPromise(promise);
+                }
+
+                /**
+                 * Get result promise.
+                 *
+                 * @return Reference to result promise.
+                 */
+                common::Promise<ResultType>& GetPromise()
+                {
+                    return promise;
+                }
+
+            private:
+                /** Result. */
+                ComputeJobResult<ResultType> res;
+
+                /** Task result promise. */
+                common::Promise<ResultType> promise;
+            };
+
+            /**
+             * Compute task holder type-specific implementation.
+             */
+            template<typename F>
+            class SingleJobComputeTaskHolder<F, void> : public 
ComputeTaskHolder
+            {
+            public:
+                typedef F JobType;
+
+                /**
+                 * Constructor.
+                 *
+                 * @param handle Job handle.
+                 */
+                SingleJobComputeTaskHolder(int64_t handle) :
+                    ComputeTaskHolder(handle)
+                {
+                    // No-op.
+                }
+
+                /**
+                 * Destructor.
+                 */
+                virtual ~SingleJobComputeTaskHolder()
+                {
+                    // No-op.
+                }
+
+                /**
+                 * Process local job result.
+                 *
+                 * @param job Job.
+                 * @return Policy.
+                 */
+                virtual int32_t JobResultLocal(ComputeJobHolder& job)
+                {
+                    typedef ComputeJobHolderImpl<JobType, void> 
ActualComputeJobHolder;
+
+                    ActualComputeJobHolder& job0 = 
static_cast<ActualComputeJobHolder&>(job);
+
+                    res = job0.GetResult();
+
+                    return ComputeJobResultPolicy::WAIT;
+                }
+
+                /**
+                 * Process remote job result.
+                 *
+                 * @param job Job.
+                 * @param reader Reader for stream with result.
+                 * @return Policy.
+                 */
+                virtual int32_t JobResultRemote(ComputeJobHolder& job, 
binary::BinaryReaderImpl& reader)
+                {
+                    res.Read(reader);
+
+                    return ComputeJobResultPolicy::WAIT;
+                }
+
+                /**
+                 * Reduce results of related jobs.
+                 */
+                virtual void Reduce()
+                {
+                    res.SetPromise(promise);
+                }
+
+                /**
+                 * Get result promise.
+                 *
+                 * @return Reference to result promise.
+                 */
+                common::Promise<void>& GetPromise()
+                {
+                    return promise;
+                }
+
+            private:
+                /** Result. */
+                ComputeJobResult<void> res;
+
+                /** Task result promise. */
+                common::Promise<void> promise;
+            };
+        }
+    }
+}
+
+#endif //_IGNITE_IMPL_COMPUTE_SINGLE_JOB_COMPUTE_TASK

http://git-wip-us.apache.org/repos/asf/ignite/blob/3c887378/modules/platforms/cpp/core/project/vs/core.vcxproj
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core/project/vs/core.vcxproj 
b/modules/platforms/cpp/core/project/vs/core.vcxproj
index 9911ffe..3c3489c 100644
--- a/modules/platforms/cpp/core/project/vs/core.vcxproj
+++ b/modules/platforms/cpp/core/project/vs/core.vcxproj
@@ -232,6 +232,8 @@
     <ClInclude 
Include="..\..\include\ignite\impl\compute\compute_job_holder.h" />
     <ClInclude 
Include="..\..\include\ignite\impl\compute\compute_job_result.h" />
     <ClInclude 
Include="..\..\include\ignite\impl\compute\compute_task_holder.h" />
+    <ClInclude 
Include="..\..\include\ignite\impl\compute\multiple_job_compute_task_holder.h" 
/>
+    <ClInclude 
Include="..\..\include\ignite\impl\compute\single_job_compute_task_holder.h" />
     <ClInclude Include="..\..\include\ignite\impl\helpers.h" />
     <ClInclude Include="..\..\include\ignite\impl\ignite_environment.h" />
     <ClInclude Include="..\..\include\ignite\impl\ignite_impl.h" />

http://git-wip-us.apache.org/repos/asf/ignite/blob/3c887378/modules/platforms/cpp/core/project/vs/core.vcxproj.filters
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core/project/vs/core.vcxproj.filters 
b/modules/platforms/cpp/core/project/vs/core.vcxproj.filters
index 7b84494..27f3944 100644
--- a/modules/platforms/cpp/core/project/vs/core.vcxproj.filters
+++ b/modules/platforms/cpp/core/project/vs/core.vcxproj.filters
@@ -237,6 +237,12 @@
     <ClInclude 
Include="..\..\include\ignite\impl\compute\compute_task_holder.h">
       <Filter>Code\impl\compute</Filter>
     </ClInclude>
+    <ClInclude 
Include="..\..\include\ignite\impl\compute\single_job_compute_task_holder.h">
+      <Filter>Code\impl\compute</Filter>
+    </ClInclude>
+    <ClInclude 
Include="..\..\include\ignite\impl\compute\multiple_job_compute_task_holder.h">
+      <Filter>Code\impl\compute</Filter>
+    </ClInclude>
   </ItemGroup>
   <ItemGroup>
     <Filter Include="Code">

Reply via email to