This is an automated email from the ASF dual-hosted git repository.

stegemr pushed a commit to branch feature/solveAsanIssuePushStreams
in repository https://gitbox.apache.org/repos/asf/celix.git


The following commit(s) were added to 
refs/heads/feature/solveAsanIssuePushStreams by this push:
     new d5818b88 Adding async source and remove asan issue in close of source
d5818b88 is described below

commit d5818b886688a73433669fcc1c3fecd14c98aef1
Author: stegemanr <[email protected]>
AuthorDate: Fri May 13 19:40:36 2022 +0200

    Adding async source and remove asan issue in close of source
---
 libs/pushstreams/api/celix/PushStreamProvider.h    | 17 +++-
 libs/pushstreams/api/celix/SimplePushEventSource.h | 69 ----------------
 .../api/celix/impl/AbstractPushEventSource.h       | 33 +++++---
 libs/pushstreams/gtest/src/PushStreamTestSuite.cc  | 92 ++++++++++++++++------
 4 files changed, 107 insertions(+), 104 deletions(-)

diff --git a/libs/pushstreams/api/celix/PushStreamProvider.h 
b/libs/pushstreams/api/celix/PushStreamProvider.h
index 6afbf068..134b6d0d 100644
--- a/libs/pushstreams/api/celix/PushStreamProvider.h
+++ b/libs/pushstreams/api/celix/PushStreamProvider.h
@@ -19,7 +19,7 @@
 
 #pragma once
 
-#include "celix/SimplePushEventSource.h"
+#include "celix/AsynchronousPushEventSource.h"
 #include "celix/SynchronousPushEventSource.h"
 #include "celix/IPushEventSource.h"
 #include "celix/impl/StreamPushEventConsumer.h"
