This is an automated email from the ASF dual-hosted git repository.

rdhabalia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 0f8aef2  [pulsar-client] Fix: set and return topic name on message api 
(#11743)
0f8aef2 is described below

commit 0f8aef2b9494cde8e8adc9d97d89a65d73ae8c35
Author: Rajan Dhabalia <rdhaba...@apache.org>
AuthorDate: Sun Aug 22 13:34:34 2021 -0700

    [pulsar-client] Fix: set and return topic name on message api (#11743)
---
 .../api/PartitionedProducerConsumerTest.java       | 40 ++++++++++++++++++++++
 .../pulsar/client/impl/ProducerSemaphoreTest.java  |  6 ++--
 .../pulsar/client/impl/MessageBuilderImpl.java     |  2 +-
 .../org/apache/pulsar/client/impl/MessageImpl.java |  5 +--
 .../client/impl/TypedMessageBuilderImpl.java       |  2 +-
 .../apache/pulsar/client/impl/MessageImplTest.java | 30 ++++++++--------
 .../org/apache/pulsar/client/impl/MessageTest.java |  8 ++---
 .../client/impl/schema/AutoConsumeSchemaTest.java  |  2 +-
 8 files changed, 68 insertions(+), 27 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionedProducerConsumerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionedProducerConsumerTest.java
index 3e9b194..543c172 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionedProducerConsumerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionedProducerConsumerTest.java
@@ -903,6 +903,46 @@ public class PartitionedProducerConsumerTest extends 
ProducerConsumerBase {
         log.info("-- Exiting {} test --", methodName);
     }
 
+    @Test
+    public void testCustomPartitionedProducer() throws Exception {
+        PulsarClient pulsarClient = newPulsarClient(lookupUrl.toString(), 
0);// Creates new client connection
+        TopicName topicName = null;
+        Producer<byte[]> producer = null;
+        try {
+            log.info("-- Starting {} test --", methodName);
+
+            int numPartitions = 4;
+            topicName = TopicName
+                    
.get("persistent://my-property/my-ns/my-partitionedtopic1-" + 
System.currentTimeMillis());
+
+            admin.topics().createPartitionedTopic(topicName.toString(), 
numPartitions);
+
+            RouterWithTopicName router = new RouterWithTopicName();
+            producer = pulsarClient.newProducer().topic(topicName.toString())
+                    .messageRouter(router)
+                    .create();
+            for (int i = 0; i < 1; i++) {
+                String message = "my-message-" + i;
+                
producer.newMessage().key(String.valueOf(i)).value(message.getBytes()).send();
+            }
+            assertEquals(router.topicName, topicName.toString());
+        } finally {
+            producer.close();
+            pulsarClient.close();
+            admin.topics().deletePartitionedTopic(topicName.toString());
+            log.info("-- Exiting {} test --", methodName);
+        }
+    }
+
+    private static class RouterWithTopicName implements MessageRouter {
+        static String topicName = null;
+
+        @Override
+        public int choosePartition(Message<?> msg, TopicMetadata metadata) {
+            topicName = msg.getTopicName();
+            return 2;
+        }
+    }
 
     private static class AlwaysTwoMessageRouter implements MessageRouter {
         @Override
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerSemaphoreTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerSemaphoreTest.java
index f46509e..c719cbd 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerSemaphoreTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerSemaphoreTest.java
@@ -88,7 +88,7 @@ public class ProducerSemaphoreTest extends 
ProducerConsumerBase {
             for (int i = 0; i < messages / 2; i++) {
                 MessageMetadata metadata = new MessageMetadata()
                         .setNumMessagesInBatch(10);
-                MessageImpl<byte[]> msg = MessageImpl.create(metadata, 
ByteBuffer.wrap(new byte[0]), Schema.BYTES);
+                MessageImpl<byte[]> msg = MessageImpl.create(metadata, 
ByteBuffer.wrap(new byte[0]), Schema.BYTES, null);
                 futures.add(producer.sendAsync(msg));
             }
             
Assert.assertEquals(producer.getSemaphore().get().availablePermits(), 
pendingQueueSize - messages/2);
@@ -147,7 +147,7 @@ public class ProducerSemaphoreTest extends 
ProducerConsumerBase {
                 MessageMetadata metadata = new MessageMetadata()
                         .setNumMessagesInBatch(10);
 
-                MessageImpl<byte[]> msg = MessageImpl.create(metadata, 
ByteBuffer.wrap(new byte[0]), Schema.BYTES);
+                MessageImpl<byte[]> msg = MessageImpl.create(metadata, 
ByteBuffer.wrap(new byte[0]), Schema.BYTES, null);
                 futures.add(producer.sendAsync(msg));
             }
             
Assert.assertEquals(producer.getSemaphore().get().availablePermits(), 0);
@@ -155,7 +155,7 @@ public class ProducerSemaphoreTest extends 
ProducerConsumerBase {
                 MessageMetadata metadata = new MessageMetadata()
                         .setNumMessagesInBatch(10);
 
-                MessageImpl<byte[]> msg = MessageImpl.create(metadata, 
ByteBuffer.wrap(new byte[0]), Schema.BYTES);
+                MessageImpl<byte[]> msg = MessageImpl.create(metadata, 
ByteBuffer.wrap(new byte[0]), Schema.BYTES, null);
                 producer.sendAsync(msg).get();
                 Assert.fail("Shouldn't be able to send message");
             } catch (ExecutionException ee) {
diff --git 
a/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/impl/MessageBuilderImpl.java
 
b/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/impl/MessageBuilderImpl.java
index 103d322..11c7dfa 100644
--- 
a/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/impl/MessageBuilderImpl.java
+++ 
b/pulsar-client-1x-base/pulsar-client-1x/src/main/java/org/apache/pulsar/client/impl/MessageBuilderImpl.java
@@ -40,7 +40,7 @@ public class MessageBuilderImpl implements MessageBuilder {
 
     @Override
     public Message<byte[]> build() {
-        return MessageImpl.create(msgMetadataBuilder, content, Schema.BYTES);
+        return MessageImpl.create(msgMetadataBuilder, content, Schema.BYTES, 
null);
     }
 
     @Override
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
index 0297b14..fef98e7 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
@@ -78,13 +78,14 @@ public class MessageImpl<T> implements Message<T> {
     private boolean poolMessage;
     
     // Constructor for out-going message
-    public static <T> MessageImpl<T> create(MessageMetadata msgMetadata, 
ByteBuffer payload, Schema<T> schema) {
+    public static <T> MessageImpl<T> create(MessageMetadata msgMetadata, 
ByteBuffer payload, Schema<T> schema,
+            String topic) {
         @SuppressWarnings("unchecked")
         MessageImpl<T> msg = (MessageImpl<T>) RECYCLER.get();
         msg.msgMetadata.clear();
         msg.msgMetadata.copyFrom(msgMetadata);
         msg.messageId = null;
-        msg.topic = null;
+        msg.topic = topic;
         msg.cnx = null;
         msg.payload = Unpooled.wrappedBuffer(payload);
         msg.properties = null;
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TypedMessageBuilderImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TypedMessageBuilderImpl.java
index 51dc992..f981bc5 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TypedMessageBuilderImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TypedMessageBuilderImpl.java
@@ -284,7 +284,7 @@ public class TypedMessageBuilderImpl<T> implements 
TypedMessageBuilder<T> {
 
     public Message<T> getMessage() {
         beforeSend();
-        return MessageImpl.create(msgMetadata, content, schema);
+        return MessageImpl.create(msgMetadata, content, schema, 
producer.topic);
     }
 
     public long getPublishTime() {
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageImplTest.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageImplTest.java
index 4083c34..1580570 100644
--- 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageImplTest.java
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageImplTest.java
@@ -59,7 +59,7 @@ public class MessageImplTest {
     @Test
     public void testGetSequenceIdNotAssociated() {
         ByteBuffer payload = ByteBuffer.wrap(new byte[0]);
-        MessageImpl<?> msg = MessageImpl.create(new MessageMetadata(), 
payload, Schema.BYTES);
+        MessageImpl<?> msg = MessageImpl.create(new MessageMetadata(), 
payload, Schema.BYTES, null);
 
         assertEquals(-1, msg.getSequenceId());
     }
@@ -71,7 +71,7 @@ public class MessageImplTest {
         builder.addProperty().setKey("key1").setValue("value2");
         builder.addProperty().setKey("key3").setValue("value3");
         ByteBuffer payload = ByteBuffer.wrap(new byte[0]);
-        MessageImpl<?> msg = MessageImpl.create(builder, payload, 
Schema.BYTES);
+        MessageImpl<?> msg = MessageImpl.create(builder, payload, 
Schema.BYTES, null);
         assertEquals("value2", msg.getProperty("key1"));
         assertEquals("value3", msg.getProperty("key3"));
     }
@@ -82,7 +82,7 @@ public class MessageImplTest {
             .setSequenceId(1234);
 
         ByteBuffer payload = ByteBuffer.wrap(new byte[0]);
-        MessageImpl<?> msg = MessageImpl.create(builder, payload, 
Schema.BYTES);
+        MessageImpl<?> msg = MessageImpl.create(builder, payload, 
Schema.BYTES, null);
 
         assertEquals(1234, msg.getSequenceId());
     }
@@ -91,7 +91,7 @@ public class MessageImplTest {
     public void testGetProducerNameNotAssigned() {
         MessageMetadata builder = new MessageMetadata();
         ByteBuffer payload = ByteBuffer.wrap(new byte[0]);
-        MessageImpl<?> msg = MessageImpl.create(builder, payload, 
Schema.BYTES);
+        MessageImpl<?> msg = MessageImpl.create(builder, payload, 
Schema.BYTES, null);
 
         assertNull(msg.getProducerName());
     }
@@ -102,7 +102,7 @@ public class MessageImplTest {
             .setProducerName("test-producer");
 
         ByteBuffer payload = ByteBuffer.wrap(new byte[0]);
-        MessageImpl<?> msg = MessageImpl.create(builder, payload, 
Schema.BYTES);
+        MessageImpl<?> msg = MessageImpl.create(builder, payload, 
Schema.BYTES, null);
 
         assertEquals("test-producer", msg.getProducerName());
     }
@@ -127,7 +127,7 @@ public class MessageImplTest {
         MessageMetadata builder = new MessageMetadata()
                 .setProducerName("default");
         MessageImpl<KeyValue<SchemaTestUtils.Foo, SchemaTestUtils.Bar>> msg = 
MessageImpl.create(
-                builder, ByteBuffer.wrap(encodeBytes), keyValueSchema);
+                builder, ByteBuffer.wrap(encodeBytes), keyValueSchema, null);
         KeyValue<SchemaTestUtils.Foo, SchemaTestUtils.Bar> keyValue = 
msg.getValue();
         assertEquals(keyValue.getKey(), foo);
         assertEquals(keyValue.getValue(), bar);
@@ -156,7 +156,7 @@ public class MessageImplTest {
         MessageMetadata builder = new MessageMetadata()
                 .setProducerName("inline");
         MessageImpl<KeyValue<SchemaTestUtils.Foo, SchemaTestUtils.Bar>> msg = 
MessageImpl.create(
-                builder, ByteBuffer.wrap(encodeBytes), keyValueSchema);
+                builder, ByteBuffer.wrap(encodeBytes), keyValueSchema, null);
         KeyValue<SchemaTestUtils.Foo, SchemaTestUtils.Bar> keyValue = 
msg.getValue();
         assertEquals(keyValue.getKey(), foo);
         assertEquals(keyValue.getValue(), bar);
@@ -186,7 +186,7 @@ public class MessageImplTest {
         
builder.setPartitionKey(Base64.getEncoder().encodeToString(fooSchema.encode(foo)));
         builder.setPartitionKeyB64Encoded(true);
         MessageImpl<KeyValue<SchemaTestUtils.Foo, SchemaTestUtils.Bar>> msg = 
MessageImpl.create(
-                builder, ByteBuffer.wrap(encodeBytes), keyValueSchema);
+                builder, ByteBuffer.wrap(encodeBytes), keyValueSchema, null);
         KeyValue<SchemaTestUtils.Foo, SchemaTestUtils.Bar> keyValue = 
msg.getValue();
         assertEquals(keyValue.getKey(), foo);
         assertEquals(keyValue.getValue(), bar);
@@ -218,7 +218,7 @@ public class MessageImplTest {
                 .setProducerName("default");
         builder.setSchemaVersion(new byte[10]);
         MessageImpl<KeyValue<SchemaTestUtils.Foo, SchemaTestUtils.Bar>> msg = 
MessageImpl.create(
-                builder, ByteBuffer.wrap(encodeBytes), keyValueSchema);
+                builder, ByteBuffer.wrap(encodeBytes), keyValueSchema, null);
         KeyValue<SchemaTestUtils.Foo, SchemaTestUtils.Bar> keyValue = 
msg.getValue();
         assertEquals(keyValue.getKey(), foo);
         assertEquals(keyValue.getValue(), bar);
@@ -256,7 +256,7 @@ public class MessageImplTest {
         
builder.setPartitionKey(Base64.getEncoder().encodeToString(fooSchema.encode(foo)));
         builder.setPartitionKeyB64Encoded(true);
         MessageImpl<KeyValue<SchemaTestUtils.Foo, SchemaTestUtils.Bar>> msg = 
MessageImpl.create(
-                builder, ByteBuffer.wrap(encodeBytes), keyValueSchema);
+                builder, ByteBuffer.wrap(encodeBytes), keyValueSchema, null);
         KeyValue<SchemaTestUtils.Foo, SchemaTestUtils.Bar> keyValue = 
msg.getValue();
         assertEquals(keyValue.getKey(), foo);
         assertEquals(keyValue.getValue(), bar);
@@ -291,7 +291,7 @@ public class MessageImplTest {
                 .setProducerName("default");
         builder.setSchemaVersion(new byte[10]);
         MessageImpl<KeyValue<SchemaTestUtils.Foo, SchemaTestUtils.Bar>> msg = 
MessageImpl.create(
-                builder, ByteBuffer.wrap(encodeBytes), keyValueSchema);
+                builder, ByteBuffer.wrap(encodeBytes), keyValueSchema, null);
         KeyValue<SchemaTestUtils.Foo, SchemaTestUtils.Bar> keyValue = 
msg.getValue();
         assertEquals(keyValue.getKey(), foo);
         assertEquals(keyValue.getValue(), bar);
@@ -329,7 +329,7 @@ public class MessageImplTest {
         
builder.setPartitionKey(Base64.getEncoder().encodeToString(fooSchema.encode(foo)));
         builder.setPartitionKeyB64Encoded(true);
         MessageImpl<KeyValue<SchemaTestUtils.Foo, SchemaTestUtils.Bar>> msg = 
MessageImpl.create(
-                builder, ByteBuffer.wrap(encodeBytes), keyValueSchema);
+                builder, ByteBuffer.wrap(encodeBytes), keyValueSchema, null);
         KeyValue<SchemaTestUtils.Foo, SchemaTestUtils.Bar> keyValue = 
msg.getValue();
         assertEquals(keyValue.getKey(), foo);
         assertEquals(keyValue.getValue(), bar);
@@ -364,7 +364,7 @@ public class MessageImplTest {
                 .setProducerName("default");
         builder.setSchemaVersion(new byte[10]);
         MessageImpl<KeyValue<SchemaTestUtils.Foo, SchemaTestUtils.Bar>> msg = 
MessageImpl.create(
-                builder, ByteBuffer.wrap(encodeBytes), keyValueSchema);
+                builder, ByteBuffer.wrap(encodeBytes), keyValueSchema, null);
         KeyValue<SchemaTestUtils.Foo, SchemaTestUtils.Bar> keyValue = 
msg.getValue();
         assertEquals(keyValue.getKey(), foo);
         assertEquals(keyValue.getValue(), bar);
@@ -402,7 +402,7 @@ public class MessageImplTest {
         
builder.setPartitionKey(Base64.getEncoder().encodeToString(fooSchema.encode(foo)));
         builder.setPartitionKeyB64Encoded(true);
         MessageImpl<KeyValue<SchemaTestUtils.Foo, SchemaTestUtils.Bar>> msg = 
MessageImpl.create(
-                builder, ByteBuffer.wrap(encodeBytes), keyValueSchema);
+                builder, ByteBuffer.wrap(encodeBytes), keyValueSchema, null);
         KeyValue<SchemaTestUtils.Foo, SchemaTestUtils.Bar> keyValue = 
msg.getValue();
         assertEquals(keyValue.getKey(), foo);
         assertEquals(keyValue.getValue(), bar);
@@ -421,7 +421,7 @@ public class MessageImplTest {
         
builder.setPartitionKey(Base64.getEncoder().encodeToString(encodeBytes));
         builder.setPartitionKeyB64Encoded(true);
         builder.setNullValue(true);
-        MessageImpl<Boolean> msg = MessageImpl.create(builder, 
ByteBuffer.wrap(encodeBytes), BooleanSchema.of());
+        MessageImpl<Boolean> msg = MessageImpl.create(builder, 
ByteBuffer.wrap(encodeBytes), BooleanSchema.of(), null);
         assertNull(msg.getValue());
     }
 
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageTest.java 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageTest.java
index 15a4f99..6d633e7 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageTest.java
@@ -39,7 +39,7 @@ public class MessageTest {
         String from = "ClusterNameOfReplicatedFrom";
         MessageMetadata builder = new 
MessageMetadata().setReplicatedFrom(from);
         ByteBuffer payload = ByteBuffer.wrap(new byte[0]);
-        Message<byte[]> msg = MessageImpl.create(builder, payload, 
Schema.BYTES);
+        Message<byte[]> msg = MessageImpl.create(builder, payload, 
Schema.BYTES, null);
 
         assertTrue(msg.isReplicated());
         assertEquals(msg.getReplicatedFrom(), from);
@@ -49,7 +49,7 @@ public class MessageTest {
     public void testMessageImplNoReplicatedInfo() {
         MessageMetadata builder = new MessageMetadata();
         ByteBuffer payload = ByteBuffer.wrap(new byte[0]);
-        Message<byte[]> msg = MessageImpl.create(builder, payload, 
Schema.BYTES);
+        Message<byte[]> msg = MessageImpl.create(builder, payload, 
Schema.BYTES, null);
 
         assertFalse(msg.isReplicated());
         assertNull(msg.getReplicatedFrom());
@@ -61,7 +61,7 @@ public class MessageTest {
         String topicName = "myTopic";
         MessageMetadata builder = new 
MessageMetadata().setReplicatedFrom(from);
         ByteBuffer payload = ByteBuffer.wrap(new byte[0]);
-        MessageImpl<byte[]> msg = MessageImpl.create(builder, payload, 
Schema.BYTES);
+        MessageImpl<byte[]> msg = MessageImpl.create(builder, payload, 
Schema.BYTES, null);
         msg.setMessageId(new MessageIdImpl(-1, -1, -1));
         TopicMessageImpl<byte[]> topicMessage = new 
TopicMessageImpl<>(topicName, topicName, msg, null);
 
@@ -74,7 +74,7 @@ public class MessageTest {
         String topicName = "myTopic";
         MessageMetadata builder = new MessageMetadata();
         ByteBuffer payload = ByteBuffer.wrap(new byte[0]);
-        MessageImpl<byte[]> msg = MessageImpl.create(builder, payload, 
Schema.BYTES);
+        MessageImpl<byte[]> msg = MessageImpl.create(builder, payload, 
Schema.BYTES, null);
         msg.setMessageId(new MessageIdImpl(-1, -1, -1));
         TopicMessageImpl<byte[]> topicMessage = new 
TopicMessageImpl<>(topicName, topicName, msg, null);
 
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchemaTest.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchemaTest.java
index fce2ef1..646f657 100644
--- 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchemaTest.java
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchemaTest.java
@@ -38,7 +38,7 @@ public class AutoConsumeSchemaTest {
         Schema<GenericRecord> autoConsumeSchema = new AutoConsumeSchema();
         byte[] bytes = "bytes data".getBytes();
         MessageImpl<GenericRecord> message = MessageImpl.create(
-                new MessageMetadata(), ByteBuffer.wrap(bytes), 
autoConsumeSchema);
+                new MessageMetadata(), ByteBuffer.wrap(bytes), 
autoConsumeSchema, null);
         Assert.assertNull(message.getSchemaVersion());
         GenericRecord genericRecord = message.getValue();
         Assert.assertEquals(genericRecord.getNativeObject(), bytes);

Reply via email to