IGNITE-1439: Implemented Futures for C++.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/f328c4ee Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/f328c4ee Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/f328c4ee Branch: refs/heads/ignite-5075 Commit: f328c4eeaef59a35bcc7de0ce03b612fe71ea408 Parents: 9ce62e6 Author: Igor Sapego <isap...@gridgain.com> Authored: Mon Apr 24 17:48:27 2017 +0300 Committer: Igor Sapego <isap...@gridgain.com> Committed: Mon Apr 24 17:48:27 2017 +0300 ---------------------------------------------------------------------- .../platforms/cpp/common/include/Makefile.am | 5 +- .../cpp/common/include/ignite/common/promise.h | 201 ++++++++ .../common/include/ignite/common/shared_state.h | 331 +++++++++++++ .../cpp/common/include/ignite/future.h | 236 +++++++++ .../cpp/common/include/ignite/ignite_error.h | 3 + .../linux/include/ignite/common/concurrent_os.h | 234 ++++++++- .../win/include/ignite/common/concurrent_os.h | 169 ++++++- .../common/os/win/src/common/concurrent_os.cpp | 26 +- .../cpp/common/project/vs/common.vcxproj | 3 + .../common/project/vs/common.vcxproj.filters | 9 + modules/platforms/cpp/core-test/Makefile.am | 1 + .../cpp/core-test/project/vs/core-test.vcxproj | 1 + .../project/vs/core-test.vcxproj.filters | 4 + .../cpp/core-test/src/concurrent_test.cpp | 47 ++ .../platforms/cpp/core-test/src/future_test.cpp | 474 +++++++++++++++++++ 15 files changed, 1720 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/f328c4ee/modules/platforms/cpp/common/include/Makefile.am ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/common/include/Makefile.am b/modules/platforms/cpp/common/include/Makefile.am index cba68e9..64f0c46 100644 --- a/modules/platforms/cpp/common/include/Makefile.am +++ b/modules/platforms/cpp/common/include/Makefile.am @@ -28,12 +28,15 @@ nobase_include_HEADERS = \ ignite/common/fixed_size_array.h \ ignite/common/utils.h \ ignite/common/platform_utils.h \ + ignite/common/shared_state.h \ + ignite/common/promise.h \ ignite/date.h \ ignite/guid.h \ ignite/ignite_error.h \ ignite/timestamp.h \ ignite/time.h \ - ignite/reference.h + ignite/reference.h \ + ignite/future.h uninstall-hook: if [ -d ${includedir}/ignite ]; then find ${includedir}/ignite -type d -empty -delete; fi http://git-wip-us.apache.org/repos/asf/ignite/blob/f328c4ee/modules/platforms/cpp/common/include/ignite/common/promise.h ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/common/include/ignite/common/promise.h b/modules/platforms/cpp/common/include/ignite/common/promise.h new file mode 100644 index 0000000..548b76b --- /dev/null +++ b/modules/platforms/cpp/common/include/ignite/common/promise.h @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * @file + * Declares ignite::commom::Promise class template. + */ + + +#ifndef _IGNITE_PROMISE +#define _IGNITE_PROMISE + +#include <ignite/common/common.h> +#include <ignite/common/shared_state.h> + +#include <ignite/ignite_error.h> +#include <ignite/future.h> + +namespace ignite +{ + namespace common + { + /** + * Promise class template. Used to set result of the asynchroniously + * started computation. + * + * @tparam T Promised value type. + */ + template<typename T> + class Promise + { + public: + /** Template value type */ + typedef T ValueType; + + /** + * Constructor. + */ + Promise() : + state(new SharedState<ValueType>()) + { + // No-op. + } + + /** + * Destructor. + */ + ~Promise() + { + SharedState<ValueType>* state0 = state.Get(); + + assert(state0 != 0); + + if (!state0->IsSet()) + state0->SetError(IgniteError(IgniteError::IGNITE_ERR_FUTURE_STATE, + "Broken promise. Value will never be set due to internal error.")); + } + + + /** + * Get future for this promise. + * + * @return New future instance. + */ + Future<ValueType> GetFuture() const + { + return Future<ValueType>(state); + } + + /** + * Set value. + * + * @throw IgniteError with IgniteError::IGNITE_ERR_FUTURE_STATE if error or value has been set already. + * @param val Value to set. + */ + void SetValue(std::auto_ptr<ValueType> val) + { + SharedState<ValueType>* state0 = state.Get(); + + assert(state0 != 0); + + return state.Get()->SetValue(val); + } + + /** + * Set error. + * + * @throw IgniteError with IgniteError::IGNITE_ERR_FUTURE_STATE if error or value has been set already. + * @param err Error to set. + */ + void SetError(const IgniteError& err) + { + SharedState<ValueType>* state0 = state.Get(); + + assert(state0 != 0); + + state.Get()->SetError(err); + } + + private: + IGNITE_NO_COPY_ASSIGNMENT(Promise); + + concurrent::SharedPointer< SharedState<ValueType> > state; + }; + + /** + * Specialization for void. + */ + template<> + class Promise<void> + { + public: + /** Template value type */ + typedef void ValueType; + + /** + * Constructor. + */ + Promise() : + state(new SharedState<ValueType>()) + { + // No-op. + } + + /** + * Destructor. + */ + ~Promise() + { + SharedState<ValueType>* state0 = state.Get(); + + assert(state0 != 0); + + if (!state0->IsSet()) + state0->SetError(IgniteError(IgniteError::IGNITE_ERR_FUTURE_STATE, + "Broken promise. Value will never be set due to internal error.")); + } + + + /** + * Get future for this promise. + * + * @return New future instance. + */ + Future<ValueType> GetFuture() const + { + return Future<ValueType>(state); + } + + /** + * Mark as complete. + * + * @throw IgniteError with IgniteError::IGNITE_ERR_FUTURE_STATE if error or value has been set already. + */ + void SetValue() + { + SharedState<ValueType>* state0 = state.Get(); + + assert(state0 != 0); + + return state.Get()->SetValue(); + } + + /** + * Set error. + * + * @throw IgniteError with IgniteError::IGNITE_ERR_FUTURE_STATE if error or value has been set already. + * @param err Error to set. + */ + void SetError(const IgniteError& err) + { + SharedState<ValueType>* state0 = state.Get(); + + assert(state0 != 0); + + state.Get()->SetError(err); + } + + private: + IGNITE_NO_COPY_ASSIGNMENT(Promise); + + concurrent::SharedPointer< SharedState<ValueType> > state; + }; + } +} + +#endif //_IGNITE_PROMISE http://git-wip-us.apache.org/repos/asf/ignite/blob/f328c4ee/modules/platforms/cpp/common/include/ignite/common/shared_state.h ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/common/include/ignite/common/shared_state.h b/modules/platforms/cpp/common/include/ignite/common/shared_state.h new file mode 100644 index 0000000..8886532 --- /dev/null +++ b/modules/platforms/cpp/common/include/ignite/common/shared_state.h @@ -0,0 +1,331 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * @file + * Declares ignite::commom::SharedState class template. + */ + +#ifndef _IGNITE_SHARED_STATE +#define _IGNITE_SHARED_STATE + +#include <ignite/common/common.h> +#include <ignite/common/concurrent.h> +#include <ignite/ignite_error.h> + +namespace ignite +{ + namespace common + { + template<typename T> + class SharedState + { + public: + /** Template value type */ + typedef T ValueType; + + /** + * Default constructor. + * Constructs non-set SharedState instance. + */ + SharedState() : + value(), + error() + { + // No-op. + } + + /** + * Destructor. + */ + ~SharedState() + { + // No-op. + } + + /** + * Checks if the value or error set for the state. + * @return True if the value or error set for the state. + */ + bool IsSet() const + { + return value.get() || error.GetCode() != IgniteError::IGNITE_SUCCESS; + } + + /** + * Set value. + * + * @throw IgniteError with IgniteError::IGNITE_ERR_FUTURE_STATE if error or value has been set already. + * @param val Value to set. + */ + void SetValue(std::auto_ptr<ValueType> val) + { + concurrent::CsLockGuard guard(mutex); + + if (IsSet()) + { + if (value.get()) + throw IgniteError(IgniteError::IGNITE_ERR_FUTURE_STATE, "Future value already set"); + + if (error.GetCode() != IgniteError::IGNITE_SUCCESS) + throw IgniteError(IgniteError::IGNITE_ERR_FUTURE_STATE, "Future error already set"); + } + + value = val; + + cond.NotifyAll(); + } + + /** + * Set error. + * + * @throw IgniteError with IgniteError::IGNITE_ERR_FUTURE_STATE if error or value has been set already. + * @param err Error to set. + */ + void SetError(const IgniteError& err) + { + concurrent::CsLockGuard guard(mutex); + + if (IsSet()) + { + if (value.get()) + throw IgniteError(IgniteError::IGNITE_ERR_FUTURE_STATE, "Future value already set"); + + if (error.GetCode() != IgniteError::IGNITE_SUCCESS) + throw IgniteError(IgniteError::IGNITE_ERR_FUTURE_STATE, "Future error already set"); + } + + error = err; + + cond.NotifyAll(); + } + + /** + * Wait for value to be set. + * Active thread will be blocked until value or error will be set. + */ + void Wait() const + { + concurrent::CsLockGuard guard(mutex); + + while (!IsSet()) + cond.Wait(mutex); + } + + /** + * Wait for value to be set for specified time. + * Active thread will be blocked until value or error will be set or timeout will end. + * + * @param msTimeout Timeout in milliseconds. + * @return True if the object has been triggered and false in case of timeout. + */ + bool WaitFor(int32_t msTimeout) const + { + concurrent::CsLockGuard guard(mutex); + + if (IsSet()) + return true; + + return cond.WaitFor(mutex, msTimeout); + } + + /** + * Get the set value. + * Active thread will be blocked until value or error will be set. + * + * @throw IgniteError if error has been set. + * @return Value that has been set on success. + */ + const ValueType& GetValue() const + { + Wait(); + + if (value.get()) + return *value; + + assert(error.GetCode() != IgniteError::IGNITE_SUCCESS); + + throw error; + } + + private: + IGNITE_NO_COPY_ASSIGNMENT(SharedState); + + /** Value. */ + std::auto_ptr<ValueType> value; + + /** Error. */ + IgniteError error; + + /** Condition variable which serves to signal that value is set. */ + mutable concurrent::ConditionVariable cond; + + /** Lock that used to prevent double-set of the value. */ + mutable concurrent::CriticalSection mutex; + }; + + /** + * Specialization for void type. + */ + template<> + class SharedState<void> + { + public: + /** Template value type */ + typedef void ValueType; + + /** + * Default constructor. + * Constructs non-set SharedState instance. + */ + SharedState() : + done(false), + error() + { + // No-op. + } + + /** + * Destructor. + */ + ~SharedState() + { + // No-op. + } + + /** + * Checks if the value or error set for the state. + * @return True if the value or error set for the state. + */ + bool IsSet() const + { + return done || error.GetCode() != IgniteError::IGNITE_SUCCESS; + } + + /** + * Set value. + * + * @throw IgniteError with IgniteError::IGNITE_ERR_FUTURE_STATE if error or value has been set already. + */ + void SetValue() + { + concurrent::CsLockGuard guard(mutex); + + if (IsSet()) + { + if (done) + throw IgniteError(IgniteError::IGNITE_ERR_FUTURE_STATE, "Future value already set"); + + if (error.GetCode() != IgniteError::IGNITE_SUCCESS) + throw IgniteError(IgniteError::IGNITE_ERR_FUTURE_STATE, "Future error already set"); + } + + done = true; + + cond.NotifyAll(); + } + + /** + * Set error. + * + * @throw IgniteError with IgniteError::IGNITE_ERR_FUTURE_STATE if error or value has been set already. + * @param err Error to set. + */ + void SetError(const IgniteError& err) + { + concurrent::CsLockGuard guard(mutex); + + if (IsSet()) + { + if (done) + throw IgniteError(IgniteError::IGNITE_ERR_FUTURE_STATE, "Future value already set"); + + if (error.GetCode() != IgniteError::IGNITE_SUCCESS) + throw IgniteError(IgniteError::IGNITE_ERR_FUTURE_STATE, "Future error already set"); + } + + error = err; + + cond.NotifyAll(); + } + + /** + * Wait for value to be set. + * Active thread will be blocked until value or error will be set. + */ + void Wait() const + { + concurrent::CsLockGuard guard(mutex); + + while (!IsSet()) + cond.Wait(mutex); + } + + /** + * Wait for value to be set for specified time. + * Active thread will be blocked until value or error will be set or timeout will end. + * + * @param msTimeout Timeout in milliseconds. + * @return True if the object has been triggered and false in case of timeout. + */ + bool WaitFor(int32_t msTimeout) const + { + concurrent::CsLockGuard guard(mutex); + + if (IsSet()) + return true; + + return cond.WaitFor(mutex, msTimeout); + } + + /** + * Get the set value. + * Active thread will be blocked until value or error will be set. + * + * @throw IgniteError if error has been set. + */ + void GetValue() const + { + Wait(); + + if (done) + return; + + assert(error.GetCode() != IgniteError::IGNITE_SUCCESS); + + throw error; + } + + private: + IGNITE_NO_COPY_ASSIGNMENT(SharedState); + + /** Marker. */ + bool done; + + /** Error. */ + IgniteError error; + + /** Condition variable which serves to signal that value is set. */ + mutable concurrent::ConditionVariable cond; + + /** Lock that used to prevent double-set of the value. */ + mutable concurrent::CriticalSection mutex; + }; + } +} + +#endif //_IGNITE_SHARED_STATE http://git-wip-us.apache.org/repos/asf/ignite/blob/f328c4ee/modules/platforms/cpp/common/include/ignite/future.h ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/common/include/ignite/future.h b/modules/platforms/cpp/common/include/ignite/future.h new file mode 100644 index 0000000..5c42e55 --- /dev/null +++ b/modules/platforms/cpp/common/include/ignite/future.h @@ -0,0 +1,236 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * @file + * Declares ignite::Future class template. + */ + + +#ifndef _IGNITE_FUTURE +#define _IGNITE_FUTURE + +#include <ignite/common/shared_state.h> +#include <ignite/ignite_error.h> + +namespace ignite +{ + namespace common + { + // Forward declaration + template<typename T> + class Promise; + } + + /** + * Future class template. Used to get result of the asynchroniously + * started computation. + * + * @tparam T Future value type. + */ + template<typename T> + class Future + { + friend class common::Promise<T>; + + public: + /** Template value type */ + typedef T ValueType; + + /** + * Copy constructor. + * + * @param src Instance to copy. + */ + Future(const Future<ValueType>& src) : + state(src.state) + { + // No-op. + } + + /** + * Assignment operator. + * + * @param other Other instance. + * @return *this. + */ + Future& operator=(const Future<ValueType>& other) + { + state = other.state; + + return *this; + } + + /** + * Wait for value to be set. + * Active thread will be blocked until value or error will be set. + */ + void Wait() const + { + const common::SharedState<ValueType>* state0 = state.Get(); + + assert(state0 != 0); + + state.Get()->Wait(); + } + + /** + * Wait for value to be set for specified time. + * Active thread will be blocked until value or error will be set or timeout will end. + * + * @param msTimeout Timeout in milliseconds. + * @return True if the object has been triggered and false in case of timeout. + */ + bool WaitFor(int32_t msTimeout) const + { + const common::SharedState<ValueType>* state0 = state.Get(); + + assert(state0 != 0); + + return state.Get()->WaitFor(msTimeout); + } + + /** + * Get the set value. + * Active thread will be blocked until value or error will be set. + * + * @throw IgniteError if error has been set. + * @return Value that has been set on success. + */ + const ValueType& GetValue() const + { + const common::SharedState<ValueType>* state0 = state.Get(); + + assert(state0 != 0); + + return state.Get()->GetValue(); + } + + private: + /** + * Constructor. + * + * @param state0 Shared state instance. + */ + Future(common::concurrent::SharedPointer< common::SharedState<ValueType> > state0) : + state(state0) + { + // No-op. + } + + /** Shared state. */ + common::concurrent::SharedPointer< common::SharedState<ValueType> > state; + }; + + /** + * Specialization for void type. + */ + template<> + class Future<void> + { + friend class common::Promise<void>; + + public: + /** Template value type */ + typedef void ValueType; + + /** + * Copy constructor. + * + * @param src Instance to copy. + */ + Future(const Future<ValueType>& src) : + state(src.state) + { + // No-op. + } + + /** + * Assignment operator. + * + * @param other Other instance. + * @return *this. + */ + Future& operator=(const Future<ValueType>& other) + { + state = other.state; + + return *this; + } + + /** + * Wait for value to be set. + * Active thread will be blocked until value or error will be set. + */ + void Wait() const + { + const common::SharedState<ValueType>* state0 = state.Get(); + + assert(state0 != 0); + + state.Get()->Wait(); + } + + /** + * Wait for value to be set for specified time. + * Active thread will be blocked until value or error will be set or timeout will end. + * + * @param msTimeout Timeout in milliseconds. + * @return True if the object has been triggered and false in case of timeout. + */ + bool WaitFor(int32_t msTimeout) const + { + const common::SharedState<ValueType>* state0 = state.Get(); + + assert(state0 != 0); + + return state.Get()->WaitFor(msTimeout); + } + + /** + * Wait for operation complition or error. + * Active thread will be blocked until value or error will be set. + * + * @throw IgniteError if error has been set. + */ + void GetValue() const + { + const common::SharedState<ValueType>* state0 = state.Get(); + + assert(state0 != 0); + + state.Get()->GetValue(); + } + + private: + /** + * Constructor. + * + * @param state0 Shared state instance. + */ + Future(common::concurrent::SharedPointer< common::SharedState<ValueType> > state0) : + state(state0) + { + // No-op. + } + + /** Shared state. */ + common::concurrent::SharedPointer< common::SharedState<ValueType> > state; + }; +} + +#endif //_IGNITE_FUTURE http://git-wip-us.apache.org/repos/asf/ignite/blob/f328c4ee/modules/platforms/cpp/common/include/ignite/ignite_error.h ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/common/include/ignite/ignite_error.h b/modules/platforms/cpp/common/include/ignite/ignite_error.h index cecaf3f..7818dd2 100644 --- a/modules/platforms/cpp/common/include/ignite/ignite_error.h +++ b/modules/platforms/cpp/common/include/ignite/ignite_error.h @@ -195,6 +195,9 @@ namespace ignite /** Security error. */ static const int IGNITE_ERR_SECURITY = 2023; + + /** Future state error. */ + static const int IGNITE_ERR_FUTURE_STATE = 2024; /** Unknown error. */ static const int IGNITE_ERR_UNKNOWN = -1; http://git-wip-us.apache.org/repos/asf/ignite/blob/f328c4ee/modules/platforms/cpp/common/os/linux/include/ignite/common/concurrent_os.h ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/common/os/linux/include/ignite/common/concurrent_os.h b/modules/platforms/cpp/common/os/linux/include/ignite/common/concurrent_os.h index 27ef998..84bc8e6 100644 --- a/modules/platforms/cpp/common/os/linux/include/ignite/common/concurrent_os.h +++ b/modules/platforms/cpp/common/os/linux/include/ignite/common/concurrent_os.h @@ -18,11 +18,16 @@ #ifndef _IGNITE_COMMON_CONCURRENT_OS #define _IGNITE_COMMON_CONCURRENT_OS -#include <map> -#include <stdint.h> #include <pthread.h> +#include <time.h> +#include <errno.h> + +#include <stdint.h> -#include "ignite/common/common.h" +#include <cassert> +#include <map> + +#include <ignite/common/common.h> namespace ignite { @@ -44,7 +49,9 @@ namespace ignite /** * Critical section. */ - class IGNITE_IMPORT_EXPORT CriticalSection { + class IGNITE_IMPORT_EXPORT CriticalSection + { + friend class ConditionVariable; public: /** * Constructor. @@ -387,6 +394,225 @@ namespace ignite /** Index. */ int32_t idx; }; + + /** + * Cross-platform wrapper for Condition Variable synchronization + * primitive concept. + */ + class ConditionVariable + { + public: + /** + * Constructor. + */ + ConditionVariable() + { + pthread_condattr_t attr; + int err = pthread_condattr_init(&attr); + assert(!err); + + err = pthread_condattr_setclock(&attr, CLOCK_MONOTONIC); + assert(!err); + + err = pthread_cond_init(&cond, &attr); + assert(!err); + } + + /** + * Destructor. + */ + ~ConditionVariable() + { + pthread_cond_destroy(&cond); + } + + /** + * Wait for Condition Variable to be notified. + * + * @param cs Critical section in which to wait. + */ + void Wait(CriticalSection& cs) + { + pthread_cond_wait(&cond, &cs.mux); + } + + /** + * Wait for Condition Variable to be notified for specified time. + * + * @param cs Critical section in which to wait. + * @param msTimeout Timeout in milliseconds. + * @return True if the object has been notified and false in case of timeout. + */ + bool WaitFor(CriticalSection& cs, int32_t msTimeout) + { + timespec ts; + int err = clock_gettime(CLOCK_MONOTONIC, &ts); + assert(!err); + + ts.tv_sec += msTimeout / 1000 + (ts.tv_nsec + (msTimeout % 1000) * 1000000) / 1000000000; + ts.tv_nsec = (ts.tv_nsec + (msTimeout % 1000) * 1000000) % 1000000000; + + int res = pthread_cond_timedwait(&cond, &cs.mux, &ts); + + return res == 0; + } + + /** + * Notify single thread waiting for the condition variable. + */ + void NotifyOne() + { + int err = pthread_cond_signal(&cond); + assert(!err); + } + + /** + * Notify all threads that are waiting on the variable. + */ + void NotifyAll() + { + int err = pthread_cond_broadcast(&cond); + assert(!err); + } + + private: + IGNITE_NO_COPY_ASSIGNMENT(ConditionVariable); + + /** OS-specific type. */ + pthread_cond_t cond; + }; + + /** + * Manually triggered event. + * Once triggered it stays in passing state until manually reset. + */ + class ManualEvent + { + public: + /** + * Constructs manual event. + * Initial state is untriggered. + */ + ManualEvent() : + cond(), + mutex(), + state(false) + { + pthread_condattr_t attr; + int err = pthread_condattr_init(&attr); + assert(!err); + + err = pthread_condattr_setclock(&attr, CLOCK_MONOTONIC); + assert(!err); + + err = pthread_cond_init(&cond, &attr); + assert(!err); + + err = pthread_mutex_init(&mutex, NULL); + assert(!err); + } + + /** + * Destructor. + */ + ~ManualEvent() + { + pthread_mutex_destroy(&mutex); + pthread_cond_destroy(&cond); + } + + /** + * Sets event into triggered state. + */ + void Set() + { + int err = pthread_mutex_lock(&mutex); + assert(!err); + + state = true; + + err = pthread_cond_broadcast(&cond); + assert(!err); + + err = pthread_mutex_unlock(&mutex); + assert(!err); + } + + /** + * Resets event into non-triggered state. + */ + void Reset() + { + int err = pthread_mutex_lock(&mutex); + assert(!err); + + state = false; + + err = pthread_mutex_unlock(&mutex); + assert(!err); + } + + /** + * Wait for event to be triggered. + */ + void Wait() + { + int err = pthread_mutex_lock(&mutex); + assert(!err); + + while (!state) + { + err = pthread_cond_wait(&cond, &mutex); + assert(!err); + } + + err = pthread_mutex_unlock(&mutex); + assert(!err); + } + + /** + * Wait for event to be triggered for specified time. + * + * @param msTimeout Timeout in milliseconds. + * @return True if the object has been triggered and false in case of timeout. + */ + bool WaitFor(int32_t msTimeout) + { + int res = 0; + int err = pthread_mutex_lock(&mutex); + assert(!err); + + if (!state) + { + timespec ts; + err = clock_gettime(CLOCK_MONOTONIC, &ts); + assert(!err); + + ts.tv_sec += msTimeout / 1000 + (ts.tv_nsec + (msTimeout % 1000) * 1000000) / 1000000000; + ts.tv_nsec = (ts.tv_nsec + (msTimeout % 1000) * 1000000) % 1000000000; + + res = pthread_cond_timedwait(&cond, &mutex, &ts); + assert(res == 0 || res == ETIMEDOUT); + } + + err = pthread_mutex_unlock(&mutex); + assert(!err); + + return res == 0; + } + + private: + IGNITE_NO_COPY_ASSIGNMENT(ManualEvent); + + /** Condition variable. */ + pthread_cond_t cond; + + /** Mutex. */ + pthread_mutex_t mutex; + + /** State. */ + bool state; + }; } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/f328c4ee/modules/platforms/cpp/common/os/win/include/ignite/common/concurrent_os.h ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/common/os/win/include/ignite/common/concurrent_os.h b/modules/platforms/cpp/common/os/win/include/ignite/common/concurrent_os.h index 77de4d8..54f611b 100644 --- a/modules/platforms/cpp/common/os/win/include/ignite/common/concurrent_os.h +++ b/modules/platforms/cpp/common/os/win/include/ignite/common/concurrent_os.h @@ -18,12 +18,14 @@ #ifndef _IGNITE_COMMON_CONCURRENT_OS #define _IGNITE_COMMON_CONCURRENT_OS -#include <map> #include <stdint.h> + +#include <cassert> +#include <map> + #include <windows.h> #include "ignite/common/common.h" - namespace ignite { namespace common @@ -33,7 +35,8 @@ namespace ignite /** * Static class to manage memory visibility semantics. */ - class IGNITE_IMPORT_EXPORT Memory { + class Memory + { public: /** * Full fence. @@ -44,7 +47,9 @@ namespace ignite /** * Critical section. */ - class IGNITE_IMPORT_EXPORT CriticalSection { + class IGNITE_IMPORT_EXPORT CriticalSection + { + friend class ConditionVariable; public: /** * Constructor. @@ -67,7 +72,7 @@ namespace ignite void Leave(); private: /** Handle. */ - CRITICAL_SECTION* hnd; + CRITICAL_SECTION hnd; IGNITE_NO_COPY_ASSIGNMENT(CriticalSection) }; @@ -76,7 +81,7 @@ namespace ignite * Special latch with count = 1. */ class IGNITE_IMPORT_EXPORT SingleLatch - { + { public: /** * Constructor. @@ -99,7 +104,7 @@ namespace ignite void Await(); private: /** Handle. */ - void* hnd; + HANDLE hnd; IGNITE_NO_COPY_ASSIGNMENT(SingleLatch) }; @@ -107,7 +112,7 @@ namespace ignite /** * Primitives for atomic access. */ - class IGNITE_IMPORT_EXPORT Atomics + class Atomics { public: /** @@ -399,6 +404,154 @@ namespace ignite /** Index. */ int32_t idx; }; + + /** + * Cross-platform wrapper for Condition Variable synchronization + * primitive concept. + */ + class ConditionVariable + { + public: + /** + * Constructor. + */ + ConditionVariable() + { + InitializeConditionVariable(&cond); + } + + /** + * Destructor. + */ + ~ConditionVariable() + { + // No-op. + } + + /** + * Wait for Condition Variable to be notified. + * + * @param cs Critical section in which to wait. + */ + void Wait(CriticalSection& cs) + { + SleepConditionVariableCS(&cond, &cs.hnd, INFINITE); + } + + /** + * Wait for Condition Variable to be notified for specified time. + * + * @param cs Critical section in which to wait. + * @param msTimeout Timeout in milliseconds. + * @return True if the object has been notified and false in case of timeout. + */ + bool WaitFor(CriticalSection& cs, int32_t msTimeout) + { + BOOL notified = SleepConditionVariableCS(&cond, &cs.hnd, msTimeout); + + return notified != FALSE; + } + + /** + * Notify single thread waiting for the condition variable. + */ + void NotifyOne() + { + WakeConditionVariable(&cond); + } + + /** + * Notify all threads that are waiting on the variable. + */ + void NotifyAll() + { + WakeAllConditionVariable(&cond); + } + + private: + IGNITE_NO_COPY_ASSIGNMENT(ConditionVariable); + + /** OS-specific type. */ + CONDITION_VARIABLE cond; + }; + + /** + * Manually triggered event. + * Once triggered it stays in passing state until manually reset. + */ + class ManualEvent + { + public: + /** + * Constructs manual event. + * Initial state is untriggered. + */ + ManualEvent() + { + handle = CreateEvent(NULL, TRUE, FALSE, NULL); + + assert(handle != NULL); + } + + /** + * Destructor. + */ + ~ManualEvent() + { + CloseHandle(handle); + } + + /** + * Sets event into triggered state. + */ + void Set() + { + BOOL success = SetEvent(handle); + + assert(success); + } + + /** + * Resets event into non-triggered state. + */ + void Reset() + { + BOOL success = ResetEvent(handle); + + assert(success); + } + + /** + * Wait for event to be triggered. + */ + void Wait() + { + DWORD res = WaitForSingleObject(handle, INFINITE); + + assert(res == WAIT_OBJECT_0); + } + + /** + * Wait for event to be triggered for specified time. + * + * @param msTimeout Timeout in milliseconds. + * @return True if the object has been triggered and false in case of timeout. + */ + bool WaitFor(int32_t msTimeout) + { + DWORD res = WaitForSingleObject(handle, static_cast<DWORD>(msTimeout)); + + assert(res == WAIT_OBJECT_0 || res == WAIT_TIMEOUT); + + return res == WAIT_OBJECT_0; + } + + private: + IGNITE_NO_COPY_ASSIGNMENT(ManualEvent); + + /** Event handle. */ + HANDLE handle; + }; } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/f328c4ee/modules/platforms/cpp/common/os/win/src/common/concurrent_os.cpp ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/common/os/win/src/common/concurrent_os.cpp b/modules/platforms/cpp/common/os/win/src/common/concurrent_os.cpp index 676d8b6..7b42ba4 100644 --- a/modules/platforms/cpp/common/os/win/src/common/concurrent_os.cpp +++ b/modules/platforms/cpp/common/os/win/src/common/concurrent_os.cpp @@ -32,31 +32,35 @@ namespace ignite MemoryBarrier(); } - CriticalSection::CriticalSection() : hnd(new CRITICAL_SECTION) { - InitializeCriticalSection(hnd); + CriticalSection::CriticalSection() : + hnd() + { + InitializeCriticalSection(&hnd); Memory::Fence(); } - CriticalSection::~CriticalSection() { - Memory::Fence(); - - delete hnd; + CriticalSection::~CriticalSection() + { + // No-op. } - void CriticalSection::Enter() { + void CriticalSection::Enter() + { Memory::Fence(); - EnterCriticalSection(hnd); + EnterCriticalSection(&hnd); } - void CriticalSection::Leave() { + void CriticalSection::Leave() + { Memory::Fence(); - LeaveCriticalSection(hnd); + LeaveCriticalSection(&hnd); } - SingleLatch::SingleLatch() : hnd(CreateEvent(NULL, TRUE, FALSE, NULL)) + SingleLatch::SingleLatch() : + hnd(CreateEvent(NULL, TRUE, FALSE, NULL)) { Memory::Fence(); } http://git-wip-us.apache.org/repos/asf/ignite/blob/f328c4ee/modules/platforms/cpp/common/project/vs/common.vcxproj ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/common/project/vs/common.vcxproj b/modules/platforms/cpp/common/project/vs/common.vcxproj index 3062112..9f32461 100644 --- a/modules/platforms/cpp/common/project/vs/common.vcxproj +++ b/modules/platforms/cpp/common/project/vs/common.vcxproj @@ -173,9 +173,12 @@ <ClInclude Include="..\..\include\ignite\common\fixed_size_array.h" /> <ClInclude Include="..\..\include\ignite\common\bits.h" /> <ClInclude Include="..\..\include\ignite\common\platform_utils.h" /> + <ClInclude Include="..\..\include\ignite\common\promise.h" /> <ClInclude Include="..\..\include\ignite\common\reference_impl.h" /> + <ClInclude Include="..\..\include\ignite\common\shared_state.h" /> <ClInclude Include="..\..\include\ignite\common\utils.h" /> <ClInclude Include="..\..\include\ignite\date.h" /> + <ClInclude Include="..\..\include\ignite\future.h" /> <ClInclude Include="..\..\include\ignite\guid.h" /> <ClInclude Include="..\..\include\ignite\ignite_error.h" /> <ClInclude Include="..\..\include\ignite\reference.h" /> http://git-wip-us.apache.org/repos/asf/ignite/blob/f328c4ee/modules/platforms/cpp/common/project/vs/common.vcxproj.filters ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/common/project/vs/common.vcxproj.filters b/modules/platforms/cpp/common/project/vs/common.vcxproj.filters index ed709f7..4daf3aa 100644 --- a/modules/platforms/cpp/common/project/vs/common.vcxproj.filters +++ b/modules/platforms/cpp/common/project/vs/common.vcxproj.filters @@ -67,6 +67,15 @@ <ClInclude Include="..\..\include\ignite\time.h"> <Filter>Code</Filter> </ClInclude> + <ClInclude Include="..\..\include\ignite\future.h"> + <Filter>Code</Filter> + </ClInclude> + <ClInclude Include="..\..\include\ignite\common\promise.h"> + <Filter>Code\common</Filter> + </ClInclude> + <ClInclude Include="..\..\include\ignite\common\shared_state.h"> + <Filter>Code\common</Filter> + </ClInclude> </ItemGroup> <ItemGroup> <ClCompile Include="..\..\src\date.cpp"> http://git-wip-us.apache.org/repos/asf/ignite/blob/f328c4ee/modules/platforms/cpp/core-test/Makefile.am ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/core-test/Makefile.am b/modules/platforms/cpp/core-test/Makefile.am index 25dee58..11f4d1a 100644 --- a/modules/platforms/cpp/core-test/Makefile.am +++ b/modules/platforms/cpp/core-test/Makefile.am @@ -77,6 +77,7 @@ ignite_tests_SOURCES = \ src/decimal_test.cpp \ src/dynamic_size_array_test.cpp \ src/fixed_size_array_test.cpp \ + src/future_test.cpp \ src/transactions_test.cpp \ src/teamcity_messages.cpp \ src/teamcity_boost.cpp \ http://git-wip-us.apache.org/repos/asf/ignite/blob/f328c4ee/modules/platforms/cpp/core-test/project/vs/core-test.vcxproj ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/core-test/project/vs/core-test.vcxproj b/modules/platforms/cpp/core-test/project/vs/core-test.vcxproj index 5d97d20..51ae5d41 100644 --- a/modules/platforms/cpp/core-test/project/vs/core-test.vcxproj +++ b/modules/platforms/cpp/core-test/project/vs/core-test.vcxproj @@ -70,6 +70,7 @@ <ClCompile Include="..\..\src\dynamic_size_array_test.cpp" /> <ClCompile Include="..\..\src\fixed_size_array_test.cpp" /> <ClCompile Include="..\..\src\continuous_query_test.cpp" /> + <ClCompile Include="..\..\src\future_test.cpp" /> <ClCompile Include="..\..\src\ignite_error_test.cpp" /> <ClCompile Include="..\..\src\ignition_test.cpp" /> <ClCompile Include="..\..\src\handle_registry_test.cpp" /> http://git-wip-us.apache.org/repos/asf/ignite/blob/f328c4ee/modules/platforms/cpp/core-test/project/vs/core-test.vcxproj.filters ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/core-test/project/vs/core-test.vcxproj.filters b/modules/platforms/cpp/core-test/project/vs/core-test.vcxproj.filters index 08652d9..ebccc7f 100644 --- a/modules/platforms/cpp/core-test/project/vs/core-test.vcxproj.filters +++ b/modules/platforms/cpp/core-test/project/vs/core-test.vcxproj.filters @@ -85,6 +85,9 @@ <ClCompile Include="..\..\src\cache_store_test.cpp"> <Filter>Code</Filter> </ClCompile> + <ClCompile Include="..\..\src\future_test.cpp"> + <Filter>Code</Filter> + </ClCompile> </ItemGroup> <ItemGroup> <ClInclude Include="..\..\include\teamcity_messages.h"> @@ -178,5 +181,6 @@ <None Include="..\..\config\cache-query-continuous.xml"> <Filter>Configs</Filter> </None> + <None Include="..\..\config\cache-query-continuous.xml" /> </ItemGroup> </Project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/f328c4ee/modules/platforms/cpp/core-test/src/concurrent_test.cpp ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/core-test/src/concurrent_test.cpp b/modules/platforms/cpp/core-test/src/concurrent_test.cpp index 173973d..bcd7ddf 100644 --- a/modules/platforms/cpp/core-test/src/concurrent_test.cpp +++ b/modules/platforms/cpp/core-test/src/concurrent_test.cpp @@ -23,6 +23,7 @@ #include <ignite/common/concurrent.h> +using namespace ignite; using namespace ignite::common::concurrent; BOOST_AUTO_TEST_SUITE(ConcurrentTestSuite) @@ -276,4 +277,50 @@ BOOST_AUTO_TEST_CASE(TestEnableSharedFromThis) BOOST_CHECK(deleted); } +BOOST_AUTO_TEST_CASE(ConditionVariableBasic) +{ + CriticalSection cs; + ConditionVariable cv; + + CsLockGuard guard(cs); + + bool notified = cv.WaitFor(cs, 100); + + BOOST_REQUIRE(!notified); + + cv.NotifyOne(); + + notified = cv.WaitFor(cs, 100); + + BOOST_REQUIRE(!notified); + + cv.NotifyAll(); + + notified = cv.WaitFor(cs, 100); + + BOOST_REQUIRE(!notified); +} + +BOOST_AUTO_TEST_CASE(ManualEventBasic) +{ + ManualEvent evt; + + bool triggered = evt.WaitFor(100); + BOOST_CHECK(!triggered); + + evt.Set(); + + triggered = evt.WaitFor(100); + BOOST_REQUIRE(triggered); + + triggered = evt.WaitFor(100); + BOOST_REQUIRE(triggered); + + evt.Wait(); + evt.Reset(); + + triggered = evt.WaitFor(100); + BOOST_CHECK(!triggered); +} + BOOST_AUTO_TEST_SUITE_END() \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/f328c4ee/modules/platforms/cpp/core-test/src/future_test.cpp ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/core-test/src/future_test.cpp b/modules/platforms/cpp/core-test/src/future_test.cpp new file mode 100644 index 0000000..3d5c659 --- /dev/null +++ b/modules/platforms/cpp/core-test/src/future_test.cpp @@ -0,0 +1,474 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef _MSC_VER + #define BOOST_TEST_DYN_LINK +#endif + +#include <boost/test/unit_test.hpp> + +#include <ignite/common/promise.h> +#include <ignite/future.h> + +using namespace ignite; +using namespace ignite::common; + +/** + * Utility to make auto pointer from value. + * + * @param val Value. + * @return Auto pointer. + */ +template<typename T> +std::auto_ptr<T> MakeAuto(const T& val) +{ + return std::auto_ptr<T>(new T(val)); +} + +/** + * Checks if the error is of type IgniteError::IGNITE_ERR_FUTURE_STATE. + */ +inline bool IsFutureError(const IgniteError& err) +{ + return err.GetCode() == IgniteError::IGNITE_ERR_FUTURE_STATE; +} + +/** +* Checks if the error is of type IgniteError::IGNITE_ERR_UNKNOWN. +*/ +inline bool IsUnknownError(const IgniteError& err) +{ + return err.GetCode() == IgniteError::IGNITE_ERR_UNKNOWN; +} + +BOOST_AUTO_TEST_SUITE(FutureTestSuite) + +BOOST_AUTO_TEST_CASE(SharedStateIntValue) +{ + SharedState<int> sharedState; + int expected = 42; + + bool set = sharedState.WaitFor(100); + BOOST_CHECK(!set); + + sharedState.SetValue(MakeAuto(expected)); + + set = sharedState.WaitFor(100); + BOOST_REQUIRE(set); + + set = sharedState.WaitFor(100); + BOOST_REQUIRE(set); + + sharedState.Wait(); + int val = sharedState.GetValue(); + + BOOST_CHECK_EQUAL(val, expected); + + int val2 = sharedState.GetValue(); + + BOOST_CHECK_EQUAL(val2, expected); + + BOOST_CHECK_EXCEPTION(sharedState.SetValue(MakeAuto(0)), IgniteError, IsFutureError); + BOOST_CHECK_EXCEPTION(sharedState.SetValue(MakeAuto(expected)), IgniteError, IsFutureError); + BOOST_CHECK_EXCEPTION(sharedState.SetError(IgniteError()), IgniteError, IsFutureError); +} + +BOOST_AUTO_TEST_CASE(SharedStateStringValue) +{ + SharedState<std::string> sharedState; + std::string expected = "Lorem ipsum"; + + bool ready = sharedState.WaitFor(100); + BOOST_CHECK(!ready); + + sharedState.SetValue(MakeAuto(expected)); + + ready = sharedState.WaitFor(100); + BOOST_REQUIRE(ready); + + ready = sharedState.WaitFor(100); + BOOST_REQUIRE(ready); + + sharedState.Wait(); + std::string val = sharedState.GetValue(); + + BOOST_CHECK_EQUAL(val, expected); + + std::string val2 = sharedState.GetValue(); + + BOOST_CHECK_EQUAL(val2, expected); + + BOOST_CHECK_EXCEPTION(sharedState.SetError(IgniteError()), IgniteError, IsFutureError); + BOOST_CHECK_EXCEPTION(sharedState.SetValue(MakeAuto(expected)), IgniteError, IsFutureError); + BOOST_CHECK_EXCEPTION(sharedState.SetValue(MakeAuto(std::string("Hello world"))), IgniteError, IsFutureError); +} + +BOOST_AUTO_TEST_CASE(SharedStateVoidValue) +{ + SharedState<void> sharedState; + + bool ready = sharedState.WaitFor(100); + BOOST_CHECK(!ready); + + sharedState.SetValue(); + + ready = sharedState.WaitFor(100); + BOOST_REQUIRE(ready); + + ready = sharedState.WaitFor(100); + BOOST_REQUIRE(ready); + + sharedState.Wait(); + sharedState.GetValue(); + sharedState.GetValue(); + + BOOST_CHECK_EXCEPTION(sharedState.SetError(IgniteError()), IgniteError, IsFutureError); + BOOST_CHECK_EXCEPTION(sharedState.SetValue(), IgniteError, IsFutureError); +} + +BOOST_AUTO_TEST_CASE(SharedStateIntError) +{ + SharedState<int> sharedState; + + bool ready = sharedState.WaitFor(100); + BOOST_CHECK(!ready); + + sharedState.SetError(IgniteError(IgniteError::IGNITE_ERR_UNKNOWN, "Test")); + + ready = sharedState.WaitFor(100); + BOOST_REQUIRE(ready); + + ready = sharedState.WaitFor(100); + BOOST_REQUIRE(ready); + + sharedState.Wait(); + + BOOST_CHECK_EXCEPTION(sharedState.GetValue(), IgniteError, IsUnknownError); + + BOOST_CHECK_EXCEPTION(sharedState.SetValue(MakeAuto(42)), IgniteError, IsFutureError); + BOOST_CHECK_EXCEPTION(sharedState.SetError(IgniteError()), IgniteError, IsFutureError); +} + +BOOST_AUTO_TEST_CASE(SharedStateVoidError) +{ + SharedState<void> sharedState; + + bool ready = sharedState.WaitFor(100); + BOOST_CHECK(!ready); + + sharedState.SetError(IgniteError(IgniteError::IGNITE_ERR_UNKNOWN, "Test")); + + ready = sharedState.WaitFor(100); + BOOST_REQUIRE(ready); + + ready = sharedState.WaitFor(100); + BOOST_REQUIRE(ready); + + sharedState.Wait(); + + BOOST_CHECK_EXCEPTION(sharedState.GetValue(), IgniteError, IsUnknownError); + BOOST_CHECK_EXCEPTION(sharedState.SetError(IgniteError()), IgniteError, IsFutureError); + BOOST_CHECK_EXCEPTION(sharedState.SetValue(), IgniteError, IsFutureError); +} + +BOOST_AUTO_TEST_CASE(FutureIntValue) +{ + Promise<int> promise; + int expected = 42; + + Future<int> future1 = promise.GetFuture(); + Future<int> future2 = promise.GetFuture(); + + bool ready = future1.WaitFor(100); + BOOST_CHECK(!ready); + + ready = future2.WaitFor(100); + BOOST_CHECK(!ready); + + promise.SetValue(MakeAuto(expected)); + + ready = future1.WaitFor(100); + BOOST_REQUIRE(ready); + + ready = future1.WaitFor(100); + BOOST_REQUIRE(ready); + + ready = future2.WaitFor(100); + BOOST_REQUIRE(ready); + + ready = future2.WaitFor(100); + BOOST_REQUIRE(ready); + + future2.Wait(); + future1.Wait(); + + int val1 = future1.GetValue(); + + BOOST_CHECK_EQUAL(val1, expected); + + int val2 = future1.GetValue(); + + BOOST_CHECK_EQUAL(val2, expected); + + int val3 = future2.GetValue(); + + BOOST_CHECK_EQUAL(val3, expected); + + int val4 = future2.GetValue(); + + BOOST_CHECK_EQUAL(val4, expected); + + BOOST_CHECK_EXCEPTION(promise.SetValue(MakeAuto(0)), IgniteError, IsFutureError); + BOOST_CHECK_EXCEPTION(promise.SetValue(MakeAuto(expected)), IgniteError, IsFutureError); + BOOST_CHECK_EXCEPTION(promise.SetError(IgniteError()), IgniteError, IsFutureError); +} + +BOOST_AUTO_TEST_CASE(FutureStringValue) +{ + Promise<std::string> promise; + std::string expected = "Lorem Ipsum"; + + Future<std::string> future1 = promise.GetFuture(); + Future<std::string> future2 = promise.GetFuture(); + + bool ready = future1.WaitFor(100); + BOOST_CHECK(!ready); + + ready = future2.WaitFor(100); + BOOST_CHECK(!ready); + + promise.SetValue(MakeAuto(expected)); + + ready = future1.WaitFor(100); + BOOST_REQUIRE(ready); + + ready = future1.WaitFor(100); + BOOST_REQUIRE(ready); + + ready = future2.WaitFor(100); + BOOST_REQUIRE(ready); + + ready = future2.WaitFor(100); + BOOST_REQUIRE(ready); + + future2.Wait(); + future1.Wait(); + + std::string val1 = future1.GetValue(); + + BOOST_CHECK_EQUAL(val1, expected); + + std::string val2 = future1.GetValue(); + + BOOST_CHECK_EQUAL(val2, expected); + + std::string val3 = future2.GetValue(); + + BOOST_CHECK_EQUAL(val3, expected); + + std::string val4 = future2.GetValue(); + + BOOST_CHECK_EQUAL(val4, expected); + + BOOST_CHECK_EXCEPTION(promise.SetValue(MakeAuto(std::string("Hello Ignite"))), IgniteError, IsFutureError); + BOOST_CHECK_EXCEPTION(promise.SetValue(MakeAuto(expected)), IgniteError, IsFutureError); + BOOST_CHECK_EXCEPTION(promise.SetError(IgniteError()), IgniteError, IsFutureError); +} + +BOOST_AUTO_TEST_CASE(FutureVoidValue) +{ + Promise<void> promise; + + Future<void> future1 = promise.GetFuture(); + Future<void> future2 = promise.GetFuture(); + + bool ready = future1.WaitFor(100); + BOOST_CHECK(!ready); + + ready = future2.WaitFor(100); + BOOST_CHECK(!ready); + + promise.SetValue(); + + ready = future1.WaitFor(100); + BOOST_REQUIRE(ready); + + ready = future1.WaitFor(100); + BOOST_REQUIRE(ready); + + ready = future2.WaitFor(100); + BOOST_REQUIRE(ready); + + ready = future2.WaitFor(100); + BOOST_REQUIRE(ready); + + future2.Wait(); + future1.Wait(); + + future1.GetValue(); + future1.GetValue(); + future2.GetValue(); + future2.GetValue(); + + BOOST_CHECK_EXCEPTION(promise.SetValue(), IgniteError, IsFutureError); + BOOST_CHECK_EXCEPTION(promise.SetValue(), IgniteError, IsFutureError); + BOOST_CHECK_EXCEPTION(promise.SetError(IgniteError()), IgniteError, IsFutureError); +} + +BOOST_AUTO_TEST_CASE(FutureIntError) +{ + Promise<int> promise; + + Future<int> future1 = promise.GetFuture(); + Future<int> future2 = promise.GetFuture(); + + bool ready = future1.WaitFor(100); + BOOST_CHECK(!ready); + + ready = future2.WaitFor(100); + BOOST_CHECK(!ready); + + promise.SetError(IgniteError(IgniteError::IGNITE_ERR_UNKNOWN, "Test")); + + ready = future1.WaitFor(100); + BOOST_REQUIRE(ready); + + ready = future1.WaitFor(100); + BOOST_REQUIRE(ready); + + ready = future2.WaitFor(100); + BOOST_REQUIRE(ready); + + ready = future2.WaitFor(100); + BOOST_REQUIRE(ready); + + future2.Wait(); + future1.Wait(); + + BOOST_CHECK_EXCEPTION(future1.GetValue(), IgniteError, IsUnknownError); + BOOST_CHECK_EXCEPTION(future2.GetValue(), IgniteError, IsUnknownError); + + BOOST_CHECK_EXCEPTION(promise.SetValue(MakeAuto(42)), IgniteError, IsFutureError); + BOOST_CHECK_EXCEPTION(promise.SetError(IgniteError()), IgniteError, IsFutureError); +} + +BOOST_AUTO_TEST_CASE(FutureVoidError) +{ + Promise<void> promise; + + Future<void> future1 = promise.GetFuture(); + Future<void> future2 = promise.GetFuture(); + + bool ready = future1.WaitFor(100); + BOOST_CHECK(!ready); + + ready = future2.WaitFor(100); + BOOST_CHECK(!ready); + + promise.SetError(IgniteError(IgniteError::IGNITE_ERR_UNKNOWN, "Test")); + + ready = future1.WaitFor(100); + BOOST_REQUIRE(ready); + + ready = future1.WaitFor(100); + BOOST_REQUIRE(ready); + + ready = future2.WaitFor(100); + BOOST_REQUIRE(ready); + + ready = future2.WaitFor(100); + BOOST_REQUIRE(ready); + + future2.Wait(); + future1.Wait(); + + BOOST_CHECK_EXCEPTION(future1.GetValue(), IgniteError, IsUnknownError); + BOOST_CHECK_EXCEPTION(future2.GetValue(), IgniteError, IsUnknownError); + + BOOST_CHECK_EXCEPTION(promise.SetValue(), IgniteError, IsFutureError); + BOOST_CHECK_EXCEPTION(promise.SetError(IgniteError()), IgniteError, IsFutureError); +} + +BOOST_AUTO_TEST_CASE(FutureIntBroken) +{ + Promise<int>* promise = new Promise<int>(); + + Future<int> future1 = promise->GetFuture(); + Future<int> future2 = promise->GetFuture(); + + bool ready = future1.WaitFor(100); + BOOST_CHECK(!ready); + + ready = future2.WaitFor(100); + BOOST_CHECK(!ready); + + delete promise; + + ready = future1.WaitFor(100); + BOOST_REQUIRE(ready); + + ready = future1.WaitFor(100); + BOOST_REQUIRE(ready); + + ready = future2.WaitFor(100); + BOOST_REQUIRE(ready); + + ready = future2.WaitFor(100); + BOOST_REQUIRE(ready); + + future2.Wait(); + future1.Wait(); + + BOOST_CHECK_EXCEPTION(future1.GetValue(), IgniteError, IsFutureError); + BOOST_CHECK_EXCEPTION(future2.GetValue(), IgniteError, IsFutureError); +} + +BOOST_AUTO_TEST_CASE(FutureVoidBroken) +{ + Promise<void>* promise = new Promise<void>(); + + Future<void> future1 = promise->GetFuture(); + Future<void> future2 = promise->GetFuture(); + + bool ready = future1.WaitFor(100); + BOOST_CHECK(!ready); + + ready = future2.WaitFor(100); + BOOST_CHECK(!ready); + + delete promise; + + ready = future1.WaitFor(100); + BOOST_REQUIRE(ready); + + ready = future1.WaitFor(100); + BOOST_REQUIRE(ready); + + ready = future2.WaitFor(100); + BOOST_REQUIRE(ready); + + ready = future2.WaitFor(100); + BOOST_REQUIRE(ready); + + future2.Wait(); + future1.Wait(); + + BOOST_CHECK_EXCEPTION(future1.GetValue(), IgniteError, IsFutureError); + BOOST_CHECK_EXCEPTION(future2.GetValue(), IgniteError, IsFutureError); +} + +BOOST_AUTO_TEST_SUITE_END() \ No newline at end of file