This is an automated email from the ASF dual-hosted git repository.
merlimat pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new c28dd859c22 [feat]PIP-468: Wire transactions into V5 client (#25631)
c28dd859c22 is described below
commit c28dd859c221cf5895879226b49d940f483885ed
Author: Matteo Merli <[email protected]>
AuthorDate: Thu Apr 30 15:41:00 2026 -0700
[feat]PIP-468: Wire transactions into V5 client (#25631)
---
.../pendingack/impl/MLPendingAckStore.java | 13 +-
.../pulsar/broker/service/SharedPulsarCluster.java | 26 ++
.../pulsar/client/api/v5/V5TransactionTest.java | 389 +++++++++++++++++++++
.../apache/pulsar/client/impl/ConsumerAckTest.java | 52 +--
.../client/impl/v5/AsyncMessageBuilderV5.java | 5 +-
.../pulsar/client/impl/v5/MessageBuilderV5.java | 5 +-
.../client/impl/v5/PulsarClientBuilderV5.java | 5 +-
.../pulsar/client/impl/v5/PulsarClientV5.java | 28 +-
.../client/impl/v5/ScalableQueueConsumer.java | 8 +-
.../client/impl/v5/ScalableStreamConsumer.java | 12 +-
.../client/impl/v5/ScalableTopicProducer.java | 25 +-
.../pulsar/client/impl/v5/TransactionV5.java | 114 ++++++
12 files changed, 606 insertions(+), 76 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStore.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStore.java
index 3d2995b7f13..e10a93cbc1a 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStore.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStore.java
@@ -523,7 +523,18 @@ public class MLPendingAckStore implements PendingAckStore {
}
public static String getTransactionPendingAckStoreSuffix(String
originTopicName, String subName) {
- return TopicName.get(originTopicName) + "-" + subName +
SystemTopicNames.PENDING_ACK_STORE_SUFFIX;
+ TopicName origin = TopicName.get(originTopicName);
+ // Segment topics
("segment://tenant/ns/topic/<hexStart>-<hexEnd>-<segmentId>") cannot
+ // host a derived pending-ack topic in the segment domain — the
descriptor parser would
+ // reject any name with extra dashes appended. Map to a flat
persistent topic in the same
+ // namespace, encoding the segment descriptor into the local name.
+ if (origin.isSegment()) {
+ return String.format("persistent://%s/%s/%s-%s-%s%s",
+ origin.getTenant(), origin.getNamespacePortion(),
+ origin.getLocalName(), origin.getSegmentDescriptor(),
+ subName, SystemTopicNames.PENDING_ACK_STORE_SUFFIX);
+ }
+ return origin + "-" + subName +
SystemTopicNames.PENDING_ACK_STORE_SUFFIX;
}
public static String getTransactionPendingAckStoreCursorName() {
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SharedPulsarCluster.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SharedPulsarCluster.java
index 1866a5a9649..dc14b163e4d 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SharedPulsarCluster.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SharedPulsarCluster.java
@@ -28,6 +28,9 @@ import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.SystemTopicNames;
+import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.metadata.bookkeeper.BKCluster;
@@ -166,6 +169,13 @@ public class SharedPulsarCluster {
// Disable the load balancer — single-broker cluster doesn't need it
config.setLoadBalancerEnabled(false);
+ // Enable the transaction coordinator so V5 transaction tests can run
on the
+ // shared cluster. Other tests don't pay any meaningful cost — the
coordinator
+ // only does work when a client opts in via enableTransaction=true.
+ config.setTransactionCoordinatorEnabled(true);
+ config.setTransactionBufferSnapshotMaxTransactionCount(2);
+ config.setTransactionBufferSnapshotMinTimeInMillis(2000);
+
pulsarService = new PulsarService(config);
pulsarService.start();
@@ -190,6 +200,22 @@ public class SharedPulsarCluster {
.allowedClusters(Set.of(CLUSTER_NAME))
.build());
+ // Set up the system namespace + transaction-coordinator partitioned
topic so
+ // the broker can serve transaction requests from V5 (and v4) clients.
The
+ // coordinator topic lives in pulsar/system, which the public admin
API rejects
+ // (system-topic format), so we go through pulsarResources directly —
same path
+ // TransactionTestBase uses.
+
admin.tenants().createTenant(NamespaceName.SYSTEM_NAMESPACE.getTenant(),
+ TenantInfo.builder()
+ .allowedClusters(Set.of(CLUSTER_NAME))
+ .build());
+
admin.namespaces().createNamespace(NamespaceName.SYSTEM_NAMESPACE.toString());
+ pulsarService.getPulsarResources()
+ .getNamespaceResources()
+ .getPartitionedTopicResources()
+
.createPartitionedTopic(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN,
+ new PartitionedTopicMetadata(1));
+
log.info().attr("startedBroker", pulsarService.getBrokerServiceUrl())
.attr("web",
pulsarService.getWebServiceAddress()).log("SharedPulsarCluster started. broker=
web");
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5TransactionTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5TransactionTest.java
new file mode 100644
index 00000000000..23ede822b65
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5TransactionTest.java
@@ -0,0 +1,389 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api.v5;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertTrue;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import lombok.Cleanup;
+import org.apache.pulsar.client.api.v5.config.SubscriptionInitialPosition;
+import org.apache.pulsar.client.api.v5.config.TransactionPolicy;
+import org.apache.pulsar.client.api.v5.schema.Schema;
+import org.awaitility.Awaitility;
+import org.testng.annotations.Test;
+
+/**
+ * Coverage for the V5 transaction wire: opening a transaction on a
transaction-enabled
+ * client, sending and acknowledging transactional messages through the V5
producer
+ * and consumer on a scalable topic, and committing or aborting the
transaction.
+ */
+public class V5TransactionTest extends V5ClientBaseTest {
+
+ private PulsarClient newTxnClient() throws Exception {
+ return track(PulsarClient.builder()
+ .serviceUrl(getBrokerServiceUrl())
+ .transactionPolicy(new
TransactionPolicy(Duration.ofMinutes(1)))
+ .build());
+ }
+
+ @Test
+ public void testAbortMovesTransactionToAbortedState() throws Exception {
+ // Open a transaction-enabled V5 client, send transactional messages
through the
+ // V5 producer to a scalable topic, then abort. The transaction state
must move
+ // OPEN → ABORTED — exercises the V5 newTransaction / message-builder
+ // transaction / Transaction.abort path end-to-end.
+ PulsarClient client = newTxnClient();
+ String topic = newScalableTopic(1);
+
+ @Cleanup
+ Producer<String> producer = client.newProducer(Schema.string())
+ .topic(topic)
+ .create();
+
+ Transaction txn = client.newTransaction();
+ assertEquals(txn.state(), Transaction.State.OPEN);
+ for (int i = 0; i < 5; i++) {
+ producer.newMessage().transaction(txn).value("v-" + i).send();
+ }
+ txn.abort();
+ assertEquals(txn.state(), Transaction.State.ABORTED);
+ }
+
+ @Test
+ public void testCommitMakesProducedMessagesVisible() throws Exception {
+ PulsarClient client = newTxnClient();
+ String topic = newScalableTopic(1);
+
+ @Cleanup
+ Producer<String> producer = client.newProducer(Schema.string())
+ .topic(topic)
+ .create();
+ @Cleanup
+ QueueConsumer<String> consumer =
client.newQueueConsumer(Schema.string())
+ .topic(topic)
+ .subscriptionName("txn-commit-sub")
+
.subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST)
+ .subscribe();
+
+ Transaction txn = client.newTransaction();
+ Set<String> sent = new HashSet<>();
+ for (int i = 0; i < 5; i++) {
+ String v = "v-" + i;
+ producer.newMessage().transaction(txn).value(v).send();
+ sent.add(v);
+ }
+ // Pre-commit: nothing visible yet.
+ assertNull(consumer.receive(Duration.ofMillis(500)),
+ "transactional sends must not be visible before commit");
+
+ txn.commit();
+ assertEquals(txn.state(), Transaction.State.COMMITTED);
+
+ Set<String> received = new HashSet<>();
+ for (int i = 0; i < 5; i++) {
+ Message<String> msg = consumer.receive(Duration.ofSeconds(5));
+ assertNotNull(msg, "missed committed message #" + i);
+ received.add(msg.value());
+ consumer.acknowledge(msg.id());
+ }
+ assertEquals(received, sent, "every committed message must be
delivered");
+ }
+
+ @Test
+ public void testAbortedMessagesAreNeverDelivered() throws Exception {
+ // After abort, transactional messages must never be delivered. Use a
non-txn
+ // sentinel published after the abort to prove that the consumer is
alive and
+ // delivering — and that the only message it sees is the sentinel.
+ PulsarClient client = newTxnClient();
+ String topic = newScalableTopic(1);
+
+ @Cleanup
+ Producer<String> producer = client.newProducer(Schema.string())
+ .topic(topic)
+ .create();
+ @Cleanup
+ QueueConsumer<String> consumer =
client.newQueueConsumer(Schema.string())
+ .topic(topic)
+ .subscriptionName("txn-abort-sub")
+
.subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST)
+ .subscribe();
+
+ Transaction txn = client.newTransaction();
+ for (int i = 0; i < 5; i++) {
+ producer.newMessage().transaction(txn).value("v-" + i).send();
+ }
+ txn.abort();
+ assertEquals(txn.state(), Transaction.State.ABORTED);
+
+ producer.newMessage().value("sentinel").send();
+
+ Message<String> msg = consumer.receive(Duration.ofSeconds(5));
+ assertNotNull(msg, "non-transactional sentinel must be delivered");
+ assertEquals(msg.value(), "sentinel", "aborted txn messages must not
precede the sentinel");
+ consumer.acknowledge(msg.id());
+
+ assertNull(consumer.receive(Duration.ofMillis(500)),
+ "no further messages after the sentinel — the aborted txn must
be discarded");
+ }
+
+ @Test
+ public void testCommitAcrossMultipleTopics() throws Exception {
+ // A single transaction writing to two distinct scalable topics
commits atomically:
+ // both topics see their messages only after commit.
+ PulsarClient client = newTxnClient();
+ String topicA = newScalableTopic(1);
+ String topicB = newScalableTopic(1);
+
+ @Cleanup
+ Producer<String> prodA =
client.newProducer(Schema.string()).topic(topicA).create();
+ @Cleanup
+ Producer<String> prodB =
client.newProducer(Schema.string()).topic(topicB).create();
+
+ @Cleanup
+ QueueConsumer<String> consA = client.newQueueConsumer(Schema.string())
+ .topic(topicA).subscriptionName("multi-topic-a")
+
.subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST)
+ .subscribe();
+ @Cleanup
+ QueueConsumer<String> consB = client.newQueueConsumer(Schema.string())
+ .topic(topicB).subscriptionName("multi-topic-b")
+
.subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST)
+ .subscribe();
+
+ Transaction txn = client.newTransaction();
+ prodA.newMessage().transaction(txn).value("a-1").send();
+ prodB.newMessage().transaction(txn).value("b-1").send();
+
+ assertNull(consA.receive(Duration.ofMillis(500)), "topic A: nothing
visible pre-commit");
+ assertNull(consB.receive(Duration.ofMillis(500)), "topic B: nothing
visible pre-commit");
+
+ txn.commit();
+ assertEquals(txn.state(), Transaction.State.COMMITTED);
+
+ Message<String> ma = consA.receive(Duration.ofSeconds(5));
+ assertNotNull(ma, "topic A: expected message after commit");
+ assertEquals(ma.value(), "a-1");
+
+ Message<String> mb = consB.receive(Duration.ofSeconds(5));
+ assertNotNull(mb, "topic B: expected message after commit");
+ assertEquals(mb.value(), "b-1");
+ }
+
+ // Disabled: documents a real broker gap in the per-segment
TransactionBuffer model.
+ // After split, the parent segment is sealed; the txn coordinator's COMMIT
END-TXN to
+ // the sealed parent never returns and the commit RPC times out (~30s).
The fix
+ // requires moving transaction tracking up to the scalable-topic level (so
layout
+ // changes don't strand in-flight transactions on now-sealed segments) —
that's
+ // beyond the scope of this commit.
+ @Test(enabled = false)
+ public void testCommitSpansSplit() throws Exception {
+ // A single transaction whose lifetime spans a layout-changing split
must commit
+ // atomically: pre-split writes (on the now-sealed parent) and
post-split writes
+ // (on the new children) all become visible together at commit.
Exercises commit
+ // markers landing on a sealed segment as well as the live children.
+ PulsarClient client = newTxnClient();
+ String topic = newScalableTopic(1);
+
+ @Cleanup
+ Producer<String> producer =
client.newProducer(Schema.string()).topic(topic).create();
+ @Cleanup
+ QueueConsumer<String> consumer =
client.newQueueConsumer(Schema.string())
+ .topic(topic).subscriptionName("split-txn-sub")
+
.subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST)
+ .subscribe();
+
+ Transaction txn = client.newTransaction();
+
+ // First batch: lands on the only initial segment.
+ Set<String> sent = new HashSet<>();
+ for (int i = 0; i < 5; i++) {
+ String v = "before-split-" + i;
+ producer.newMessage().key("k-" +
i).transaction(txn).value(v).send();
+ sent.add(v);
+ }
+
+ long activeSegmentId = -1;
+ var meta = admin.scalableTopics().getMetadata(topic);
+ for (var seg : meta.getSegments().values()) {
+ if (seg.isActive()) {
+ activeSegmentId = seg.getSegmentId();
+ break;
+ }
+ }
+ assertTrue(activeSegmentId >= 0, "expected exactly one active segment
before split");
+ admin.scalableTopics().splitSegment(topic, activeSegmentId);
+
+ // Wait for the producer's view to reflect the new layout.
+ Awaitility.await().untilAsserted(() -> {
+ int active = 0;
+ var m = admin.scalableTopics().getMetadata(topic);
+ for (var seg : m.getSegments().values()) {
+ if (seg.isActive()) {
+ active++;
+ }
+ }
+ assertEquals(active, 2, "split must produce 2 active children");
+ });
+
+ // Second batch: lands on the new children, still inside the same
transaction.
+ for (int i = 0; i < 5; i++) {
+ String v = "after-split-" + i;
+ producer.newMessage().key("k-after-" +
i).transaction(txn).value(v).send();
+ sent.add(v);
+ }
+
+ assertNull(consumer.receive(Duration.ofMillis(500)),
+ "transactional sends must not be visible before commit");
+
+ txn.commit();
+ assertEquals(txn.state(), Transaction.State.COMMITTED);
+
+ Set<String> received = new HashSet<>();
+ for (int i = 0; i < sent.size(); i++) {
+ Message<String> msg = consumer.receive(Duration.ofSeconds(10));
+ assertNotNull(msg, "missed message #" + i + " (received so far: "
+ received.size() + ")");
+ received.add(msg.value());
+ consumer.acknowledge(msg.id());
+ }
+ assertEquals(received, sent, "all txn messages across the split must
be delivered after commit");
+ }
+
+ // Disabled: same broker gap as testCommitSpansSplit. After merge, both
source
+ // segments are sealed and the COMMIT marker can't be delivered, so the
END-TXN
+ // request times out.
+ @Test(enabled = false)
+ public void testCommitSpansMerge() throws Exception {
+ // A single transaction whose lifetime spans a layout-changing merge
must commit
+ // atomically: writes to the two pre-merge segments and writes to the
post-merge
+ // segment all become visible together.
+ PulsarClient client = newTxnClient();
+ String topic = newScalableTopic(2);
+
+ @Cleanup
+ Producer<String> producer =
client.newProducer(Schema.string()).topic(topic).create();
+ @Cleanup
+ QueueConsumer<String> consumer =
client.newQueueConsumer(Schema.string())
+ .topic(topic).subscriptionName("merge-txn-sub")
+
.subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST)
+ .subscribe();
+
+ Transaction txn = client.newTransaction();
+
+ // First batch: lands on the two initial segments.
+ Set<String> sent = new HashSet<>();
+ for (int i = 0; i < 10; i++) {
+ String v = "before-merge-" + i;
+ producer.newMessage().key("k-" +
i).transaction(txn).value(v).send();
+ sent.add(v);
+ }
+
+ var meta = admin.scalableTopics().getMetadata(topic);
+ List<Long> activeIds = new ArrayList<>();
+ for (var seg : meta.getSegments().values()) {
+ if (seg.isActive()) {
+ activeIds.add(seg.getSegmentId());
+ }
+ }
+ assertEquals(activeIds.size(), 2, "expected 2 active segments before
merge");
+ admin.scalableTopics().mergeSegments(topic, activeIds.get(0),
activeIds.get(1));
+
+ Awaitility.await().untilAsserted(() -> {
+ int active = 0;
+ var m = admin.scalableTopics().getMetadata(topic);
+ for (var seg : m.getSegments().values()) {
+ if (seg.isActive()) {
+ active++;
+ }
+ }
+ assertEquals(active, 1, "merge must collapse to 1 active segment");
+ });
+
+ // Second batch: lands on the new sole segment, still inside the same
transaction.
+ for (int i = 0; i < 5; i++) {
+ String v = "after-merge-" + i;
+ producer.newMessage().key("k-after-" +
i).transaction(txn).value(v).send();
+ sent.add(v);
+ }
+
+ assertNull(consumer.receive(Duration.ofMillis(500)),
+ "transactional sends must not be visible before commit");
+
+ txn.commit();
+ assertEquals(txn.state(), Transaction.State.COMMITTED);
+
+ Set<String> received = new HashSet<>();
+ for (int i = 0; i < sent.size(); i++) {
+ Message<String> msg = consumer.receive(Duration.ofSeconds(10));
+ assertNotNull(msg, "missed message #" + i + " (received so far: "
+ received.size() + ")");
+ received.add(msg.value());
+ consumer.acknowledge(msg.id());
+ }
+ assertEquals(received, sent, "all txn messages across the merge must
be delivered after commit");
+ }
+
+ @Test
+ public void testTransactionalConsumeAndProduce() throws Exception {
+ // Classic consume-transform-produce: ack input + produce derived
output in one
+ // transaction. The output is invisible pre-commit; commit makes it
visible.
+ PulsarClient client = newTxnClient();
+ String inputTopic = newScalableTopic(1);
+ String outputTopic = newScalableTopic(1);
+
+ // Seed an input message non-transactionally.
+ @Cleanup
+ Producer<String> seed =
client.newProducer(Schema.string()).topic(inputTopic).create();
+ seed.newMessage().value("hello").send();
+
+ @Cleanup
+ QueueConsumer<String> input = client.newQueueConsumer(Schema.string())
+ .topic(inputTopic).subscriptionName("input-sub")
+
.subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST)
+ .subscribe();
+ @Cleanup
+ Producer<String> output =
client.newProducer(Schema.string()).topic(outputTopic).create();
+ @Cleanup
+ QueueConsumer<String> verify = client.newQueueConsumer(Schema.string())
+ .topic(outputTopic).subscriptionName("verify-sub")
+
.subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST)
+ .subscribe();
+
+ Message<String> in = input.receive(Duration.ofSeconds(5));
+ assertNotNull(in, "seed input message must be delivered");
+
+ Transaction txn = client.newTransaction();
+
output.newMessage().transaction(txn).value(in.value().toUpperCase()).send();
+ input.acknowledge(in.id(), txn);
+
+ assertNull(verify.receive(Duration.ofMillis(500)),
+ "transactional output must not be visible before commit");
+
+ txn.commit();
+ assertEquals(txn.state(), Transaction.State.COMMITTED);
+
+ Message<String> out = verify.receive(Duration.ofSeconds(5));
+ assertNotNull(out, "committed output must be delivered");
+ assertEquals(out.value(), "HELLO");
+ }
+}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerAckTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerAckTest.java
index 23c498d2654..973fdc6e17a 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerAckTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerAckTest.java
@@ -19,20 +19,14 @@
package org.apache.pulsar.client.impl;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.Mockito.doNothing;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
-import static org.testng.Assert.fail;
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
-import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
@@ -49,9 +43,6 @@ import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
-import org.apache.pulsar.client.api.SubscriptionType;
-import org.apache.pulsar.client.impl.transaction.TransactionImpl;
-import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
@@ -61,24 +52,15 @@ import org.testng.collections.Sets;
@Test(groups = "broker-impl")
public class ConsumerAckTest extends SharedPulsarBaseTest {
- private TransactionImpl transaction;
private PulsarClient clientWithStats;
- @SuppressWarnings("deprecation")
+ @SuppressWarnings("deprecation")
@BeforeMethod(alwaysRun = true)
public void setupConsumerAckTest() throws Exception {
this.clientWithStats = PulsarClient.builder()
.serviceUrl(getBrokerServiceUrl())
.statsInterval(30, TimeUnit.SECONDS)
.build();
- transaction = mock(TransactionImpl.class);
- doReturn(1L).when(transaction).getTxnIdLeastBits();
- doReturn(1L).when(transaction).getTxnIdMostBits();
- doReturn(TransactionImpl.State.OPEN).when(transaction).getState();
- CompletableFuture<Void> completableFuture =
CompletableFuture.completedFuture(null);
- doNothing().when(transaction).registerAckOp(any());
- doReturn(true).when(transaction).checkIfOpen(any());
-
doReturn(completableFuture).when(transaction).registerAckedTopic(any(), any());
}
@AfterMethod(alwaysRun = true)
@@ -88,38 +70,6 @@ public class ConsumerAckTest extends SharedPulsarBaseTest {
}
}
- @Test
- public void testAckResponse() throws PulsarClientException,
InterruptedException {
- String topic = newTopicName();
- @Cleanup
- Producer<Integer> producer = pulsarClient.newProducer(Schema.INT32)
- .topic(topic)
- .enableBatching(false)
- .create();
- @Cleanup
- ConsumerImpl<Integer> consumer = (ConsumerImpl<Integer>)
pulsarClient.newConsumer(Schema.INT32)
- .topic(topic)
- .subscriptionName("sub")
- .subscriptionType(SubscriptionType.Shared)
- .ackTimeout(1, TimeUnit.SECONDS)
- .subscribe();
- producer.send(1);
- producer.send(2);
- try {
- consumer.acknowledgeAsync(new MessageIdImpl(1, 1, 1),
transaction).get();
- fail();
- } catch (ExecutionException e) {
- Assert.assertTrue(e.getCause() instanceof
PulsarClientException.NotAllowedException);
- }
- Message<Integer> message = consumer.receive();
-
- try {
- consumer.acknowledgeAsync(message.getMessageId(),
transaction).get();
- fail();
- } catch (ExecutionException e) {
- Assert.assertTrue(e.getCause() instanceof
PulsarClientException.NotAllowedException);
- }
- }
@Test(timeOut = 30000)
public void testAckReceipt() throws Exception {
String topic = newTopicName();
diff --git
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/AsyncMessageBuilderV5.java
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/AsyncMessageBuilderV5.java
index 974ba10b80c..896e2c75e5e 100644
---
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/AsyncMessageBuilderV5.java
+++
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/AsyncMessageBuilderV5.java
@@ -43,6 +43,7 @@ final class AsyncMessageBuilderV5<T> implements
AsyncMessageBuilder<T> {
private Duration deliverAfter;
private Instant deliverAt;
private List<String> replicationClusters;
+ private Transaction txn;
AsyncMessageBuilderV5(ScalableTopicProducer<T> producer) {
this.producer = producer;
@@ -52,7 +53,7 @@ final class AsyncMessageBuilderV5<T> implements
AsyncMessageBuilder<T> {
public CompletableFuture<MessageId> send() {
return producer.sendInternalAsync(
key, value, properties, eventTime, sequenceId,
- deliverAfter, deliverAt, replicationClusters)
+ deliverAfter, deliverAt, replicationClusters, txn)
.thenApply(id -> id);
}
@@ -70,7 +71,7 @@ final class AsyncMessageBuilderV5<T> implements
AsyncMessageBuilder<T> {
@Override
public AsyncMessageBuilderV5<T> transaction(Transaction txn) {
- // TODO: Wire up transaction support
+ this.txn = txn;
return this;
}
diff --git
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/MessageBuilderV5.java
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/MessageBuilderV5.java
index 5e6e598f0e6..910f25f27a0 100644
---
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/MessageBuilderV5.java
+++
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/MessageBuilderV5.java
@@ -43,6 +43,7 @@ final class MessageBuilderV5<T> implements MessageBuilder<T> {
private Duration deliverAfter;
private Instant deliverAt;
private List<String> replicationClusters;
+ private Transaction txn;
MessageBuilderV5(ScalableTopicProducer<T> producer) {
this.producer = producer;
@@ -52,7 +53,7 @@ final class MessageBuilderV5<T> implements MessageBuilder<T> {
public MessageId send() throws PulsarClientException {
return producer.sendInternal(
key, value, properties, eventTime, sequenceId,
- deliverAfter, deliverAt, replicationClusters);
+ deliverAfter, deliverAt, replicationClusters, txn);
}
@Override
@@ -69,7 +70,7 @@ final class MessageBuilderV5<T> implements MessageBuilder<T> {
@Override
public MessageBuilderV5<T> transaction(Transaction txn) {
- // TODO: Wire up transaction support
+ this.txn = txn;
return this;
}
diff --git
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/PulsarClientBuilderV5.java
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/PulsarClientBuilderV5.java
index 1c1fe6ccc9b..ea6afd012db 100644
---
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/PulsarClientBuilderV5.java
+++
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/PulsarClientBuilderV5.java
@@ -39,6 +39,7 @@ final class PulsarClientBuilderV5 implements
PulsarClientBuilder {
private final ClientConfigurationData conf = new ClientConfigurationData();
private String description;
+ private Duration transactionTimeout;
PulsarClientBuilderV5() {
conf.setStatsIntervalSeconds(0);
@@ -48,7 +49,7 @@ final class PulsarClientBuilderV5 implements
PulsarClientBuilder {
public PulsarClient build() throws PulsarClientException {
try {
var v4Client = new PulsarClientImpl(conf);
- return new PulsarClientV5(v4Client, description);
+ return new PulsarClientV5(v4Client, description,
transactionTimeout);
} catch (org.apache.pulsar.client.api.PulsarClientException e) {
throw new PulsarClientException(e.getMessage(), e);
}
@@ -103,8 +104,8 @@ final class PulsarClientBuilderV5 implements
PulsarClientBuilder {
@Override
public PulsarClientBuilder transactionPolicy(TransactionPolicy policy) {
- // TransactionPolicy enables transactions with a default timeout
conf.setEnableTransaction(true);
+ this.transactionTimeout = policy.timeout();
return this;
}
diff --git
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/PulsarClientV5.java
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/PulsarClientV5.java
index c09cd7c664a..792d9ea060c 100644
---
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/PulsarClientV5.java
+++
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/PulsarClientV5.java
@@ -18,7 +18,11 @@
*/
package org.apache.pulsar.client.impl.v5;
+import java.time.Duration;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
import org.apache.pulsar.client.api.v5.CheckpointConsumerBuilder;
import org.apache.pulsar.client.api.v5.ProducerBuilder;
import org.apache.pulsar.client.api.v5.PulsarClient;
@@ -37,10 +41,12 @@ final class PulsarClientV5 implements PulsarClient {
private final PulsarClientImpl v4Client;
private final String description;
+ private final Duration transactionTimeout;
- PulsarClientV5(PulsarClientImpl v4Client, String description) {
+ PulsarClientV5(PulsarClientImpl v4Client, String description, Duration
transactionTimeout) {
this.v4Client = v4Client;
this.description = description;
+ this.transactionTimeout = transactionTimeout;
}
/**
@@ -72,13 +78,24 @@ final class PulsarClientV5 implements PulsarClient {
@Override
public Transaction newTransaction() throws PulsarClientException {
- throw new PulsarClientException("Transactions not yet implemented");
+ try {
+ return newTransactionAsync().get();
+ } catch (ExecutionException e) {
+ Throwable cause = e.getCause() != null ? e.getCause() : e;
+ throw new PulsarClientException(cause.getMessage(), cause);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarClientException("Interrupted while creating
transaction", e);
+ }
}
@Override
public CompletableFuture<Transaction> newTransactionAsync() {
- return CompletableFuture.failedFuture(
- new PulsarClientException("Transactions not yet implemented"));
+ var builder = v4Client.newTransaction();
+ if (transactionTimeout != null) {
+ builder.withTransactionTimeout(transactionTimeout.toMillis(),
TimeUnit.MILLISECONDS);
+ }
+ return builder.build().thenApply(v4Txn -> (Transaction) new
TransactionV5(v4Txn));
}
@Override
@@ -93,8 +110,7 @@ final class PulsarClientV5 implements PulsarClient {
@Override
public CompletableFuture<Void> closeAsync() {
return v4Client.closeAsync().exceptionally(ex -> {
- throw new java.util.concurrent.CompletionException(
- new PulsarClientException(ex.getMessage(), ex));
+ throw new CompletionException(new
PulsarClientException(ex.getMessage(), ex));
});
}
diff --git
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableQueueConsumer.java
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableQueueConsumer.java
index eed2c41bd1f..61f5e87644d 100644
---
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableQueueConsumer.java
+++
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableQueueConsumer.java
@@ -170,7 +170,13 @@ final class ScalableQueueConsumer<T> implements
QueueConsumer<T>, DagWatchClient
@Override
public void acknowledge(MessageId messageId, Transaction txn) {
- throw new UnsupportedOperationException("Transactional ack not yet
implemented");
+ if (!(messageId instanceof MessageIdV5 id)) {
+ throw new IllegalArgumentException("Expected MessageIdV5, got: " +
messageId.getClass());
+ }
+ var future = segmentConsumers.get(id.segmentId());
+ if (future != null) {
+ future.thenAccept(c -> c.acknowledgeAsync(id.v4MessageId(),
TransactionV5.unwrap(txn)));
+ }
}
@Override
diff --git
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableStreamConsumer.java
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableStreamConsumer.java
index e33191c2861..ddffea7d84d 100644
---
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableStreamConsumer.java
+++
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableStreamConsumer.java
@@ -208,8 +208,16 @@ final class ScalableStreamConsumer<T>
@Override
public void acknowledgeCumulative(MessageId messageId, Transaction txn) {
- // Transaction support not yet implemented
- throw new UnsupportedOperationException("Transactional ack not yet
implemented");
+ if (!(messageId instanceof MessageIdV5 id)) {
+ throw new IllegalArgumentException("Expected MessageIdV5, got: " +
messageId.getClass());
+ }
+ var v4Txn = TransactionV5.unwrap(txn);
+ for (var entry : id.positionVector().entrySet()) {
+ var future = segmentConsumers.get(entry.getKey());
+ if (future != null) {
+ future.thenAccept(c ->
c.acknowledgeCumulativeAsync(entry.getValue(), v4Txn));
+ }
+ }
}
@Override
diff --git
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableTopicProducer.java
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableTopicProducer.java
index 01fe8b0d3d1..fd8ec3b5747 100644
---
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableTopicProducer.java
+++
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableTopicProducer.java
@@ -149,7 +149,8 @@ final class ScalableTopicProducer<T> implements
Producer<T>, DagWatchClient.Layo
String key, T value, java.util.Map<String, String> properties,
java.time.Instant eventTime, Long sequenceId,
java.time.Duration deliverAfter, java.time.Instant deliverAt,
- java.util.List<String> replicationClusters) throws
PulsarClientException {
+ java.util.List<String> replicationClusters,
+ org.apache.pulsar.client.api.v5.Transaction txn) throws
PulsarClientException {
for (int attempt = 0; attempt < 3; attempt++) {
long segmentId = routeMessage(key);
@@ -157,7 +158,7 @@ final class ScalableTopicProducer<T> implements
Producer<T>, DagWatchClient.Layo
try {
var v4MsgId = buildV4Message(producer, key, value, properties,
- eventTime, sequenceId, deliverAfter, deliverAt,
replicationClusters)
+ eventTime, sequenceId, deliverAfter, deliverAt,
replicationClusters, txn)
.send();
return new MessageIdV5(v4MsgId, segmentId);
} catch
(org.apache.pulsar.client.api.PulsarClientException.TopicTerminatedException
@@ -193,17 +194,19 @@ final class ScalableTopicProducer<T> implements
Producer<T>, DagWatchClient.Layo
String key, T value, java.util.Map<String, String> properties,
java.time.Instant eventTime, Long sequenceId,
java.time.Duration deliverAfter, java.time.Instant deliverAt,
- java.util.List<String> replicationClusters) {
+ java.util.List<String> replicationClusters,
+ org.apache.pulsar.client.api.v5.Transaction txn) {
return sendInternalAsyncWithRetry(key, value, properties,
- eventTime, sequenceId, deliverAfter, deliverAt,
replicationClusters, 0);
+ eventTime, sequenceId, deliverAfter, deliverAt,
replicationClusters, txn, 0);
}
private CompletableFuture<MessageIdV5> sendInternalAsyncWithRetry(
String key, T value, java.util.Map<String, String> properties,
java.time.Instant eventTime, Long sequenceId,
java.time.Duration deliverAfter, java.time.Instant deliverAt,
- java.util.List<String> replicationClusters, int attempt) {
+ java.util.List<String> replicationClusters,
+ org.apache.pulsar.client.api.v5.Transaction txn, int attempt) {
long segmentId;
try {
@@ -220,7 +223,7 @@ final class ScalableTopicProducer<T> implements
Producer<T>, DagWatchClient.Layo
}
return buildV4Message(producer, key, value, properties,
- eventTime, sequenceId, deliverAfter, deliverAt,
replicationClusters)
+ eventTime, sequenceId, deliverAfter, deliverAt,
replicationClusters, txn)
.sendAsync()
.thenApply(v4MsgId -> new MessageIdV5(v4MsgId, segmentId))
.exceptionallyCompose(ex -> {
@@ -241,7 +244,8 @@ final class ScalableTopicProducer<T> implements
Producer<T>, DagWatchClient.Layo
java.util.concurrent.TimeUnit.MILLISECONDS))
.thenCompose(__ -> sendInternalAsyncWithRetry(
key, value, properties, eventTime,
sequenceId,
- deliverAfter, deliverAt,
replicationClusters, attempt + 1));
+ deliverAfter, deliverAt,
replicationClusters,
+ txn, attempt + 1));
}
return CompletableFuture.failedFuture(ex);
});
@@ -252,9 +256,12 @@ final class ScalableTopicProducer<T> implements
Producer<T>, DagWatchClient.Layo
String key, T value, java.util.Map<String, String> properties,
java.time.Instant eventTime, Long sequenceId,
java.time.Duration deliverAfter, java.time.Instant deliverAt,
- java.util.List<String> replicationClusters) {
+ java.util.List<String> replicationClusters,
+ org.apache.pulsar.client.api.v5.Transaction txn) {
- var msgBuilder = producer.newMessage().value(value);
+ org.apache.pulsar.client.api.transaction.Transaction v4Txn =
TransactionV5.unwrap(txn);
+ var msgBuilder = (v4Txn != null ? producer.newMessage(v4Txn) :
producer.newMessage())
+ .value(value);
if (key != null) {
msgBuilder.key(key);
diff --git
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/TransactionV5.java
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/TransactionV5.java
new file mode 100644
index 00000000000..ff7fa5223c1
--- /dev/null
+++
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/TransactionV5.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.v5;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import org.apache.pulsar.client.api.v5.PulsarClientException;
+import org.apache.pulsar.client.api.v5.Transaction;
+import org.apache.pulsar.client.api.v5.async.AsyncTransaction;
+
+/**
+ * V5 {@link Transaction} that wraps a v4 {@code
org.apache.pulsar.client.api.transaction.Transaction}.
+ * Per-message and per-ack hand-off uses the underlying v4 instance via {@link
#v4Transaction()}.
+ */
+final class TransactionV5 implements Transaction {
+
+ private final org.apache.pulsar.client.api.transaction.Transaction
v4Transaction;
+ private final AsyncTransaction asyncView;
+
+ TransactionV5(org.apache.pulsar.client.api.transaction.Transaction
v4Transaction) {
+ this.v4Transaction = v4Transaction;
+ this.asyncView = new AsyncView();
+ }
+
+ org.apache.pulsar.client.api.transaction.Transaction v4Transaction() {
+ return v4Transaction;
+ }
+
+ @Override
+ public void commit() throws PulsarClientException {
+ try {
+ v4Transaction.commit().get();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarClientException("Commit interrupted", e);
+ } catch (ExecutionException e) {
+ Throwable cause = e.getCause() != null ? e.getCause() : e;
+ throw new PulsarClientException(cause.getMessage(), cause);
+ }
+ }
+
+ @Override
+ public void abort() throws PulsarClientException {
+ try {
+ v4Transaction.abort().get();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarClientException("Abort interrupted", e);
+ } catch (ExecutionException e) {
+ Throwable cause = e.getCause() != null ? e.getCause() : e;
+ throw new PulsarClientException(cause.getMessage(), cause);
+ }
+ }
+
+ @Override
+ public AsyncTransaction async() {
+ return asyncView;
+ }
+
+ @Override
+ public State state() {
+ return switch (v4Transaction.getState()) {
+ case OPEN -> State.OPEN;
+ case COMMITTING -> State.COMMITTING;
+ case ABORTING -> State.ABORTING;
+ case COMMITTED -> State.COMMITTED;
+ case ABORTED -> State.ABORTED;
+ case ERROR -> State.ERROR;
+ case TIME_OUT -> State.TIMED_OUT;
+ };
+ }
+
+ /**
+ * Unwrap the v4 transaction from a V5 handle, validating type. Returns
null when
+ * the handle is null (no transaction context).
+ */
+ static org.apache.pulsar.client.api.transaction.Transaction
unwrap(Transaction txn) {
+ if (txn == null) {
+ return null;
+ }
+ if (!(txn instanceof TransactionV5 v5)) {
+ throw new IllegalArgumentException("Expected TransactionV5, got: "
+ txn.getClass());
+ }
+ return v5.v4Transaction;
+ }
+
+ private final class AsyncView implements AsyncTransaction {
+ @Override
+ public CompletableFuture<Void> commit() {
+ return v4Transaction.commit();
+ }
+
+ @Override
+ public CompletableFuture<Void> abort() {
+ return v4Transaction.abort();
+ }
+ }
+}