This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-node.git


The following commit(s) were added to refs/heads/master by this push:
     new d3174fa  Support Key-based batcher for producer (#418)
d3174fa is described below

commit d3174fa829bce76ff436996a152b4b21f51b82d1
Author: Baodi Shi <ba...@apache.org>
AuthorDate: Mon May 26 17:52:34 2025 +0800

    Support Key-based batcher for producer (#418)
---
 index.d.ts               |   6 +++
 src/Message.cc           |  11 ++++-
 src/Message.h            |   1 +
 src/ProducerConfig.cc    |  16 +++++++
 tests/end_to_end.test.js | 114 +++++++++++++++++++++++++++++++++++++++++++++++
 5 files changed, 147 insertions(+), 1 deletion(-)

diff --git a/index.d.ts b/index.d.ts
index 69837f9..5097cbc 100644
--- a/index.d.ts
+++ b/index.d.ts
@@ -67,6 +67,7 @@ export interface ProducerConfig {
   chunkingEnabled?: boolean;
   schema?: SchemaInfo;
   accessMode?: ProducerAccessMode;
+  batchingType?: ProducerBatchType;
 }
 
 export class Producer {
@@ -163,6 +164,7 @@ export class Message {
   getEventTimestamp(): number;
   getRedeliveryCount(): number;
   getPartitionKey(): string;
+  getOrderingKey(): string;
 }
 
 export class MessageId {
@@ -271,6 +273,10 @@ export type CompressionType =
   'ZSTD' |
   'SNAPPY';
 
+export type ProducerBatchType =
+    'DefaultBatching' |
+    'KeyBasedBatching';
+
 export type ProducerCryptoFailureAction =
   'FAIL' |
   'SEND';
diff --git a/src/Message.cc b/src/Message.cc
index 06be544..184b8e7 100644
--- a/src/Message.cc
+++ b/src/Message.cc
@@ -45,7 +45,8 @@ Napi::Object Message::Init(Napi::Env env, Napi::Object 
exports) {
        InstanceMethod("getPublishTimestamp", &Message::GetPublishTimestamp),
        InstanceMethod("getEventTimestamp", &Message::GetEventTimestamp),
        InstanceMethod("getRedeliveryCount", &Message::GetRedeliveryCount),
-       InstanceMethod("getPartitionKey", &Message::GetPartitionKey)});
+       InstanceMethod("getPartitionKey", &Message::GetPartitionKey),
+       InstanceMethod("getOrderingKey", &Message::GetOrderingKey)});
 
   constructor = Napi::Persistent(func);
   constructor.SuppressDestruct();
@@ -138,6 +139,14 @@ Napi::Value Message::GetPartitionKey(const 
Napi::CallbackInfo &info) {
   return Napi::String::New(env, 
pulsar_message_get_partitionKey(this->cMessage.get()));
 }
 
+Napi::Value Message::GetOrderingKey(const Napi::CallbackInfo &info) {
+  Napi::Env env = info.Env();
+  if (!ValidateCMessage(env)) {
+    return env.Null();
+  }
+  return Napi::String::New(env, 
pulsar_message_get_orderingKey(this->cMessage.get()));
+}
+
 bool Message::ValidateCMessage(Napi::Env env) {
   if (this->cMessage.get()) {
     return true;
diff --git a/src/Message.h b/src/Message.h
index 42d02c1..6097f1c 100644
--- a/src/Message.h
+++ b/src/Message.h
@@ -44,6 +44,7 @@ class Message : public Napi::ObjectWrap<Message> {
   Napi::Value GetPublishTimestamp(const Napi::CallbackInfo &info);
   Napi::Value GetEventTimestamp(const Napi::CallbackInfo &info);
   Napi::Value GetPartitionKey(const Napi::CallbackInfo &info);
+  Napi::Value GetOrderingKey(const Napi::CallbackInfo &info);
   Napi::Value GetRedeliveryCount(const Napi::CallbackInfo &info);
   bool ValidateCMessage(Napi::Env env);
 
diff --git a/src/ProducerConfig.cc b/src/ProducerConfig.cc
index 120eebf..2c704bf 100644
--- a/src/ProducerConfig.cc
+++ b/src/ProducerConfig.cc
@@ -19,6 +19,7 @@
 #include "SchemaInfo.h"
 #include "ProducerConfig.h"
 #include <map>
+#include "pulsar/ProducerConfiguration.h"
 
 static const std::string CFG_TOPIC = "topic";
 static const std::string CFG_PRODUCER_NAME = "producerName";
@@ -40,6 +41,11 @@ static const std::string CFG_ENCRYPTION_KEY = 
"encryptionKey";
 static const std::string CFG_CRYPTO_FAILURE_ACTION = "cryptoFailureAction";
 static const std::string CFG_CHUNK_ENABLED = "chunkingEnabled";
 static const std::string CFG_ACCESS_MODE = "accessMode";
+static const std::string CFG_BATCHING_TYPE = "batchingType";
+
+struct _pulsar_producer_configuration {
+  pulsar::ProducerConfiguration conf;
+};
 
 static const std::map<std::string, pulsar_partitions_routing_mode> 
MESSAGE_ROUTING_MODE = {
     {"UseSinglePartition", pulsar_UseSinglePartition},
@@ -71,6 +77,11 @@ static std::map<std::string, pulsar_producer_access_mode> 
PRODUCER_ACCESS_MODE =
     {"ExclusiveWithFencing", pulsar_ProducerAccessModeExclusiveWithFencing},
 };
 
+static std::map<std::string, pulsar::ProducerConfiguration::BatchingType> 
PRODUCER_BATCHING_TYPE = {
+    {"DefaultBatching", pulsar::ProducerConfiguration::DefaultBatching},
+    {"KeyBasedBatching", pulsar::ProducerConfiguration::KeyBasedBatching},
+};
+
 ProducerConfig::ProducerConfig(const Napi::Object& producerConfig) : topic("") 
{
   this->cProducerConfig = std::shared_ptr<pulsar_producer_configuration_t>(
       pulsar_producer_configuration_create(), 
pulsar_producer_configuration_free);
@@ -208,6 +219,11 @@ ProducerConfig::ProducerConfig(const Napi::Object& 
producerConfig) : topic("") {
     pulsar_producer_configuration_set_access_mode(this->cProducerConfig.get(),
                                                   
PRODUCER_ACCESS_MODE.at(accessMode));
   }
+
+  std::string batchingType = 
producerConfig.Get(CFG_BATCHING_TYPE).ToString().Utf8Value();
+  if (PRODUCER_BATCHING_TYPE.count(batchingType)) {
+    
this->cProducerConfig.get()->conf.setBatchingType(PRODUCER_BATCHING_TYPE.at(batchingType));
+  }
 }
 
 ProducerConfig::~ProducerConfig() {}
diff --git a/tests/end_to_end.test.js b/tests/end_to_end.test.js
index 304683d..ce3693f 100644
--- a/tests/end_to_end.test.js
+++ b/tests/end_to_end.test.js
@@ -1330,4 +1330,118 @@ const Pulsar = require('../index');
       await client.close();
     });
   });
