merlimat commented on a change in pull request #12586:
URL: https://github.com/apache/pulsar/pull/12586#discussion_r741607586



##########
File path: pulsar-client-cpp/lib/Future.h
##########
@@ -31,121 +32,138 @@ typedef std::unique_lock<std::mutex> Lock;
 namespace pulsar {
 
 template <typename Result, typename Type>
-struct InternalState {
-    std::mutex mutex;
-    std::condition_variable condition;
-    Result result;
-    Type value;
-    bool complete;
-
-    std::list<typename std::function<void(Result, const Type&)> > listeners;
-};
-
-template <typename Result, typename Type>
-class Future {
+class InternalState {
    public:
-    typedef std::function<void(Result, const Type&)> ListenerCallback;
+    using ListenerCallback = std::function<void(Result, const Type&)>;
 
-    Future& addListener(ListenerCallback callback) {
-        InternalState<Result, Type>* state = state_.get();
-        Lock lock(state->mutex);
+    // There's a bug about the defaulted default constructor for GCC < 4.9.1, 
so we cannot use
+    // `InternalState() = default` here.
+    InternalState() {}
+    InternalState(const InternalState&) = delete;
+    InternalState& operator=(const InternalState&) = delete;
 
-        if (state->complete) {
+    static Result defaultResult() {
+        static Result result;
+        return result;
+    }
+
+    static Type defaultValue() {
+        static Type value;
+        return value;
+    }
+
+    bool completed() const noexcept { return completed_; }
+
+    void addListener(const ListenerCallback& callback) {
+        Lock lock(mutex_);
+        if (completed_) {
+            const auto result = result_;
+            const auto value = value_;
             lock.unlock();
-            callback(state->result, state->value);
+            callback(result, value);
         } else {
-            state->listeners.push_back(callback);
+            listeners_.emplace_back(callback);
         }
+    }
 
-        return *this;
+    Result wait(Type& value) {
+        Lock lock(mutex_);
+        while (!completed_) {
+            condition_.wait(lock);
+        }
+        value = value_;
+        return result_;
     }
 
-    Result get(Type& result) {
-        InternalState<Result, Type>* state = state_.get();
-        Lock lock(state->mutex);
+    bool complete(const Type& value) {
+        if (completed_) {
+            return false;
+        }
+
+        Lock lock(mutex_);
+        value_ = value;
+        completed_ = true;
+        auto listeners = std::move(listeners_);
+        lock.unlock();
 
-        if (!state->complete) {
-            // Wait for result
-            while (!state->complete) {
-                state->condition.wait(lock);
-            }
+        for (auto& callback : listeners) {

Review comment:
       Ok, we just need to be careful in making sure that when a listener is 
added and the future is already completed, the listener needs to be triggered 
immediately.

##########
File path: pulsar-client-cpp/lib/Future.h
##########
@@ -99,21 +100,24 @@ class Promise {
         }
 
         state->value = value;
-        state->result = Result();
+        state->result = DEFAULT_RESULT;
         state->complete = true;
 
-        typename std::list<ListenerCallback>::iterator it;
-        for (it = state->listeners.begin(); it != state->listeners.end(); 
++it) {
-            ListenerCallback& callback = *it;
-            callback(state->result, state->value);
+        const auto listeners = std::move(state->listeners);
+
+        lock.unlock();
+
+        for (auto& callback : listeners) {
+            callback(DEFAULT_RESULT, value);
         }
 
         state->listeners.clear();

Review comment:
       the `state->listeners.clear()` would have to be done from within the 
mutex. Probably it might not be needed anymore? In what state is the 
`std::vector` left after the listeners are moved out?

##########
File path: pulsar-client-cpp/lib/Future.h
##########
@@ -99,21 +100,24 @@ class Promise {
         }
 
         state->value = value;
-        state->result = Result();
+        state->result = DEFAULT_RESULT;
         state->complete = true;
 
-        typename std::list<ListenerCallback>::iterator it;
-        for (it = state->listeners.begin(); it != state->listeners.end(); 
++it) {
-            ListenerCallback& callback = *it;
-            callback(state->result, state->value);
+        const auto listeners = std::move(state->listeners);
+
+        lock.unlock();
+
+        for (auto& callback : listeners) {
+            callback(DEFAULT_RESULT, value);
         }
 
         state->listeners.clear();

Review comment:
       The only thing I’m not 100 sure if it’s safe to keep using the listener 
vector after it's moved out. Eg. If a new listener is added after the future  
is completed, it will add again into the moved vector which is in an undefined 
state. A I understand, the only guarantee is that the vector will be empty, but 
we have no idea whether we can add or not. 
   
   Instead of the move, it might actually be safer to do a swap with an empty 
std::vector.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to