This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new d79499d Fix Websocket Consume Messages in Partitioned Topics (#2829) d79499d is described below commit d79499dbe24fbd8b065489875ba86a47761ec33f Author: Yuto Furuta <mzq6mft...@gmail.com> AuthorDate: Sat Oct 27 02:53:09 2018 +0900 Fix Websocket Consume Messages in Partitioned Topics (#2829) * fix consume messages in partitioned topics on websocket * add consumeMessagesInPartitionedTopicTest * add fromByteArrayWithTopic * remove public --- .../websocket/proxy/ProxyPublishConsumeTest.java | 43 +++++++++++++++++++++- .../org/apache/pulsar/client/api/MessageId.java | 5 +++ .../apache/pulsar/client/impl/MessageIdImpl.java | 31 ++++++++++++++++ .../apache/pulsar/websocket/ConsumerHandler.java | 3 +- 4 files changed, 80 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java index e084b04..7f1b5aa 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java @@ -358,7 +358,7 @@ public class ProxyPublishConsumeTest extends ProducerConsumerBase { + "/my-sub?subscriptionType=Failover"; final String producerUri = "ws://localhost:" + port + "/ws/v2/producer/persistent/" + topic + "/"; final String readerUri = "ws://localhost:" + port + "/ws/v2/reader/persistent/" + topic; - System.out.println(consumerUri+", "+producerUri); + System.out.println(consumerUri + ", " + producerUri); URI consumeUri = URI.create(consumerUri); URI produceUri = URI.create(producerUri); URI readUri = URI.create(readerUri); @@ -424,6 +424,47 @@ public class ProxyPublishConsumeTest extends ProducerConsumerBase { } } + @Test(timeOut = 10000) + public void consumeMessagesInPartitionedTopicTest() throws Exception { + final String namespace = "my-property/my-ns"; + final String topic = namespace + "/" + "my-topic7"; + admin.topics().createPartitionedTopic("persistent://" + topic, 3); + + final String subscription = "my-sub"; + final String consumerUri = "ws://localhost:" + port + "/ws/v2/consumer/persistent/" + topic + "/" + subscription; + final String producerUri = "ws://localhost:" + port + "/ws/v2/producer/persistent/" + topic; + + URI consumeUri = URI.create(consumerUri); + URI produceUri = URI.create(producerUri); + + WebSocketClient consumeClient = new WebSocketClient(); + WebSocketClient produceClient = new WebSocketClient(); + + SimpleConsumerSocket consumeSocket = new SimpleConsumerSocket(); + SimpleProducerSocket produceSocket = new SimpleProducerSocket(); + + try { + produceClient.start(); + ClientUpgradeRequest produceRequest = new ClientUpgradeRequest(); + Future<Session> producerFuture = produceClient.connect(produceSocket, produceUri, produceRequest); + producerFuture.get(); + produceSocket.sendMessage(100); + } finally { + stopWebSocketClient(produceClient); + } + + Thread.sleep(500); + + try { + consumeClient.start(); + ClientUpgradeRequest consumeRequest = new ClientUpgradeRequest(); + Future<Session> consumerFuture = consumeClient.connect(consumeSocket, consumeUri, consumeRequest); + consumerFuture.get(); + } finally { + stopWebSocketClient(consumeClient); + } + } + private void verifyTopicStat(Client client, String baseUrl, String topic) { String statUrl = baseUrl + topic + "/stats"; WebTarget webTarget = client.target(statUrl); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/api/MessageId.java b/pulsar-client/src/main/java/org/apache/pulsar/client/api/MessageId.java index 1bb3e08..4d38aa9 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/api/MessageId.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/MessageId.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.io.Serializable; import org.apache.pulsar.client.impl.MessageIdImpl; +import org.apache.pulsar.common.naming.TopicName; /** * Opaque unique identifier of a single message @@ -49,6 +50,10 @@ public interface MessageId extends Comparable<MessageId>, Serializable { return MessageIdImpl.fromByteArray(data); } + public static MessageId fromByteArrayWithTopic(byte[] data, TopicName topicName) throws IOException { + return MessageIdImpl.fromByteArrayWithTopic(data, topicName); + } + public static final MessageId earliest = new MessageIdImpl(-1, -1, -1); public static final MessageId latest = new MessageIdImpl(Long.MAX_VALUE, Long.MAX_VALUE, -1); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdImpl.java index 3686b12..5a53436 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageIdImpl.java @@ -30,6 +30,7 @@ import java.io.IOException; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.common.api.proto.PulsarApi; import org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData; +import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream; import org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream; import org.apache.pulsar.shaded.com.google.protobuf.v241.UninitializedMessageException; @@ -113,6 +114,36 @@ public class MessageIdImpl implements MessageId { return messageId; } + public static MessageId fromByteArrayWithTopic(byte[] data, TopicName topicName) throws IOException { + checkNotNull(data); + ByteBufCodedInputStream inputStream = ByteBufCodedInputStream.get(Unpooled.wrappedBuffer(data, 0, data.length)); + PulsarApi.MessageIdData.Builder builder = PulsarApi.MessageIdData.newBuilder(); + + PulsarApi.MessageIdData idData; + try { + idData = builder.mergeFrom(inputStream, null).build(); + } catch (UninitializedMessageException e) { + throw new IOException(e); + } + + MessageId messageId; + if (idData.hasBatchIndex()) { + messageId = new BatchMessageIdImpl(idData.getLedgerId(), idData.getEntryId(), idData.getPartition(), + idData.getBatchIndex()); + } else { + messageId = new MessageIdImpl(idData.getLedgerId(), idData.getEntryId(), idData.getPartition()); + } + if (idData.getPartition() > -1 && topicName != null) { + messageId = new TopicMessageIdImpl( + topicName.getPartition(idData.getPartition()).toString(), topicName.toString(), messageId); + } + + inputStream.recycle(); + builder.recycle(); + idData.recycle(); + return messageId; + } + // batchIndex is -1 if message is non-batched message and has the batchIndex for a batch message protected byte[] toByteArray(int batchIndex) { MessageIdData.Builder builder = MessageIdData.newBuilder(); diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java index cb21f6f..740a4e1 100644 --- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java +++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java @@ -44,6 +44,7 @@ import org.apache.pulsar.client.api.PulsarClientException.AlreadyClosedException import org.apache.pulsar.client.api.PulsarClientException.ConsumerBusyException; import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.client.impl.ConsumerBuilderImpl; +import org.apache.pulsar.client.impl.TopicMessageIdImpl; import org.apache.pulsar.common.util.DateFormatter; import org.apache.pulsar.common.util.ObjectMapperFactory; import org.apache.pulsar.websocket.data.ConsumerAck; @@ -220,7 +221,7 @@ public class ConsumerHandler extends AbstractWebSocketHandler { MessageId msgId; try { ConsumerAck ack = ObjectMapperFactory.getThreadLocal().readValue(message, ConsumerAck.class); - msgId = MessageId.fromByteArray(Base64.getDecoder().decode(ack.messageId)); + msgId = MessageId.fromByteArrayWithTopic(Base64.getDecoder().decode(ack.messageId), topic); } catch (IOException e) { log.warn("Failed to deserialize message id: {}", message, e); close(WebSocketError.FailedToDeserializeFromJSON);