+  describe('KeyBasedBatchingTest', () => {
+    let client;
+    let producer;
+    let consumer;
+    let topicName;
+
+    beforeAll(async () => {
+      client = new Pulsar.Client({
+        serviceUrl: 'pulsar://localhost:6650',
+      });
+    });
+
+    afterAll(async () => {
+      await client.close();
+    });
+
+    beforeEach(async () => {
+      topicName = `KeyBasedBatchingTest-${Date.now()}`;
+    });
+
+    afterEach(async () => {
+      if (producer) await producer.close();
+      if (consumer) await consumer.close();
+    });
+
+    const initProducer = async (maxMessages) => {
+      producer = await client.createProducer({
+        topic: topicName,
+        batchingEnabled: true,
+        batchingMaxMessages: maxMessages,
+        batchingType: 'KeyBasedBatching',
+        batchingMaxPublishDelayMs: 3600 * 1000,
+      });
+    };
+
+    const initConsumer = async () => {
+      consumer = await client.subscribe({
+        topic: topicName,
+        subscription: 'SubscriptionName',
+        subscriptionType: 'Exclusive',
+      });
+    };
+
+    const receiveAndAck = async () => {
+      const msg = await consumer.receive();
+      await consumer.acknowledge(msg);
+      return msg;
+    };
+
+    test('testSequenceId', async () => {
+      await initProducer(6);
+      await initConsumer();
+
+      // 0. Send 6 messages, use different keys and order
+      producer.send({ data: Buffer.from('0'), partitionKey: 'A' });
+      producer.send({ data: Buffer.from('1'), partitionKey: 'B' });
+      producer.send({ data: Buffer.from('2'), partitionKey: 'C' });
+      producer.send({ data: Buffer.from('3'), partitionKey: 'B' });
+      producer.send({ data: Buffer.from('4'), partitionKey: 'C' });
+      producer.send({ data: Buffer.from('5'), partitionKey: 'A' });
+      await producer.flush();
+
+      // 1. Receive all messages
+      const received = [];
+      for (let i = 0; i < 6; i += 1) {
+        const msg = await receiveAndAck();
+        received.push({
+          key: msg.getPartitionKey().toString(),
+          value: msg.getData().toString(),
+        });
+      }
+
+      // 2. Verify message order (based on key dictionary order)
+      const expected = [
+        { key: 'B', value: '1' },
+        { key: 'B', value: '3' },
+        { key: 'C', value: '2' },
+        { key: 'C', value: '4' },
+        { key: 'A', value: '0' },
+        { key: 'A', value: '5' },
+      ];
+
+      expect(received).toEqual(expected);
+    });
+
+    test('testOrderingKeyPriority', async () => {
+      await initProducer(3);
+      await initConsumer();
+
+      // 1. Send 3 messages to verify orderingKey takes precedence over 
partitionKey
+      producer.send({
+        data: Buffer.from('0'),
+        orderingKey: 'A',
+        partitionKey: 'B',
+      });
+      producer.send({ data: Buffer.from('2'), orderingKey: 'B' });
+      producer.send({ data: Buffer.from('1'), orderingKey: 'A' });
+      await producer.flush();
+
+      // 2. Receive messages and verify their order and keys
+      const msg1 = await receiveAndAck();
+      expect(msg1.getData().toString()).toBe('2');
+      expect(msg1.getOrderingKey().toString()).toBe('B');
+
+      const msg2 = await receiveAndAck();
+      expect(msg2.getData().toString()).toBe('0');
+      expect(msg2.getOrderingKey()).toBe('A');
+      expect(msg2.getPartitionKey()).toBe('B');
+
+      const msg3 = await receiveAndAck();
+      expect(msg3.getData().toString()).toBe('1');
+      expect(msg3.getOrderingKey().toString()).toBe('A');
+    });
+  });
 })();

Reply via email to