This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit c7abb70de42ce9739ef23facb105232522016603
Author: Oneby <[email protected]>
AuthorDate: Wed Nov 26 13:24:04 2025 +0800

    [fix][broker] Add schema version in rest produce api (#25004)
    
    (cherry picked from commit a44e60da13b17f9ec815b4ad413e1b7ff8010e6d)
---
 .../org/apache/pulsar/broker/rest/TopicsBase.java  |  7 +--
 .../org/apache/pulsar/broker/admin/TopicsTest.java | 63 ++++++++++++++++++++++
 2 files changed, 67 insertions(+), 3 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/rest/TopicsBase.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/rest/TopicsBase.java
index 47067c55005..425e715a1e5 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/rest/TopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/rest/TopicsBase.java
@@ -196,7 +196,7 @@ public class TopicsBase extends PersistentTopicsBase {
         try {
             String producerName = (null == request.getProducerName() || 
request.getProducerName().isEmpty())
                     ? defaultProducerName : request.getProducerName();
-            List<Message> messages = buildMessage(request, schema, 
producerName, topicName);
+            List<Message> messages = buildMessage(request, schema, 
producerName, topicName, schemaVersion);
             List<CompletableFuture<Position>> publishResults = new 
ArrayList<>();
             List<ProducerAck> produceMessageResults = new ArrayList<>();
             for (int index = 0; index < messages.size(); index++) {
@@ -237,7 +237,7 @@ public class TopicsBase extends PersistentTopicsBase {
         try {
             String producerName = (null == request.getProducerName() || 
request.getProducerName().isEmpty())
                     ? defaultProducerName : request.getProducerName();
-            List<Message> messages = buildMessage(request, schema, 
producerName, topicName);
+            List<Message> messages = buildMessage(request, schema, 
producerName, topicName, schemaVersion);
             List<CompletableFuture<Position>> publishResults = new 
ArrayList<>();
             List<ProducerAck> produceMessageResults = new ArrayList<>();
             // Try to publish messages to all partitions this broker owns in 
round robin mode.
@@ -627,7 +627,7 @@ public class TopicsBase extends PersistentTopicsBase {
 
     // Build pulsar message from REST request.
     private List<Message> buildMessage(ProducerMessages producerMessages, 
Schema schema,
-                                       String producerName, TopicName 
topicName) {
+                                       String producerName, TopicName 
topicName, SchemaVersion schemaVersion) {
         List<ProducerMessage> messages;
         List<Message> pulsarMessages = new ArrayList<>();
 
@@ -637,6 +637,7 @@ public class TopicsBase extends PersistentTopicsBase {
             messageMetadata.setProducerName(producerName);
             messageMetadata.setPublishTime(System.currentTimeMillis());
             messageMetadata.setSequenceId(message.getSequenceId());
+            messageMetadata.setSchemaVersion(schemaVersion.bytes());
             if (null != message.getReplicationClusters()) {
                 
messageMetadata.addAllReplicateTos(message.getReplicationClusters());
             }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicsTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicsTest.java
index 72426110224..bcbd133ce5c 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicsTest.java
@@ -86,6 +86,7 @@ import org.apache.pulsar.common.policies.data.ClusterDataImpl;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
 import org.apache.pulsar.common.schema.KeyValue;
 import org.apache.pulsar.common.schema.KeyValueEncodingType;
+import org.apache.pulsar.common.schema.SchemaInfoWithVersion;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
 import org.apache.pulsar.websocket.data.ProducerAcks;
 import org.apache.pulsar.websocket.data.ProducerMessage;
@@ -868,4 +869,66 @@ public class TopicsTest extends 
MockedPulsarServiceBaseTest {
                 + "kv.encoding.type=SEPARATED, key.schema.type=STRING}) to 
topic persistent:"
                 + "//my-tenant/my-namespace/my-topic"));
     }
+
+    @Test
+    public void testProduceWithAutoConsumeSchema() throws Exception {
+        String topicName = "persistent://" + testTenant + "/" + testNamespace 
+ "/" + testTopicName;
+        admin.topics().createNonPartitionedTopic(topicName);
+        GenericSchema jsonSchema = 
GenericJsonSchema.of(JSONSchema.of(SchemaDefinition.builder()
+                .withPojo(PC.class).build()).getSchemaInfo());
+        // use producer to create topic schema
+        Producer producer = 
pulsarClient.newProducer(jsonSchema).topic(topicName).create();
+        producer.close();
+
+        PC pc = new PC("dell", "alienware", 2021, GPU.AMD,
+                new Seller("WA", "main street", 98004));
+        PC anotherPc = new PC("asus", "rog", 2020, GPU.NVIDIA,
+                new Seller("CA", "back street", 90232));
+
+        @Cleanup
+        Consumer<?> consumer = pulsarClient.newConsumer(Schema.AUTO_CONSUME())
+                .topic(topicName)
+                .subscriptionName("auto-schema-sub")
+                .subscriptionType(SubscriptionType.Exclusive)
+                
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+                .subscribe();
+
+        SchemaInfoWithVersion schemaInfoWithVersion = 
admin.schemas().getSchemaInfoWithVersion(topicName);
+        AsyncResponse asyncResponse = mock(AsyncResponse.class);
+        ProducerMessages producerMessages = new ProducerMessages();
+        producerMessages.setSchemaVersion(schemaInfoWithVersion.getVersion());
+        String message = "["
+                + "{\"key\":\"my-key\",\"payload\":\""
+                + 
ObjectMapperFactory.getMapper().writer().writeValueAsString(pc).replace("\"", 
"\\\"")
+                + "\",\"eventTime\":1603045262772,\"sequenceId\":1},"
+                + "{\"key\":\"my-key\",\"payload\":\""
+                + 
ObjectMapperFactory.getMapper().writer().writeValueAsString(anotherPc).replace("\"",
 "\\\"")
+                + "\",\"eventTime\":1603045262772,\"sequenceId\":2}]";
+        producerMessages.setMessages(createMessages(message));
+        topics.produceOnPersistentTopic(asyncResponse, testTenant, 
testNamespace, testTopicName, false,
+                producerMessages);
+        ArgumentCaptor<Response> responseCaptor = 
ArgumentCaptor.forClass(Response.class);
+        verify(asyncResponse, 
timeout(5000).times(1)).resume(responseCaptor.capture());
+        Assert.assertEquals(responseCaptor.getValue().getStatus(), 
Response.Status.OK.getStatusCode());
+
+        List<PC> expected = Arrays.asList(pc, anotherPc);
+        for (int i = 0; i < 2; i++) {
+            Message<?> msg = consumer.receive(2, TimeUnit.SECONDS);
+            Assert.assertTrue(msg.getValue() instanceof GenericJsonRecord);
+            GenericJsonRecord genericJsonRecord = (GenericJsonRecord) 
msg.getValue();
+            Assert.assertEquals(genericJsonRecord.getField("brand"), 
expected.get(i).getBrand());
+            Assert.assertEquals(genericJsonRecord.getField("model"), 
expected.get(i).getModel());
+            Number year = (Number) genericJsonRecord.getField("year");
+            Assert.assertEquals(year.intValue(), expected.get(i).getYear());
+            Assert.assertEquals(genericJsonRecord.getField("gpu"), 
expected.get(i).getGpu().name());
+            Object seller = genericJsonRecord.getField("seller");
+            Assert.assertTrue(seller instanceof GenericJsonRecord);
+            GenericJsonRecord sellerGenericJsonRecord = (GenericJsonRecord) 
seller;
+            Assert.assertEquals(sellerGenericJsonRecord.getField("state"), 
expected.get(i).getSeller().getState());
+            Assert.assertEquals(sellerGenericJsonRecord.getField("street"), 
expected.get(i).getSeller().getStreet());
+            Number zipCode = (Number) 
sellerGenericJsonRecord.getField("zipCode");
+            Assert.assertEquals(zipCode.longValue(), 
expected.get(i).getSeller().getZipCode());
+        }
+    }
+
 }

Reply via email to