IGNITE-5582: Implemented Compute::Broadcast for C++ (cherry picked from commit fa974286e8f066a8d6aa57519edf5ec7761be095)
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/3c887378 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/3c887378 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/3c887378 Branch: refs/heads/master Commit: 3c887378eb64d2d236073410070082e5699e8334 Parents: 99713fe Author: Igor Sapego <isap...@gridgain.com> Authored: Fri Jul 7 16:52:31 2017 +0300 Committer: Igor Sapego <isap...@gridgain.com> Committed: Fri Jul 7 16:52:31 2017 +0300 ---------------------------------------------------------------------- .../cpp/core-test/src/compute_test.cpp | 91 ++++++- modules/platforms/cpp/core/include/Makefile.am | 2 + .../cpp/core/include/ignite/compute/compute.h | 66 +++++ .../include/ignite/impl/compute/compute_impl.h | 161 +++++++---- .../ignite/impl/compute/compute_job_result.h | 54 +++- .../ignite/impl/compute/compute_task_holder.h | 204 +------------- .../compute/multiple_job_compute_task_holder.h | 265 +++++++++++++++++++ .../compute/single_job_compute_task_holder.h | 212 +++++++++++++++ .../platforms/cpp/core/project/vs/core.vcxproj | 2 + .../cpp/core/project/vs/core.vcxproj.filters | 6 + 10 files changed, 811 insertions(+), 252 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/3c887378/modules/platforms/cpp/core-test/src/compute_test.cpp ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/core-test/src/compute_test.cpp b/modules/platforms/cpp/core-test/src/compute_test.cpp index 8c57ef1..1fd7670 100644 --- a/modules/platforms/cpp/core-test/src/compute_test.cpp +++ b/modules/platforms/cpp/core-test/src/compute_test.cpp @@ -476,7 +476,7 @@ BOOST_AUTO_TEST_CASE(IgniteRunAsyncLocalError) BOOST_CHECK_EXCEPTION(res.GetValue(), IgniteError, IsTestError); } -BOOST_AUTO_TEST_CASE(IgniteRunTestRemote) +BOOST_AUTO_TEST_CASE(IgniteRunRemote) { Ignite node2 = MakeNode("ComputeNode2"); Compute compute = node.GetCompute(); @@ -489,7 +489,7 @@ BOOST_AUTO_TEST_CASE(IgniteRunTestRemote) BOOST_CHECK_EQUAL(Func3::res, "42.24"); } -BOOST_AUTO_TEST_CASE(IgniteRunTestRemoteError) +BOOST_AUTO_TEST_CASE(IgniteRunRemoteError) { Ignite node2 = MakeNode("ComputeNode2"); Compute compute = node.GetCompute(); @@ -509,5 +509,92 @@ BOOST_AUTO_TEST_CASE(IgniteRunTestRemoteError) BOOST_CHECK_EXCEPTION(res.GetValue(), IgniteError, IsTestError); } +BOOST_AUTO_TEST_CASE(IgniteBroadcastLocalSync) +{ + Compute compute = node.GetCompute(); + + BOOST_CHECKPOINT("Broadcasting");; + std::vector<std::string> res = compute.Broadcast<std::string>(Func2(8, 5)); + + BOOST_CHECK_EQUAL(res.size(), 1); + BOOST_CHECK_EQUAL(res[0], "8.5"); +} + +BOOST_AUTO_TEST_CASE(IgniteBroadcastLocalAsync) +{ + Compute compute = node.GetCompute(); + + BOOST_CHECKPOINT("Broadcasting");; + Future< std::vector<std::string> > res = compute.BroadcastAsync<std::string>(Func2(312, 245)); + + BOOST_CHECK(!res.IsReady()); + + BOOST_CHECKPOINT("Waiting with timeout"); + res.WaitFor(100); + + BOOST_CHECK(!res.IsReady()); + + std::vector<std::string> value = res.GetValue(); + + BOOST_CHECK_EQUAL(value.size(), 1); + BOOST_CHECK_EQUAL(value[0], "312.245"); +} + +BOOST_AUTO_TEST_CASE(IgniteBroadcastSyncLocalError) +{ + Compute compute = node.GetCompute(); + + BOOST_CHECKPOINT("Broadcasting"); + + BOOST_CHECK_EXCEPTION(compute.Broadcast(Func2(MakeTestError())), IgniteError, IsTestError); +} + +BOOST_AUTO_TEST_CASE(IgniteBroadcastAsyncLocalError) +{ + Compute compute = node.GetCompute(); + + BOOST_CHECKPOINT("Broadcasting"); + Future<void> res = compute.BroadcastAsync(Func2(MakeTestError())); + + BOOST_CHECK(!res.IsReady()); + + BOOST_CHECKPOINT("Waiting with timeout"); + res.WaitFor(100); + + BOOST_CHECK(!res.IsReady()); + + BOOST_CHECK_EXCEPTION(res.GetValue(), IgniteError, IsTestError); +} + +BOOST_AUTO_TEST_CASE(IgniteBroadcastRemote) +{ + Ignite node2 = MakeNode("ComputeNode2"); + Compute compute = node.GetCompute(); + + BOOST_CHECKPOINT("Broadcasting"); + std::vector<std::string> res = compute.Broadcast<std::string>(Func2(8, 5)); + + BOOST_CHECK_EQUAL(res.size(), 2); + BOOST_CHECK_EQUAL(res[0], "8.5"); + BOOST_CHECK_EQUAL(res[1], "8.5"); +} + +BOOST_AUTO_TEST_CASE(IgniteBroadcastRemoteError) +{ + Ignite node2 = MakeNode("ComputeNode2"); + Compute compute = node.GetCompute(); + + BOOST_CHECKPOINT("Broadcasting"); + Future< std::vector<std::string> > res = compute.BroadcastAsync<std::string>(Func2(MakeTestError())); + + BOOST_CHECK(!res.IsReady()); + + BOOST_CHECKPOINT("Waiting with timeout"); + res.WaitFor(100); + + BOOST_CHECK(!res.IsReady()); + + BOOST_CHECK_EXCEPTION(res.GetValue(), IgniteError, IsTestError); +} BOOST_AUTO_TEST_SUITE_END() \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/3c887378/modules/platforms/cpp/core/include/Makefile.am ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/core/include/Makefile.am b/modules/platforms/cpp/core/include/Makefile.am index 50772cb..1e9369f 100644 --- a/modules/platforms/cpp/core/include/Makefile.am +++ b/modules/platforms/cpp/core/include/Makefile.am @@ -61,6 +61,8 @@ nobase_include_HEADERS = \ ignite/impl/compute/compute_job_holder.h \ ignite/impl/compute/compute_job_result.h \ ignite/impl/compute/compute_task_holder.h \ + ignite/impl/compute/single_job_compute_task_holder.h \ + ignite/impl/compute/multiple_job_compute_task_holder.h \ ignite/impl/handle_registry.h \ ignite/impl/ignite_binding_impl.h \ ignite/impl/ignite_environment.h \ http://git-wip-us.apache.org/repos/asf/ignite/blob/3c887378/modules/platforms/cpp/core/include/ignite/compute/compute.h ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/core/include/ignite/compute/compute.h b/modules/platforms/cpp/core/include/ignite/compute/compute.h index 75c8c85..9b4c9b9 100644 --- a/modules/platforms/cpp/core/include/ignite/compute/compute.h +++ b/modules/platforms/cpp/core/include/ignite/compute/compute.h @@ -157,6 +157,72 @@ namespace ignite return impl.Get()->RunAsync<F>(action); } + /** + * Broadcasts provided ComputeFunc to all nodes in the cluster group. + * + * @tparam R Function return type. BinaryType should be specialized + * for the type if it is not primitive. + * @tparam F Compute function type. Should implement ComputeFunc<R> + * class. + * @param func Compute function to call. + * @return Vector containing computation results. + * @throw IgniteError in case of error. + */ + template<typename R, typename F> + std::vector<R> Broadcast(const F& func) + { + return impl.Get()->BroadcastAsync<R, F>(func).GetValue(); + } + + /** + * Broadcasts provided ComputeFunc to all nodes in the cluster group. + * + * @tparam F Compute function type. Should implement ComputeFunc<R> + * class. + * @param func Compute function to call. + * @throw IgniteError in case of error. + */ + template<typename F> + void Broadcast(const F& func) + { + impl.Get()->BroadcastAsync<F, false>(func).GetValue(); + } + + /** + * Asyncronuously broadcasts provided ComputeFunc to all nodes in the + * cluster group. + * + * @tparam R Function return type. BinaryType should be specialized + * for the type if it is not primitive. + * @tparam F Compute function type. Should implement ComputeFunc<R> + * class. + * @param func Compute function to call. + * @return Future that can be used to access computation results once + * they are ready. + * @throw IgniteError in case of error. + */ + template<typename R, typename F> + Future< std::vector<R> > BroadcastAsync(const F& func) + { + return impl.Get()->BroadcastAsync<R, F>(func); + } + + /** + * Asyncronuously broadcasts provided ComputeFunc to all nodes in the + * cluster group. + * + * @tparam F Compute function type. Should implement ComputeFunc<R> + * class. + * @param func Compute function to call. + * @return Future that can be used to wait for action to complete. + * @throw IgniteError in case of error. + */ + template<typename F> + Future<void> BroadcastAsync(const F& func) + { + return impl.Get()->BroadcastAsync<F, false>(func); + } + private: /** Implementation. */ common::concurrent::SharedPointer<impl::compute::ComputeImpl> impl; http://git-wip-us.apache.org/repos/asf/ignite/blob/3c887378/modules/platforms/cpp/core/include/ignite/impl/compute/compute_impl.h ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/core/include/ignite/impl/compute/compute_impl.h b/modules/platforms/cpp/core/include/ignite/impl/compute/compute_impl.h index 63f9a46..4ba1c1c 100644 --- a/modules/platforms/cpp/core/include/ignite/impl/compute/compute_impl.h +++ b/modules/platforms/cpp/core/include/ignite/impl/compute/compute_impl.h @@ -26,11 +26,10 @@ #include <ignite/common/common.h> #include <ignite/common/promise.h> #include <ignite/impl/interop/interop_target.h> -#include <ignite/impl/compute/compute_task_holder.h> +#include <ignite/impl/compute/single_job_compute_task_holder.h> +#include <ignite/impl/compute/multiple_job_compute_task_holder.h> #include <ignite/impl/compute/cancelable_impl.h> -#include <ignite/ignite_error.h> - namespace ignite { namespace impl @@ -50,7 +49,9 @@ namespace ignite { enum Type { - Unicast = 5 + BROADCAST = 2, + + UNICAST = 5, }; }; @@ -66,41 +67,113 @@ namespace ignite * Asyncronuously calls provided ComputeFunc on a node within * the underlying cluster group. * - * @tparam F Compute function type. Should implement ComputeFunc - * class. - * @tparam R Call return type. BinaryType should be specialized for - * the type if it is not primitive. Should not be void. For + * @tparam F Compute function type. Should implement + * ComputeFunc<R> class. + * @tparam R Call return type. BinaryType should be specialized + * for the type if it is not primitive. Should not be void. For * non-returning methods see Compute::Run(). * @param func Compute function to call. - * @return Future that can be used to acess computation result once - * it's ready. - * @throw IgniteError in case of error. + * @return Future that can be used to acess computation result + * once it's ready. */ template<typename R, typename F> Future<R> CallAsync(const F& func) { - common::concurrent::SharedPointer<interop::InteropMemory> mem = GetEnvironment().AllocateMemory(); - interop::InteropOutputStream out(mem.Get()); - binary::BinaryWriterImpl writer(&out, GetEnvironment().GetTypeManager()); + typedef ComputeJobHolderImpl<F, R> JobType; + typedef SingleJobComputeTaskHolder<F, R> TaskType; + + return PerformTask<R, F, JobType, TaskType>(Operation::UNICAST, func); + } + + /** + * Asyncronuously runs provided ComputeFunc on a node within + * the underlying cluster group. + * + * @tparam F Compute action type. Should implement + * ComputeFunc<R> class. + * @param action Compute action to call. + * @return Future that can be used to wait for action + * to complete. + */ + template<typename F> + Future<void> RunAsync(const F& action) + { + typedef ComputeJobHolderImpl<F, void> JobType; + typedef SingleJobComputeTaskHolder<F, void> TaskType; + + return PerformTask<void, F, JobType, TaskType>(Operation::UNICAST, action); + } + + /** + * Asyncronuously broadcasts provided ComputeFunc to all nodes + * in the underlying cluster group. + * + * @tparam F Compute function type. Should implement + * ComputeFunc<R> class. + * @tparam R Call return type. BinaryType should be specialized + * for the type if it is not primitive. Should not be void. For + * non-returning methods see Compute::Run(). + * @param func Compute function to call. + * @return Future that can be used to acess computation result + * once it's ready. + */ + template<typename R, typename F> + Future< std::vector<R> > BroadcastAsync(const F& func) + { + typedef ComputeJobHolderImpl<F, R> JobType; + typedef MultipleJobComputeTaskHolder<F, R> TaskType; + + return PerformTask<std::vector<R>, F, JobType, TaskType>(Operation::BROADCAST, func); + } + + /** + * Asyncronuously broadcasts provided ComputeFunc to all nodes + * in the underlying cluster group. + * + * @tparam F Compute function type. Should implement + * ComputeFunc<R> class. + * @param func Compute function to call. + * @return Future that can be used to acess computation result + * once it's ready. + */ + template<typename F, bool> + Future<void> BroadcastAsync(const F& func) + { + typedef ComputeJobHolderImpl<F, void> JobType; + typedef MultipleJobComputeTaskHolder<F, void> TaskType; + + return PerformTask<void, F, JobType, TaskType>(Operation::BROADCAST, func); + } - common::concurrent::SharedPointer<ComputeJobHolder> job(new ComputeJobHolderImpl<F, R>(func)); + private: + /** + * Perform job. + * + * @tparam F Compute function type. Should implement + * ComputeFunc<R> class. + * @tparam R Call return type. BinaryType should be specialized + * for the type if it is not primitive. + * @tparam J Job type. + * @tparam T Task type. + * + * @param operation Operation type. + * @param func Function. + * @return Future that can be used to acess computation result + * once it's ready. + */ + template<typename R, typename F, typename J, typename T> + Future<R> PerformTask(Operation::Type operation, const F& func) + { + common::concurrent::SharedPointer<ComputeJobHolder> job(new J(func)); int64_t jobHandle = GetEnvironment().GetHandleRegistry().Allocate(job); - ComputeTaskHolderImpl<F, R>* taskPtr = new ComputeTaskHolderImpl<F, R>(jobHandle); + T* taskPtr = new T(jobHandle); common::concurrent::SharedPointer<ComputeTaskHolder> task(taskPtr); int64_t taskHandle = GetEnvironment().GetHandleRegistry().Allocate(task); - writer.WriteInt64(taskHandle); - writer.WriteInt32(1); - writer.WriteInt64(jobHandle); - writer.WriteObject<F>(func); - - out.Synchronize(); - - jobject target = InStreamOutObject(Operation::Unicast, *mem.Get()); - std::auto_ptr<common::Cancelable> cancelable(new CancelableImpl(GetEnvironmentPointer(), target)); + std::auto_ptr<common::Cancelable> cancelable = PerformTask(operation, jobHandle, taskHandle, func); common::Promise<R>& promise = taskPtr->GetPromise(); promise.SetCancelTarget(cancelable); @@ -109,48 +182,38 @@ namespace ignite } /** - * Asyncronuously runs provided ComputeFunc on a node within - * the underlying cluster group. + * Perform job. * - * @tparam F Compute action type. Should implement ComputeAction - * class. - * @param action Compute action to call. - * @return Future that can be used to wait for action to complete. - * @throw IgniteError in case of error. + * @tparam F Compute function type. Should implement + * ComputeFunc<R> class. + * + * @param operation Operation type. + * @param jobHandle Job Handle. + * @param taskHandle Task Handle. + * @param func Function. + * @return Cancelable auto pointer. */ template<typename F> - Future<void> RunAsync(const F& action) + std::auto_ptr<common::Cancelable> PerformTask(Operation::Type operation, int64_t jobHandle, + int64_t taskHandle, const F& func) { common::concurrent::SharedPointer<interop::InteropMemory> mem = GetEnvironment().AllocateMemory(); interop::InteropOutputStream out(mem.Get()); binary::BinaryWriterImpl writer(&out, GetEnvironment().GetTypeManager()); - common::concurrent::SharedPointer<ComputeJobHolder> job(new ComputeJobHolderImpl<F, void>(action)); - - int64_t jobHandle = GetEnvironment().GetHandleRegistry().Allocate(job); - - ComputeTaskHolderImpl<F, void>* taskPtr = new ComputeTaskHolderImpl<F, void>(jobHandle); - common::concurrent::SharedPointer<ComputeTaskHolder> task(taskPtr); - - int64_t taskHandle = GetEnvironment().GetHandleRegistry().Allocate(task); - writer.WriteInt64(taskHandle); writer.WriteInt32(1); writer.WriteInt64(jobHandle); - writer.WriteObject<F>(action); + writer.WriteObject<F>(func); out.Synchronize(); - jobject target = InStreamOutObject(Operation::Unicast, *mem.Get()); + jobject target = InStreamOutObject(operation, *mem.Get()); std::auto_ptr<common::Cancelable> cancelable(new CancelableImpl(GetEnvironmentPointer(), target)); - common::Promise<void>& promise = taskPtr->GetPromise(); - promise.SetCancelTarget(cancelable); - - return promise.GetFuture(); + return cancelable; } - private: IGNITE_NO_COPY_ASSIGNMENT(ComputeImpl); }; } http://git-wip-us.apache.org/repos/asf/ignite/blob/3c887378/modules/platforms/cpp/core/include/ignite/impl/compute/compute_job_result.h ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/core/include/ignite/impl/compute/compute_job_result.h b/modules/platforms/cpp/core/include/ignite/impl/compute/compute_job_result.h index 0874522..9d3dfea 100644 --- a/modules/platforms/cpp/core/include/ignite/impl/compute/compute_job_result.h +++ b/modules/platforms/cpp/core/include/ignite/impl/compute/compute_job_result.h @@ -36,6 +36,28 @@ namespace ignite { namespace compute { + struct ComputeJobResultPolicy + { + enum Type + { + /** + * Wait for results if any are still expected. If all results have been received - + * it will start reducing results. + */ + WAIT = 0, + + /** + * Ignore all not yet received results and start reducing results. + */ + REDUCE = 1, + + /** + * Fail-over job to execute on another node. + */ + FAILOVER = 2 + }; + }; + /** * Used to hold compute job result. */ @@ -65,16 +87,36 @@ namespace ignite } /** + * Get result value. + * + * @return Result. + */ + const ResultType& GetResult() const + { + return res; + } + + /** * Set error. * * @param error Error to set. */ - void SetError(const IgniteError error) + void SetError(const IgniteError& error) { err = error; } /** + * Get error. + * + * @return Error. + */ + const IgniteError& GetError() const + { + return err; + } + + /** * Set promise to a state which corresponds to result. * * @param promise Promise, which state to set. @@ -192,6 +234,16 @@ namespace ignite } /** + * Get error. + * + * @return Error. + */ + const IgniteError& GetError() const + { + return err; + } + + /** * Set promise to a state which corresponds to result. * * @param promise Promise, which state to set. http://git-wip-us.apache.org/repos/asf/ignite/blob/3c887378/modules/platforms/cpp/core/include/ignite/impl/compute/compute_task_holder.h ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/core/include/ignite/impl/compute/compute_task_holder.h b/modules/platforms/cpp/core/include/ignite/impl/compute/compute_task_holder.h index f627f27..66276d1 100644 --- a/modules/platforms/cpp/core/include/ignite/impl/compute/compute_task_holder.h +++ b/modules/platforms/cpp/core/include/ignite/impl/compute/compute_task_holder.h @@ -17,17 +17,14 @@ /** * @file - * Declares ignite::impl::compute::ComputeTaskHolder class and - * ignite::impl::compute::ComputeTaskHolderImpl class template. + * Declares ignite::impl::compute::ComputeTaskHolder. */ -#ifndef _IGNITE_IMPL_COMPUTE_COMPUTE_TASK_IMPL -#define _IGNITE_IMPL_COMPUTE_COMPUTE_TASK_IMPL +#ifndef _IGNITE_IMPL_COMPUTE_COMPUTE_TASK_HOLDER +#define _IGNITE_IMPL_COMPUTE_COMPUTE_TASK_HOLDER #include <stdint.h> -#include <ignite/common/promise.h> -#include <ignite/impl/compute/compute_job_result.h> #include <ignite/impl/compute/compute_job_holder.h> namespace ignite @@ -36,28 +33,6 @@ namespace ignite { namespace compute { - struct ComputeJobResultPolicy - { - enum Type - { - /** - * Wait for results if any are still expected. If all results have been received - - * it will start reducing results. - */ - WAIT = 0, - - /** - * Ignore all not yet received results and start reducing results. - */ - REDUCE = 1, - - /** - * Fail-over job to execute on another node. - */ - FAILOVER = 2 - }; - }; - /** * Compute task holder. Internal helper class. * Used to handle tasks in general way, without specific types. @@ -120,179 +95,8 @@ namespace ignite /** Related job handle. */ int64_t handle; }; - - /** - * Compute task holder type-specific implementation. - */ - template<typename F, typename R> - class ComputeTaskHolderImpl : public ComputeTaskHolder - { - public: - typedef F JobType; - typedef R ResultType; - - /** - * Constructor. - * - * @param handle Job handle. - */ - ComputeTaskHolderImpl(int64_t handle) : - ComputeTaskHolder(handle) - { - // No-op. - } - - /** - * Destructor. - */ - virtual ~ComputeTaskHolderImpl() - { - // No-op. - } - - /** - * Process local job result. - * - * @param job Job. - * @return Policy. - */ - virtual int32_t JobResultLocal(ComputeJobHolder& job) - { - typedef ComputeJobHolderImpl<JobType, ResultType> ActualComputeJobHolder; - - ActualComputeJobHolder& job0 = static_cast<ActualComputeJobHolder&>(job); - - res = job0.GetResult(); - - return ComputeJobResultPolicy::WAIT; - } - - /** - * Process remote job result. - * - * @param job Job. - * @param reader Reader for stream with result. - * @return Policy. - */ - virtual int32_t JobResultRemote(ComputeJobHolder& job, binary::BinaryReaderImpl& reader) - { - res.Read(reader); - - return ComputeJobResultPolicy::WAIT; - } - - /** - * Reduce results of related jobs. - */ - virtual void Reduce() - { - res.SetPromise(promise); - } - - /** - * Get result promise. - * - * @return Reference to result promise. - */ - common::Promise<ResultType>& GetPromise() - { - return promise; - } - - private: - /** Result. */ - ComputeJobResult<ResultType> res; - - /** Task result promise. */ - common::Promise<ResultType> promise; - }; - - /** - * Compute task holder type-specific implementation. - */ - template<typename F> - class ComputeTaskHolderImpl<F, void> : public ComputeTaskHolder - { - public: - typedef F JobType; - - /** - * Constructor. - * - * @param handle Job handle. - */ - ComputeTaskHolderImpl(int64_t handle) : - ComputeTaskHolder(handle) - { - // No-op. - } - - /** - * Destructor. - */ - virtual ~ComputeTaskHolderImpl() - { - // No-op. - } - - /** - * Process local job result. - * - * @param job Job. - * @return Policy. - */ - virtual int32_t JobResultLocal(ComputeJobHolder& job) - { - typedef ComputeJobHolderImpl<JobType, void> ActualComputeJobHolder; - - ActualComputeJobHolder& job0 = static_cast<ActualComputeJobHolder&>(job); - - res = job0.GetResult(); - - return ComputeJobResultPolicy::WAIT; - } - - /** - * Process remote job result. - * - * @param job Job. - * @param reader Reader for stream with result. - * @return Policy. - */ - virtual int32_t JobResultRemote(ComputeJobHolder& job, binary::BinaryReaderImpl& reader) - { - res.Read(reader); - - return ComputeJobResultPolicy::WAIT; - } - - /** - * Reduce results of related jobs. - */ - virtual void Reduce() - { - res.SetPromise(promise); - } - - /** - * Get result promise. - * - * @return Reference to result promise. - */ - common::Promise<void>& GetPromise() - { - return promise; - } - - private: - /** Result. */ - ComputeJobResult<void> res; - - /** Task result promise. */ - common::Promise<void> promise; - }; } } } -#endif //_IGNITE_IMPL_COMPUTE_COMPUTE_TASK_IMPL +#endif //_IGNITE_IMPL_COMPUTE_COMPUTE_TASK_HOLDER http://git-wip-us.apache.org/repos/asf/ignite/blob/3c887378/modules/platforms/cpp/core/include/ignite/impl/compute/multiple_job_compute_task_holder.h ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/core/include/ignite/impl/compute/multiple_job_compute_task_holder.h b/modules/platforms/cpp/core/include/ignite/impl/compute/multiple_job_compute_task_holder.h new file mode 100644 index 0000000..9fb13f1 --- /dev/null +++ b/modules/platforms/cpp/core/include/ignite/impl/compute/multiple_job_compute_task_holder.h @@ -0,0 +1,265 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * @file + * Declares ignite::impl::compute::MultipleJobComputeTaskHolder class template. + */ + +#ifndef _IGNITE_IMPL_COMPUTE_MULTIPLE_JOB_COMPUTE_TASK +#define _IGNITE_IMPL_COMPUTE_MULTIPLE_JOB_COMPUTE_TASK + +#include <stdint.h> +#include <vector> + +#include <ignite/common/promise.h> +#include <ignite/impl/compute/compute_job_result.h> +#include <ignite/impl/compute/compute_task_holder.h> + +namespace ignite +{ + namespace impl + { + namespace compute + { + /** + * Multiple Job Compute task holder type-specific implementation. + * Used for broadcast. + * + * @tparam F Function type. + * @tparam R Function result type. + */ + template<typename F, typename R> + class MultipleJobComputeTaskHolder : public ComputeTaskHolder + { + public: + typedef F JobType; + typedef R ResultType; + + /** + * Constructor. + * + * @param handle Job handle. + */ + MultipleJobComputeTaskHolder(int64_t handle) : + ComputeTaskHolder(handle), + result(new std::vector<ResultType>()), + error(), + promise() + { + // No-op. + } + + /** + * Destructor. + */ + virtual ~MultipleJobComputeTaskHolder() + { + // No-op. + } + + /** + * Process local job result. + * + * @param job Job. + * @return Policy. + */ + virtual int32_t JobResultLocal(ComputeJobHolder& job) + { + typedef ComputeJobHolderImpl<JobType, ResultType> ActualComputeJobHolder; + + ActualComputeJobHolder& job0 = static_cast<ActualComputeJobHolder&>(job); + + ProcessResult(job0.GetResult()); + + return ComputeJobResultPolicy::WAIT; + } + + /** + * Process remote job result. + * + * @param job Job. + * @param reader Reader for stream with result. + * @return Policy. + */ + virtual int32_t JobResultRemote(ComputeJobHolder& job, binary::BinaryReaderImpl& reader) + { + ComputeJobResult<ResultType> res; + + res.Read(reader); + + ProcessResult(res); + + return ComputeJobResultPolicy::WAIT; + } + + /** + * Reduce results of related jobs. + */ + virtual void Reduce() + { + if (error.GetCode() == IgniteError::IGNITE_SUCCESS) + promise.SetValue(result); + else + promise.SetError(error); + } + + /** + * Get result promise. + * + * @return Reference to result promise. + */ + common::Promise< std::vector<ResultType> >& GetPromise() + { + return promise; + } + + private: + /** + * Process result. + * + * @param res Result. + */ + void ProcessResult(const ComputeJobResult<ResultType>& res) + { + const IgniteError& err = res.GetError(); + + if (err.GetCode() == IgniteError::IGNITE_SUCCESS) + result->push_back(res.GetResult()); + else + error = err; + } + + /** Result. */ + std::auto_ptr< std::vector<ResultType> > result; + + /** Error. */ + IgniteError error; + + /** Task result promise. */ + common::Promise< std::vector<ResultType> > promise; + }; + + /** + * Compute task holder type-specific implementation. + */ + template<typename F> + class MultipleJobComputeTaskHolder<F, void> : public ComputeTaskHolder + { + public: + typedef F JobType; + + /** + * Constructor. + * + * @param handle Job handle. + */ + MultipleJobComputeTaskHolder(int64_t handle) : + ComputeTaskHolder(handle) + { + // No-op. + } + + /** + * Destructor. + */ + virtual ~MultipleJobComputeTaskHolder() + { + // No-op. + } + + /** + * Process local job result. + * + * @param job Job. + * @return Policy. + */ + virtual int32_t JobResultLocal(ComputeJobHolder& job) + { + typedef ComputeJobHolderImpl<JobType, void> ActualComputeJobHolder; + + ActualComputeJobHolder& job0 = static_cast<ActualComputeJobHolder&>(job); + + ProcessResult(job0.GetResult()); + + return ComputeJobResultPolicy::WAIT; + } + + /** + * Process remote job result. + * + * @param job Job. + * @param reader Reader for stream with result. + * @return Policy. + */ + virtual int32_t JobResultRemote(ComputeJobHolder& job, binary::BinaryReaderImpl& reader) + { + ComputeJobResult<void> res; + + res.Read(reader); + + ProcessResult(res); + + return ComputeJobResultPolicy::WAIT; + } + + /** + * Reduce results of related jobs. + */ + virtual void Reduce() + { + if (error.GetCode() == IgniteError::IGNITE_SUCCESS) + promise.SetValue(); + else + promise.SetError(error); + } + + /** + * Get result promise. + * + * @return Reference to result promise. + */ + common::Promise<void>& GetPromise() + { + return promise; + } + + private: + /** + * Process result. + * + * @param res Result. + */ + void ProcessResult(const ComputeJobResult<void>& res) + { + const IgniteError& err = res.GetError(); + + if (err.GetCode() != IgniteError::IGNITE_SUCCESS) + error = err; + } + + /** Error. */ + IgniteError error; + + /** Task result promise. */ + common::Promise<void> promise; + }; + } + } +} + +#endif //_IGNITE_IMPL_COMPUTE_MULTIPLE_JOB_COMPUTE_TASK http://git-wip-us.apache.org/repos/asf/ignite/blob/3c887378/modules/platforms/cpp/core/include/ignite/impl/compute/single_job_compute_task_holder.h ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/core/include/ignite/impl/compute/single_job_compute_task_holder.h b/modules/platforms/cpp/core/include/ignite/impl/compute/single_job_compute_task_holder.h new file mode 100644 index 0000000..9b0506a --- /dev/null +++ b/modules/platforms/cpp/core/include/ignite/impl/compute/single_job_compute_task_holder.h @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * @file + * Declares ignite::impl::compute::SingleJobComputeTaskHolder class template. + */ + +#ifndef _IGNITE_IMPL_COMPUTE_SINGLE_JOB_COMPUTE_TASK +#define _IGNITE_IMPL_COMPUTE_SINGLE_JOB_COMPUTE_TASK + +#include <stdint.h> + +#include <ignite/common/promise.h> +#include <ignite/impl/compute/compute_job_result.h> +#include <ignite/impl/compute/compute_task_holder.h> + +namespace ignite +{ + namespace impl + { + namespace compute + { + /** + * Compute task holder type-specific implementation. + */ + template<typename F, typename R> + class SingleJobComputeTaskHolder : public ComputeTaskHolder + { + public: + typedef F JobType; + typedef R ResultType; + + /** + * Constructor. + * + * @param handle Job handle. + */ + SingleJobComputeTaskHolder(int64_t handle) : + ComputeTaskHolder(handle) + { + // No-op. + } + + /** + * Destructor. + */ + virtual ~SingleJobComputeTaskHolder() + { + // No-op. + } + + /** + * Process local job result. + * + * @param job Job. + * @return Policy. + */ + virtual int32_t JobResultLocal(ComputeJobHolder& job) + { + typedef ComputeJobHolderImpl<JobType, ResultType> ActualComputeJobHolder; + + ActualComputeJobHolder& job0 = static_cast<ActualComputeJobHolder&>(job); + + res = job0.GetResult(); + + return ComputeJobResultPolicy::WAIT; + } + + /** + * Process remote job result. + * + * @param job Job. + * @param reader Reader for stream with result. + * @return Policy. + */ + virtual int32_t JobResultRemote(ComputeJobHolder& job, binary::BinaryReaderImpl& reader) + { + res.Read(reader); + + return ComputeJobResultPolicy::WAIT; + } + + /** + * Reduce results of related jobs. + */ + virtual void Reduce() + { + res.SetPromise(promise); + } + + /** + * Get result promise. + * + * @return Reference to result promise. + */ + common::Promise<ResultType>& GetPromise() + { + return promise; + } + + private: + /** Result. */ + ComputeJobResult<ResultType> res; + + /** Task result promise. */ + common::Promise<ResultType> promise; + }; + + /** + * Compute task holder type-specific implementation. + */ + template<typename F> + class SingleJobComputeTaskHolder<F, void> : public ComputeTaskHolder + { + public: + typedef F JobType; + + /** + * Constructor. + * + * @param handle Job handle. + */ + SingleJobComputeTaskHolder(int64_t handle) : + ComputeTaskHolder(handle) + { + // No-op. + } + + /** + * Destructor. + */ + virtual ~SingleJobComputeTaskHolder() + { + // No-op. + } + + /** + * Process local job result. + * + * @param job Job. + * @return Policy. + */ + virtual int32_t JobResultLocal(ComputeJobHolder& job) + { + typedef ComputeJobHolderImpl<JobType, void> ActualComputeJobHolder; + + ActualComputeJobHolder& job0 = static_cast<ActualComputeJobHolder&>(job); + + res = job0.GetResult(); + + return ComputeJobResultPolicy::WAIT; + } + + /** + * Process remote job result. + * + * @param job Job. + * @param reader Reader for stream with result. + * @return Policy. + */ + virtual int32_t JobResultRemote(ComputeJobHolder& job, binary::BinaryReaderImpl& reader) + { + res.Read(reader); + + return ComputeJobResultPolicy::WAIT; + } + + /** + * Reduce results of related jobs. + */ + virtual void Reduce() + { + res.SetPromise(promise); + } + + /** + * Get result promise. + * + * @return Reference to result promise. + */ + common::Promise<void>& GetPromise() + { + return promise; + } + + private: + /** Result. */ + ComputeJobResult<void> res; + + /** Task result promise. */ + common::Promise<void> promise; + }; + } + } +} + +#endif //_IGNITE_IMPL_COMPUTE_SINGLE_JOB_COMPUTE_TASK http://git-wip-us.apache.org/repos/asf/ignite/blob/3c887378/modules/platforms/cpp/core/project/vs/core.vcxproj ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/core/project/vs/core.vcxproj b/modules/platforms/cpp/core/project/vs/core.vcxproj index 9911ffe..3c3489c 100644 --- a/modules/platforms/cpp/core/project/vs/core.vcxproj +++ b/modules/platforms/cpp/core/project/vs/core.vcxproj @@ -232,6 +232,8 @@ <ClInclude Include="..\..\include\ignite\impl\compute\compute_job_holder.h" /> <ClInclude Include="..\..\include\ignite\impl\compute\compute_job_result.h" /> <ClInclude Include="..\..\include\ignite\impl\compute\compute_task_holder.h" /> + <ClInclude Include="..\..\include\ignite\impl\compute\multiple_job_compute_task_holder.h" /> + <ClInclude Include="..\..\include\ignite\impl\compute\single_job_compute_task_holder.h" /> <ClInclude Include="..\..\include\ignite\impl\helpers.h" /> <ClInclude Include="..\..\include\ignite\impl\ignite_environment.h" /> <ClInclude Include="..\..\include\ignite\impl\ignite_impl.h" /> http://git-wip-us.apache.org/repos/asf/ignite/blob/3c887378/modules/platforms/cpp/core/project/vs/core.vcxproj.filters ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/core/project/vs/core.vcxproj.filters b/modules/platforms/cpp/core/project/vs/core.vcxproj.filters index 7b84494..27f3944 100644 --- a/modules/platforms/cpp/core/project/vs/core.vcxproj.filters +++ b/modules/platforms/cpp/core/project/vs/core.vcxproj.filters @@ -237,6 +237,12 @@ <ClInclude Include="..\..\include\ignite\impl\compute\compute_task_holder.h"> <Filter>Code\impl\compute</Filter> </ClInclude> + <ClInclude Include="..\..\include\ignite\impl\compute\single_job_compute_task_holder.h"> + <Filter>Code\impl\compute</Filter> + </ClInclude> + <ClInclude Include="..\..\include\ignite\impl\compute\multiple_job_compute_task_holder.h"> + <Filter>Code\impl\compute</Filter> + </ClInclude> </ItemGroup> <ItemGroup> <Filter Include="Code">