This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-4.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit c7abb70de42ce9739ef23facb105232522016603 Author: Oneby <[email protected]> AuthorDate: Wed Nov 26 13:24:04 2025 +0800 [fix][broker] Add schema version in rest produce api (#25004) (cherry picked from commit a44e60da13b17f9ec815b4ad413e1b7ff8010e6d) --- .../org/apache/pulsar/broker/rest/TopicsBase.java | 7 +-- .../org/apache/pulsar/broker/admin/TopicsTest.java | 63 ++++++++++++++++++++++ 2 files changed, 67 insertions(+), 3 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/rest/TopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/rest/TopicsBase.java index 47067c55005..425e715a1e5 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/rest/TopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/rest/TopicsBase.java @@ -196,7 +196,7 @@ public class TopicsBase extends PersistentTopicsBase { try { String producerName = (null == request.getProducerName() || request.getProducerName().isEmpty()) ? defaultProducerName : request.getProducerName(); - List<Message> messages = buildMessage(request, schema, producerName, topicName); + List<Message> messages = buildMessage(request, schema, producerName, topicName, schemaVersion); List<CompletableFuture<Position>> publishResults = new ArrayList<>(); List<ProducerAck> produceMessageResults = new ArrayList<>(); for (int index = 0; index < messages.size(); index++) { @@ -237,7 +237,7 @@ public class TopicsBase extends PersistentTopicsBase { try { String producerName = (null == request.getProducerName() || request.getProducerName().isEmpty()) ? defaultProducerName : request.getProducerName(); - List<Message> messages = buildMessage(request, schema, producerName, topicName); + List<Message> messages = buildMessage(request, schema, producerName, topicName, schemaVersion); List<CompletableFuture<Position>> publishResults = new ArrayList<>(); List<ProducerAck> produceMessageResults = new ArrayList<>(); // Try to publish messages to all partitions this broker owns in round robin mode. @@ -627,7 +627,7 @@ public class TopicsBase extends PersistentTopicsBase { // Build pulsar message from REST request. private List<Message> buildMessage(ProducerMessages producerMessages, Schema schema, - String producerName, TopicName topicName) { + String producerName, TopicName topicName, SchemaVersion schemaVersion) { List<ProducerMessage> messages; List<Message> pulsarMessages = new ArrayList<>(); @@ -637,6 +637,7 @@ public class TopicsBase extends PersistentTopicsBase { messageMetadata.setProducerName(producerName); messageMetadata.setPublishTime(System.currentTimeMillis()); messageMetadata.setSequenceId(message.getSequenceId()); + messageMetadata.setSchemaVersion(schemaVersion.bytes()); if (null != message.getReplicationClusters()) { messageMetadata.addAllReplicateTos(message.getReplicationClusters()); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicsTest.java index 72426110224..bcbd133ce5c 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicsTest.java @@ -86,6 +86,7 @@ import org.apache.pulsar.common.policies.data.ClusterDataImpl; import org.apache.pulsar.common.policies.data.TenantInfoImpl; import org.apache.pulsar.common.schema.KeyValue; import org.apache.pulsar.common.schema.KeyValueEncodingType; +import org.apache.pulsar.common.schema.SchemaInfoWithVersion; import org.apache.pulsar.common.util.ObjectMapperFactory; import org.apache.pulsar.websocket.data.ProducerAcks; import org.apache.pulsar.websocket.data.ProducerMessage; @@ -868,4 +869,66 @@ public class TopicsTest extends MockedPulsarServiceBaseTest { + "kv.encoding.type=SEPARATED, key.schema.type=STRING}) to topic persistent:" + "//my-tenant/my-namespace/my-topic")); } + + @Test + public void testProduceWithAutoConsumeSchema() throws Exception { + String topicName = "persistent://" + testTenant + "/" + testNamespace + "/" + testTopicName; + admin.topics().createNonPartitionedTopic(topicName); + GenericSchema jsonSchema = GenericJsonSchema.of(JSONSchema.of(SchemaDefinition.builder() + .withPojo(PC.class).build()).getSchemaInfo()); + // use producer to create topic schema + Producer producer = pulsarClient.newProducer(jsonSchema).topic(topicName).create(); + producer.close(); + + PC pc = new PC("dell", "alienware", 2021, GPU.AMD, + new Seller("WA", "main street", 98004)); + PC anotherPc = new PC("asus", "rog", 2020, GPU.NVIDIA, + new Seller("CA", "back street", 90232)); + + @Cleanup + Consumer<?> consumer = pulsarClient.newConsumer(Schema.AUTO_CONSUME()) + .topic(topicName) + .subscriptionName("auto-schema-sub") + .subscriptionType(SubscriptionType.Exclusive) + .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) + .subscribe(); + + SchemaInfoWithVersion schemaInfoWithVersion = admin.schemas().getSchemaInfoWithVersion(topicName); + AsyncResponse asyncResponse = mock(AsyncResponse.class); + ProducerMessages producerMessages = new ProducerMessages(); + producerMessages.setSchemaVersion(schemaInfoWithVersion.getVersion()); + String message = "[" + + "{\"key\":\"my-key\",\"payload\":\"" + + ObjectMapperFactory.getMapper().writer().writeValueAsString(pc).replace("\"", "\\\"") + + "\",\"eventTime\":1603045262772,\"sequenceId\":1}," + + "{\"key\":\"my-key\",\"payload\":\"" + + ObjectMapperFactory.getMapper().writer().writeValueAsString(anotherPc).replace("\"", "\\\"") + + "\",\"eventTime\":1603045262772,\"sequenceId\":2}]"; + producerMessages.setMessages(createMessages(message)); + topics.produceOnPersistentTopic(asyncResponse, testTenant, testNamespace, testTopicName, false, + producerMessages); + ArgumentCaptor<Response> responseCaptor = ArgumentCaptor.forClass(Response.class); + verify(asyncResponse, timeout(5000).times(1)).resume(responseCaptor.capture()); + Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.OK.getStatusCode()); + + List<PC> expected = Arrays.asList(pc, anotherPc); + for (int i = 0; i < 2; i++) { + Message<?> msg = consumer.receive(2, TimeUnit.SECONDS); + Assert.assertTrue(msg.getValue() instanceof GenericJsonRecord); + GenericJsonRecord genericJsonRecord = (GenericJsonRecord) msg.getValue(); + Assert.assertEquals(genericJsonRecord.getField("brand"), expected.get(i).getBrand()); + Assert.assertEquals(genericJsonRecord.getField("model"), expected.get(i).getModel()); + Number year = (Number) genericJsonRecord.getField("year"); + Assert.assertEquals(year.intValue(), expected.get(i).getYear()); + Assert.assertEquals(genericJsonRecord.getField("gpu"), expected.get(i).getGpu().name()); + Object seller = genericJsonRecord.getField("seller"); + Assert.assertTrue(seller instanceof GenericJsonRecord); + GenericJsonRecord sellerGenericJsonRecord = (GenericJsonRecord) seller; + Assert.assertEquals(sellerGenericJsonRecord.getField("state"), expected.get(i).getSeller().getState()); + Assert.assertEquals(sellerGenericJsonRecord.getField("street"), expected.get(i).getSeller().getStreet()); + Number zipCode = (Number) sellerGenericJsonRecord.getField("zipCode"); + Assert.assertEquals(zipCode.longValue(), expected.get(i).getSeller().getZipCode()); + } + } + }