@@ -48,6 +48,16 @@ namespace celix {
         template <typename T>
         [[nodiscard]] std::shared_ptr<celix::SynchronousPushEventSource<T>> 
createSynchronousEventSource(std::shared_ptr<PromiseFactory>&  promiseFactory);
 
+        /**
+         * @brief creates an event source for type T. The eventsource is 
asynchronous, events are sent downstream in
+         * executor of the promise factory.
+         * @param promiseFactory the used promiseFactory
+         * @tparam T The type of the events
+         * @return the event source, the caller needs to hold the shared_ptr.
+         */
+        template <typename T>
+        [[nodiscard]] std::shared_ptr<celix::AsynchronousPushEventSource<T>> 
createAsynchronousEventSource(std::shared_ptr<PromiseFactory>&  promiseFactory);
+
         /**
          * @brief creates a stream of for event type T. Stream is unbuffered 
thus on reception of event, the event
          * will be sent downstream on the eventsource's thread.
@@ -84,6 +94,11 @@ inline std::shared_ptr<celix::SynchronousPushEventSource<T>> 
celix::PushStreamPr
     return 
std::make_shared<celix::SynchronousPushEventSource<T>>(promiseFactory);
 }
 
+template <typename T>
+inline std::shared_ptr<celix::AsynchronousPushEventSource<T>> 
celix::PushStreamProvider::createAsynchronousEventSource(std::shared_ptr<PromiseFactory>&
 promiseFactory) {
+    return 
std::make_shared<celix::AsynchronousPushEventSource<T>>(promiseFactory);
+}
+
 template <typename T>
 std::shared_ptr<celix::PushStream<T>> 
celix::PushStreamProvider::createUnbufferedStream(std::shared_ptr<celix::IPushEventSource<T>>
 eventSource, std::shared_ptr<PromiseFactory>& promiseFactory) {
     auto stream = std::make_shared<UnbufferedPushStream<T>>(promiseFactory);
diff --git a/libs/pushstreams/api/celix/SimplePushEventSource.h 
b/libs/pushstreams/api/celix/SimplePushEventSource.h
deleted file mode 100644
index f283d56b..00000000
--- a/libs/pushstreams/api/celix/SimplePushEventSource.h
+++ /dev/null
@@ -1,69 +0,0 @@
-/**
- *Licensed to the Apache Software Foundation (ASF) under one
- *or more contributor license agreements.  See the NOTICE file
- *distributed with this work for additional information
- *regarding copyright ownership.  The ASF licenses this file
- *to you under the Apache License, Version 2.0 (the
- *"License"); you may not use this file except in compliance
- *with the License.  You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- *Unless required by applicable law or agreed to in writing,
- *software distributed under the License is distributed on an
- *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- *specific language governing permissions and limitations
- *under the License.
- */
-
-
-#pragma once
-
-#include <queue>
-
-#include "celix/impl/AbstractPushEventSource.h"
-#include "celix/IAutoCloseable.h"
-
-#include "IllegalStateException.h"
-#include "celix/PromiseFactory.h"
-#include "celix/Promise.h"
-#include "celix/DefaultExecutor.h"
-#include "celix/PushEvent.h"
-
-namespace celix {
-    template <typename T>
-    class SimplePushEventSource: public AbstractPushEventSource<T> {
-    public:
-        explicit SimplePushEventSource(PromiseFactory& promiseFactory);
-
-    protected:
-        void execute(std::function<void()> task) override;
-
-    private:
-        std::shared_ptr<IExecutor> executor {};
-        std::queue<std::function<void()>> queue {};
-    };
-}
-
-/*********************************************************************************
- Implementation
-*********************************************************************************/
-
-template <typename T>
-celix::SimplePushEventSource<T>::SimplePushEventSource(PromiseFactory& 
promiseFactory): AbstractPushEventSource<T>{promiseFactory},
-    executor{promiseFactory.getExecutor()} {
-
-//    executor->execute([]() {
-//        for(;;) {
-//
-//        }
-//    });
-}
-
-template <typename T>
-void celix::SimplePushEventSource<T>::execute(std::function<void()> task) {
-    task();
-}
-
-
diff --git a/libs/pushstreams/api/celix/impl/AbstractPushEventSource.h 
b/libs/pushstreams/api/celix/impl/AbstractPushEventSource.h
index 053de270..fcf2fca7 100644
--- a/libs/pushstreams/api/celix/impl/AbstractPushEventSource.h
+++ b/libs/pushstreams/api/celix/impl/AbstractPushEventSource.h
@@ -88,7 +88,7 @@ 
celix::AbstractPushEventSource<T>::AbstractPushEventSource(std::shared_ptr<Promi
 
 template <typename T>
 void 
celix::AbstractPushEventSource<T>::open(std::shared_ptr<celix::IPushEventConsumer<T>>
 _eventConsumer) {
-    std::lock_guard lck{mutex};
+    std::unique_lock lck{mutex};
     if (closed) {
         _eventConsumer->accept(celix::ClosePushEvent<T>());
     } else {
@@ -102,7 +102,7 @@ void 
celix::AbstractPushEventSource<T>::open(std::shared_ptr<celix::IPushEventCo
 
 template <typename T>
 [[nodiscard]] celix::Promise<void> 
celix::AbstractPushEventSource<T>::connectPromise() {
-    std::lock_guard lck{mutex};
+    std::unique_lock lck{mutex};
     auto connect = promiseFactory->deferred<void>();
     connected.push_back(connect);
     return connect.getPromise();
@@ -110,7 +110,7 @@ template <typename T>
 
 template <typename T>
 void celix::AbstractPushEventSource<T>::publish(const T& event) {
-    std::lock_guard lck{mutex};
+    std::unique_lock lck{mutex};
 
     if (closed) {
         throw IllegalStateException("AbstractPushEventSource closed");
@@ -125,28 +125,37 @@ void celix::AbstractPushEventSource<T>::publish(const T& 
event) {
 
 template <typename T>
 bool celix::AbstractPushEventSource<T>::isConnected() {
-    std::lock_guard lck{mutex};
+    std::unique_lock lck{mutex};
     return !eventConsumers.empty();
 }
 
 template <typename T>
 void celix::AbstractPushEventSource<T>::close() {
+    std::condition_variable cv;
+
+
     {
-        std::lock_guard lck{mutex};
+        std::unique_lock lck{mutex};
+
         if (closed) {
             return;
-        } else {
-            closed = true;
         }
-    }
 
-    for(auto& eventConsumer : eventConsumers) {
-        execute([eventConsumer]() {
-            eventConsumer->accept(celix::ClosePushEvent<T>());
-        });
+        for (auto &eventConsumer : eventConsumers) {
+            execute([eventConsumer]() {
+                eventConsumer->accept(celix::ClosePushEvent<T>());
+            });
+        }
     }
 
     execute([&]() {
+        std::unique_lock lck{mutex};
         eventConsumers.clear();
+        closed = true;
+        cv.notify_one();
     });
+
+    //wait upon closed
+    std::unique_lock lck{mutex};
+    cv.wait(lck, [&]() {return closed;});
 }
\ No newline at end of file
diff --git a/libs/pushstreams/gtest/src/PushStreamTestSuite.cc 
b/libs/pushstreams/gtest/src/PushStreamTestSuite.cc
index f794eb82..7e390c47 100644
--- a/libs/pushstreams/gtest/src/PushStreamTestSuite.cc
+++ b/libs/pushstreams/gtest/src/PushStreamTestSuite.cc
@@ -75,14 +75,18 @@ public:
     std::unique_ptr<std::thread> t{};
 
     std::shared_ptr<celix::PromiseFactory> promiseFactory 
{std::make_shared<celix::PromiseFactory>()};
-    celix::Deferred<void> done{promiseFactory->deferred<void>()};
-    celix::Promise<void> donepromise = done.getPromise();
+    std::mutex mutex{};
+    std::condition_variable done{};
 
     template <typename T>
-    std::shared_ptr<celix::SynchronousPushEventSource<T>> createEventSource(T 
event, int publishCount, bool autoinc = false) {
-        auto ses = psp.template 
createSynchronousEventSource<T>(promiseFactory);
-
-        auto successLambda = [this, weakses = 
std::weak_ptr<celix::SynchronousPushEventSource<T>>(ses), event, publishCount, 
autoinc](celix::Promise<void> p) -> celix::Promise<void> {
+    std::shared_ptr<celix::AbstractPushEventSource<T>> createEventSource(T 
event, int publishCount, bool autoinc = false, bool syncSource = true) {
+        std::shared_ptr<celix::AbstractPushEventSource<T>> ses;
+        if (syncSource)
+            ses = psp.template createSynchronousEventSource<T>(promiseFactory);
+        else
+            ses = psp.template 
createAsynchronousEventSource<T>(promiseFactory);
+
+        auto successLambda = [this, weakses = 
std::weak_ptr<celix::AbstractPushEventSource<T>>(ses), event, publishCount, 
autoinc](celix::Promise<void> p) -> celix::Promise<void> {
             t = std::make_unique<std::thread>([&, event, publishCount, 
autoinc]() {
                 int counter = 0;
                 T data {event};
@@ -104,7 +108,8 @@ public:
             if (ses) {
                 ses->close();
             }
-            done.resolve();
+            std::unique_lock lk(mutex);
+            done.notify_one();
             return p;
         };
 
@@ -310,6 +315,8 @@ TEST_F(PushStreamTestSuite, ForEachTestBasicType) {
         int consumeCount{0};
         int consumeSum{0};
         int lastConsumed{-1};
+        std::unique_lock lk(mutex);
+
         auto ses = createEventSource<int>(0, 10'000, true);
 
         auto stream = psp.createUnbufferedStream<int>(ses, promiseFactory);
@@ -322,8 +329,8 @@ TEST_F(PushStreamTestSuite, ForEachTestBasicType) {
                     consumeSum = consumeSum + event;
                 });
 
+        done.wait(lk);
         streamEnded.wait();
-        donepromise.wait();
 
         GTEST_ASSERT_EQ(10'000, consumeCount);
         GTEST_ASSERT_EQ(49'995'000, consumeSum);
@@ -339,6 +346,8 @@ TEST_F(PushStreamTestSuite, ForEachTestBasicType_Buffered) {
         int consumeCount{0};
         int consumeSum{0};
         int lastConsumed{-1};
+        std::unique_lock lk(mutex);
+
         auto ses = createEventSource<int>(0, 10'000, true);
 
         auto stream = psp.createStream<int>(ses, promiseFactory);
@@ -351,8 +360,8 @@ TEST_F(PushStreamTestSuite, ForEachTestBasicType_Buffered) {
                     consumeSum = consumeSum + event;
                 });
 
+        done.wait(lk);
         streamEnded.wait();
-        donepromise.wait();
 
         GTEST_ASSERT_EQ(10'000, consumeCount);
         GTEST_ASSERT_EQ(49'995'000, consumeSum);
@@ -363,6 +372,8 @@ TEST_F(PushStreamTestSuite, ForEachTestBasicType_Buffered) {
 TEST_F(PushStreamTestSuite, ForEachTestObjectType) {
     int consumeCount{0};
     int consumeSum{0};
+    std::unique_lock lk(mutex);
+
     auto ses = createEventSource<EventObject>(EventObject{2}, 10);
 
     auto stream = psp.createUnbufferedStream<EventObject>(ses, promiseFactory);
@@ -372,8 +383,8 @@ TEST_F(PushStreamTestSuite, ForEachTestObjectType) {
                 consumeSum = consumeSum + event;
             });
 
+    done.wait(lk);
     streamEnded.wait();
-    donepromise.wait();
 
     GTEST_ASSERT_EQ(10, consumeCount);
     GTEST_ASSERT_EQ(20, consumeSum);
@@ -384,6 +395,8 @@ TEST_F(PushStreamTestSuite, ForEachTestObjectType) {
 TEST_F(PushStreamTestSuite, FilterTestObjectType_true) {
     int consumeCount{0};
     int consumeSum{0};
+    std::unique_lock lk(mutex);
+
     auto ses = createEventSource<EventObject>(EventObject{2}, 10);
 
     auto stream = psp.createUnbufferedStream<EventObject>(ses, promiseFactory);
@@ -396,8 +409,8 @@ TEST_F(PushStreamTestSuite, FilterTestObjectType_true) {
                 consumeSum = consumeSum + event;
             });
 
+    done.wait(lk);
     streamEnded.wait();
-    donepromise.wait();
 
     GTEST_ASSERT_EQ(10, consumeCount);
     GTEST_ASSERT_EQ(20, consumeSum);
@@ -407,6 +420,8 @@ TEST_F(PushStreamTestSuite, FilterTestObjectType_true) {
 TEST_F(PushStreamTestSuite, FilterTestObjectType_false) {
     int consumeCount{0};
     int consumeSum{0};
+    std::unique_lock lk(mutex);
+
     auto ses = createEventSource<EventObject>(EventObject{2}, 10);
 
     auto stream = psp.createUnbufferedStream<EventObject>(ses, promiseFactory);
@@ -419,8 +434,8 @@ TEST_F(PushStreamTestSuite, FilterTestObjectType_false) {
                 consumeSum = consumeSum + event;
             });
 
+    done.wait(lk);
     streamEnded.wait();
-    donepromise.wait();
 
     GTEST_ASSERT_EQ(0, consumeCount);
     GTEST_ASSERT_EQ(0, consumeSum);
@@ -429,6 +444,9 @@ TEST_F(PushStreamTestSuite, FilterTestObjectType_false) {
 TEST_F(PushStreamTestSuite, FilterTestObjectType_simple) {
     int consumeCount{0};
     int consumeSum{0};
+
+    std::unique_lock lk(mutex);
+
     auto ses = createEventSource<EventObject>(EventObject{0}, 10, true);
 
     auto stream = psp.createUnbufferedStream<EventObject>(ses, promiseFactory);
@@ -441,8 +459,8 @@ TEST_F(PushStreamTestSuite, FilterTestObjectType_simple) {
                 consumeSum = consumeSum + event;
             });
 
+    done.wait(lk);
     streamEnded.wait();
-    donepromise.wait();
 
     GTEST_ASSERT_EQ(5, consumeCount);
     GTEST_ASSERT_EQ( 0 + 1 + 2 + 3 + 4, consumeSum);
@@ -451,6 +469,8 @@ TEST_F(PushStreamTestSuite, FilterTestObjectType_simple) {
 TEST_F(PushStreamTestSuite, FilterTestObjectType_and) {
     int consumeCount{0};
     int consumeSum{0};
+    std::unique_lock lk(mutex);
+
     auto ses = createEventSource<EventObject>(EventObject{0}, 10, true);
 
     auto stream = psp.createUnbufferedStream<EventObject>(ses, promiseFactory);
@@ -466,8 +486,8 @@ TEST_F(PushStreamTestSuite, FilterTestObjectType_and) {
                 consumeSum = consumeSum + event;
             });
 
+    done.wait(lk);
     streamEnded.wait();
-    donepromise.wait();
 
     GTEST_ASSERT_EQ(2, consumeCount);
     GTEST_ASSERT_EQ(6 + 7, consumeSum); // 0 + 1 + 2 + 3 + 4
@@ -476,6 +496,8 @@ TEST_F(PushStreamTestSuite, FilterTestObjectType_and) {
 TEST_F(PushStreamTestSuite, MapTestObjectType) {
     int consumeCount{0};
     int consumeSum{0};
+    std::unique_lock lk(mutex);
+
     auto ses = createEventSource<EventObject>(EventObject{0}, 10, true);
     auto stream = psp.createUnbufferedStream<EventObject>(ses, promiseFactory);
     auto streamEnded = stream->
@@ -487,13 +509,39 @@ TEST_F(PushStreamTestSuite, MapTestObjectType) {
                 consumeSum = consumeSum + event;
             });
 
+    done.wait(lk);
     streamEnded.wait();
-    donepromise.wait();
 
     GTEST_ASSERT_EQ(10, consumeCount);
     GTEST_ASSERT_EQ(45, consumeSum);
 }
 
+TEST_F(PushStreamTestSuite, MapTestObjectType_async) {
+    for(int i = 0; i < 1000; i++) {
+        int consumeCount{0};
+        int consumeSum{0};
+        std::unique_lock lk(mutex);
+
+        auto ses = createEventSource<EventObject>(EventObject{0}, 10, true, 
false);
+        auto stream = psp.createUnbufferedStream<EventObject>(ses, 
promiseFactory);
+        auto streamEnded = stream->
+                map<int>([](const EventObject &event) -> int {
+            return event.val;
+        }).
+                forEach([&](const int &event) {
+            consumeCount++;
+            consumeSum = consumeSum + event;
+        });
+
+        done.wait(lk);
+        streamEnded.wait();
+
+        GTEST_ASSERT_EQ(10, consumeCount);
+        GTEST_ASSERT_EQ(45, consumeSum);
+    }
+}
+
+
 TEST_F(PushStreamTestSuite, MultipleStreamsTest_CloseSource) {
     int onEventStream1{0};
     int onEventStream2{0};
@@ -502,6 +550,8 @@ TEST_F(PushStreamTestSuite, 
MultipleStreamsTest_CloseSource) {
     int onErrorReceived1{0};
     int onErrorReceived2{0};
 
+    std::unique_lock lk(mutex);
+
     auto psp = PushStreamProvider();
     std::unique_ptr<std::thread> t{};
     auto ses = createEventSource<int>(0, 20, true);
@@ -535,17 +585,15 @@ TEST_F(PushStreamTestSuite, 
MultipleStreamsTest_CloseSource) {
     });
 
     streamEnded1.onSuccess([]() {
-        std::cout << " ended" << std::endl;
     });
 
     streamEnded2.onSuccess([]() {
-        std::cout << "Stream2 ended" << std::endl;
     });
 
     streamEnded1.wait();
     streamEnded2.wait();
 
-    donepromise.wait();
+    done.wait(lk);
 
     GTEST_ASSERT_EQ(4, onEventStream1);
     //The first stream will start the source, thus the number of receives in 
second is not guaranteed
@@ -593,11 +641,9 @@ TEST_F(PushStreamTestSuite, 
MultipleStreamsTest_CloseStream) {
             });
 
     streamEnded1.onSuccess([]() {
-        std::cout << "Stream ended" << std::endl;
     });
 
     streamEnded2.onSuccess([]() {
-        std::cout << "Stream2 ended" << std::endl;
     });
 
     stream1->close();
@@ -612,12 +658,15 @@ TEST_F(PushStreamTestSuite, 
MultipleStreamsTest_CloseStream) {
     GTEST_ASSERT_EQ(1, onEventStream2);
 }
 
+
 TEST_F(PushStreamTestSuite, SplitStreamsTest) {
     std::map<int,int> counts{};
     counts[0] = 0;
     counts[1] = 0;
     int onClosedReceived{0};
 
+    std::unique_lock lk(mutex);
+
     auto psp = PushStreamProvider();
     std::unique_ptr<std::thread> t{};
     auto ses = createEventSource<int>(0, 20, true);
@@ -644,11 +693,10 @@ TEST_F(PushStreamTestSuite, SplitStreamsTest) {
         }));
 
         streamEndeds[i].onSuccess([]() {
-            std::cout << "Stream ended" << std::endl;
         });
     }
 
-    donepromise.wait();
+    done.wait(lk);
 
     GTEST_ASSERT_EQ(2, onClosedReceived);
     GTEST_ASSERT_EQ(9, counts[0]);

Reply via email to