BewareMyPower commented on code in PR #23:
URL: https://github.com/apache/pulsar-client-cpp/pull/23#discussion_r1010280775
##########
lib/MultiTopicsConsumerImpl.cc:
##########
@@ -647,6 +647,41 @@ void MultiTopicsConsumerImpl::acknowledgeAsync(const
MessageId& msgId, ResultCal
}
}
+void MultiTopicsConsumerImpl::acknowledgeAsync(const MessageIdList&
messageIdList, ResultCallback callback) {
+ if (state_ != Ready) {
+ callback(ResultAlreadyClosed);
+ return;
+ }
+
+ std::unordered_map<std::string, MessageIdList> topicToMessageId;
+ for (const MessageId& messageId : messageIdList) {
+ auto topicName = messageId.getTopicName();
+ topicToMessageId[topicName].emplace_back(messageId);
+ }
+
+ auto needCallBack =
std::make_shared<std::atomic<int>>(topicToMessageId.size());
+ Result res = ResultOk;
+ auto cb = [callback, needCallBack, &res](Result result) {
+ if (result != ResultOk) {
+ res = result;
+ }
+ needCallBack->fetch_sub(1);
+ if (needCallBack->load() == 0) {
+ callback(res);
+ }
Review Comment:
`res` should be defined inside the lambda, not outside. Because it's only
used in the lambda.
BTW, please keep it in mind that never capture a reference of a local
variable.
1. The lifetime of the local variable will end after going out of the scope,
while the lambda could be invoked in another thread.
2. Since multiple threads can access the shared local variable, it must be
atomic.
That's also why `std::shared_ptr<std::atomic<T>>` is used in other places.
1. Capturing a `std::shared_ptr<U>` will increase the reference count and
extend the lifetime of the `U` instance until all lambdas finished.
2. `U` is `std::atomic<T>`, which is thread safe to access among different
threads.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]