merlimat closed pull request #2829: Fix Websocket Consume Messages in Partitioned Topics URL: https://github.com/apache/pulsar/pull/2829
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java index e084b04622..7f1b5aa45c 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java @@ -358,7 +358,7 @@ public void testProxyStats() throws Exception { + "/my-sub?subscriptionType=Failover"; final String producerUri = "ws://localhost:" + port + "/ws/v2/producer/persistent/" + topic + "/"; final String readerUri = "ws://localhost:" + port + "/ws/v2/reader/persistent/" + topic; - System.out.println(consumerUri+", "+producerUri); + System.out.println(consumerUri + ", " + producerUri); URI consumeUri = URI.create(consumerUri); URI produceUri = URI.create(producerUri); URI readUri = URI.create(readerUri); @@ -424,6 +424,47 @@ public void testProxyStats() throws Exception { } } + @Test(timeOut = 10000) + public void consumeMessagesInPartitionedTopicTest() throws Exception { + final String namespace = "my-property/my-ns"; + final String topic = namespace + "/" + "my-topic7"; + admin.topics().createPartitionedTopic("persistent://" + topic, 3); + + final String subscription = "my-sub"; + final String consumerUri = "ws://localhost:" + port + "/ws/v2/consumer/persistent/" + topic + "/" + subscription; + final String producerUri = "ws://localhost:" + port + "/ws/v2/producer/persistent/" + topic; + + URI consumeUri = URI.create(consumerUri); + URI produceUri = URI.create(producerUri); + + WebSocketClient consumeClient = new WebSocketClient(); + WebSocketClient produceClient = new WebSocketClient(); + + SimpleConsumerSocket consumeSocket = new SimpleConsumerSocket(); + SimpleProducerSocket produceSocket = new SimpleProducerSocket(); + + try { + produceClient.start(); + ClientUpgradeRequest produceRequest = new ClientUpgradeRequest(); + Future<Session> producerFuture = produceClient.connect(produceSocket, produceUri, produceRequest); + producerFuture.get(); + produceSocket.sendMessage(100); + } finally { + stopWebSocketClient(produceClient); + } + + Thread.sleep(500); + + try { + consumeClient.start(); + ClientUpgradeRequest consumeRequest = new ClientUpgradeRequest(); + Future<Session> consumerFuture = consumeClient.connect(consumeSocket, consumeUri, consumeRequest); + consumerFuture.get(); + } finally { + stopWebSocketClient(consumeClient); + } + } + private void verifyTopicStat(Client client, String baseUrl, String topic) { String statUrl = baseUrl + topic + "/stats"; WebTarget webTarget = client.target(statUrl); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/api/MessageId.java b/pulsar-client/src/main/java/org/apache/pulsar/client/api/MessageId.java index 1bb3e086ed..4d38aa949d 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/api/MessageId.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/MessageId.java @@ -22,6 +22,7 @@ import java.io.Serializable; import org.apache.pulsar.client.impl.MessageIdImpl; +import org.apache.pulsar.common.naming.TopicName; /** * Opaque unique identifier of a single message @@ -49,6 +50,10 @@ public static MessageId fromByteArray(byte[] data) throws IOException { return MessageIdImpl.fromByteArray(data); } + public static MessageId fromByteArrayWithTopic(byte[] data, TopicName topicName) throws IOException { + return MessageIdImpl.fromByteArrayWithTopic(data, topicName); + } + public static final MessageId earliest = new MessageIdImpl(-1, -1, -1); public static final MessageId latest = new MessageIdImpl(Long.MAX_VALUE, Long.MAX_VALUE, -1); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdImpl.java index 3686b124a8..5a53436e7b 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdImpl.java @@ -30,6 +30,7 @@ import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.common.api.proto.PulsarApi; import org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData; +import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream; import org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream; import org.apache.pulsar.shaded.com.google.protobuf.v241.UninitializedMessageException; @@ -113,6 +114,36 @@ public static MessageId fromByteArray(byte[] data) throws IOException { return messageId; } + public static MessageId fromByteArrayWithTopic(byte[] data, TopicName topicName) throws IOException { + checkNotNull(data); + ByteBufCodedInputStream inputStream = ByteBufCodedInputStream.get(Unpooled.wrappedBuffer(data, 0, data.length)); + PulsarApi.MessageIdData.Builder builder = PulsarApi.MessageIdData.newBuilder(); + + PulsarApi.MessageIdData idData; + try { + idData = builder.mergeFrom(inputStream, null).build(); + } catch (UninitializedMessageException e) { + throw new IOException(e); + } + + MessageId messageId; + if (idData.hasBatchIndex()) { + messageId = new BatchMessageIdImpl(idData.getLedgerId(), idData.getEntryId(), idData.getPartition(), + idData.getBatchIndex()); + } else { + messageId = new MessageIdImpl(idData.getLedgerId(), idData.getEntryId(), idData.getPartition()); + } + if (idData.getPartition() > -1 && topicName != null) { + messageId = new TopicMessageIdImpl( + topicName.getPartition(idData.getPartition()).toString(), topicName.toString(), messageId); + } + + inputStream.recycle(); + builder.recycle(); + idData.recycle(); + return messageId; + } + // batchIndex is -1 if message is non-batched message and has the batchIndex for a batch message protected byte[] toByteArray(int batchIndex) { MessageIdData.Builder builder = MessageIdData.newBuilder(); diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java index cb21f6f8cc..740a4e11c9 100644 --- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java +++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java @@ -44,6 +44,7 @@ import org.apache.pulsar.client.api.PulsarClientException.ConsumerBusyException; import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.client.impl.ConsumerBuilderImpl; +import org.apache.pulsar.client.impl.TopicMessageIdImpl; import org.apache.pulsar.common.util.DateFormatter; import org.apache.pulsar.common.util.ObjectMapperFactory; import org.apache.pulsar.websocket.data.ConsumerAck; @@ -220,7 +221,7 @@ public void onWebSocketText(String message) { MessageId msgId; try { ConsumerAck ack = ObjectMapperFactory.getThreadLocal().readValue(message, ConsumerAck.class); - msgId = MessageId.fromByteArray(Base64.getDecoder().decode(ack.messageId)); + msgId = MessageId.fromByteArrayWithTopic(Base64.getDecoder().decode(ack.messageId), topic); } catch (IOException e) { log.warn("Failed to deserialize message id: {}", message, e); close(WebSocketError.FailedToDeserializeFromJSON); ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services