merlimat commented on a change in pull request #12586:
URL: https://github.com/apache/pulsar/pull/12586#discussion_r741607586



##########
File path: pulsar-client-cpp/lib/Future.h
##########
@@ -31,121 +32,138 @@ typedef std::unique_lock<std::mutex> Lock;
 namespace pulsar {
 
 template <typename Result, typename Type>
-struct InternalState {
-    std::mutex mutex;
-    std::condition_variable condition;
-    Result result;
-    Type value;
-    bool complete;
-
-    std::list<typename std::function<void(Result, const Type&)> > listeners;
-};
-
-template <typename Result, typename Type>
-class Future {
+class InternalState {
    public:
-    typedef std::function<void(Result, const Type&)> ListenerCallback;
+    using ListenerCallback = std::function<void(Result, const Type&)>;
 
-    Future& addListener(ListenerCallback callback) {
-        InternalState<Result, Type>* state = state_.get();
-        Lock lock(state->mutex);
+    // There's a bug about the defaulted default constructor for GCC < 4.9.1, 
so we cannot use
+    // `InternalState() = default` here.
+    InternalState() {}
+    InternalState(const InternalState&) = delete;
+    InternalState& operator=(const InternalState&) = delete;
 
-        if (state->complete) {
+    static Result defaultResult() {
+        static Result result;
+        return result;
+    }
+
+    static Type defaultValue() {
+        static Type value;
+        return value;
+    }
+
+    bool completed() const noexcept { return completed_; }
+
+    void addListener(const ListenerCallback& callback) {
+        Lock lock(mutex_);
+        if (completed_) {
+            const auto result = result_;
+            const auto value = value_;
             lock.unlock();
-            callback(state->result, state->value);
+            callback(result, value);
         } else {
-            state->listeners.push_back(callback);
+            listeners_.emplace_back(callback);
         }
+    }
 
-        return *this;
+    Result wait(Type& value) {
+        Lock lock(mutex_);
+        while (!completed_) {
+            condition_.wait(lock);
+        }
+        value = value_;
+        return result_;
     }
 
-    Result get(Type& result) {
-        InternalState<Result, Type>* state = state_.get();
-        Lock lock(state->mutex);
+    bool complete(const Type& value) {
+        if (completed_) {
+            return false;
+        }
+
+        Lock lock(mutex_);
+        value_ = value;
+        completed_ = true;
+        auto listeners = std::move(listeners_);
+        lock.unlock();
 
-        if (!state->complete) {
-            // Wait for result
-            while (!state->complete) {
-                state->condition.wait(lock);
-            }
+        for (auto& callback : listeners) {

Review comment:
       Ok, we just need to be careful in making sure that when a listener is 
added and the future is already completed, the listener needs to be triggered 
immediately.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to