This is an automated email from the ASF dual-hosted git repository. xyz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar-client-node.git
The following commit(s) were added to refs/heads/master by this push: new d3174fa Support Key-based batcher for producer (#418) d3174fa is described below commit d3174fa829bce76ff436996a152b4b21f51b82d1 Author: Baodi Shi <ba...@apache.org> AuthorDate: Mon May 26 17:52:34 2025 +0800 Support Key-based batcher for producer (#418) --- index.d.ts | 6 +++ src/Message.cc | 11 ++++- src/Message.h | 1 + src/ProducerConfig.cc | 16 +++++++ tests/end_to_end.test.js | 114 +++++++++++++++++++++++++++++++++++++++++++++++ 5 files changed, 147 insertions(+), 1 deletion(-) diff --git a/index.d.ts b/index.d.ts index 69837f9..5097cbc 100644 --- a/index.d.ts +++ b/index.d.ts @@ -67,6 +67,7 @@ export interface ProducerConfig { chunkingEnabled?: boolean; schema?: SchemaInfo; accessMode?: ProducerAccessMode; + batchingType?: ProducerBatchType; } export class Producer { @@ -163,6 +164,7 @@ export class Message { getEventTimestamp(): number; getRedeliveryCount(): number; getPartitionKey(): string; + getOrderingKey(): string; } export class MessageId { @@ -271,6 +273,10 @@ export type CompressionType = 'ZSTD' | 'SNAPPY'; +export type ProducerBatchType = + 'DefaultBatching' | + 'KeyBasedBatching'; + export type ProducerCryptoFailureAction = 'FAIL' | 'SEND'; diff --git a/src/Message.cc b/src/Message.cc index 06be544..184b8e7 100644 --- a/src/Message.cc +++ b/src/Message.cc @@ -45,7 +45,8 @@ Napi::Object Message::Init(Napi::Env env, Napi::Object exports) { InstanceMethod("getPublishTimestamp", &Message::GetPublishTimestamp), InstanceMethod("getEventTimestamp", &Message::GetEventTimestamp), InstanceMethod("getRedeliveryCount", &Message::GetRedeliveryCount), - InstanceMethod("getPartitionKey", &Message::GetPartitionKey)}); + InstanceMethod("getPartitionKey", &Message::GetPartitionKey), + InstanceMethod("getOrderingKey", &Message::GetOrderingKey)}); constructor = Napi::Persistent(func); constructor.SuppressDestruct(); @@ -138,6 +139,14 @@ Napi::Value Message::GetPartitionKey(const Napi::CallbackInfo &info) { return Napi::String::New(env, pulsar_message_get_partitionKey(this->cMessage.get())); } +Napi::Value Message::GetOrderingKey(const Napi::CallbackInfo &info) { + Napi::Env env = info.Env(); + if (!ValidateCMessage(env)) { + return env.Null(); + } + return Napi::String::New(env, pulsar_message_get_orderingKey(this->cMessage.get())); +} + bool Message::ValidateCMessage(Napi::Env env) { if (this->cMessage.get()) { return true; diff --git a/src/Message.h b/src/Message.h index 42d02c1..6097f1c 100644 --- a/src/Message.h +++ b/src/Message.h @@ -44,6 +44,7 @@ class Message : public Napi::ObjectWrap<Message> { Napi::Value GetPublishTimestamp(const Napi::CallbackInfo &info); Napi::Value GetEventTimestamp(const Napi::CallbackInfo &info); Napi::Value GetPartitionKey(const Napi::CallbackInfo &info); + Napi::Value GetOrderingKey(const Napi::CallbackInfo &info); Napi::Value GetRedeliveryCount(const Napi::CallbackInfo &info); bool ValidateCMessage(Napi::Env env); diff --git a/src/ProducerConfig.cc b/src/ProducerConfig.cc index 120eebf..2c704bf 100644 --- a/src/ProducerConfig.cc +++ b/src/ProducerConfig.cc @@ -19,6 +19,7 @@ #include "SchemaInfo.h" #include "ProducerConfig.h" #include <map> +#include "pulsar/ProducerConfiguration.h" static const std::string CFG_TOPIC = "topic"; static const std::string CFG_PRODUCER_NAME = "producerName"; @@ -40,6 +41,11 @@ static const std::string CFG_ENCRYPTION_KEY = "encryptionKey"; static const std::string CFG_CRYPTO_FAILURE_ACTION = "cryptoFailureAction"; static const std::string CFG_CHUNK_ENABLED = "chunkingEnabled"; static const std::string CFG_ACCESS_MODE = "accessMode"; +static const std::string CFG_BATCHING_TYPE = "batchingType"; + +struct _pulsar_producer_configuration { + pulsar::ProducerConfiguration conf; +}; static const std::map<std::string, pulsar_partitions_routing_mode> MESSAGE_ROUTING_MODE = { {"UseSinglePartition", pulsar_UseSinglePartition}, @@ -71,6 +77,11 @@ static std::map<std::string, pulsar_producer_access_mode> PRODUCER_ACCESS_MODE = {"ExclusiveWithFencing", pulsar_ProducerAccessModeExclusiveWithFencing}, }; +static std::map<std::string, pulsar::ProducerConfiguration::BatchingType> PRODUCER_BATCHING_TYPE = { + {"DefaultBatching", pulsar::ProducerConfiguration::DefaultBatching}, + {"KeyBasedBatching", pulsar::ProducerConfiguration::KeyBasedBatching}, +}; + ProducerConfig::ProducerConfig(const Napi::Object& producerConfig) : topic("") { this->cProducerConfig = std::shared_ptr<pulsar_producer_configuration_t>( pulsar_producer_configuration_create(), pulsar_producer_configuration_free); @@ -208,6 +219,11 @@ ProducerConfig::ProducerConfig(const Napi::Object& producerConfig) : topic("") { pulsar_producer_configuration_set_access_mode(this->cProducerConfig.get(), PRODUCER_ACCESS_MODE.at(accessMode)); } + + std::string batchingType = producerConfig.Get(CFG_BATCHING_TYPE).ToString().Utf8Value(); + if (PRODUCER_BATCHING_TYPE.count(batchingType)) { + this->cProducerConfig.get()->conf.setBatchingType(PRODUCER_BATCHING_TYPE.at(batchingType)); + } } ProducerConfig::~ProducerConfig() {} diff --git a/tests/end_to_end.test.js b/tests/end_to_end.test.js index 304683d..ce3693f 100644 --- a/tests/end_to_end.test.js +++ b/tests/end_to_end.test.js @@ -1330,4 +1330,118 @@ const Pulsar = require('../index'); await client.close(); }); }); + describe('KeyBasedBatchingTest', () => { + let client; + let producer; + let consumer; + let topicName; + + beforeAll(async () => { + client = new Pulsar.Client({ + serviceUrl: 'pulsar://localhost:6650', + }); + }); + + afterAll(async () => { + await client.close(); + }); + + beforeEach(async () => { + topicName = `KeyBasedBatchingTest-${Date.now()}`; + }); + + afterEach(async () => { + if (producer) await producer.close(); + if (consumer) await consumer.close(); + }); + + const initProducer = async (maxMessages) => { + producer = await client.createProducer({ + topic: topicName, + batchingEnabled: true, + batchingMaxMessages: maxMessages, + batchingType: 'KeyBasedBatching', + batchingMaxPublishDelayMs: 3600 * 1000, + }); + }; + + const initConsumer = async () => { + consumer = await client.subscribe({ + topic: topicName, + subscription: 'SubscriptionName', + subscriptionType: 'Exclusive', + }); + }; + + const receiveAndAck = async () => { + const msg = await consumer.receive(); + await consumer.acknowledge(msg); + return msg; + }; + + test('testSequenceId', async () => { + await initProducer(6); + await initConsumer(); + + // 0. Send 6 messages, use different keys and order + producer.send({ data: Buffer.from('0'), partitionKey: 'A' }); + producer.send({ data: Buffer.from('1'), partitionKey: 'B' }); + producer.send({ data: Buffer.from('2'), partitionKey: 'C' }); + producer.send({ data: Buffer.from('3'), partitionKey: 'B' }); + producer.send({ data: Buffer.from('4'), partitionKey: 'C' }); + producer.send({ data: Buffer.from('5'), partitionKey: 'A' }); + await producer.flush(); + + // 1. Receive all messages + const received = []; + for (let i = 0; i < 6; i += 1) { + const msg = await receiveAndAck(); + received.push({ + key: msg.getPartitionKey().toString(), + value: msg.getData().toString(), + }); + } + + // 2. Verify message order (based on key dictionary order) + const expected = [ + { key: 'B', value: '1' }, + { key: 'B', value: '3' }, + { key: 'C', value: '2' }, + { key: 'C', value: '4' }, + { key: 'A', value: '0' }, + { key: 'A', value: '5' }, + ]; + + expect(received).toEqual(expected); + }); + + test('testOrderingKeyPriority', async () => { + await initProducer(3); + await initConsumer(); + + // 1. Send 3 messages to verify orderingKey takes precedence over partitionKey + producer.send({ + data: Buffer.from('0'), + orderingKey: 'A', + partitionKey: 'B', + }); + producer.send({ data: Buffer.from('2'), orderingKey: 'B' }); + producer.send({ data: Buffer.from('1'), orderingKey: 'A' }); + await producer.flush(); + + // 2. Receive messages and verify their order and keys + const msg1 = await receiveAndAck(); + expect(msg1.getData().toString()).toBe('2'); + expect(msg1.getOrderingKey().toString()).toBe('B'); + + const msg2 = await receiveAndAck(); + expect(msg2.getData().toString()).toBe('0'); + expect(msg2.getOrderingKey()).toBe('A'); + expect(msg2.getPartitionKey()).toBe('B'); + + const msg3 = await receiveAndAck(); + expect(msg3.getData().toString()).toBe('1'); + expect(msg3.getOrderingKey().toString()).toBe('A'); + }); + }); })();