This is an automated email from the ASF dual-hosted git repository.
baodi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new e6b9d09 [refactor] Handle responses in methods instead of switch
cases (#197)
e6b9d09 is described below
commit e6b9d091892da0d911e89b54b36f643cf8d4efa7
Author: Yunze Xu <[email protected]>
AuthorDate: Mon Feb 20 22:07:43 2023 +0800
[refactor] Handle responses in methods instead of switch cases (#197)
### Motivation
The C++ Client handles responses in a huge switch block in
`ClientConnection::handleIncomingCommand`, which has 553 lines
currently. It's not good to maintain since we have some responses to
handle in future.
### Modifications
Add a series of `handleXXX` methods to handle responses in
`ClientConnection`.
---
lib/ClientConnection.cc | 952 ++++++++++++++++++++++++------------------------
lib/ClientConnection.h | 28 ++
2 files changed, 503 insertions(+), 477 deletions(-)
diff --git a/lib/ClientConnection.cc b/lib/ClientConnection.cc
index d908537..7abd295 100644
--- a/lib/ClientConnection.cc
+++ b/lib/ClientConnection.cc
@@ -767,6 +767,8 @@ bool ClientConnection::verifyChecksum(SharedBuffer&
incomingBuffer_, uint32_t& r
}
void ClientConnection::handleActiveConsumerChange(const
proto::CommandActiveConsumerChange& change) {
+ LOG_DEBUG(cnxString_ << "Received notification about active consumer
change, consumer_id: "
+ << change.consumer_id() << " isActive: " <<
change.is_active());
Lock lock(mutex_);
ConsumersMap::iterator it = consumers_.find(change.consumer_id());
if (it != consumers_.end()) {
@@ -843,524 +845,80 @@ void
ClientConnection::handleIncomingCommand(BaseCommand& incomingCmd) {
// Handle normal commands
switch (incomingCmd.type()) {
- case BaseCommand::SEND_RECEIPT: {
- const auto& sendReceipt = incomingCmd.send_receipt();
- int producerId = sendReceipt.producer_id();
- uint64_t sequenceId = sendReceipt.sequence_id();
- const proto::MessageIdData& messageIdData =
sendReceipt.message_id();
- auto messageId = toMessageId(messageIdData);
-
- LOG_DEBUG(cnxString_ << "Got receipt for producer: " <<
producerId
- << " -- msg: " << sequenceId << "--
message id: " << messageId);
-
- Lock lock(mutex_);
- ProducersMap::iterator it = producers_.find(producerId);
- if (it != producers_.end()) {
- ProducerImplPtr producer = it->second.lock();
- lock.unlock();
-
- if (producer) {
- if (!producer->ackReceived(sequenceId, messageId))
{
- // If the producer fails to process the ack,
we need to close the connection
- // to give it a chance to recover from there
- close();
- }
- }
- } else {
- LOG_ERROR(cnxString_ << "Got invalid producer Id in
SendReceipt: " //
- << producerId << " -- msg: " <<
sequenceId);
- }
-
+ case BaseCommand::SEND_RECEIPT:
+ handleSendReceipt(incomingCmd.send_receipt());
break;
- }
- case BaseCommand::SEND_ERROR: {
- const auto& error = incomingCmd.send_error();
- LOG_WARN(cnxString_ << "Received send error from server: "
<< error.message());
- if (ChecksumError == error.error()) {
- long producerId = error.producer_id();
- long sequenceId = error.sequence_id();
- Lock lock(mutex_);
- ProducersMap::iterator it =
producers_.find(producerId);
- if (it != producers_.end()) {
- ProducerImplPtr producer = it->second.lock();
- lock.unlock();
-
- if (producer) {
- if
(!producer->removeCorruptMessage(sequenceId)) {
- // If the producer fails to remove corrupt
msg, we need to close the
- // connection to give it a chance to
recover from there
- close();
- }
- }
- }
- } else {
- close();
- }
+ case BaseCommand::SEND_ERROR:
+ handleSendError(incomingCmd.send_error());
break;
- }
- case BaseCommand::SUCCESS: {
- const auto& success = incomingCmd.success();
- LOG_DEBUG(cnxString_ << "Received success response from
server. req_id: "
- << success.request_id());
-
- Lock lock(mutex_);
- PendingRequestsMap::iterator it =
pendingRequests_.find(success.request_id());
- if (it != pendingRequests_.end()) {
- PendingRequestData requestData = it->second;
- pendingRequests_.erase(it);
- lock.unlock();
-
- requestData.promise.setValue({});
- requestData.timer->cancel();
- }
+ case BaseCommand::SUCCESS:
+ handleSuccess(incomingCmd.success());
break;
- }
- case BaseCommand::PARTITIONED_METADATA_RESPONSE: {
- const auto& partitionMetadataResponse =
incomingCmd.partitionmetadataresponse();
- LOG_DEBUG(cnxString_ << "Received partition-metadata
response from server. req_id: "
- <<
partitionMetadataResponse.request_id());
-
- Lock lock(mutex_);
- PendingLookupRequestsMap::iterator it =
-
pendingLookupRequests_.find(partitionMetadataResponse.request_id());
- if (it != pendingLookupRequests_.end()) {
- it->second.timer->cancel();
- LookupDataResultPromisePtr lookupDataPromise =
it->second.promise;
- pendingLookupRequests_.erase(it);
- numOfPendingLookupRequest_--;
- lock.unlock();
-
- if (!partitionMetadataResponse.has_response() ||
- (partitionMetadataResponse.response() ==
-
proto::CommandPartitionedTopicMetadataResponse::Failed)) {
- if (partitionMetadataResponse.has_error()) {
- LOG_ERROR(cnxString_ << "Failed
partition-metadata lookup req_id: "
- <<
partitionMetadataResponse.request_id()
- << " error: " <<
partitionMetadataResponse.error()
- << " msg: " <<
partitionMetadataResponse.message());
-
checkServerError(partitionMetadataResponse.error());
-
lookupDataPromise->setFailed(getResult(partitionMetadataResponse.error(),
-
partitionMetadataResponse.message()));
- } else {
- LOG_ERROR(cnxString_ << "Failed
partition-metadata lookup req_id: "
- <<
partitionMetadataResponse.request_id()
- << " with empty response:
");
-
lookupDataPromise->setFailed(ResultConnectError);
- }
- } else {
- LookupDataResultPtr lookupResultPtr =
std::make_shared<LookupDataResult>();
-
lookupResultPtr->setPartitions(partitionMetadataResponse.partitions());
- lookupDataPromise->setValue(lookupResultPtr);
- }
-
- } else {
- LOG_WARN("Received unknown request id from server: "
- << partitionMetadataResponse.request_id());
- }
+ case BaseCommand::PARTITIONED_METADATA_RESPONSE:
+
handlePartitionedMetadataResponse(incomingCmd.partitionmetadataresponse());
break;
- }
- case BaseCommand::CONSUMER_STATS_RESPONSE: {
- const auto& consumerStatsResponse =
incomingCmd.consumerstatsresponse();
- LOG_DEBUG(cnxString_ << "ConsumerStatsResponse command -
Received consumer stats "
- "response from server. req_id: "
- <<
consumerStatsResponse.request_id());
- Lock lock(mutex_);
- PendingConsumerStatsMap::iterator it =
-
pendingConsumerStatsMap_.find(consumerStatsResponse.request_id());
- if (it != pendingConsumerStatsMap_.end()) {
- Promise<Result, BrokerConsumerStatsImpl>
consumerStatsPromise = it->second;
- pendingConsumerStatsMap_.erase(it);
- lock.unlock();
-
- if (consumerStatsResponse.has_error_code()) {
- if (consumerStatsResponse.has_error_message()) {
- LOG_ERROR(cnxString_ << " Failed to get
consumer stats - "
- <<
consumerStatsResponse.error_message());
- }
-
consumerStatsPromise.setFailed(getResult(consumerStatsResponse.error_code(),
-
consumerStatsResponse.error_message()));
- } else {
- LOG_DEBUG(cnxString_ << "ConsumerStatsResponse
command - Received consumer stats "
- "response from server.
req_id: "
- <<
consumerStatsResponse.request_id() << " Stats: ");
- BrokerConsumerStatsImpl brokerStats(
- consumerStatsResponse.msgrateout(),
consumerStatsResponse.msgthroughputout(),
- consumerStatsResponse.msgrateredeliver(),
- consumerStatsResponse.consumername(),
- consumerStatsResponse.availablepermits(),
- consumerStatsResponse.unackedmessages(),
-
consumerStatsResponse.blockedconsumeronunackedmsgs(),
- consumerStatsResponse.address(),
consumerStatsResponse.connectedsince(),
- consumerStatsResponse.type(),
consumerStatsResponse.msgrateexpired(),
- consumerStatsResponse.msgbacklog());
- consumerStatsPromise.setValue(brokerStats);
- }
- } else {
- LOG_WARN("ConsumerStatsResponse command - Received
unknown request id from server: "
- << consumerStatsResponse.request_id());
- }
+ case BaseCommand::CONSUMER_STATS_RESPONSE:
+
handleConsumerStatsResponse(incomingCmd.consumerstatsresponse());
break;
- }
- case BaseCommand::LOOKUP_RESPONSE: {
- const auto& lookupTopicResponse =
incomingCmd.lookuptopicresponse();
- LOG_DEBUG(cnxString_ << "Received lookup response from
server. req_id: "
- << lookupTopicResponse.request_id());
-
- Lock lock(mutex_);
- PendingLookupRequestsMap::iterator it =
-
pendingLookupRequests_.find(lookupTopicResponse.request_id());
- if (it != pendingLookupRequests_.end()) {
- it->second.timer->cancel();
- LookupDataResultPromisePtr lookupDataPromise =
it->second.promise;
- pendingLookupRequests_.erase(it);
- numOfPendingLookupRequest_--;
- lock.unlock();
-
- if (!lookupTopicResponse.has_response() ||
- (lookupTopicResponse.response() ==
proto::CommandLookupTopicResponse::Failed)) {
- if (lookupTopicResponse.has_error()) {
- LOG_ERROR(cnxString_
- << "Failed lookup req_id: " <<
lookupTopicResponse.request_id()
- << " error: " <<
lookupTopicResponse.error()
- << " msg: " <<
lookupTopicResponse.message());
- checkServerError(lookupTopicResponse.error());
- lookupDataPromise->setFailed(
- getResult(lookupTopicResponse.error(),
lookupTopicResponse.message()));
- } else {
- LOG_ERROR(cnxString_
- << "Failed lookup req_id: " <<
lookupTopicResponse.request_id()
- << " with empty response: ");
-
lookupDataPromise->setFailed(ResultConnectError);
- }
- } else {
- LOG_DEBUG(cnxString_
- << "Received lookup response from
server. req_id: "
- << lookupTopicResponse.request_id() //
- << " -- broker-url: " <<
lookupTopicResponse.brokerserviceurl()
- << " -- broker-tls-url: " //
- <<
lookupTopicResponse.brokerserviceurltls()
- << " authoritative: " <<
lookupTopicResponse.authoritative() //
- << " redirect: " <<
lookupTopicResponse.response());
- LookupDataResultPtr lookupResultPtr =
std::make_shared<LookupDataResult>();
-
- if (tlsSocket_) {
-
lookupResultPtr->setBrokerUrl(lookupTopicResponse.brokerserviceurltls());
- } else {
-
lookupResultPtr->setBrokerUrl(lookupTopicResponse.brokerserviceurl());
- }
-
-
lookupResultPtr->setBrokerUrlTls(lookupTopicResponse.brokerserviceurltls());
-
lookupResultPtr->setAuthoritative(lookupTopicResponse.authoritative());
-
lookupResultPtr->setRedirect(lookupTopicResponse.response() ==
-
proto::CommandLookupTopicResponse::Redirect);
- lookupResultPtr->setShouldProxyThroughServiceUrl(
-
lookupTopicResponse.proxy_through_service_url());
- lookupDataPromise->setValue(lookupResultPtr);
- }
-
- } else {
- LOG_WARN(
- "Received unknown request id from server: " <<
lookupTopicResponse.request_id());
- }
+ case BaseCommand::LOOKUP_RESPONSE:
+
handleLookupTopicRespose(incomingCmd.lookuptopicresponse());
break;
- }
- case BaseCommand::PRODUCER_SUCCESS: {
- const auto& producerSuccess =
incomingCmd.producer_success();
- LOG_DEBUG(cnxString_ << "Received success producer
response from server. req_id: "
- << producerSuccess.request_id() //
- << " -- producer name: " <<
producerSuccess.producer_name());
-
- Lock lock(mutex_);
- PendingRequestsMap::iterator it =
pendingRequests_.find(producerSuccess.request_id());
- if (it != pendingRequests_.end()) {
- PendingRequestData requestData = it->second;
- if (!producerSuccess.producer_ready()) {
- LOG_INFO(cnxString_ << " Producer " <<
producerSuccess.producer_name()
- << " has been queued up at
broker. req_id: "
- <<
producerSuccess.request_id());
- requestData.hasGotResponse->store(true);
- lock.unlock();
- } else {
- pendingRequests_.erase(it);
- lock.unlock();
- ResponseData data;
- data.producerName =
producerSuccess.producer_name();
- data.lastSequenceId =
producerSuccess.last_sequence_id();
- if (producerSuccess.has_schema_version()) {
- data.schemaVersion =
producerSuccess.schema_version();
- }
- if (producerSuccess.has_topic_epoch()) {
- data.topicEpoch =
boost::make_optional(producerSuccess.topic_epoch());
- } else {
- data.topicEpoch = boost::none;
- }
- requestData.promise.setValue(data);
- requestData.timer->cancel();
- }
- }
+ case BaseCommand::PRODUCER_SUCCESS:
+ handleProducerSuccess(incomingCmd.producer_success());
break;
- }
- case BaseCommand::ERROR: {
- const auto& error = incomingCmd.error();
- Result result = getResult(error.error(), error.message());
- LOG_WARN(cnxString_ << "Received error response from
server: " << result
- << (error.has_message() ? (" (" +
error.message() + ")") : "")
- << " -- req_id: " <<
error.request_id());
-
- Lock lock(mutex_);
-
- PendingRequestsMap::iterator it =
pendingRequests_.find(error.request_id());
- if (it != pendingRequests_.end()) {
- PendingRequestData requestData = it->second;
- pendingRequests_.erase(it);
- lock.unlock();
-
- requestData.promise.setFailed(result);
- requestData.timer->cancel();
- } else {
- PendingGetLastMessageIdRequestsMap::iterator it =
-
pendingGetLastMessageIdRequests_.find(error.request_id());
- if (it != pendingGetLastMessageIdRequests_.end()) {
- auto getLastMessageIdPromise = it->second;
- pendingGetLastMessageIdRequests_.erase(it);
- lock.unlock();
-
- getLastMessageIdPromise.setFailed(result);
- } else {
- PendingGetNamespaceTopicsMap::iterator it =
-
pendingGetNamespaceTopicsRequests_.find(error.request_id());
- if (it !=
pendingGetNamespaceTopicsRequests_.end()) {
- Promise<Result, NamespaceTopicsPtr>
getNamespaceTopicsPromise = it->second;
- pendingGetNamespaceTopicsRequests_.erase(it);
- lock.unlock();
-
- getNamespaceTopicsPromise.setFailed(result);
- } else {
- lock.unlock();
- }
- }
- }
+ case BaseCommand::ERROR:
+ handleError(incomingCmd.error());
break;
- }
-
- case BaseCommand::CLOSE_PRODUCER: {
- const auto& closeProducer = incomingCmd.close_producer();
- int producerId = closeProducer.producer_id();
-
- LOG_DEBUG("Broker notification of Closed producer: " <<
producerId);
-
- Lock lock(mutex_);
- ProducersMap::iterator it = producers_.find(producerId);
- if (it != producers_.end()) {
- ProducerImplPtr producer = it->second.lock();
- producers_.erase(it);
- lock.unlock();
-
- if (producer) {
- producer->disconnectProducer();
- }
- } else {
- LOG_ERROR(cnxString_ << "Got invalid producer Id in
closeProducer command: "
- << producerId);
- }
+ case BaseCommand::CLOSE_PRODUCER:
+ handleCloseProducer(incomingCmd.close_producer());
break;
- }
-
- case BaseCommand::CLOSE_CONSUMER: {
- const auto& closeconsumer = incomingCmd.close_consumer();
- int consumerId = closeconsumer.consumer_id();
-
- LOG_DEBUG("Broker notification of Closed consumer: " <<
consumerId);
-
- Lock lock(mutex_);
- ConsumersMap::iterator it = consumers_.find(consumerId);
- if (it != consumers_.end()) {
- ConsumerImplPtr consumer = it->second.lock();
- consumers_.erase(it);
- lock.unlock();
-
- if (consumer) {
- consumer->disconnectConsumer();
- }
- } else {
- LOG_ERROR(cnxString_ << "Got invalid consumer Id in
closeConsumer command: "
- << consumerId);
- }
+ case BaseCommand::CLOSE_CONSUMER:
+ handleCloseConsumer(incomingCmd.close_consumer());
break;
- }
- case BaseCommand::PING: {
+ case BaseCommand::PING:
// Respond to ping request
LOG_DEBUG(cnxString_ << "Replying to ping command");
sendCommand(Commands::newPong());
break;
- }
- case BaseCommand::PONG: {
+ case BaseCommand::PONG:
LOG_DEBUG(cnxString_ << "Received response to ping
message");
break;
- }
- case BaseCommand::AUTH_CHALLENGE: {
- LOG_DEBUG(cnxString_ << "Received auth challenge from
broker");
-
- Result result;
- SharedBuffer buffer =
Commands::newAuthResponse(authentication_, result);
- if (result != ResultOk) {
- LOG_ERROR(cnxString_ << "Failed to send auth response:
" << result);
- close(result);
- break;
- }
- asyncWrite(buffer.const_asio_buffer(),
-
std::bind(&ClientConnection::handleSentAuthResponse, shared_from_this(),
- std::placeholders::_1, buffer));
+ case BaseCommand::AUTH_CHALLENGE:
+ handleAuthChallenge();
break;
- }
- case BaseCommand::ACTIVE_CONSUMER_CHANGE: {
- const auto& change = incomingCmd.active_consumer_change();
- LOG_DEBUG(cnxString_
- << "Received notification about active consumer
change, consumer_id: "
- << change.consumer_id() << " isActive: " <<
change.is_active());
- handleActiveConsumerChange(change);
+ case BaseCommand::ACTIVE_CONSUMER_CHANGE:
+
handleActiveConsumerChange(incomingCmd.active_consumer_change());
break;
- }
- case BaseCommand::GET_LAST_MESSAGE_ID_RESPONSE: {
- const auto& getLastMessageIdResponse =
incomingCmd.getlastmessageidresponse();
- LOG_DEBUG(cnxString_ << "Received getLastMessageIdResponse
from server. req_id: "
- <<
getLastMessageIdResponse.request_id());
-
- Lock lock(mutex_);
- PendingGetLastMessageIdRequestsMap::iterator it =
-
pendingGetLastMessageIdRequests_.find(getLastMessageIdResponse.request_id());
-
- if (it != pendingGetLastMessageIdRequests_.end()) {
- auto getLastMessageIdPromise = it->second;
- pendingGetLastMessageIdRequests_.erase(it);
- lock.unlock();
-
- if
(getLastMessageIdResponse.has_consumer_mark_delete_position()) {
- getLastMessageIdPromise.setValue(
-
{toMessageId(getLastMessageIdResponse.last_message_id()),
-
toMessageId(getLastMessageIdResponse.consumer_mark_delete_position())});
- } else {
- getLastMessageIdPromise.setValue(
-
{toMessageId(getLastMessageIdResponse.last_message_id())});
- }
- } else {
- lock.unlock();
- LOG_WARN(
- "getLastMessageIdResponse command - Received
unknown request id from server: "
- << getLastMessageIdResponse.request_id());
- }
+ case BaseCommand::GET_LAST_MESSAGE_ID_RESPONSE:
+
handleGetLastMessageIdResponse(incomingCmd.getlastmessageidresponse());
break;
- }
- case BaseCommand::GET_TOPICS_OF_NAMESPACE_RESPONSE: {
- const auto& response =
incomingCmd.gettopicsofnamespaceresponse();
-
- LOG_DEBUG(cnxString_ << "Received
GetTopicsOfNamespaceResponse from server. req_id: "
- << response.request_id() << "
topicsSize" << response.topics_size());
-
- Lock lock(mutex_);
- PendingGetNamespaceTopicsMap::iterator it =
-
pendingGetNamespaceTopicsRequests_.find(response.request_id());
-
- if (it != pendingGetNamespaceTopicsRequests_.end()) {
- Promise<Result, NamespaceTopicsPtr> getTopicsPromise =
it->second;
- pendingGetNamespaceTopicsRequests_.erase(it);
- lock.unlock();
-
- int numTopics = response.topics_size();
- std::set<std::string> topicSet;
- // get all topics
- for (int i = 0; i < numTopics; i++) {
- // remove partition part
- const std::string& topicName = response.topics(i);
- int pos = topicName.find("-partition-");
- std::string filteredName = topicName.substr(0,
pos);
-
- // filter duped topic name
- if (topicSet.find(filteredName) == topicSet.end())
{
- topicSet.insert(filteredName);
- }
- }
-
- NamespaceTopicsPtr topicsPtr =
-
std::make_shared<std::vector<std::string>>(topicSet.begin(), topicSet.end());
-
- getTopicsPromise.setValue(topicsPtr);
- } else {
- lock.unlock();
- LOG_WARN(
- "GetTopicsOfNamespaceResponse command - Received
unknown request id from "
- "server: "
- << response.request_id());
- }
+ case BaseCommand::GET_TOPICS_OF_NAMESPACE_RESPONSE:
+
handleGetTopicOfNamespaceResponse(incomingCmd.gettopicsofnamespaceresponse());
break;
- }
- case BaseCommand::GET_SCHEMA_RESPONSE: {
- const auto& response = incomingCmd.getschemaresponse();
- LOG_DEBUG(cnxString_ << "Received GetSchemaResponse from
server. req_id: "
- << response.request_id());
- Lock lock(mutex_);
- auto it =
pendingGetSchemaRequests_.find(response.request_id());
- if (it != pendingGetSchemaRequests_.end()) {
- Promise<Result, boost::optional<SchemaInfo>>
getSchemaPromise = it->second;
- pendingGetSchemaRequests_.erase(it);
- lock.unlock();
-
- if (response.has_error_code()) {
- if (response.error_code() == proto::TopicNotFound)
{
- getSchemaPromise.setValue(boost::none);
- } else {
- Result result =
getResult(response.error_code(), response.error_message());
- LOG_WARN(cnxString_ << "Received error
GetSchemaResponse from server "
- << result
- <<
(response.has_error_message()
- ? (" (" +
response.error_message() + ")")
- : "")
- << " -- req_id: " <<
response.request_id());
- getSchemaPromise.setFailed(result);
- }
- return;
- }
-
- const auto& schema = response.schema();
- const auto& properMap = schema.properties();
- StringMap properties;
- for (auto kv = properMap.begin(); kv !=
properMap.end(); ++kv) {
- properties[kv->key()] = kv->value();
- }
- SchemaInfo
schemaInfo(static_cast<SchemaType>(schema.type()), "",
- schema.schema_data(),
properties);
- getSchemaPromise.setValue(schemaInfo);
- } else {
- lock.unlock();
- LOG_WARN(
- "GetSchemaResponse command - Received unknown
request id from "
- "server: "
- << response.request_id());
- }
+ case BaseCommand::GET_SCHEMA_RESPONSE:
+ handleGetSchemaResponse(incomingCmd.getschemaresponse());
break;
- }
- default: {
+ default:
LOG_WARN(cnxString_ << "Received invalid message from
server");
close();
break;
- }
}
}
}
@@ -1798,4 +1356,444 @@ void ClientConnection::checkServerError(ServerError
error) {
}
}
+void ClientConnection::handleSendReceipt(const proto::CommandSendReceipt&
sendReceipt) {
+ int producerId = sendReceipt.producer_id();
+ uint64_t sequenceId = sendReceipt.sequence_id();
+ const proto::MessageIdData& messageIdData = sendReceipt.message_id();
+ auto messageId = toMessageId(messageIdData);
+
+ LOG_DEBUG(cnxString_ << "Got receipt for producer: " << producerId << " --
msg: " << sequenceId
+ << "-- message id: " << messageId);
+
+ Lock lock(mutex_);
+ auto it = producers_.find(producerId);
+ if (it != producers_.end()) {
+ ProducerImplPtr producer = it->second.lock();
+ lock.unlock();
+
+ if (producer) {
+ if (!producer->ackReceived(sequenceId, messageId)) {
+ // If the producer fails to process the ack, we need to close
the connection
+ // to give it a chance to recover from there
+ close();
+ }
+ }
+ } else {
+ LOG_ERROR(cnxString_ << "Got invalid producer Id in SendReceipt: " //
+ << producerId << " -- msg: " << sequenceId);
+ }
+}
+
+void ClientConnection::handleSendError(const proto::CommandSendError& error) {
+ LOG_WARN(cnxString_ << "Received send error from server: " <<
error.message());
+ if (ChecksumError == error.error()) {
+ long producerId = error.producer_id();
+ long sequenceId = error.sequence_id();
+ Lock lock(mutex_);
+ auto it = producers_.find(producerId);
+ if (it != producers_.end()) {
+ ProducerImplPtr producer = it->second.lock();
+ lock.unlock();
+
+ if (producer) {
+ if (!producer->removeCorruptMessage(sequenceId)) {
+ // If the producer fails to remove corrupt msg, we need to
close the
+ // connection to give it a chance to recover from there
+ close();
+ }
+ }
+ }
+ } else {
+ close();
+ }
+}
+
+void ClientConnection::handleSuccess(const proto::CommandSuccess& success) {
+ LOG_DEBUG(cnxString_ << "Received success response from server. req_id: "
<< success.request_id());
+
+ Lock lock(mutex_);
+ auto it = pendingRequests_.find(success.request_id());
+ if (it != pendingRequests_.end()) {
+ PendingRequestData requestData = it->second;
+ pendingRequests_.erase(it);
+ lock.unlock();
+
+ requestData.promise.setValue({});
+ requestData.timer->cancel();
+ }
+}
+
+void ClientConnection::handlePartitionedMetadataResponse(
+ const proto::CommandPartitionedTopicMetadataResponse&
partitionMetadataResponse) {
+ LOG_DEBUG(cnxString_ << "Received partition-metadata response from server.
req_id: "
+ << partitionMetadataResponse.request_id());
+
+ Lock lock(mutex_);
+ auto it =
pendingLookupRequests_.find(partitionMetadataResponse.request_id());
+ if (it != pendingLookupRequests_.end()) {
+ it->second.timer->cancel();
+ LookupDataResultPromisePtr lookupDataPromise = it->second.promise;
+ pendingLookupRequests_.erase(it);
+ numOfPendingLookupRequest_--;
+ lock.unlock();
+
+ if (!partitionMetadataResponse.has_response() ||
+ (partitionMetadataResponse.response() ==
+ proto::CommandPartitionedTopicMetadataResponse::Failed)) {
+ if (partitionMetadataResponse.has_error()) {
+ LOG_ERROR(cnxString_ << "Failed partition-metadata lookup
req_id: "
+ << partitionMetadataResponse.request_id()
+ << " error: " <<
partitionMetadataResponse.error()
+ << " msg: " <<
partitionMetadataResponse.message());
+ checkServerError(partitionMetadataResponse.error());
+ lookupDataPromise->setFailed(
+ getResult(partitionMetadataResponse.error(),
partitionMetadataResponse.message()));
+ } else {
+ LOG_ERROR(cnxString_ << "Failed partition-metadata lookup
req_id: "
+ << partitionMetadataResponse.request_id()
<< " with empty response: ");
+ lookupDataPromise->setFailed(ResultConnectError);
+ }
+ } else {
+ LookupDataResultPtr lookupResultPtr =
std::make_shared<LookupDataResult>();
+
lookupResultPtr->setPartitions(partitionMetadataResponse.partitions());
+ lookupDataPromise->setValue(lookupResultPtr);
+ }
+
+ } else {
+ LOG_WARN("Received unknown request id from server: " <<
partitionMetadataResponse.request_id());
+ }
+}
+
+void ClientConnection::handleConsumerStatsResponse(
+ const proto::CommandConsumerStatsResponse& consumerStatsResponse) {
+ LOG_DEBUG(cnxString_ << "ConsumerStatsResponse command - Received consumer
stats "
+ "response from server. req_id: "
+ << consumerStatsResponse.request_id());
+ Lock lock(mutex_);
+ auto it =
pendingConsumerStatsMap_.find(consumerStatsResponse.request_id());
+ if (it != pendingConsumerStatsMap_.end()) {
+ Promise<Result, BrokerConsumerStatsImpl> consumerStatsPromise =
it->second;
+ pendingConsumerStatsMap_.erase(it);
+ lock.unlock();
+
+ if (consumerStatsResponse.has_error_code()) {
+ if (consumerStatsResponse.has_error_message()) {
+ LOG_ERROR(cnxString_ << " Failed to get consumer stats - "
+ << consumerStatsResponse.error_message());
+ }
+ consumerStatsPromise.setFailed(
+ getResult(consumerStatsResponse.error_code(),
consumerStatsResponse.error_message()));
+ } else {
+ LOG_DEBUG(cnxString_ << "ConsumerStatsResponse command - Received
consumer stats "
+ "response from server. req_id: "
+ << consumerStatsResponse.request_id() << "
Stats: ");
+ BrokerConsumerStatsImpl brokerStats(
+ consumerStatsResponse.msgrateout(),
consumerStatsResponse.msgthroughputout(),
+ consumerStatsResponse.msgrateredeliver(),
consumerStatsResponse.consumername(),
+ consumerStatsResponse.availablepermits(),
consumerStatsResponse.unackedmessages(),
+ consumerStatsResponse.blockedconsumeronunackedmsgs(),
consumerStatsResponse.address(),
+ consumerStatsResponse.connectedsince(),
consumerStatsResponse.type(),
+ consumerStatsResponse.msgrateexpired(),
consumerStatsResponse.msgbacklog());
+ consumerStatsPromise.setValue(brokerStats);
+ }
+ } else {
+ LOG_WARN("ConsumerStatsResponse command - Received unknown request id
from server: "
+ << consumerStatsResponse.request_id());
+ }
+}
+
+void ClientConnection::handleLookupTopicRespose(
+ const proto::CommandLookupTopicResponse& lookupTopicResponse) {
+ LOG_DEBUG(cnxString_ << "Received lookup response from server. req_id: "
+ << lookupTopicResponse.request_id());
+
+ Lock lock(mutex_);
+ auto it = pendingLookupRequests_.find(lookupTopicResponse.request_id());
+ if (it != pendingLookupRequests_.end()) {
+ it->second.timer->cancel();
+ LookupDataResultPromisePtr lookupDataPromise = it->second.promise;
+ pendingLookupRequests_.erase(it);
+ numOfPendingLookupRequest_--;
+ lock.unlock();
+
+ if (!lookupTopicResponse.has_response() ||
+ (lookupTopicResponse.response() ==
proto::CommandLookupTopicResponse::Failed)) {
+ if (lookupTopicResponse.has_error()) {
+ LOG_ERROR(cnxString_ << "Failed lookup req_id: " <<
lookupTopicResponse.request_id()
+ << " error: " <<
lookupTopicResponse.error()
+ << " msg: " <<
lookupTopicResponse.message());
+ checkServerError(lookupTopicResponse.error());
+ lookupDataPromise->setFailed(
+ getResult(lookupTopicResponse.error(),
lookupTopicResponse.message()));
+ } else {
+ LOG_ERROR(cnxString_ << "Failed lookup req_id: " <<
lookupTopicResponse.request_id()
+ << " with empty response: ");
+ lookupDataPromise->setFailed(ResultConnectError);
+ }
+ } else {
+ LOG_DEBUG(cnxString_ << "Received lookup response from server.
req_id: "
+ << lookupTopicResponse.request_id() //
+ << " -- broker-url: " <<
lookupTopicResponse.brokerserviceurl()
+ << " -- broker-tls-url: " //
+ << lookupTopicResponse.brokerserviceurltls()
+ << " authoritative: " <<
lookupTopicResponse.authoritative() //
+ << " redirect: " <<
lookupTopicResponse.response());
+ LookupDataResultPtr lookupResultPtr =
std::make_shared<LookupDataResult>();
+
+ if (tlsSocket_) {
+
lookupResultPtr->setBrokerUrl(lookupTopicResponse.brokerserviceurltls());
+ } else {
+
lookupResultPtr->setBrokerUrl(lookupTopicResponse.brokerserviceurl());
+ }
+
+
lookupResultPtr->setBrokerUrlTls(lookupTopicResponse.brokerserviceurltls());
+
lookupResultPtr->setAuthoritative(lookupTopicResponse.authoritative());
+ lookupResultPtr->setRedirect(lookupTopicResponse.response() ==
+
proto::CommandLookupTopicResponse::Redirect);
+
lookupResultPtr->setShouldProxyThroughServiceUrl(lookupTopicResponse.proxy_through_service_url());
+ lookupDataPromise->setValue(lookupResultPtr);
+ }
+
+ } else {
+ LOG_WARN("Received unknown request id from server: " <<
lookupTopicResponse.request_id());
+ }
+}
+
+void ClientConnection::handleProducerSuccess(const
proto::CommandProducerSuccess& producerSuccess) {
+ LOG_DEBUG(cnxString_ << "Received success producer response from server.
req_id: "
+ << producerSuccess.request_id() //
+ << " -- producer name: " <<
producerSuccess.producer_name());
+
+ Lock lock(mutex_);
+ auto it = pendingRequests_.find(producerSuccess.request_id());
+ if (it != pendingRequests_.end()) {
+ PendingRequestData requestData = it->second;
+ if (!producerSuccess.producer_ready()) {
+ LOG_INFO(cnxString_ << " Producer " <<
producerSuccess.producer_name()
+ << " has been queued up at broker. req_id: "
<< producerSuccess.request_id());
+ requestData.hasGotResponse->store(true);
+ lock.unlock();
+ } else {
+ pendingRequests_.erase(it);
+ lock.unlock();
+ ResponseData data;
+ data.producerName = producerSuccess.producer_name();
+ data.lastSequenceId = producerSuccess.last_sequence_id();
+ if (producerSuccess.has_schema_version()) {
+ data.schemaVersion = producerSuccess.schema_version();
+ }
+ if (producerSuccess.has_topic_epoch()) {
+ data.topicEpoch =
boost::make_optional(producerSuccess.topic_epoch());
+ } else {
+ data.topicEpoch = boost::none;
+ }
+ requestData.promise.setValue(data);
+ requestData.timer->cancel();
+ }
+ }
+}
+
+void ClientConnection::handleError(const proto::CommandError& error) {
+ Result result = getResult(error.error(), error.message());
+ LOG_WARN(cnxString_ << "Received error response from server: " << result
+ << (error.has_message() ? (" (" + error.message() +
")") : "")
+ << " -- req_id: " << error.request_id());
+
+ Lock lock(mutex_);
+
+ auto it = pendingRequests_.find(error.request_id());
+ if (it != pendingRequests_.end()) {
+ PendingRequestData requestData = it->second;
+ pendingRequests_.erase(it);
+ lock.unlock();
+
+ requestData.promise.setFailed(result);
+ requestData.timer->cancel();
+ } else {
+ PendingGetLastMessageIdRequestsMap::iterator it =
+ pendingGetLastMessageIdRequests_.find(error.request_id());
+ if (it != pendingGetLastMessageIdRequests_.end()) {
+ auto getLastMessageIdPromise = it->second;
+ pendingGetLastMessageIdRequests_.erase(it);
+ lock.unlock();
+
+ getLastMessageIdPromise.setFailed(result);
+ } else {
+ PendingGetNamespaceTopicsMap::iterator it =
+ pendingGetNamespaceTopicsRequests_.find(error.request_id());
+ if (it != pendingGetNamespaceTopicsRequests_.end()) {
+ Promise<Result, NamespaceTopicsPtr> getNamespaceTopicsPromise
= it->second;
+ pendingGetNamespaceTopicsRequests_.erase(it);
+ lock.unlock();
+
+ getNamespaceTopicsPromise.setFailed(result);
+ } else {
+ lock.unlock();
+ }
+ }
+ }
+}
+
+void ClientConnection::handleCloseProducer(const proto::CommandCloseProducer&
closeProducer) {
+ int producerId = closeProducer.producer_id();
+
+ LOG_DEBUG("Broker notification of Closed producer: " << producerId);
+
+ Lock lock(mutex_);
+ auto it = producers_.find(producerId);
+ if (it != producers_.end()) {
+ ProducerImplPtr producer = it->second.lock();
+ producers_.erase(it);
+ lock.unlock();
+
+ if (producer) {
+ producer->disconnectProducer();
+ }
+ } else {
+ LOG_ERROR(cnxString_ << "Got invalid producer Id in closeProducer
command: " << producerId);
+ }
+}
+
+void ClientConnection::handleCloseConsumer(const proto::CommandCloseConsumer&
closeconsumer) {
+ int consumerId = closeconsumer.consumer_id();
+
+ LOG_DEBUG("Broker notification of Closed consumer: " << consumerId);
+
+ Lock lock(mutex_);
+ auto it = consumers_.find(consumerId);
+ if (it != consumers_.end()) {
+ ConsumerImplPtr consumer = it->second.lock();
+ consumers_.erase(it);
+ lock.unlock();
+
+ if (consumer) {
+ consumer->disconnectConsumer();
+ }
+ } else {
+ LOG_ERROR(cnxString_ << "Got invalid consumer Id in closeConsumer
command: " << consumerId);
+ }
+}
+
+void ClientConnection::handleAuthChallenge() {
+ LOG_DEBUG(cnxString_ << "Received auth challenge from broker");
+
+ Result result;
+ SharedBuffer buffer = Commands::newAuthResponse(authentication_, result);
+ if (result != ResultOk) {
+ LOG_ERROR(cnxString_ << "Failed to send auth response: " << result);
+ close(result);
+ return;
+ }
+ asyncWrite(buffer.const_asio_buffer(),
std::bind(&ClientConnection::handleSentAuthResponse,
+ shared_from_this(),
std::placeholders::_1, buffer));
+}
+
+void ClientConnection::handleGetLastMessageIdResponse(
+ const proto::CommandGetLastMessageIdResponse& getLastMessageIdResponse) {
+ LOG_DEBUG(cnxString_ << "Received getLastMessageIdResponse from server.
req_id: "
+ << getLastMessageIdResponse.request_id());
+
+ Lock lock(mutex_);
+ auto it =
pendingGetLastMessageIdRequests_.find(getLastMessageIdResponse.request_id());
+
+ if (it != pendingGetLastMessageIdRequests_.end()) {
+ auto getLastMessageIdPromise = it->second;
+ pendingGetLastMessageIdRequests_.erase(it);
+ lock.unlock();
+
+ if (getLastMessageIdResponse.has_consumer_mark_delete_position()) {
+ getLastMessageIdPromise.setValue(
+ {toMessageId(getLastMessageIdResponse.last_message_id()),
+
toMessageId(getLastMessageIdResponse.consumer_mark_delete_position())});
+ } else {
+
getLastMessageIdPromise.setValue({toMessageId(getLastMessageIdResponse.last_message_id())});
+ }
+ } else {
+ lock.unlock();
+ LOG_WARN("getLastMessageIdResponse command - Received unknown request
id from server: "
+ << getLastMessageIdResponse.request_id());
+ }
+}
+
+void ClientConnection::handleGetTopicOfNamespaceResponse(
+ const proto::CommandGetTopicsOfNamespaceResponse& response) {
+ LOG_DEBUG(cnxString_ << "Received GetTopicsOfNamespaceResponse from
server. req_id: "
+ << response.request_id() << " topicsSize" <<
response.topics_size());
+
+ Lock lock(mutex_);
+ auto it = pendingGetNamespaceTopicsRequests_.find(response.request_id());
+
+ if (it != pendingGetNamespaceTopicsRequests_.end()) {
+ Promise<Result, NamespaceTopicsPtr> getTopicsPromise = it->second;
+ pendingGetNamespaceTopicsRequests_.erase(it);
+ lock.unlock();
+
+ int numTopics = response.topics_size();
+ std::set<std::string> topicSet;
+ // get all topics
+ for (int i = 0; i < numTopics; i++) {
+ // remove partition part
+ const std::string& topicName = response.topics(i);
+ int pos = topicName.find("-partition-");
+ std::string filteredName = topicName.substr(0, pos);
+
+ // filter duped topic name
+ if (topicSet.find(filteredName) == topicSet.end()) {
+ topicSet.insert(filteredName);
+ }
+ }
+
+ NamespaceTopicsPtr topicsPtr =
+ std::make_shared<std::vector<std::string>>(topicSet.begin(),
topicSet.end());
+
+ getTopicsPromise.setValue(topicsPtr);
+ } else {
+ lock.unlock();
+ LOG_WARN(
+ "GetTopicsOfNamespaceResponse command - Received unknown request
id from "
+ "server: "
+ << response.request_id());
+ }
+}
+
+void ClientConnection::handleGetSchemaResponse(const
proto::CommandGetSchemaResponse& response) {
+ LOG_DEBUG(cnxString_ << "Received GetSchemaResponse from server. req_id: "
<< response.request_id());
+ Lock lock(mutex_);
+ auto it = pendingGetSchemaRequests_.find(response.request_id());
+ if (it != pendingGetSchemaRequests_.end()) {
+ Promise<Result, boost::optional<SchemaInfo>> getSchemaPromise =
it->second;
+ pendingGetSchemaRequests_.erase(it);
+ lock.unlock();
+
+ if (response.has_error_code()) {
+ if (response.error_code() == proto::TopicNotFound) {
+ getSchemaPromise.setValue(boost::none);
+ } else {
+ Result result = getResult(response.error_code(),
response.error_message());
+ LOG_WARN(cnxString_ << "Received error GetSchemaResponse from
server " << result
+ << (response.has_error_message() ? (" (" +
response.error_message() + ")")
+ : "")
+ << " -- req_id: " <<
response.request_id());
+ getSchemaPromise.setFailed(result);
+ }
+ return;
+ }
+
+ const auto& schema = response.schema();
+ const auto& properMap = schema.properties();
+ StringMap properties;
+ for (auto kv = properMap.begin(); kv != properMap.end(); ++kv) {
+ properties[kv->key()] = kv->value();
+ }
+ SchemaInfo schemaInfo(static_cast<SchemaType>(schema.type()), "",
schema.schema_data(), properties);
+ getSchemaPromise.setValue(schemaInfo);
+ } else {
+ lock.unlock();
+ LOG_WARN(
+ "GetSchemaResponse command - Received unknown request id from "
+ "server: "
+ << response.request_id());
+ }
+}
+
} // namespace pulsar
diff --git a/lib/ClientConnection.h b/lib/ClientConnection.h
index c77ca78..eae18f9 100644
--- a/lib/ClientConnection.h
+++ b/lib/ClientConnection.h
@@ -76,7 +76,20 @@ namespace proto {
class BaseCommand;
class CommandActiveConsumerChange;
class CommandMessage;
+class CommandCloseConsumer;
+class CommandCloseProducer;
class CommandConnected;
+class CommandConsumerStatsResponse;
+class CommandGetSchemaResponse;
+class CommandGetTopicsOfNamespaceResponse;
+class CommandError;
+class CommandGetLastMessageIdResponse;
+class CommandLookupTopicResponse;
+class CommandPartitionedTopicMetadataResponse;
+class CommandProducerSuccess;
+class CommandSendReceipt;
+class CommandSendError;
+class CommandSuccess;
} // namespace proto
// Data returned on the request operation. Mostly used on create-producer
command
@@ -362,6 +375,21 @@ class PULSAR_PUBLIC ClientConnection : public
std::enable_shared_from_this<Clien
void closeSocket();
void checkServerError(ServerError error);
+
+ void handleSendReceipt(const proto::CommandSendReceipt&);
+ void handleSendError(const proto::CommandSendError&);
+ void handleSuccess(const proto::CommandSuccess&);
+ void handlePartitionedMetadataResponse(const
proto::CommandPartitionedTopicMetadataResponse&);
+ void handleConsumerStatsResponse(const
proto::CommandConsumerStatsResponse&);
+ void handleLookupTopicRespose(const proto::CommandLookupTopicResponse&);
+ void handleProducerSuccess(const proto::CommandProducerSuccess&);
+ void handleError(const proto::CommandError&);
+ void handleCloseProducer(const proto::CommandCloseProducer&);
+ void handleCloseConsumer(const proto::CommandCloseConsumer&);
+ void handleAuthChallenge();
+ void handleGetLastMessageIdResponse(const
proto::CommandGetLastMessageIdResponse&);
+ void handleGetTopicOfNamespaceResponse(const
proto::CommandGetTopicsOfNamespaceResponse&);
+ void handleGetSchemaResponse(const proto::CommandGetSchemaResponse&);
};
} // namespace pulsar