This is an automated email from the ASF dual-hosted git repository.
stegemr pushed a commit to branch feature/solveAsanIssuePushStreams
in repository https://gitbox.apache.org/repos/asf/celix.git
The following commit(s) were added to
refs/heads/feature/solveAsanIssuePushStreams by this push:
new d5818b88 Adding async source and remove asan issue in close of source
d5818b88 is described below
commit d5818b886688a73433669fcc1c3fecd14c98aef1
Author: stegemanr <[email protected]>
AuthorDate: Fri May 13 19:40:36 2022 +0200
Adding async source and remove asan issue in close of source
---
libs/pushstreams/api/celix/PushStreamProvider.h | 17 +++-
libs/pushstreams/api/celix/SimplePushEventSource.h | 69 ----------------
.../api/celix/impl/AbstractPushEventSource.h | 33 +++++---
libs/pushstreams/gtest/src/PushStreamTestSuite.cc | 92 ++++++++++++++++------
4 files changed, 107 insertions(+), 104 deletions(-)
diff --git a/libs/pushstreams/api/celix/PushStreamProvider.h
b/libs/pushstreams/api/celix/PushStreamProvider.h
index 6afbf068..134b6d0d 100644
--- a/libs/pushstreams/api/celix/PushStreamProvider.h
+++ b/libs/pushstreams/api/celix/PushStreamProvider.h
@@ -19,7 +19,7 @@
#pragma once
-#include "celix/SimplePushEventSource.h"
+#include "celix/AsynchronousPushEventSource.h"
#include "celix/SynchronousPushEventSource.h"
#include "celix/IPushEventSource.h"
#include "celix/impl/StreamPushEventConsumer.h"
@@ -48,6 +48,16 @@ namespace celix {
template <typename T>
[[nodiscard]] std::shared_ptr<celix::SynchronousPushEventSource<T>>
createSynchronousEventSource(std::shared_ptr<PromiseFactory>& promiseFactory);
+ /**
+ * @brief creates an event source for type T. The eventsource is
asynchronous, events are sent downstream in
+ * executor of the promise factory.
+ * @param promiseFactory the used promiseFactory
+ * @tparam T The type of the events
+ * @return the event source, the caller needs to hold the shared_ptr.
+ */
+ template <typename T>
+ [[nodiscard]] std::shared_ptr<celix::AsynchronousPushEventSource<T>>
createAsynchronousEventSource(std::shared_ptr<PromiseFactory>& promiseFactory);
+
/**
* @brief creates a stream of for event type T. Stream is unbuffered
thus on reception of event, the event
* will be sent downstream on the eventsource's thread.
@@ -84,6 +94,11 @@ inline std::shared_ptr<celix::SynchronousPushEventSource<T>>
celix::PushStreamPr
return
std::make_shared<celix::SynchronousPushEventSource<T>>(promiseFactory);
}
+template <typename T>
+inline std::shared_ptr<celix::AsynchronousPushEventSource<T>>
celix::PushStreamProvider::createAsynchronousEventSource(std::shared_ptr<PromiseFactory>&
promiseFactory) {
+ return
std::make_shared<celix::AsynchronousPushEventSource<T>>(promiseFactory);
+}
+
template <typename T>
std::shared_ptr<celix::PushStream<T>>
celix::PushStreamProvider::createUnbufferedStream(std::shared_ptr<celix::IPushEventSource<T>>
eventSource, std::shared_ptr<PromiseFactory>& promiseFactory) {
auto stream = std::make_shared<UnbufferedPushStream<T>>(promiseFactory);
diff --git a/libs/pushstreams/api/celix/SimplePushEventSource.h
b/libs/pushstreams/api/celix/SimplePushEventSource.h
deleted file mode 100644
index f283d56b..00000000
--- a/libs/pushstreams/api/celix/SimplePushEventSource.h
+++ /dev/null
@@ -1,69 +0,0 @@
-/**
- *Licensed to the Apache Software Foundation (ASF) under one
- *or more contributor license agreements. See the NOTICE file
- *distributed with this work for additional information
- *regarding copyright ownership. The ASF licenses this file
- *to you under the Apache License, Version 2.0 (the
- *"License"); you may not use this file except in compliance
- *with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- *Unless required by applicable law or agreed to in writing,
- *software distributed under the License is distributed on an
- *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- *specific language governing permissions and limitations
- *under the License.
- */
-
-
-#pragma once
-
-#include <queue>
-
-#include "celix/impl/AbstractPushEventSource.h"
-#include "celix/IAutoCloseable.h"
-
-#include "IllegalStateException.h"
-#include "celix/PromiseFactory.h"
-#include "celix/Promise.h"
-#include "celix/DefaultExecutor.h"
-#include "celix/PushEvent.h"
-
-namespace celix {
- template <typename T>
- class SimplePushEventSource: public AbstractPushEventSource<T> {
- public:
- explicit SimplePushEventSource(PromiseFactory& promiseFactory);
-
- protected:
- void execute(std::function<void()> task) override;
-
- private:
- std::shared_ptr<IExecutor> executor {};
- std::queue<std::function<void()>> queue {};
- };
-}
-
-/*********************************************************************************
- Implementation
-*********************************************************************************/
-
-template <typename T>
-celix::SimplePushEventSource<T>::SimplePushEventSource(PromiseFactory&
promiseFactory): AbstractPushEventSource<T>{promiseFactory},
- executor{promiseFactory.getExecutor()} {
-
-// executor->execute([]() {
-// for(;;) {
-//
-// }
-// });
-}
-
-template <typename T>
-void celix::SimplePushEventSource<T>::execute(std::function<void()> task) {
- task();
-}
-
-
diff --git a/libs/pushstreams/api/celix/impl/AbstractPushEventSource.h
b/libs/pushstreams/api/celix/impl/AbstractPushEventSource.h
index 053de270..fcf2fca7 100644
--- a/libs/pushstreams/api/celix/impl/AbstractPushEventSource.h
+++ b/libs/pushstreams/api/celix/impl/AbstractPushEventSource.h
@@ -88,7 +88,7 @@
celix::AbstractPushEventSource<T>::AbstractPushEventSource(std::shared_ptr<Promi
template <typename T>
void
celix::AbstractPushEventSource<T>::open(std::shared_ptr<celix::IPushEventConsumer<T>>
_eventConsumer) {
- std::lock_guard lck{mutex};
+ std::unique_lock lck{mutex};
if (closed) {
_eventConsumer->accept(celix::ClosePushEvent<T>());
} else {
@@ -102,7 +102,7 @@ void
celix::AbstractPushEventSource<T>::open(std::shared_ptr<celix::IPushEventCo
template <typename T>
[[nodiscard]] celix::Promise<void>
celix::AbstractPushEventSource<T>::connectPromise() {
- std::lock_guard lck{mutex};
+ std::unique_lock lck{mutex};
auto connect = promiseFactory->deferred<void>();
connected.push_back(connect);
return connect.getPromise();
@@ -110,7 +110,7 @@ template <typename T>
template <typename T>
void celix::AbstractPushEventSource<T>::publish(const T& event) {
- std::lock_guard lck{mutex};
+ std::unique_lock lck{mutex};
if (closed) {
throw IllegalStateException("AbstractPushEventSource closed");
@@ -125,28 +125,37 @@ void celix::AbstractPushEventSource<T>::publish(const T&
event) {
template <typename T>
bool celix::AbstractPushEventSource<T>::isConnected() {
- std::lock_guard lck{mutex};
+ std::unique_lock lck{mutex};
return !eventConsumers.empty();
}
template <typename T>
void celix::AbstractPushEventSource<T>::close() {
+ std::condition_variable cv;
+
+
{
- std::lock_guard lck{mutex};
+ std::unique_lock lck{mutex};
+
if (closed) {
return;
- } else {
- closed = true;
}
- }
- for(auto& eventConsumer : eventConsumers) {
- execute([eventConsumer]() {
- eventConsumer->accept(celix::ClosePushEvent<T>());
- });
+ for (auto &eventConsumer : eventConsumers) {
+ execute([eventConsumer]() {
+ eventConsumer->accept(celix::ClosePushEvent<T>());
+ });
+ }
}
execute([&]() {
+ std::unique_lock lck{mutex};
eventConsumers.clear();
+ closed = true;
+ cv.notify_one();
});
+
+ //wait upon closed
+ std::unique_lock lck{mutex};
+ cv.wait(lck, [&]() {return closed;});
}
\ No newline at end of file
diff --git a/libs/pushstreams/gtest/src/PushStreamTestSuite.cc
b/libs/pushstreams/gtest/src/PushStreamTestSuite.cc
index f794eb82..7e390c47 100644
--- a/libs/pushstreams/gtest/src/PushStreamTestSuite.cc
+++ b/libs/pushstreams/gtest/src/PushStreamTestSuite.cc
@@ -75,14 +75,18 @@ public:
std::unique_ptr<std::thread> t{};
std::shared_ptr<celix::PromiseFactory> promiseFactory
{std::make_shared<celix::PromiseFactory>()};
- celix::Deferred<void> done{promiseFactory->deferred<void>()};
- celix::Promise<void> donepromise = done.getPromise();
+ std::mutex mutex{};
+ std::condition_variable done{};
template <typename T>
- std::shared_ptr<celix::SynchronousPushEventSource<T>> createEventSource(T
event, int publishCount, bool autoinc = false) {
- auto ses = psp.template
createSynchronousEventSource<T>(promiseFactory);
-
- auto successLambda = [this, weakses =
std::weak_ptr<celix::SynchronousPushEventSource<T>>(ses), event, publishCount,
autoinc](celix::Promise<void> p) -> celix::Promise<void> {
+ std::shared_ptr<celix::AbstractPushEventSource<T>> createEventSource(T
event, int publishCount, bool autoinc = false, bool syncSource = true) {
+ std::shared_ptr<celix::AbstractPushEventSource<T>> ses;
+ if (syncSource)
+ ses = psp.template createSynchronousEventSource<T>(promiseFactory);
+ else
+ ses = psp.template
createAsynchronousEventSource<T>(promiseFactory);
+
+ auto successLambda = [this, weakses =
std::weak_ptr<celix::AbstractPushEventSource<T>>(ses), event, publishCount,
autoinc](celix::Promise<void> p) -> celix::Promise<void> {
t = std::make_unique<std::thread>([&, event, publishCount,
autoinc]() {
int counter = 0;
T data {event};
@@ -104,7 +108,8 @@ public:
if (ses) {
ses->close();
}
- done.resolve();
+ std::unique_lock lk(mutex);
+ done.notify_one();
return p;
};
@@ -310,6 +315,8 @@ TEST_F(PushStreamTestSuite, ForEachTestBasicType) {
int consumeCount{0};
int consumeSum{0};
int lastConsumed{-1};
+ std::unique_lock lk(mutex);
+
auto ses = createEventSource<int>(0, 10'000, true);
auto stream = psp.createUnbufferedStream<int>(ses, promiseFactory);
@@ -322,8 +329,8 @@ TEST_F(PushStreamTestSuite, ForEachTestBasicType) {
consumeSum = consumeSum + event;
});
+ done.wait(lk);
streamEnded.wait();
- donepromise.wait();
GTEST_ASSERT_EQ(10'000, consumeCount);
GTEST_ASSERT_EQ(49'995'000, consumeSum);
@@ -339,6 +346,8 @@ TEST_F(PushStreamTestSuite, ForEachTestBasicType_Buffered) {
int consumeCount{0};
int consumeSum{0};
int lastConsumed{-1};
+ std::unique_lock lk(mutex);
+
auto ses = createEventSource<int>(0, 10'000, true);
auto stream = psp.createStream<int>(ses, promiseFactory);
@@ -351,8 +360,8 @@ TEST_F(PushStreamTestSuite, ForEachTestBasicType_Buffered) {
consumeSum = consumeSum + event;
});
+ done.wait(lk);
streamEnded.wait();
- donepromise.wait();
GTEST_ASSERT_EQ(10'000, consumeCount);
GTEST_ASSERT_EQ(49'995'000, consumeSum);
@@ -363,6 +372,8 @@ TEST_F(PushStreamTestSuite, ForEachTestBasicType_Buffered) {
TEST_F(PushStreamTestSuite, ForEachTestObjectType) {
int consumeCount{0};
int consumeSum{0};
+ std::unique_lock lk(mutex);
+
auto ses = createEventSource<EventObject>(EventObject{2}, 10);
auto stream = psp.createUnbufferedStream<EventObject>(ses, promiseFactory);
@@ -372,8 +383,8 @@ TEST_F(PushStreamTestSuite, ForEachTestObjectType) {
consumeSum = consumeSum + event;
});
+ done.wait(lk);
streamEnded.wait();
- donepromise.wait();
GTEST_ASSERT_EQ(10, consumeCount);
GTEST_ASSERT_EQ(20, consumeSum);
@@ -384,6 +395,8 @@ TEST_F(PushStreamTestSuite, ForEachTestObjectType) {
TEST_F(PushStreamTestSuite, FilterTestObjectType_true) {
int consumeCount{0};
int consumeSum{0};
+ std::unique_lock lk(mutex);
+
auto ses = createEventSource<EventObject>(EventObject{2}, 10);
auto stream = psp.createUnbufferedStream<EventObject>(ses, promiseFactory);
@@ -396,8 +409,8 @@ TEST_F(PushStreamTestSuite, FilterTestObjectType_true) {
consumeSum = consumeSum + event;
});
+ done.wait(lk);
streamEnded.wait();
- donepromise.wait();
GTEST_ASSERT_EQ(10, consumeCount);
GTEST_ASSERT_EQ(20, consumeSum);
@@ -407,6 +420,8 @@ TEST_F(PushStreamTestSuite, FilterTestObjectType_true) {
TEST_F(PushStreamTestSuite, FilterTestObjectType_false) {
int consumeCount{0};
int consumeSum{0};
+ std::unique_lock lk(mutex);
+
auto ses = createEventSource<EventObject>(EventObject{2}, 10);
auto stream = psp.createUnbufferedStream<EventObject>(ses, promiseFactory);
@@ -419,8 +434,8 @@ TEST_F(PushStreamTestSuite, FilterTestObjectType_false) {
consumeSum = consumeSum + event;
});
+ done.wait(lk);
streamEnded.wait();
- donepromise.wait();
GTEST_ASSERT_EQ(0, consumeCount);
GTEST_ASSERT_EQ(0, consumeSum);
@@ -429,6 +444,9 @@ TEST_F(PushStreamTestSuite, FilterTestObjectType_false) {
TEST_F(PushStreamTestSuite, FilterTestObjectType_simple) {
int consumeCount{0};
int consumeSum{0};
+
+ std::unique_lock lk(mutex);
+
auto ses = createEventSource<EventObject>(EventObject{0}, 10, true);
auto stream = psp.createUnbufferedStream<EventObject>(ses, promiseFactory);
@@ -441,8 +459,8 @@ TEST_F(PushStreamTestSuite, FilterTestObjectType_simple) {
consumeSum = consumeSum + event;
});
+ done.wait(lk);
streamEnded.wait();
- donepromise.wait();
GTEST_ASSERT_EQ(5, consumeCount);
GTEST_ASSERT_EQ( 0 + 1 + 2 + 3 + 4, consumeSum);
@@ -451,6 +469,8 @@ TEST_F(PushStreamTestSuite, FilterTestObjectType_simple) {
TEST_F(PushStreamTestSuite, FilterTestObjectType_and) {
int consumeCount{0};
int consumeSum{0};
+ std::unique_lock lk(mutex);
+
auto ses = createEventSource<EventObject>(EventObject{0}, 10, true);
auto stream = psp.createUnbufferedStream<EventObject>(ses, promiseFactory);
@@ -466,8 +486,8 @@ TEST_F(PushStreamTestSuite, FilterTestObjectType_and) {
consumeSum = consumeSum + event;
});
+ done.wait(lk);
streamEnded.wait();
- donepromise.wait();
GTEST_ASSERT_EQ(2, consumeCount);
GTEST_ASSERT_EQ(6 + 7, consumeSum); // 0 + 1 + 2 + 3 + 4
@@ -476,6 +496,8 @@ TEST_F(PushStreamTestSuite, FilterTestObjectType_and) {
TEST_F(PushStreamTestSuite, MapTestObjectType) {
int consumeCount{0};
int consumeSum{0};
+ std::unique_lock lk(mutex);
+
auto ses = createEventSource<EventObject>(EventObject{0}, 10, true);
auto stream = psp.createUnbufferedStream<EventObject>(ses, promiseFactory);
auto streamEnded = stream->
@@ -487,13 +509,39 @@ TEST_F(PushStreamTestSuite, MapTestObjectType) {
consumeSum = consumeSum + event;
});
+ done.wait(lk);
streamEnded.wait();
- donepromise.wait();
GTEST_ASSERT_EQ(10, consumeCount);
GTEST_ASSERT_EQ(45, consumeSum);
}
+TEST_F(PushStreamTestSuite, MapTestObjectType_async) {
+ for(int i = 0; i < 1000; i++) {
+ int consumeCount{0};
+ int consumeSum{0};
+ std::unique_lock lk(mutex);
+
+ auto ses = createEventSource<EventObject>(EventObject{0}, 10, true,
false);
+ auto stream = psp.createUnbufferedStream<EventObject>(ses,
promiseFactory);
+ auto streamEnded = stream->
+ map<int>([](const EventObject &event) -> int {
+ return event.val;
+ }).
+ forEach([&](const int &event) {
+ consumeCount++;
+ consumeSum = consumeSum + event;
+ });
+
+ done.wait(lk);
+ streamEnded.wait();
+
+ GTEST_ASSERT_EQ(10, consumeCount);
+ GTEST_ASSERT_EQ(45, consumeSum);
+ }
+}
+
+
TEST_F(PushStreamTestSuite, MultipleStreamsTest_CloseSource) {
int onEventStream1{0};
int onEventStream2{0};
@@ -502,6 +550,8 @@ TEST_F(PushStreamTestSuite,
MultipleStreamsTest_CloseSource) {
int onErrorReceived1{0};
int onErrorReceived2{0};
+ std::unique_lock lk(mutex);
+
auto psp = PushStreamProvider();
std::unique_ptr<std::thread> t{};
auto ses = createEventSource<int>(0, 20, true);
@@ -535,17 +585,15 @@ TEST_F(PushStreamTestSuite,
MultipleStreamsTest_CloseSource) {
});
streamEnded1.onSuccess([]() {
- std::cout << " ended" << std::endl;
});
streamEnded2.onSuccess([]() {
- std::cout << "Stream2 ended" << std::endl;
});
streamEnded1.wait();
streamEnded2.wait();
- donepromise.wait();
+ done.wait(lk);
GTEST_ASSERT_EQ(4, onEventStream1);
//The first stream will start the source, thus the number of receives in
second is not guaranteed
@@ -593,11 +641,9 @@ TEST_F(PushStreamTestSuite,
MultipleStreamsTest_CloseStream) {
});
streamEnded1.onSuccess([]() {
- std::cout << "Stream ended" << std::endl;
});
streamEnded2.onSuccess([]() {
- std::cout << "Stream2 ended" << std::endl;
});
stream1->close();
@@ -612,12 +658,15 @@ TEST_F(PushStreamTestSuite,
MultipleStreamsTest_CloseStream) {
GTEST_ASSERT_EQ(1, onEventStream2);
}
+
TEST_F(PushStreamTestSuite, SplitStreamsTest) {
std::map<int,int> counts{};
counts[0] = 0;
counts[1] = 0;
int onClosedReceived{0};
+ std::unique_lock lk(mutex);
+
auto psp = PushStreamProvider();
std::unique_ptr<std::thread> t{};
auto ses = createEventSource<int>(0, 20, true);
@@ -644,11 +693,10 @@ TEST_F(PushStreamTestSuite, SplitStreamsTest) {
}));
streamEndeds[i].onSuccess([]() {
- std::cout << "Stream ended" << std::endl;
});
}
- donepromise.wait();
+ done.wait(lk);
GTEST_ASSERT_EQ(2, onClosedReceived);
GTEST_ASSERT_EQ(9, counts[0]);