This is an automated email from the ASF dual-hosted git repository.

shoothzj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new 0ab478a  [consumer] Support parse broker metadata (#276)
0ab478a is described below

commit 0ab478a3d4c1e61ce2cfa3ab0e5285bb61874a6b
Author: ZhangJian He <[email protected]>
AuthorDate: Tue May 30 21:21:41 2023 +0800

    [consumer] Support parse broker metadata (#276)
    
    ### Motivation
    pulsar-client-cpp doesn't support parse broker metadata. Which will error 
whiling connect to  enabledbrokerMetada pulsar.
    This PR makes pulsar-client-cpp can consume messages well, but haven't 
exposing index interface yet.
    
    See also
    
    https://github.com/apache/pulsar-client-go/pull/745
    
https://github.com/apache/pulsar/blob/e38091044c428af002b16110531497e2abc897d2/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L1289
    
    ### Documentation
    
    <!-- DO NOT REMOVE THIS SECTION. CHECK THE PROPER BOX ONLY. -->
    
    - [ ] `doc-required`
    (Your PR needs to update docs and you will update later)
    
    - [x] `doc-not-needed`
    (Please explain why)
    
    - [ ] `doc`
    (Your PR contains doc changes)
    
    - [ ] `doc-complete`
    (Docs have been already added)
---
 lib/ClientConnection.cc                    | 27 +++++++++++++++--
 lib/ClientConnection.h                     |  2 ++
 lib/Commands.h                             |  3 ++
 run-unit-tests.sh                          |  6 ++++
 tests/CMakeLists.txt                       |  3 ++
 tests/brokermetadata/BrokerMetadataTest.cc | 47 ++++++++++++++++++++++++++++++
 tests/brokermetadata/docker-compose.yml    | 43 +++++++++++++++++++++++++++
 tests/oauth2/Oauth2Test.cc                 |  1 -
 8 files changed, 129 insertions(+), 3 deletions(-)

diff --git a/lib/ClientConnection.cc b/lib/ClientConnection.cc
index 72b9c8e..d955b32 100644
--- a/lib/ClientConnection.cc
+++ b/lib/ClientConnection.cc
@@ -679,11 +679,32 @@ void ClientConnection::processIncomingBuffer() {
         if (incomingCmd.type() == BaseCommand::MESSAGE) {
             // Parse message metadata and extract payload
             proto::MessageMetadata msgMetadata;
+            proto::BrokerEntryMetadata brokerEntryMetadata;
 
             // read checksum
             uint32_t remainingBytes = frameSize - (cmdSize + 4);
             bool isChecksumValid = verifyChecksum(incomingBuffer_, 
remainingBytes, incomingCmd);
 
+            auto readerIndex = incomingBuffer_.readerIndex();
+            if (incomingBuffer_.readUnsignedShort() == 
Commands::magicBrokerEntryMetadata) {
+                // broker entry metadata is present
+                uint32_t brokerEntryMetadataSize = 
incomingBuffer_.readUnsignedInt();
+                if 
(!brokerEntryMetadata.ParseFromArray(incomingBuffer_.data(), 
brokerEntryMetadataSize)) {
+                    LOG_ERROR(cnxString_ << "[consumer id " << 
incomingCmd.message().consumer_id()
+                                         << ", message ledger id "
+                                         << 
incomingCmd.message().message_id().ledgerid() << ", entry id "
+                                         << 
incomingCmd.message().message_id().entryid()
+                                         << "] Error parsing broker entry 
metadata");
+                    close();
+                    return;
+                }
+
+                incomingBuffer_.consume(brokerEntryMetadataSize);
+                remainingBytes -= (2 + 4 + brokerEntryMetadataSize);
+            } else {
+                incomingBuffer_.setReaderIndex(readerIndex);
+            }
+
             uint32_t metadataSize = incomingBuffer_.readUnsignedInt();
             if (!msgMetadata.ParseFromArray(incomingBuffer_.data(), 
metadataSize)) {
                 LOG_ERROR(cnxString_ << "[consumer id " << 
incomingCmd.message().consumer_id()  //
@@ -701,7 +722,8 @@ void ClientConnection::processIncomingBuffer() {
             uint32_t payloadSize = remainingBytes;
             SharedBuffer payload = SharedBuffer::copy(incomingBuffer_.data(), 
payloadSize);
             incomingBuffer_.consume(payloadSize);
-            handleIncomingMessage(incomingCmd.message(), isChecksumValid, 
msgMetadata, payload);
+            handleIncomingMessage(incomingCmd.message(), isChecksumValid, 
brokerEntryMetadata, msgMetadata,
+                                  payload);
         } else {
             handleIncomingCommand(incomingCmd);
         }
@@ -710,7 +732,7 @@ void ClientConnection::processIncomingBuffer() {
         // We still have 1 to 3 bytes from the next frame
         assert(incomingBuffer_.readableBytes() < sizeof(uint32_t));
 
-        // Restart with a new buffer and copy the the few bytes at the 
beginning
+        // Restart with a new buffer and copy the few bytes at the beginning
         incomingBuffer_ = SharedBuffer::copyFrom(incomingBuffer_, 
DefaultBufferSize);
 
         // At least we need to read 4 bytes to have the complete frame size
@@ -782,6 +804,7 @@ void ClientConnection::handleActiveConsumerChange(const 
proto::CommandActiveCons
 }
 
 void ClientConnection::handleIncomingMessage(const proto::CommandMessage& msg, 
bool isChecksumValid,
+                                             proto::BrokerEntryMetadata& 
brokerEntryMetadata,
                                              proto::MessageMetadata& 
msgMetadata, SharedBuffer& payload) {
     LOG_DEBUG(cnxString_ << "Received a message from the server for consumer: 
" << msg.consumer_id());
 
diff --git a/lib/ClientConnection.h b/lib/ClientConnection.h
index 24a43d5..7de9089 100644
--- a/lib/ClientConnection.h
+++ b/lib/ClientConnection.h
@@ -74,6 +74,7 @@ struct OpSendMsg;
 
 namespace proto {
 class BaseCommand;
+class BrokerEntryMetadata;
 class CommandActiveConsumerChange;
 class CommandAckResponse;
 class CommandMessage;
@@ -225,6 +226,7 @@ class PULSAR_PUBLIC ClientConnection : public 
std::enable_shared_from_this<Clien
     void handleActiveConsumerChange(const proto::CommandActiveConsumerChange& 
change);
     void handleIncomingCommand(proto::BaseCommand& incomingCmd);
     void handleIncomingMessage(const proto::CommandMessage& msg, bool 
isChecksumValid,
+                               proto::BrokerEntryMetadata& brokerEntryMetadata,
                                proto::MessageMetadata& msgMetadata, 
SharedBuffer& payload);
 
     void handlePulsarConnected(const proto::CommandConnected& cmdConnected);
diff --git a/lib/Commands.h b/lib/Commands.h
index 65a6406..c21adb4 100644
--- a/lib/Commands.h
+++ b/lib/Commands.h
@@ -79,6 +79,9 @@ class Commands {
     };
 
     const static uint16_t magicCrc32c = 0x0e01;
+
+    const static uint16_t magicBrokerEntryMetadata = 0x0e02;
+
     const static int checksumSize = 4;
 
     static SharedBuffer newConnect(const AuthenticationPtr& authentication, 
const std::string& logicalAddress,
diff --git a/run-unit-tests.sh b/run-unit-tests.sh
index 8be29f0..8ea834c 100755
--- a/run-unit-tests.sh
+++ b/run-unit-tests.sh
@@ -38,6 +38,12 @@ sleep 15
 $CMAKE_BUILD_DIRECTORY/tests/Oauth2Test
 docker compose -f tests/oauth2/docker-compose.yml down
 
+# Run BrokerMetadata tests
+docker compose -f tests/brokermetadata/docker-compose.yml up -d
+sleep 15
+$CMAKE_BUILD_DIRECTORY/tests/BrokerMetadataTest
+docker compose -f tests/brokermetadata/docker-compose.yml down
+
 ./pulsar-test-service-start.sh
 
 pushd $CMAKE_BUILD_DIRECTORY/tests
diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt
index cfc9e27..f5c8662 100644
--- a/tests/CMakeLists.txt
+++ b/tests/CMakeLists.txt
@@ -65,6 +65,9 @@ if (UNIX)
     target_link_libraries(ConnectionFailTest ${CLIENT_LIBS} pulsarStatic 
${GTEST_LIBRARY_PATH})
 endif ()
 
+add_executable(BrokerMetadataTest brokermetadata/BrokerMetadataTest.cc)
+target_link_libraries(BrokerMetadataTest ${CLIENT_LIBS} pulsarStatic 
${GTEST_LIBRARY_PATH})
+
 add_executable(Oauth2Test oauth2/Oauth2Test.cc)
 target_compile_options(Oauth2Test PRIVATE 
"-DTEST_ROOT_PATH=\"${CMAKE_CURRENT_SOURCE_DIR}\"")
 target_link_libraries(Oauth2Test ${CLIENT_LIBS} pulsarStatic 
${GTEST_LIBRARY_PATH})
diff --git a/tests/brokermetadata/BrokerMetadataTest.cc 
b/tests/brokermetadata/BrokerMetadataTest.cc
new file mode 100644
index 0000000..f012734
--- /dev/null
+++ b/tests/brokermetadata/BrokerMetadataTest.cc
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+// Run `docker-compose up -d` to set up the test environment for this test.
+#include <gtest/gtest.h>
+#include <pulsar/Client.h>
+
+using namespace pulsar;
+
+TEST(BrokerMetadataTest, testConsumeSuccess) {
+    Client client{"pulsar://localhost:6650"};
+    Producer producer;
+    Result producerResult = 
client.createProducer("persistent://public/default/testConsumeSuccess", 
producer);
+    ASSERT_EQ(producerResult, ResultOk);
+    Consumer consumer;
+    Result consumerResult =
+        client.subscribe("persistent://public/default/testConsumeSuccess", 
"testConsumeSuccess", consumer);
+    ASSERT_EQ(consumerResult, ResultOk);
+    const auto msg = MessageBuilder().setContent("testConsumeSuccess").build();
+    Result sendResult = producer.send(msg);
+    ASSERT_EQ(sendResult, ResultOk);
+    Message receivedMsg;
+    Result receiveResult = consumer.receive(receivedMsg);
+    ASSERT_EQ(receiveResult, ResultOk);
+    ASSERT_EQ(receivedMsg.getDataAsString(), "testConsumeSuccess");
+    client.close();
+}
+
+int main(int argc, char* argv[]) {
+    ::testing::InitGoogleTest(&argc, argv);
+    return RUN_ALL_TESTS();
+}
diff --git a/tests/brokermetadata/docker-compose.yml 
b/tests/brokermetadata/docker-compose.yml
new file mode 100644
index 0000000..bb719e0
--- /dev/null
+++ b/tests/brokermetadata/docker-compose.yml
@@ -0,0 +1,43 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+version: '3'
+networks:
+  pulsar:
+    driver: bridge
+services:
+  standalone:
+    image: apachepulsar/pulsar:latest
+    container_name: standalone
+    hostname: local
+    restart: "no"
+    networks:
+      - pulsar
+    environment:
+      - metadataStoreUrl=zk:localhost:2181
+      - clusterName=standalone-broker-metadata
+      - advertisedAddress=localhost
+      - advertisedListeners=external:pulsar://localhost:6650
+      - PULSAR_MEM=-Xms512m -Xmx512m -XX:MaxDirectMemorySize=256m
+      - 
PULSAR_PREFIX_BROKER_ENTRY_METADATA_INTERCEPTORS=org.apache.pulsar.common.intercept.AppendIndexMetadataInterceptor
+      - PULSAR_PREFIX_EXPOSING_BROKER_ENTRY_METADATA_TO_CLIENT_ENABLED=true
+    ports:
+      - "6650:6650"
+      - "8080:8080"
+    command: bash -c "bin/apply-config-from-env.py conf/standalone.conf && 
exec bin/pulsar standalone -nss -nfw"
diff --git a/tests/oauth2/Oauth2Test.cc b/tests/oauth2/Oauth2Test.cc
index 6264620..5f273d7 100644
--- a/tests/oauth2/Oauth2Test.cc
+++ b/tests/oauth2/Oauth2Test.cc
@@ -75,7 +75,6 @@ int main(int argc, char* argv[]) {
 
     ::testing::InitGoogleTest(&argc, argv);
     return RUN_ALL_TESTS();
-    return 0;
 }
 
 static Result testCreateProducer(const std::string& privateKey) {

Reply via email to