This is an automated email from the ASF dual-hosted git repository.

merlimat pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new c28dd859c22 [feat]PIP-468: Wire transactions into V5 client (#25631)
c28dd859c22 is described below

commit c28dd859c221cf5895879226b49d940f483885ed
Author: Matteo Merli <[email protected]>
AuthorDate: Thu Apr 30 15:41:00 2026 -0700

    [feat]PIP-468: Wire transactions into V5 client (#25631)
---
 .../pendingack/impl/MLPendingAckStore.java         |  13 +-
 .../pulsar/broker/service/SharedPulsarCluster.java |  26 ++
 .../pulsar/client/api/v5/V5TransactionTest.java    | 389 +++++++++++++++++++++
 .../apache/pulsar/client/impl/ConsumerAckTest.java |  52 +--
 .../client/impl/v5/AsyncMessageBuilderV5.java      |   5 +-
 .../pulsar/client/impl/v5/MessageBuilderV5.java    |   5 +-
 .../client/impl/v5/PulsarClientBuilderV5.java      |   5 +-
 .../pulsar/client/impl/v5/PulsarClientV5.java      |  28 +-
 .../client/impl/v5/ScalableQueueConsumer.java      |   8 +-
 .../client/impl/v5/ScalableStreamConsumer.java     |  12 +-
 .../client/impl/v5/ScalableTopicProducer.java      |  25 +-
 .../pulsar/client/impl/v5/TransactionV5.java       | 114 ++++++
 12 files changed, 606 insertions(+), 76 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStore.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStore.java
index 3d2995b7f13..e10a93cbc1a 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStore.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStore.java
@@ -523,7 +523,18 @@ public class MLPendingAckStore implements PendingAckStore {
     }
 
     public static String getTransactionPendingAckStoreSuffix(String 
originTopicName, String subName) {
-        return TopicName.get(originTopicName) + "-" + subName + 
SystemTopicNames.PENDING_ACK_STORE_SUFFIX;
+        TopicName origin = TopicName.get(originTopicName);
+        // Segment topics 
("segment://tenant/ns/topic/<hexStart>-<hexEnd>-<segmentId>") cannot
+        // host a derived pending-ack topic in the segment domain — the 
descriptor parser would
+        // reject any name with extra dashes appended. Map to a flat 
persistent topic in the same
+        // namespace, encoding the segment descriptor into the local name.
+        if (origin.isSegment()) {
+            return String.format("persistent://%s/%s/%s-%s-%s%s",
+                    origin.getTenant(), origin.getNamespacePortion(),
+                    origin.getLocalName(), origin.getSegmentDescriptor(),
+                    subName, SystemTopicNames.PENDING_ACK_STORE_SUFFIX);
+        }
+        return origin + "-" + subName + 
SystemTopicNames.PENDING_ACK_STORE_SUFFIX;
     }
 
     public static String getTransactionPendingAckStoreCursorName() {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SharedPulsarCluster.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SharedPulsarCluster.java
index 1866a5a9649..dc14b163e4d 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SharedPulsarCluster.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SharedPulsarCluster.java
@@ -28,6 +28,9 @@ import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.SystemTopicNames;
+import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.TenantInfo;
 import org.apache.pulsar.metadata.bookkeeper.BKCluster;
@@ -166,6 +169,13 @@ public class SharedPulsarCluster {
         // Disable the load balancer — single-broker cluster doesn't need it
         config.setLoadBalancerEnabled(false);
 
+        // Enable the transaction coordinator so V5 transaction tests can run 
on the
+        // shared cluster. Other tests don't pay any meaningful cost — the 
coordinator
+        // only does work when a client opts in via enableTransaction=true.
+        config.setTransactionCoordinatorEnabled(true);
+        config.setTransactionBufferSnapshotMaxTransactionCount(2);
+        config.setTransactionBufferSnapshotMinTimeInMillis(2000);
+
         pulsarService = new PulsarService(config);
         pulsarService.start();
 
@@ -190,6 +200,22 @@ public class SharedPulsarCluster {
                         .allowedClusters(Set.of(CLUSTER_NAME))
                         .build());
 
+        // Set up the system namespace + transaction-coordinator partitioned 
topic so
+        // the broker can serve transaction requests from V5 (and v4) clients. 
The
+        // coordinator topic lives in pulsar/system, which the public admin 
API rejects
+        // (system-topic format), so we go through pulsarResources directly — 
same path
+        // TransactionTestBase uses.
+        
admin.tenants().createTenant(NamespaceName.SYSTEM_NAMESPACE.getTenant(),
+                TenantInfo.builder()
+                        .allowedClusters(Set.of(CLUSTER_NAME))
+                        .build());
+        
admin.namespaces().createNamespace(NamespaceName.SYSTEM_NAMESPACE.toString());
+        pulsarService.getPulsarResources()
+                .getNamespaceResources()
+                .getPartitionedTopicResources()
+                
.createPartitionedTopic(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN,
+                        new PartitionedTopicMetadata(1));
+
         log.info().attr("startedBroker", pulsarService.getBrokerServiceUrl())
                 .attr("web", 
pulsarService.getWebServiceAddress()).log("SharedPulsarCluster started. broker= 
web");
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5TransactionTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5TransactionTest.java
new file mode 100644
index 00000000000..23ede822b65
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5TransactionTest.java
@@ -0,0 +1,389 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api.v5;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertTrue;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import lombok.Cleanup;
+import org.apache.pulsar.client.api.v5.config.SubscriptionInitialPosition;
+import org.apache.pulsar.client.api.v5.config.TransactionPolicy;
+import org.apache.pulsar.client.api.v5.schema.Schema;
+import org.awaitility.Awaitility;
+import org.testng.annotations.Test;
+
+/**
+ * Coverage for the V5 transaction wire: opening a transaction on a 
transaction-enabled
+ * client, sending and acknowledging transactional messages through the V5 
producer
+ * and consumer on a scalable topic, and committing or aborting the 
transaction.
+ */
+public class V5TransactionTest extends V5ClientBaseTest {
+
+    private PulsarClient newTxnClient() throws Exception {
+        return track(PulsarClient.builder()
+                .serviceUrl(getBrokerServiceUrl())
+                .transactionPolicy(new 
TransactionPolicy(Duration.ofMinutes(1)))
+                .build());
+    }
+
+    @Test
+    public void testAbortMovesTransactionToAbortedState() throws Exception {
+        // Open a transaction-enabled V5 client, send transactional messages 
through the
+        // V5 producer to a scalable topic, then abort. The transaction state 
must move
+        // OPEN → ABORTED — exercises the V5 newTransaction / message-builder
+        // transaction / Transaction.abort path end-to-end.
+        PulsarClient client = newTxnClient();
+        String topic = newScalableTopic(1);
+
+        @Cleanup
+        Producer<String> producer = client.newProducer(Schema.string())
+                .topic(topic)
+                .create();
+
+        Transaction txn = client.newTransaction();
+        assertEquals(txn.state(), Transaction.State.OPEN);
+        for (int i = 0; i < 5; i++) {
+            producer.newMessage().transaction(txn).value("v-" + i).send();
+        }
+        txn.abort();
+        assertEquals(txn.state(), Transaction.State.ABORTED);
+    }
+
+    @Test
+    public void testCommitMakesProducedMessagesVisible() throws Exception {
+        PulsarClient client = newTxnClient();
+        String topic = newScalableTopic(1);
+
+        @Cleanup
+        Producer<String> producer = client.newProducer(Schema.string())
+                .topic(topic)
+                .create();
+        @Cleanup
+        QueueConsumer<String> consumer = 
client.newQueueConsumer(Schema.string())
+                .topic(topic)
+                .subscriptionName("txn-commit-sub")
+                
.subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST)
+                .subscribe();
+
+        Transaction txn = client.newTransaction();
+        Set<String> sent = new HashSet<>();
+        for (int i = 0; i < 5; i++) {
+            String v = "v-" + i;
+            producer.newMessage().transaction(txn).value(v).send();
+            sent.add(v);
+        }
+        // Pre-commit: nothing visible yet.
+        assertNull(consumer.receive(Duration.ofMillis(500)),
+                "transactional sends must not be visible before commit");
+
+        txn.commit();
+        assertEquals(txn.state(), Transaction.State.COMMITTED);
+
+        Set<String> received = new HashSet<>();
+        for (int i = 0; i < 5; i++) {
+            Message<String> msg = consumer.receive(Duration.ofSeconds(5));
+            assertNotNull(msg, "missed committed message #" + i);
+            received.add(msg.value());
+            consumer.acknowledge(msg.id());
+        }
+        assertEquals(received, sent, "every committed message must be 
delivered");
+    }
+
+    @Test
+    public void testAbortedMessagesAreNeverDelivered() throws Exception {
+        // After abort, transactional messages must never be delivered. Use a 
non-txn
+        // sentinel published after the abort to prove that the consumer is 
alive and
+        // delivering — and that the only message it sees is the sentinel.
+        PulsarClient client = newTxnClient();
+        String topic = newScalableTopic(1);
+
+        @Cleanup
+        Producer<String> producer = client.newProducer(Schema.string())
+                .topic(topic)
+                .create();
+        @Cleanup
+        QueueConsumer<String> consumer = 
client.newQueueConsumer(Schema.string())
+                .topic(topic)
+                .subscriptionName("txn-abort-sub")
+                
.subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST)
+                .subscribe();
+
+        Transaction txn = client.newTransaction();
+        for (int i = 0; i < 5; i++) {
+            producer.newMessage().transaction(txn).value("v-" + i).send();
+        }
+        txn.abort();
+        assertEquals(txn.state(), Transaction.State.ABORTED);
+
+        producer.newMessage().value("sentinel").send();
+
+        Message<String> msg = consumer.receive(Duration.ofSeconds(5));
+        assertNotNull(msg, "non-transactional sentinel must be delivered");
+        assertEquals(msg.value(), "sentinel", "aborted txn messages must not 
precede the sentinel");
+        consumer.acknowledge(msg.id());
+
+        assertNull(consumer.receive(Duration.ofMillis(500)),
+                "no further messages after the sentinel — the aborted txn must 
be discarded");
+    }
+
+    @Test
+    public void testCommitAcrossMultipleTopics() throws Exception {
+        // A single transaction writing to two distinct scalable topics 
commits atomically:
+        // both topics see their messages only after commit.
+        PulsarClient client = newTxnClient();
+        String topicA = newScalableTopic(1);
+        String topicB = newScalableTopic(1);
+
+        @Cleanup
+        Producer<String> prodA = 
client.newProducer(Schema.string()).topic(topicA).create();
+        @Cleanup
+        Producer<String> prodB = 
client.newProducer(Schema.string()).topic(topicB).create();
+
+        @Cleanup
+        QueueConsumer<String> consA = client.newQueueConsumer(Schema.string())
+                .topic(topicA).subscriptionName("multi-topic-a")
+                
.subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST)
+                .subscribe();
+        @Cleanup
+        QueueConsumer<String> consB = client.newQueueConsumer(Schema.string())
+                .topic(topicB).subscriptionName("multi-topic-b")
+                
.subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST)
+                .subscribe();
+
+        Transaction txn = client.newTransaction();
+        prodA.newMessage().transaction(txn).value("a-1").send();
+        prodB.newMessage().transaction(txn).value("b-1").send();
+
+        assertNull(consA.receive(Duration.ofMillis(500)), "topic A: nothing 
visible pre-commit");
+        assertNull(consB.receive(Duration.ofMillis(500)), "topic B: nothing 
visible pre-commit");
+
+        txn.commit();
+        assertEquals(txn.state(), Transaction.State.COMMITTED);
+
+        Message<String> ma = consA.receive(Duration.ofSeconds(5));
+        assertNotNull(ma, "topic A: expected message after commit");
+        assertEquals(ma.value(), "a-1");
+
+        Message<String> mb = consB.receive(Duration.ofSeconds(5));
+        assertNotNull(mb, "topic B: expected message after commit");
+        assertEquals(mb.value(), "b-1");
+    }
+
+    // Disabled: documents a real broker gap in the per-segment 
TransactionBuffer model.
+    // After split, the parent segment is sealed; the txn coordinator's COMMIT 
END-TXN to
+    // the sealed parent never returns and the commit RPC times out (~30s). 
The fix
+    // requires moving transaction tracking up to the scalable-topic level (so 
layout
+    // changes don't strand in-flight transactions on now-sealed segments) — 
that's
+    // beyond the scope of this commit.
+    @Test(enabled = false)
+    public void testCommitSpansSplit() throws Exception {
+        // A single transaction whose lifetime spans a layout-changing split 
must commit
+        // atomically: pre-split writes (on the now-sealed parent) and 
post-split writes
+        // (on the new children) all become visible together at commit. 
Exercises commit
+        // markers landing on a sealed segment as well as the live children.
+        PulsarClient client = newTxnClient();
+        String topic = newScalableTopic(1);
+
+        @Cleanup
+        Producer<String> producer = 
client.newProducer(Schema.string()).topic(topic).create();
+        @Cleanup
+        QueueConsumer<String> consumer = 
client.newQueueConsumer(Schema.string())
+                .topic(topic).subscriptionName("split-txn-sub")
+                
.subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST)
+                .subscribe();
+
+        Transaction txn = client.newTransaction();
+
+        // First batch: lands on the only initial segment.
+        Set<String> sent = new HashSet<>();
+        for (int i = 0; i < 5; i++) {
+            String v = "before-split-" + i;
+            producer.newMessage().key("k-" + 
i).transaction(txn).value(v).send();
+            sent.add(v);
+        }
+
+        long activeSegmentId = -1;
+        var meta = admin.scalableTopics().getMetadata(topic);
+        for (var seg : meta.getSegments().values()) {
+            if (seg.isActive()) {
+                activeSegmentId = seg.getSegmentId();
+                break;
+            }
+        }
+        assertTrue(activeSegmentId >= 0, "expected exactly one active segment 
before split");
+        admin.scalableTopics().splitSegment(topic, activeSegmentId);
+
+        // Wait for the producer's view to reflect the new layout.
+        Awaitility.await().untilAsserted(() -> {
+            int active = 0;
+            var m = admin.scalableTopics().getMetadata(topic);
+            for (var seg : m.getSegments().values()) {
+                if (seg.isActive()) {
+                    active++;
+                }
+            }
+            assertEquals(active, 2, "split must produce 2 active children");
+        });
+
+        // Second batch: lands on the new children, still inside the same 
transaction.
+        for (int i = 0; i < 5; i++) {
+            String v = "after-split-" + i;
+            producer.newMessage().key("k-after-" + 
i).transaction(txn).value(v).send();
+            sent.add(v);
+        }
+
+        assertNull(consumer.receive(Duration.ofMillis(500)),
+                "transactional sends must not be visible before commit");
+
+        txn.commit();
+        assertEquals(txn.state(), Transaction.State.COMMITTED);
+
+        Set<String> received = new HashSet<>();
+        for (int i = 0; i < sent.size(); i++) {
+            Message<String> msg = consumer.receive(Duration.ofSeconds(10));
+            assertNotNull(msg, "missed message #" + i + " (received so far: " 
+ received.size() + ")");
+            received.add(msg.value());
+            consumer.acknowledge(msg.id());
+        }
+        assertEquals(received, sent, "all txn messages across the split must 
be delivered after commit");
+    }
+
+    // Disabled: same broker gap as testCommitSpansSplit. After merge, both 
source
+    // segments are sealed and the COMMIT marker can't be delivered, so the 
END-TXN
+    // request times out.
+    @Test(enabled = false)
+    public void testCommitSpansMerge() throws Exception {
+        // A single transaction whose lifetime spans a layout-changing merge 
must commit
+        // atomically: writes to the two pre-merge segments and writes to the 
post-merge
+        // segment all become visible together.
+        PulsarClient client = newTxnClient();
+        String topic = newScalableTopic(2);
+
+        @Cleanup
+        Producer<String> producer = 
client.newProducer(Schema.string()).topic(topic).create();
+        @Cleanup
+        QueueConsumer<String> consumer = 
client.newQueueConsumer(Schema.string())
+                .topic(topic).subscriptionName("merge-txn-sub")
+                
.subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST)
+                .subscribe();
+
+        Transaction txn = client.newTransaction();
+
+        // First batch: lands on the two initial segments.
+        Set<String> sent = new HashSet<>();
+        for (int i = 0; i < 10; i++) {
+            String v = "before-merge-" + i;
+            producer.newMessage().key("k-" + 
i).transaction(txn).value(v).send();
+            sent.add(v);
+        }
+
+        var meta = admin.scalableTopics().getMetadata(topic);
+        List<Long> activeIds = new ArrayList<>();
+        for (var seg : meta.getSegments().values()) {
+            if (seg.isActive()) {
+                activeIds.add(seg.getSegmentId());
+            }
+        }
+        assertEquals(activeIds.size(), 2, "expected 2 active segments before 
merge");
+        admin.scalableTopics().mergeSegments(topic, activeIds.get(0), 
activeIds.get(1));
+
+        Awaitility.await().untilAsserted(() -> {
+            int active = 0;
+            var m = admin.scalableTopics().getMetadata(topic);
+            for (var seg : m.getSegments().values()) {
+                if (seg.isActive()) {
+                    active++;
+                }
+            }
+            assertEquals(active, 1, "merge must collapse to 1 active segment");
+        });
+
+        // Second batch: lands on the new sole segment, still inside the same 
transaction.
+        for (int i = 0; i < 5; i++) {
+            String v = "after-merge-" + i;
+            producer.newMessage().key("k-after-" + 
i).transaction(txn).value(v).send();
+            sent.add(v);
+        }
+
+        assertNull(consumer.receive(Duration.ofMillis(500)),
+                "transactional sends must not be visible before commit");
+
+        txn.commit();
+        assertEquals(txn.state(), Transaction.State.COMMITTED);
+
+        Set<String> received = new HashSet<>();
+        for (int i = 0; i < sent.size(); i++) {
+            Message<String> msg = consumer.receive(Duration.ofSeconds(10));
+            assertNotNull(msg, "missed message #" + i + " (received so far: " 
+ received.size() + ")");
+            received.add(msg.value());
+            consumer.acknowledge(msg.id());
+        }
+        assertEquals(received, sent, "all txn messages across the merge must 
be delivered after commit");
+    }
+
+    @Test
+    public void testTransactionalConsumeAndProduce() throws Exception {
+        // Classic consume-transform-produce: ack input + produce derived 
output in one
+        // transaction. The output is invisible pre-commit; commit makes it 
visible.
+        PulsarClient client = newTxnClient();
+        String inputTopic = newScalableTopic(1);
+        String outputTopic = newScalableTopic(1);
+
+        // Seed an input message non-transactionally.
+        @Cleanup
+        Producer<String> seed = 
client.newProducer(Schema.string()).topic(inputTopic).create();
+        seed.newMessage().value("hello").send();
+
+        @Cleanup
+        QueueConsumer<String> input = client.newQueueConsumer(Schema.string())
+                .topic(inputTopic).subscriptionName("input-sub")
+                
.subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST)
+                .subscribe();
+        @Cleanup
+        Producer<String> output = 
client.newProducer(Schema.string()).topic(outputTopic).create();
+        @Cleanup
+        QueueConsumer<String> verify = client.newQueueConsumer(Schema.string())
+                .topic(outputTopic).subscriptionName("verify-sub")
+                
.subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST)
+                .subscribe();
+
+        Message<String> in = input.receive(Duration.ofSeconds(5));
+        assertNotNull(in, "seed input message must be delivered");
+
+        Transaction txn = client.newTransaction();
+        
output.newMessage().transaction(txn).value(in.value().toUpperCase()).send();
+        input.acknowledge(in.id(), txn);
+
+        assertNull(verify.receive(Duration.ofMillis(500)),
+                "transactional output must not be visible before commit");
+
+        txn.commit();
+        assertEquals(txn.state(), Transaction.State.COMMITTED);
+
+        Message<String> out = verify.receive(Duration.ofSeconds(5));
+        assertNotNull(out, "committed output must be delivered");
+        assertEquals(out.value(), "HELLO");
+    }
+}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerAckTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerAckTest.java
index 23c498d2654..973fdc6e17a 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerAckTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerAckTest.java
@@ -19,20 +19,14 @@
 package org.apache.pulsar.client.impl;
 
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.Mockito.doNothing;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertTrue;
-import static org.testng.Assert.fail;
 import java.io.Closeable;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Set;
-import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
@@ -49,9 +43,6 @@ import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Schema;
-import org.apache.pulsar.client.api.SubscriptionType;
-import org.apache.pulsar.client.impl.transaction.TransactionImpl;
-import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
@@ -61,24 +52,15 @@ import org.testng.collections.Sets;
 @Test(groups = "broker-impl")
 public class ConsumerAckTest extends SharedPulsarBaseTest {
 
-    private TransactionImpl transaction;
     private PulsarClient clientWithStats;
-    @SuppressWarnings("deprecation")
 
+    @SuppressWarnings("deprecation")
     @BeforeMethod(alwaysRun = true)
     public void setupConsumerAckTest() throws Exception {
         this.clientWithStats = PulsarClient.builder()
                 .serviceUrl(getBrokerServiceUrl())
                 .statsInterval(30, TimeUnit.SECONDS)
                 .build();
-        transaction = mock(TransactionImpl.class);
-        doReturn(1L).when(transaction).getTxnIdLeastBits();
-        doReturn(1L).when(transaction).getTxnIdMostBits();
-        doReturn(TransactionImpl.State.OPEN).when(transaction).getState();
-        CompletableFuture<Void> completableFuture = 
CompletableFuture.completedFuture(null);
-        doNothing().when(transaction).registerAckOp(any());
-        doReturn(true).when(transaction).checkIfOpen(any());
-        
doReturn(completableFuture).when(transaction).registerAckedTopic(any(), any());
     }
 
     @AfterMethod(alwaysRun = true)
@@ -88,38 +70,6 @@ public class ConsumerAckTest extends SharedPulsarBaseTest {
         }
     }
 
-    @Test
-    public void testAckResponse() throws PulsarClientException, 
InterruptedException {
-        String topic = newTopicName();
-        @Cleanup
-        Producer<Integer> producer = pulsarClient.newProducer(Schema.INT32)
-                .topic(topic)
-                .enableBatching(false)
-                .create();
-        @Cleanup
-        ConsumerImpl<Integer> consumer = (ConsumerImpl<Integer>) 
pulsarClient.newConsumer(Schema.INT32)
-                .topic(topic)
-                .subscriptionName("sub")
-                .subscriptionType(SubscriptionType.Shared)
-                .ackTimeout(1, TimeUnit.SECONDS)
-                .subscribe();
-        producer.send(1);
-        producer.send(2);
-        try {
-            consumer.acknowledgeAsync(new MessageIdImpl(1, 1, 1), 
transaction).get();
-            fail();
-        } catch (ExecutionException e) {
-            Assert.assertTrue(e.getCause() instanceof 
PulsarClientException.NotAllowedException);
-        }
-        Message<Integer> message = consumer.receive();
-
-        try {
-            consumer.acknowledgeAsync(message.getMessageId(), 
transaction).get();
-            fail();
-        } catch (ExecutionException e) {
-            Assert.assertTrue(e.getCause() instanceof 
PulsarClientException.NotAllowedException);
-        }
-    }
     @Test(timeOut = 30000)
     public void testAckReceipt() throws Exception {
         String topic = newTopicName();
diff --git 
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/AsyncMessageBuilderV5.java
 
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/AsyncMessageBuilderV5.java
index 974ba10b80c..896e2c75e5e 100644
--- 
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/AsyncMessageBuilderV5.java
+++ 
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/AsyncMessageBuilderV5.java
@@ -43,6 +43,7 @@ final class AsyncMessageBuilderV5<T> implements 
AsyncMessageBuilder<T> {
     private Duration deliverAfter;
     private Instant deliverAt;
     private List<String> replicationClusters;
+    private Transaction txn;
 
     AsyncMessageBuilderV5(ScalableTopicProducer<T> producer) {
         this.producer = producer;
@@ -52,7 +53,7 @@ final class AsyncMessageBuilderV5<T> implements 
AsyncMessageBuilder<T> {
     public CompletableFuture<MessageId> send() {
         return producer.sendInternalAsync(
                 key, value, properties, eventTime, sequenceId,
-                deliverAfter, deliverAt, replicationClusters)
+                deliverAfter, deliverAt, replicationClusters, txn)
                 .thenApply(id -> id);
     }
 
@@ -70,7 +71,7 @@ final class AsyncMessageBuilderV5<T> implements 
AsyncMessageBuilder<T> {
 
     @Override
     public AsyncMessageBuilderV5<T> transaction(Transaction txn) {
-        // TODO: Wire up transaction support
+        this.txn = txn;
         return this;
     }
 
diff --git 
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/MessageBuilderV5.java
 
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/MessageBuilderV5.java
index 5e6e598f0e6..910f25f27a0 100644
--- 
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/MessageBuilderV5.java
+++ 
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/MessageBuilderV5.java
@@ -43,6 +43,7 @@ final class MessageBuilderV5<T> implements MessageBuilder<T> {
     private Duration deliverAfter;
     private Instant deliverAt;
     private List<String> replicationClusters;
+    private Transaction txn;
 
     MessageBuilderV5(ScalableTopicProducer<T> producer) {
         this.producer = producer;
@@ -52,7 +53,7 @@ final class MessageBuilderV5<T> implements MessageBuilder<T> {
     public MessageId send() throws PulsarClientException {
         return producer.sendInternal(
                 key, value, properties, eventTime, sequenceId,
-                deliverAfter, deliverAt, replicationClusters);
+                deliverAfter, deliverAt, replicationClusters, txn);
     }
 
     @Override
@@ -69,7 +70,7 @@ final class MessageBuilderV5<T> implements MessageBuilder<T> {
 
     @Override
     public MessageBuilderV5<T> transaction(Transaction txn) {
-        // TODO: Wire up transaction support
+        this.txn = txn;
         return this;
     }
 
diff --git 
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/PulsarClientBuilderV5.java
 
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/PulsarClientBuilderV5.java
index 1c1fe6ccc9b..ea6afd012db 100644
--- 
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/PulsarClientBuilderV5.java
+++ 
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/PulsarClientBuilderV5.java
@@ -39,6 +39,7 @@ final class PulsarClientBuilderV5 implements 
PulsarClientBuilder {
 
     private final ClientConfigurationData conf = new ClientConfigurationData();
     private String description;
+    private Duration transactionTimeout;
 
     PulsarClientBuilderV5() {
         conf.setStatsIntervalSeconds(0);
@@ -48,7 +49,7 @@ final class PulsarClientBuilderV5 implements 
PulsarClientBuilder {
     public PulsarClient build() throws PulsarClientException {
         try {
             var v4Client = new PulsarClientImpl(conf);
-            return new PulsarClientV5(v4Client, description);
+            return new PulsarClientV5(v4Client, description, 
transactionTimeout);
         } catch (org.apache.pulsar.client.api.PulsarClientException e) {
             throw new PulsarClientException(e.getMessage(), e);
         }
@@ -103,8 +104,8 @@ final class PulsarClientBuilderV5 implements 
PulsarClientBuilder {
 
     @Override
     public PulsarClientBuilder transactionPolicy(TransactionPolicy policy) {
-        // TransactionPolicy enables transactions with a default timeout
         conf.setEnableTransaction(true);
+        this.transactionTimeout = policy.timeout();
         return this;
     }
 
diff --git 
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/PulsarClientV5.java
 
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/PulsarClientV5.java
index c09cd7c664a..792d9ea060c 100644
--- 
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/PulsarClientV5.java
+++ 
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/PulsarClientV5.java
@@ -18,7 +18,11 @@
  */
 package org.apache.pulsar.client.impl.v5;
 
+import java.time.Duration;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
 import org.apache.pulsar.client.api.v5.CheckpointConsumerBuilder;
 import org.apache.pulsar.client.api.v5.ProducerBuilder;
 import org.apache.pulsar.client.api.v5.PulsarClient;
@@ -37,10 +41,12 @@ final class PulsarClientV5 implements PulsarClient {
 
     private final PulsarClientImpl v4Client;
     private final String description;
+    private final Duration transactionTimeout;
 
-    PulsarClientV5(PulsarClientImpl v4Client, String description) {
+    PulsarClientV5(PulsarClientImpl v4Client, String description, Duration 
transactionTimeout) {
         this.v4Client = v4Client;
         this.description = description;
+        this.transactionTimeout = transactionTimeout;
     }
 
     /**
@@ -72,13 +78,24 @@ final class PulsarClientV5 implements PulsarClient {
 
     @Override
     public Transaction newTransaction() throws PulsarClientException {
-        throw new PulsarClientException("Transactions not yet implemented");
+        try {
+            return newTransactionAsync().get();
+        } catch (ExecutionException e) {
+            Throwable cause = e.getCause() != null ? e.getCause() : e;
+            throw new PulsarClientException(cause.getMessage(), cause);
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarClientException("Interrupted while creating 
transaction", e);
+        }
     }
 
     @Override
     public CompletableFuture<Transaction> newTransactionAsync() {
-        return CompletableFuture.failedFuture(
-                new PulsarClientException("Transactions not yet implemented"));
+        var builder = v4Client.newTransaction();
+        if (transactionTimeout != null) {
+            builder.withTransactionTimeout(transactionTimeout.toMillis(), 
TimeUnit.MILLISECONDS);
+        }
+        return builder.build().thenApply(v4Txn -> (Transaction) new 
TransactionV5(v4Txn));
     }
 
     @Override
@@ -93,8 +110,7 @@ final class PulsarClientV5 implements PulsarClient {
     @Override
     public CompletableFuture<Void> closeAsync() {
         return v4Client.closeAsync().exceptionally(ex -> {
-            throw new java.util.concurrent.CompletionException(
-                    new PulsarClientException(ex.getMessage(), ex));
+            throw new CompletionException(new 
PulsarClientException(ex.getMessage(), ex));
         });
     }
 
diff --git 
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableQueueConsumer.java
 
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableQueueConsumer.java
index eed2c41bd1f..61f5e87644d 100644
--- 
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableQueueConsumer.java
+++ 
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableQueueConsumer.java
@@ -170,7 +170,13 @@ final class ScalableQueueConsumer<T> implements 
QueueConsumer<T>, DagWatchClient
 
     @Override
     public void acknowledge(MessageId messageId, Transaction txn) {
-        throw new UnsupportedOperationException("Transactional ack not yet 
implemented");
+        if (!(messageId instanceof MessageIdV5 id)) {
+            throw new IllegalArgumentException("Expected MessageIdV5, got: " + 
messageId.getClass());
+        }
+        var future = segmentConsumers.get(id.segmentId());
+        if (future != null) {
+            future.thenAccept(c -> c.acknowledgeAsync(id.v4MessageId(), 
TransactionV5.unwrap(txn)));
+        }
     }
 
     @Override
diff --git 
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableStreamConsumer.java
 
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableStreamConsumer.java
index e33191c2861..ddffea7d84d 100644
--- 
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableStreamConsumer.java
+++ 
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableStreamConsumer.java
@@ -208,8 +208,16 @@ final class ScalableStreamConsumer<T>
 
     @Override
     public void acknowledgeCumulative(MessageId messageId, Transaction txn) {
-        // Transaction support not yet implemented
-        throw new UnsupportedOperationException("Transactional ack not yet 
implemented");
+        if (!(messageId instanceof MessageIdV5 id)) {
+            throw new IllegalArgumentException("Expected MessageIdV5, got: " + 
messageId.getClass());
+        }
+        var v4Txn = TransactionV5.unwrap(txn);
+        for (var entry : id.positionVector().entrySet()) {
+            var future = segmentConsumers.get(entry.getKey());
+            if (future != null) {
+                future.thenAccept(c -> 
c.acknowledgeCumulativeAsync(entry.getValue(), v4Txn));
+            }
+        }
     }
 
     @Override
diff --git 
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableTopicProducer.java
 
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableTopicProducer.java
index 01fe8b0d3d1..fd8ec3b5747 100644
--- 
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableTopicProducer.java
+++ 
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableTopicProducer.java
@@ -149,7 +149,8 @@ final class ScalableTopicProducer<T> implements 
Producer<T>, DagWatchClient.Layo
             String key, T value, java.util.Map<String, String> properties,
             java.time.Instant eventTime, Long sequenceId,
             java.time.Duration deliverAfter, java.time.Instant deliverAt,
-            java.util.List<String> replicationClusters) throws 
PulsarClientException {
+            java.util.List<String> replicationClusters,
+            org.apache.pulsar.client.api.v5.Transaction txn) throws 
PulsarClientException {
 
         for (int attempt = 0; attempt < 3; attempt++) {
             long segmentId = routeMessage(key);
@@ -157,7 +158,7 @@ final class ScalableTopicProducer<T> implements 
Producer<T>, DagWatchClient.Layo
 
             try {
                 var v4MsgId = buildV4Message(producer, key, value, properties,
-                        eventTime, sequenceId, deliverAfter, deliverAt, 
replicationClusters)
+                        eventTime, sequenceId, deliverAfter, deliverAt, 
replicationClusters, txn)
                         .send();
                 return new MessageIdV5(v4MsgId, segmentId);
             } catch 
(org.apache.pulsar.client.api.PulsarClientException.TopicTerminatedException
@@ -193,17 +194,19 @@ final class ScalableTopicProducer<T> implements 
Producer<T>, DagWatchClient.Layo
             String key, T value, java.util.Map<String, String> properties,
             java.time.Instant eventTime, Long sequenceId,
             java.time.Duration deliverAfter, java.time.Instant deliverAt,
-            java.util.List<String> replicationClusters) {
+            java.util.List<String> replicationClusters,
+            org.apache.pulsar.client.api.v5.Transaction txn) {
 
         return sendInternalAsyncWithRetry(key, value, properties,
-                eventTime, sequenceId, deliverAfter, deliverAt, 
replicationClusters, 0);
+                eventTime, sequenceId, deliverAfter, deliverAt, 
replicationClusters, txn, 0);
     }
 
     private CompletableFuture<MessageIdV5> sendInternalAsyncWithRetry(
             String key, T value, java.util.Map<String, String> properties,
             java.time.Instant eventTime, Long sequenceId,
             java.time.Duration deliverAfter, java.time.Instant deliverAt,
-            java.util.List<String> replicationClusters, int attempt) {
+            java.util.List<String> replicationClusters,
+            org.apache.pulsar.client.api.v5.Transaction txn, int attempt) {
 
         long segmentId;
         try {
@@ -220,7 +223,7 @@ final class ScalableTopicProducer<T> implements 
Producer<T>, DagWatchClient.Layo
         }
 
         return buildV4Message(producer, key, value, properties,
-                eventTime, sequenceId, deliverAfter, deliverAt, 
replicationClusters)
+                eventTime, sequenceId, deliverAfter, deliverAt, 
replicationClusters, txn)
                 .sendAsync()
                 .thenApply(v4MsgId -> new MessageIdV5(v4MsgId, segmentId))
                 .exceptionallyCompose(ex -> {
@@ -241,7 +244,8 @@ final class ScalableTopicProducer<T> implements 
Producer<T>, DagWatchClient.Layo
                                         
java.util.concurrent.TimeUnit.MILLISECONDS))
                                 .thenCompose(__ -> sendInternalAsyncWithRetry(
                                         key, value, properties, eventTime, 
sequenceId,
-                                        deliverAfter, deliverAt, 
replicationClusters, attempt + 1));
+                                        deliverAfter, deliverAt, 
replicationClusters,
+                                        txn, attempt + 1));
                     }
                     return CompletableFuture.failedFuture(ex);
                 });
@@ -252,9 +256,12 @@ final class ScalableTopicProducer<T> implements 
Producer<T>, DagWatchClient.Layo
             String key, T value, java.util.Map<String, String> properties,
             java.time.Instant eventTime, Long sequenceId,
             java.time.Duration deliverAfter, java.time.Instant deliverAt,
-            java.util.List<String> replicationClusters) {
+            java.util.List<String> replicationClusters,
+            org.apache.pulsar.client.api.v5.Transaction txn) {
 
-        var msgBuilder = producer.newMessage().value(value);
+        org.apache.pulsar.client.api.transaction.Transaction v4Txn = 
TransactionV5.unwrap(txn);
+        var msgBuilder = (v4Txn != null ? producer.newMessage(v4Txn) : 
producer.newMessage())
+                .value(value);
 
         if (key != null) {
             msgBuilder.key(key);
diff --git 
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/TransactionV5.java
 
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/TransactionV5.java
new file mode 100644
index 00000000000..ff7fa5223c1
--- /dev/null
+++ 
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/TransactionV5.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.v5;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import org.apache.pulsar.client.api.v5.PulsarClientException;
+import org.apache.pulsar.client.api.v5.Transaction;
+import org.apache.pulsar.client.api.v5.async.AsyncTransaction;
+
+/**
+ * V5 {@link Transaction} that wraps a v4 {@code 
org.apache.pulsar.client.api.transaction.Transaction}.
+ * Per-message and per-ack hand-off uses the underlying v4 instance via {@link 
#v4Transaction()}.
+ */
+final class TransactionV5 implements Transaction {
+
+    private final org.apache.pulsar.client.api.transaction.Transaction 
v4Transaction;
+    private final AsyncTransaction asyncView;
+
+    TransactionV5(org.apache.pulsar.client.api.transaction.Transaction 
v4Transaction) {
+        this.v4Transaction = v4Transaction;
+        this.asyncView = new AsyncView();
+    }
+
+    org.apache.pulsar.client.api.transaction.Transaction v4Transaction() {
+        return v4Transaction;
+    }
+
+    @Override
+    public void commit() throws PulsarClientException {
+        try {
+            v4Transaction.commit().get();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarClientException("Commit interrupted", e);
+        } catch (ExecutionException e) {
+            Throwable cause = e.getCause() != null ? e.getCause() : e;
+            throw new PulsarClientException(cause.getMessage(), cause);
+        }
+    }
+
+    @Override
+    public void abort() throws PulsarClientException {
+        try {
+            v4Transaction.abort().get();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarClientException("Abort interrupted", e);
+        } catch (ExecutionException e) {
+            Throwable cause = e.getCause() != null ? e.getCause() : e;
+            throw new PulsarClientException(cause.getMessage(), cause);
+        }
+    }
+
+    @Override
+    public AsyncTransaction async() {
+        return asyncView;
+    }
+
+    @Override
+    public State state() {
+        return switch (v4Transaction.getState()) {
+            case OPEN -> State.OPEN;
+            case COMMITTING -> State.COMMITTING;
+            case ABORTING -> State.ABORTING;
+            case COMMITTED -> State.COMMITTED;
+            case ABORTED -> State.ABORTED;
+            case ERROR -> State.ERROR;
+            case TIME_OUT -> State.TIMED_OUT;
+        };
+    }
+
+    /**
+     * Unwrap the v4 transaction from a V5 handle, validating type. Returns 
null when
+     * the handle is null (no transaction context).
+     */
+    static org.apache.pulsar.client.api.transaction.Transaction 
unwrap(Transaction txn) {
+        if (txn == null) {
+            return null;
+        }
+        if (!(txn instanceof TransactionV5 v5)) {
+            throw new IllegalArgumentException("Expected TransactionV5, got: " 
+ txn.getClass());
+        }
+        return v5.v4Transaction;
+    }
+
+    private final class AsyncView implements AsyncTransaction {
+        @Override
+        public CompletableFuture<Void> commit() {
+            return v4Transaction.commit();
+        }
+
+        @Override
+        public CompletableFuture<Void> abort() {
+            return v4Transaction.abort();
+        }
+    }
+}


Reply via email to