GitHub user pointearth created a discussion: Pulsar function Should support
Generic output Message
**Is your feature request related to a problem? Please describe.**
In the pulsar function, I don't want to define a particular class as O in
`public interface Function<I, O> {
O process(I var1, Context var2) throws Exception;
}`
because I have a POJO class, but I always want to push a wrap class to wrap the
POJO class as output. because the only different thing between this wrap class
and the POJO class is wrap class always adds a JSON field to any of POJO class.
We can prodcue a Generic message with pulsar.client.api:
`RecordSchemaBuilder recordSchemaBuilder =
SchemaBuilder.record(USER_CONST.SCHEMA_NAME);
recordSchemaBuilder.field(USER_CONST.AGE).type(SchemaType.INT32);
recordSchemaBuilder.field(USER_CONST.NAME).type(SchemaType.STRING);
recordSchemaBuilder.field(USER_CONST.JSON_FOR_EVENT).type(SchemaType.STRING);
SchemaInfo schemaInfo = recordSchemaBuilder.build(SchemaType.AVRO);
GenericSchema userSchema = Schema.generic(schemaInfo);
Producer<GenericRecord> producer = pulsarClient.newProducer( userSchema)
.topic(TOPIC_CONST.TOPIC_FUN_GENERIC)
.create();
User user = new User();
user.age = 27;
user.name = "Simon Generic";
Gson gson = new Gson();
String jsonForEvent = gson.toJson(user);
GenericRecordBuilder genericRecordBuilder =
userSchema.newRecordBuilder();
GenericRecord genericRecord = genericRecordBuilder
.set(USER_CONST.AGE, user.age)
.set(USER_CONST.NAME,user.name)
.set(USER_CONST.JSON_FOR_EVENT, jsonForEvent)
.build();`
but, I am trying many ways to produce dynamic object into a pulsar, just like:
`public class FunWrapGeneric implements Function<String, GenericRecord> {
@Override
public GenericRecord process(String input, Context context) throws
Exception {`
but I always got failed. Can we support this?
**Describe the solution you'd like**
1. Add GenericRecord support for pulsar function
`public class FunWrapGeneric implements Function<String, GenericRecord>`
2. or Add a generic Class
`public class FunWrapGeneric implements Function<String, RecordWrap<T>> {
@Override
public RecordWrap<T> process(String input, Context context) throws
Exception {`
and interface RecordWrap
support add or reduce fields from T.
3. or Add an override of pulsar function want a Generic class as a parameter of
pulsar function
`FunWrapGeneric implements Function<String,Schema>`
cc @sijie @codelipenghui
GitHub link: https://github.com/apache/pulsar/discussions/18875
----
This is an automatically sent email for [email protected].
To unsubscribe, please send an email to: [email protected]