Hi,
Here is my use case:
Dynamically adding topics for various different types of consumer
capability is easy, but it requires creating a producer for each one.
(I can't pass the topic to the producer....
E.g. producer.send(message, "fruit:orange")
then
producer.send(message, "fruit:apple")
I don't want to have to re-code the application when "banana" comes along
to add another producer.
So, I'd like to use a pulsar function to do the routing from a single
producer to a dynamic second topic, so new consumers only need to process
their own type.
//
producer = pulsarClient.newProducer(AvroSchema.of(Fruit.class))
.producerName("producer:fruit-1")
.topic("fruit")
.create();
producer.newMessage().property("topic2", "fruit:" +
myFruit.type).value(myFruit).send();
Then in my function, I can take the message property "topic2" and re-send a
message on the new topic.
However, is there a way I can avoid having to get involved in SerDe or
Schemas in the function?
The message bytes won't change, and the consumers all have access to an
AvroSchema definition.
I notice in the java Function API I have an (Object input) which I suspect
is an org.Json structure. but the context.newOutputMessage() wants a schema.
I don't really want to have to re-construct a "Fruit" object from org.Json
types, just to then encode it back to the same bytes on the wire.
Is there a shortcut I can just publish a byte[] from the orignal message
input byte[]
Thanks
Rob
Rob Shepherd BEng PhD