This is an automated email from the ASF dual-hosted git repository.
shoothzj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new 0ab478a [consumer] Support parse broker metadata (#276)
0ab478a is described below
commit 0ab478a3d4c1e61ce2cfa3ab0e5285bb61874a6b
Author: ZhangJian He <[email protected]>
AuthorDate: Tue May 30 21:21:41 2023 +0800
[consumer] Support parse broker metadata (#276)
### Motivation
pulsar-client-cpp doesn't support parse broker metadata. Which will error
whiling connect to enabledbrokerMetada pulsar.
This PR makes pulsar-client-cpp can consume messages well, but haven't
exposing index interface yet.
See also
https://github.com/apache/pulsar-client-go/pull/745
https://github.com/apache/pulsar/blob/e38091044c428af002b16110531497e2abc897d2/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L1289
### Documentation
<!-- DO NOT REMOVE THIS SECTION. CHECK THE PROPER BOX ONLY. -->
- [ ] `doc-required`
(Your PR needs to update docs and you will update later)
- [x] `doc-not-needed`
(Please explain why)
- [ ] `doc`
(Your PR contains doc changes)
- [ ] `doc-complete`
(Docs have been already added)
---
lib/ClientConnection.cc | 27 +++++++++++++++--
lib/ClientConnection.h | 2 ++
lib/Commands.h | 3 ++
run-unit-tests.sh | 6 ++++
tests/CMakeLists.txt | 3 ++
tests/brokermetadata/BrokerMetadataTest.cc | 47 ++++++++++++++++++++++++++++++
tests/brokermetadata/docker-compose.yml | 43 +++++++++++++++++++++++++++
tests/oauth2/Oauth2Test.cc | 1 -
8 files changed, 129 insertions(+), 3 deletions(-)
diff --git a/lib/ClientConnection.cc b/lib/ClientConnection.cc
index 72b9c8e..d955b32 100644
--- a/lib/ClientConnection.cc
+++ b/lib/ClientConnection.cc
@@ -679,11 +679,32 @@ void ClientConnection::processIncomingBuffer() {
if (incomingCmd.type() == BaseCommand::MESSAGE) {
// Parse message metadata and extract payload
proto::MessageMetadata msgMetadata;
+ proto::BrokerEntryMetadata brokerEntryMetadata;
// read checksum
uint32_t remainingBytes = frameSize - (cmdSize + 4);
bool isChecksumValid = verifyChecksum(incomingBuffer_,
remainingBytes, incomingCmd);
+ auto readerIndex = incomingBuffer_.readerIndex();
+ if (incomingBuffer_.readUnsignedShort() ==
Commands::magicBrokerEntryMetadata) {
+ // broker entry metadata is present
+ uint32_t brokerEntryMetadataSize =
incomingBuffer_.readUnsignedInt();
+ if
(!brokerEntryMetadata.ParseFromArray(incomingBuffer_.data(),
brokerEntryMetadataSize)) {
+ LOG_ERROR(cnxString_ << "[consumer id " <<
incomingCmd.message().consumer_id()
+ << ", message ledger id "
+ <<
incomingCmd.message().message_id().ledgerid() << ", entry id "
+ <<
incomingCmd.message().message_id().entryid()
+ << "] Error parsing broker entry
metadata");
+ close();
+ return;
+ }
+
+ incomingBuffer_.consume(brokerEntryMetadataSize);
+ remainingBytes -= (2 + 4 + brokerEntryMetadataSize);
+ } else {
+ incomingBuffer_.setReaderIndex(readerIndex);
+ }
+
uint32_t metadataSize = incomingBuffer_.readUnsignedInt();
if (!msgMetadata.ParseFromArray(incomingBuffer_.data(),
metadataSize)) {
LOG_ERROR(cnxString_ << "[consumer id " <<
incomingCmd.message().consumer_id() //
@@ -701,7 +722,8 @@ void ClientConnection::processIncomingBuffer() {
uint32_t payloadSize = remainingBytes;
SharedBuffer payload = SharedBuffer::copy(incomingBuffer_.data(),
payloadSize);
incomingBuffer_.consume(payloadSize);
- handleIncomingMessage(incomingCmd.message(), isChecksumValid,
msgMetadata, payload);
+ handleIncomingMessage(incomingCmd.message(), isChecksumValid,
brokerEntryMetadata, msgMetadata,
+ payload);
} else {
handleIncomingCommand(incomingCmd);
}
@@ -710,7 +732,7 @@ void ClientConnection::processIncomingBuffer() {
// We still have 1 to 3 bytes from the next frame
assert(incomingBuffer_.readableBytes() < sizeof(uint32_t));
- // Restart with a new buffer and copy the the few bytes at the
beginning
+ // Restart with a new buffer and copy the few bytes at the beginning
incomingBuffer_ = SharedBuffer::copyFrom(incomingBuffer_,
DefaultBufferSize);
// At least we need to read 4 bytes to have the complete frame size
@@ -782,6 +804,7 @@ void ClientConnection::handleActiveConsumerChange(const
proto::CommandActiveCons
}
void ClientConnection::handleIncomingMessage(const proto::CommandMessage& msg,
bool isChecksumValid,
+ proto::BrokerEntryMetadata&
brokerEntryMetadata,
proto::MessageMetadata&
msgMetadata, SharedBuffer& payload) {
LOG_DEBUG(cnxString_ << "Received a message from the server for consumer:
" << msg.consumer_id());
diff --git a/lib/ClientConnection.h b/lib/ClientConnection.h
index 24a43d5..7de9089 100644
--- a/lib/ClientConnection.h
+++ b/lib/ClientConnection.h
@@ -74,6 +74,7 @@ struct OpSendMsg;
namespace proto {
class BaseCommand;
+class BrokerEntryMetadata;
class CommandActiveConsumerChange;
class CommandAckResponse;
class CommandMessage;
@@ -225,6 +226,7 @@ class PULSAR_PUBLIC ClientConnection : public
std::enable_shared_from_this<Clien
void handleActiveConsumerChange(const proto::CommandActiveConsumerChange&
change);
void handleIncomingCommand(proto::BaseCommand& incomingCmd);
void handleIncomingMessage(const proto::CommandMessage& msg, bool
isChecksumValid,
+ proto::BrokerEntryMetadata& brokerEntryMetadata,
proto::MessageMetadata& msgMetadata,
SharedBuffer& payload);
void handlePulsarConnected(const proto::CommandConnected& cmdConnected);
diff --git a/lib/Commands.h b/lib/Commands.h
index 65a6406..c21adb4 100644
--- a/lib/Commands.h
+++ b/lib/Commands.h
@@ -79,6 +79,9 @@ class Commands {
};
const static uint16_t magicCrc32c = 0x0e01;
+
+ const static uint16_t magicBrokerEntryMetadata = 0x0e02;
+
const static int checksumSize = 4;
static SharedBuffer newConnect(const AuthenticationPtr& authentication,
const std::string& logicalAddress,
diff --git a/run-unit-tests.sh b/run-unit-tests.sh
index 8be29f0..8ea834c 100755
--- a/run-unit-tests.sh
+++ b/run-unit-tests.sh
@@ -38,6 +38,12 @@ sleep 15
$CMAKE_BUILD_DIRECTORY/tests/Oauth2Test
docker compose -f tests/oauth2/docker-compose.yml down
+# Run BrokerMetadata tests
+docker compose -f tests/brokermetadata/docker-compose.yml up -d
+sleep 15
+$CMAKE_BUILD_DIRECTORY/tests/BrokerMetadataTest
+docker compose -f tests/brokermetadata/docker-compose.yml down
+
./pulsar-test-service-start.sh
pushd $CMAKE_BUILD_DIRECTORY/tests
diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt
index cfc9e27..f5c8662 100644
--- a/tests/CMakeLists.txt
+++ b/tests/CMakeLists.txt
@@ -65,6 +65,9 @@ if (UNIX)
target_link_libraries(ConnectionFailTest ${CLIENT_LIBS} pulsarStatic
${GTEST_LIBRARY_PATH})
endif ()
+add_executable(BrokerMetadataTest brokermetadata/BrokerMetadataTest.cc)
+target_link_libraries(BrokerMetadataTest ${CLIENT_LIBS} pulsarStatic
${GTEST_LIBRARY_PATH})
+
add_executable(Oauth2Test oauth2/Oauth2Test.cc)
target_compile_options(Oauth2Test PRIVATE
"-DTEST_ROOT_PATH=\"${CMAKE_CURRENT_SOURCE_DIR}\"")
target_link_libraries(Oauth2Test ${CLIENT_LIBS} pulsarStatic
${GTEST_LIBRARY_PATH})
diff --git a/tests/brokermetadata/BrokerMetadataTest.cc
b/tests/brokermetadata/BrokerMetadataTest.cc
new file mode 100644
index 0000000..f012734
--- /dev/null
+++ b/tests/brokermetadata/BrokerMetadataTest.cc
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+// Run `docker-compose up -d` to set up the test environment for this test.
+#include <gtest/gtest.h>
+#include <pulsar/Client.h>
+
+using namespace pulsar;
+
+TEST(BrokerMetadataTest, testConsumeSuccess) {
+ Client client{"pulsar://localhost:6650"};
+ Producer producer;
+ Result producerResult =
client.createProducer("persistent://public/default/testConsumeSuccess",
producer);
+ ASSERT_EQ(producerResult, ResultOk);
+ Consumer consumer;
+ Result consumerResult =
+ client.subscribe("persistent://public/default/testConsumeSuccess",
"testConsumeSuccess", consumer);
+ ASSERT_EQ(consumerResult, ResultOk);
+ const auto msg = MessageBuilder().setContent("testConsumeSuccess").build();
+ Result sendResult = producer.send(msg);
+ ASSERT_EQ(sendResult, ResultOk);
+ Message receivedMsg;
+ Result receiveResult = consumer.receive(receivedMsg);
+ ASSERT_EQ(receiveResult, ResultOk);
+ ASSERT_EQ(receivedMsg.getDataAsString(), "testConsumeSuccess");
+ client.close();
+}
+
+int main(int argc, char* argv[]) {
+ ::testing::InitGoogleTest(&argc, argv);
+ return RUN_ALL_TESTS();
+}
diff --git a/tests/brokermetadata/docker-compose.yml
b/tests/brokermetadata/docker-compose.yml
new file mode 100644
index 0000000..bb719e0
--- /dev/null
+++ b/tests/brokermetadata/docker-compose.yml
@@ -0,0 +1,43 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+version: '3'
+networks:
+ pulsar:
+ driver: bridge
+services:
+ standalone:
+ image: apachepulsar/pulsar:latest
+ container_name: standalone
+ hostname: local
+ restart: "no"
+ networks:
+ - pulsar
+ environment:
+ - metadataStoreUrl=zk:localhost:2181
+ - clusterName=standalone-broker-metadata
+ - advertisedAddress=localhost
+ - advertisedListeners=external:pulsar://localhost:6650
+ - PULSAR_MEM=-Xms512m -Xmx512m -XX:MaxDirectMemorySize=256m
+ -
PULSAR_PREFIX_BROKER_ENTRY_METADATA_INTERCEPTORS=org.apache.pulsar.common.intercept.AppendIndexMetadataInterceptor
+ - PULSAR_PREFIX_EXPOSING_BROKER_ENTRY_METADATA_TO_CLIENT_ENABLED=true
+ ports:
+ - "6650:6650"
+ - "8080:8080"
+ command: bash -c "bin/apply-config-from-env.py conf/standalone.conf &&
exec bin/pulsar standalone -nss -nfw"
diff --git a/tests/oauth2/Oauth2Test.cc b/tests/oauth2/Oauth2Test.cc
index 6264620..5f273d7 100644
--- a/tests/oauth2/Oauth2Test.cc
+++ b/tests/oauth2/Oauth2Test.cc
@@ -75,7 +75,6 @@ int main(int argc, char* argv[]) {
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
- return 0;
}
static Result testCreateProducer(const std::string& privateKey) {