+1 Regards, penghui
On Thu, Feb 26, 2026 at 6:28 PM WenZhi Feng <[email protected]> wrote: > Sounds good! Pulsar definitely needs such a simplification. > > On 2026/02/27 02:07:10 Matteo Merli wrote: > > Hi everyone, > > > > I'd like to start a discussion around a potential redesign of the Java > > client API, shipped as a new separate module (pulsar-client-api-v5), > > starting from next LTS version 5.0. This is not a fully fledged PIP > > yet — I want to gather feedback and gauge interest before formalizing > > a proposal. > > > > The draft API (interfaces only, no implementation) is available here: > > > > API: > https://github.com/merlimat/pulsar/tree/v5-api/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5 > > > > Usage examples: > > > https://github.com/merlimat/pulsar/blob/v5-api/pulsar-client-api-v5/src/test/java/org/apache/pulsar/client/api/v5/Examples.java > > > > Why a new API? > > > > Three main drivers: > > > > Remove partitioning from the client API > > > > The current API leaks partitioning everywhere: TopicMessageId, > > getPartitionsForTopic(), MessageRouter, MessageRoutingMode, > > partition-specific consumers. This forces application code to deal > > with what is fundamentally a server-side scalability concern. > > > > In the new API, topics are opaque. Scalability is handled internally > > by hash-range segments — the client never sees them. Per-key ordering > > is still guaranteed when a key is specified, but the underlying > > parallelism is entirely managed by the broker. > > > > Simplify an API that grew too big and inconsistent > > > > After years of organic growth, the current API surface has accumulated > > a lot of baggage: > > > > Consumer has 60+ methods mixing unrelated concerns (ack, nack, seek, > > pause, unsubscribe, stats...) > > ConsumerBuilder has 40+ configuration methods with overlapping semantics > > Timeouts use (long, TimeUnit) in some places, long millis in others > > Nullable returns vs empty — inconsistent across the API > > loadConf(Map), clone(), Serializable on builders — rarely used, clutters > the API > > SPI via reflection hack (DefaultImplementation) instead of standard > > ServiceLoader > > > > The v5 API targets modern Java, uses > > Duration/Instant/Optional/records, and keeps a minimal surface. > > > > Separate streaming vs queuing consumption > > > > The current Consumer mixes all four subscription types (Exclusive, > > Failover, Shared, Key_Shared) behind a single interface. The result: > > acknowledgeCumulative() is available but throws at runtime for Shared > > subscriptions; negativeAcknowledge() semantics differ between modes; > > seek() behavior varies; dead-letter policy only applies to some modes. > > > > The v5 API splits this into purpose-built types: > > > > StreamConsumer — ordered consumption with cumulative ack (maps to > > Exclusive/Failover). For event sourcing, CDC, ordered pipelines. > > QueueConsumer — unordered parallel consumption with individual ack, > > nack, dead-letter support (maps to Shared/Key_Shared). For work > > queues, task processing. > > CheckpointConsumer — unmanaged consumption for connector frameworks > > (Flink, Spark). No subscription, no ack — position is tracked > > externally via an opaque Checkpoint type that captures a consistent > > position across all internal hash-range segments. > > > > Each type only exposes the operations that make sense for its model. > > No more runtime surprises. > > > > API overview > > > > Entry point: > > > > PulsarClient > > .newProducer(schema) -> ProducerBuilder -> Producer<T> > > .newStreamConsumer(schema) -> StreamConsumerBuilder -> StreamConsumer<T> > > .newQueueConsumer(schema) -> QueueConsumerBuilder -> QueueConsumer<T> > > .newCheckpointConsumer(schema) -> CheckpointConsumerBuilder -> > > CheckpointConsumer<T> > > .newTransaction() -> Transaction > > > > Sync-first with .async() accessor — each type has an async counterpart > > (e.g. producer.async() returns AsyncProducer) with > > CompletableFuture-based operations. > > > > Configuration grouped into policy records: BatchingPolicy, > > CompressionPolicy, TlsPolicy, BackoffPolicy, DeadLetterPolicy, > > EncryptionPolicy, etc. > > > > Checkpoint is an opaque serializable position vector with factories: > > Checkpoint.earliest(), Checkpoint.latest(), > > Checkpoint.atTimestamp(instant), Checkpoint.fromByteArray(bytes). > > Connectors can snapshot and restore positions without knowing about > > internal segments. > > > > What this does NOT change > > > > Wire protocol — unchanged > > Broker — unchanged > > Existing pulsar-client-api — stays as-is, fully supported > > This is API-only (interfaces) — the implementation module would come > later > > > > Looking forward to your thoughts! > > > > -- > > Matteo Merli > > <[email protected]> > > >
