justintime4tea opened a new issue, #24966: URL: https://github.com/apache/pulsar/issues/24966
### Search before reporting - [x] I searched in the [issues](https://github.com/apache/pulsar/issues) and found nothing similar. ### Read release policy - [x] I understand that [unsupported versions](https://pulsar.apache.org/contribute/release-policy/#supported-versions) don't get bug fixes. I will attempt to reproduce the issue on a supported version of Pulsar client and Pulsar broker. ### User environment - K8s via apachepulsar/pulsar Helm chart - Java, OS, etc... are all determined by Apache Pulsar Helm chart / container specs ### Issue Description Messages published via Pulsar REST endpoint do not have schema properly associated with the message/payload. This is evident when using the "bin/pulsar-consumer" client but more importantly this affects sink connectors like JDBC. Documentation states you should be able to supply either a schema version _or_ an entire schema definition but neither seem to work as intended (see repro below). * [Pulsar Rest Docs](https://pulsar.apache.org/docs/4.1.x/client-libraries-rest/) ## Use case We want to publish messages to Pulsar HTTP endpoint and have them consumed by a Pulsar JDBC sink finally being written to a database. This doesn't work without proper schema support by the REST endpoint. To be clear - this is an issue with the REST endpoint itself and is not exclusive to JDBC setups, notice the proper output/handling via "bin/pulsar-client" when consuming the same message produced by the node JS client VS REST. ### Error messages ```text ``` ### Reproducing the issue ## Reproduction Reproduction has been done by "exec-ing" into the Pulsar broker pod like so: ```shell kubectl exec -it pulsar-broker-0 -- bash ``` All commands below can be ran from within the official Pulsar deployment using "kubectl exec" - this is the official Apache Pulsar helm distribution. ### Setup tenant, namespace, topic, and schema ```shell # Create test tenant bin/pulsar-admin tenants create test-tenant # Create test namespace bin/pulsar-admin namespaces create test-tenant/test-namespace # Create test topic bin/pulsar-admin topics create persistent://test-tenant/test-namespace/test-topic # Set schema for topic curl -X POST \ http://localhost:8080/admin/v2/schemas/test-tenant/test-namespace/test-topic/schema \ -H "Content-Type: application/json" \ -d '{"type":"JSON","schema":"{\"type\":\"record\",\"name\":\"test_schema\",\"fields\":[{\"name\":\"firstName\",\"type\":\"string\"}]}"}' # Confirm schema is associated with topic bin/pulsar-admin schemas get persistent://test-tenant/test-namespace/test-topic # Sample output: # # { # "version": 1, # "schemaInfo": { # "name": "test-topic", # "schema": { # "type": "record", # "name": "test_schema", # "fields": [ # { # "name": "firstName", # "type": "string" # } # ] # }, # "type": "JSON", # "timestamp": 1762811737314, # "properties": {} # } # } ``` ### Testing with simple "bin/pulsar-client" ```shell # In one terminal - run consumer - notice the schema associated with the topic in the CLI output bin/pulsar-client consume \ persistent://test-tenant/test-namespace/test-topic \ -s verify-sub -n 1 --schema-type auto_consume # From a different terminal - post a message to the Pulsar REST API curl -X POST \ http://localhost:8080/topics/persistent/test-tenant/test-namespace/test-topic \ -H "Content-Type: application/json" \ -d '{ "schemaVersion": 0, "messages": [ { "payload": "{\"firstName\":\"Justin\"}" } ] }' # Notice the output with type=class java.lang.String - Even when providing schema directly to POST # # publishTime:[1762816147331], eventTime:[0], key:[null], properties:[], content:{type=class java.lang.String, value={"firstName":"Justin"}} ``` ### Testing with JDBC sink ```shell # Create JDBC Sink ./bin/pulsar-admin sinks create \ --name test-schema \ --tenant test-tenant \ --namespace billing \ --inputs persistent://test-tenant/test-namespace/test-topic \ --sink-type jdbc-postgres \ --sink-config '{"userName": "USERNAME", "password": "PASSWORD", "jdbcUrl": "jdbc:postgresql://your-database-url:5432/db_name", "tableName": "test-schema"}' # Inspect logs and see that schema is recognized cat /pulsar/logs/functions/test-tenant/test-namespace/test-schema/test-schema-0.log # Send message via REST API (neither schemaVersion OR valueSchema work) curl -X POST \ http://localhost:8080/topics/persistent/test-tenant/test-namespace/test-topic \ -H "Content-Type: application/json" \ -d '{ "schemaVersion": 0, "messages": [ { "payload": "{\"firstName\":\"Justin\"}" } ] }' # See the failure messages in JDBC Sink logs cat /pulsar/logs/functions/test-tenant/test-namespace/test-schema/test-schema-0.log # java.lang.UnsupportedOperationException: Primitive schema is not supported: BYTES # at org.apache.pulsar.io.jdbc.BaseJdbcAutoSchemaSink.createMutation(BaseJdbcAutoSchemaSink.java:201) ~[pulsar-io-jdbc-core-4.1.1.jar:?] # at java.util.stream.ReferencePipeline$3$1.accept(Unknown Source) ~[?:?] # at java.util.LinkedList$LLSpliterator.forEachRemaining(Unknown Source) ~[?:?] # at java.util.stream.AbstractPipeline.copyInto(Unknown Source) ~[?:?] # at java.util.stream.AbstractPipeline.wrapAndCopyInto(Unknown Source) ~[?:?] # at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(Unknown Source) ~[?:?] # at java.util.stream.AbstractPipeline.evaluate(Unknown Source) ~[?:?] # at java.util.stream.ReferencePipeline.collect(Unknown Source) ~[?:?] # at org.apache.pulsar.io.jdbc.JdbcAbstractSink.flush(JdbcAbstractSink.java:256) ~[pulsar-io-jdbc-core-4.1.1.jar:?] # at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) ~[?:?] # at java.util.concurrent.FutureTask.runAndReset(Unknown Source) ~[?:?] # at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source) ~[?:?] # at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) ~[?:?] # at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) ~[?:?] # at java.lang.Thread.run(Unknown Source) [?:?] ``` ### Test using node "pulsar-client" When using nodes "pulsar-client" the schema is properly associated with the message (though you have to use a weird casing for the "type" and use "schemaType" instead of "type" in the schemas definition... 0_0 This is properly recognized by Pulsar and the JDBC sink and records will be written to the database as expected. ```typescript const {Client} = require('pulsar-client') const client = new Client({ serviceUrl: config.get('pulsar-client-service-url') }) const producer = await client.createProducer({ topic: 'persistent://test-tenant/test-namespace/test-topic' , accessMode: 'Shared' , schema: { schemaType: 'Json' , schema: JSON.stringify({ type: 'record' , name: 'test_schema' , fields: [ {name: 'firstName', type: 'string'} ] }) } }) await producer.send({ data: Buffer.from(JSON.stringify({ firstName: 'Justin' })) }) await producer.flush() ``` The output in the "bin/pulsar-client" consumer is different when consuming from the node-js sent message and it's clear that schema has been associated with the message properly: ```shell 2025-11-10T22:52:44,654+0000 [main] INFO org.apache.pulsar.client.impl.schema.AutoConsumeSchema - Configure topic schema \x00\x00\x00\x00\x00\x00\x00\x00 for topic persistent://test-tenant/test-namespace/test-topic : {"type":"record","name":"IngressUsage","fields":[{"name":"firstName","type":"string"}]} 2025-11-10T22:52:44,655+0000 [main] INFO org.apache.pulsar.client.impl.schema.generic.MultiVersionGenericAvroReader - Load schema reader for version(0), schema is : {"type":"record","name":"IngressUsage","fields":[{"name":"firstName","type":"string"}]} publishTime:[1762814949124], eventTime:[0], key:[null], properties:[], content:{firstName=Justin} ``` ### Additional information _No response_ ### Are you willing to submit a PR? - [ ] I'm willing to submit a PR! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
