merlimat closed pull request #2829: Fix Websocket Consume Messages in 
Partitioned Topics
URL: https://github.com/apache/pulsar/pull/2829
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java
index e084b04622..7f1b5aa45c 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java
@@ -358,7 +358,7 @@ public void testProxyStats() throws Exception {
                 + "/my-sub?subscriptionType=Failover";
         final String producerUri = "ws://localhost:" + port + 
"/ws/v2/producer/persistent/" + topic + "/";
         final String readerUri = "ws://localhost:" + port + 
"/ws/v2/reader/persistent/" + topic;
-        System.out.println(consumerUri+", "+producerUri);
+        System.out.println(consumerUri + ", " + producerUri);
         URI consumeUri = URI.create(consumerUri);
         URI produceUri = URI.create(producerUri);
         URI readUri = URI.create(readerUri);
@@ -424,6 +424,47 @@ public void testProxyStats() throws Exception {
         }
     }
 
+    @Test(timeOut = 10000)
+    public void consumeMessagesInPartitionedTopicTest() throws Exception {
+        final String namespace = "my-property/my-ns";
+        final String topic = namespace + "/" + "my-topic7";
+        admin.topics().createPartitionedTopic("persistent://" + topic, 3);
+
+        final String subscription = "my-sub";
+        final String consumerUri = "ws://localhost:" + port + 
"/ws/v2/consumer/persistent/" + topic + "/" + subscription;
+        final String producerUri = "ws://localhost:" + port + 
"/ws/v2/producer/persistent/" + topic;
+
+        URI consumeUri = URI.create(consumerUri);
+        URI produceUri = URI.create(producerUri);
+
+        WebSocketClient consumeClient = new WebSocketClient();
+        WebSocketClient produceClient = new WebSocketClient();
+
+        SimpleConsumerSocket consumeSocket = new SimpleConsumerSocket();
+        SimpleProducerSocket produceSocket = new SimpleProducerSocket();
+
+        try {
+            produceClient.start();
+            ClientUpgradeRequest produceRequest = new ClientUpgradeRequest();
+            Future<Session> producerFuture = 
produceClient.connect(produceSocket, produceUri, produceRequest);
+            producerFuture.get();
+            produceSocket.sendMessage(100);
+        } finally {
+            stopWebSocketClient(produceClient);
+        }
+
+        Thread.sleep(500);
+
+        try {
+            consumeClient.start();
+            ClientUpgradeRequest consumeRequest = new ClientUpgradeRequest();
+            Future<Session> consumerFuture = 
consumeClient.connect(consumeSocket, consumeUri, consumeRequest);
+            consumerFuture.get();
+        } finally {
+            stopWebSocketClient(consumeClient);
+        }
+    }
+
     private void verifyTopicStat(Client client, String baseUrl, String topic) {
         String statUrl = baseUrl + topic + "/stats";
         WebTarget webTarget = client.target(statUrl);
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/api/MessageId.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/api/MessageId.java
index 1bb3e086ed..4d38aa949d 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/api/MessageId.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/MessageId.java
@@ -22,6 +22,7 @@
 
 import java.io.Serializable;
 import org.apache.pulsar.client.impl.MessageIdImpl;
+import org.apache.pulsar.common.naming.TopicName;
 
 /**
  * Opaque unique identifier of a single message
@@ -49,6 +50,10 @@ public static MessageId fromByteArray(byte[] data) throws 
IOException {
         return MessageIdImpl.fromByteArray(data);
     }
 
+    public static MessageId fromByteArrayWithTopic(byte[] data, TopicName 
topicName) throws IOException {
+        return MessageIdImpl.fromByteArrayWithTopic(data, topicName);
+    }
+
     public static final MessageId earliest = new MessageIdImpl(-1, -1, -1);
 
     public static final MessageId latest = new MessageIdImpl(Long.MAX_VALUE, 
Long.MAX_VALUE, -1);
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdImpl.java
index 3686b124a8..5a53436e7b 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdImpl.java
@@ -30,6 +30,7 @@
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.common.api.proto.PulsarApi;
 import org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData;
+import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream;
 import org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream;
 import 
org.apache.pulsar.shaded.com.google.protobuf.v241.UninitializedMessageException;
@@ -113,6 +114,36 @@ public static MessageId fromByteArray(byte[] data) throws 
IOException {
         return messageId;
     }
 
+    public static MessageId fromByteArrayWithTopic(byte[] data, TopicName 
topicName) throws IOException {
+        checkNotNull(data);
+        ByteBufCodedInputStream inputStream = 
ByteBufCodedInputStream.get(Unpooled.wrappedBuffer(data, 0, data.length));
+        PulsarApi.MessageIdData.Builder builder = 
PulsarApi.MessageIdData.newBuilder();
+
+        PulsarApi.MessageIdData idData;
+        try {
+            idData = builder.mergeFrom(inputStream, null).build();
+        } catch (UninitializedMessageException e) {
+            throw new IOException(e);
+        }
+
+        MessageId messageId;
+        if (idData.hasBatchIndex()) {
+            messageId = new BatchMessageIdImpl(idData.getLedgerId(), 
idData.getEntryId(), idData.getPartition(),
+                    idData.getBatchIndex());
+        } else {
+            messageId = new MessageIdImpl(idData.getLedgerId(), 
idData.getEntryId(), idData.getPartition());
+        }
+        if (idData.getPartition() > -1 && topicName != null) {
+            messageId = new TopicMessageIdImpl(
+                    topicName.getPartition(idData.getPartition()).toString(), 
topicName.toString(), messageId);
+        }
+
+        inputStream.recycle();
+        builder.recycle();
+        idData.recycle();
+        return messageId;
+    }
+
     // batchIndex is -1 if message is non-batched message and has the 
batchIndex for a batch message
     protected byte[] toByteArray(int batchIndex) {
         MessageIdData.Builder builder = MessageIdData.newBuilder();
diff --git 
a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java
 
b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java
index cb21f6f8cc..740a4e11c9 100644
--- 
a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java
+++ 
b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java
@@ -44,6 +44,7 @@
 import 
org.apache.pulsar.client.api.PulsarClientException.ConsumerBusyException;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.impl.ConsumerBuilderImpl;
+import org.apache.pulsar.client.impl.TopicMessageIdImpl;
 import org.apache.pulsar.common.util.DateFormatter;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
 import org.apache.pulsar.websocket.data.ConsumerAck;
@@ -220,7 +221,7 @@ public void onWebSocketText(String message) {
         MessageId msgId;
         try {
             ConsumerAck ack = 
ObjectMapperFactory.getThreadLocal().readValue(message, ConsumerAck.class);
-            msgId = 
MessageId.fromByteArray(Base64.getDecoder().decode(ack.messageId));
+            msgId = 
MessageId.fromByteArrayWithTopic(Base64.getDecoder().decode(ack.messageId), 
topic);
         } catch (IOException e) {
             log.warn("Failed to deserialize message id: {}", message, e);
             close(WebSocketError.FailedToDeserializeFromJSON);


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to