gmiklos-ltg opened a new pull request, #20115:
URL: https://github.com/apache/pulsar/pull/20115
Fixes #5350
### Motivation
While evaluating Pulsar Functions for our data transformation use-cases we
ran into the following startup issue with our function that was using a custom
SerDe class:
`2023-04-12T11:30:35,368+0200 [public/default/user-transform-new-2-0] ERROR
org.apache.pulsar.functions.instance.JavaInstanceRunnable -
[public/default/user-transform-new-2:0] Uncaught exception in Java Instance
java.lang.RuntimeException: User class must be in class path
at
org.apache.pulsar.common.util.Reflections.createInstance(Reflections.java:72)
~[org.apache.pulsar-pulsar-common-2.11.0.jar:2.11.0]
at
org.apache.pulsar.functions.instance.InstanceUtils.createInstance(InstanceUtils.java:92)
~[org.apache.pulsar-pulsar-functions-instance-2.11.0.jar:2.11.0]
at
org.apache.pulsar.functions.instance.InstanceUtils.initializeSerDe(InstanceUtils.java:51)
~[org.apache.pulsar-pulsar-functions-instance-2.11.0.jar:2.11.0]
at
org.apache.pulsar.functions.source.TopicSchema.newSchemaInstance(TopicSchema.java:238)
~[org.apache.pulsar-pulsar-functions-instance-2.11.0.jar:2.11.0]
at
org.apache.pulsar.functions.source.TopicSchema.newSchemaInstance(TopicSchema.java:246)
~[org.apache.pulsar-pulsar-functions-instance-2.11.0.jar:2.11.0]
at
org.apache.pulsar.functions.source.TopicSchema.lambda$getSchema$0(TopicSchema.java:68)
~[org.apache.pulsar-pulsar-functions-instance-2.11.0.jar:2.11.0]
at java.util.HashMap.computeIfAbsent(HashMap.java:1220) ~[?:?]
at
org.apache.pulsar.functions.source.TopicSchema.getSchema(TopicSchema.java:68)
~[org.apache.pulsar-pulsar-functions-instance-2.11.0.jar:2.11.0]
at
org.apache.pulsar.functions.source.PulsarSource.buildPulsarSourceConsumerConfig(PulsarSource.java:176)
~[org.apache.pulsar-pulsar-functions-instance-2.11.0.jar:2.11.0]
at
org.apache.pulsar.functions.source.SingleConsumerPulsarSource.open(SingleConsumerPulsarSource.java:69)
~[org.apache.pulsar-pulsar-functions-instance-2.11.0.jar:2.11.0]
at
org.apache.pulsar.functions.instance.JavaInstanceRunnable.setupInput(JavaInstanceRunnable.java:774)
~[org.apache.pulsar-pulsar-functions-instance-2.11.0.jar:2.11.0]
at
org.apache.pulsar.functions.instance.JavaInstanceRunnable.setup(JavaInstanceRunnable.java:226)
~[org.apache.pulsar-pulsar-functions-instance-2.11.0.jar:2.11.0]
at
org.apache.pulsar.functions.instance.JavaInstanceRunnable.run(JavaInstanceRunnable.java:259)
~[org.apache.pulsar-pulsar-functions-instance-2.11.0.jar:2.11.0]
at java.lang.Thread.run(Thread.java:833) ~[?:?]
Caused by: java.lang.ClassNotFoundException: com.bridge.data.UserMessageSerDe
at java.net.URLClassLoader.findClass(URLClassLoader.java:445) ~[?:?]
at java.lang.ClassLoader.loadClass(ClassLoader.java:587) ~[?:?]
at java.lang.ClassLoader.loadClass(ClassLoader.java:520) ~[?:?]
at java.lang.Class.forName0(Native Method) ~[?:?]
at java.lang.Class.forName(Class.java:467) ~[?:?]
at
org.apache.pulsar.common.util.Reflections.createInstance(Reflections.java:70)
~[org.apache.pulsar-pulsar-common-2.11.0.jar:2.11.0]
... 13 more`
We made it absolutely sure that the class is included in the fat jar that we
have compiled.
config.yaml looks like this:
```
tenant: public
namespace: default
name: user-transform
className: com.bridge.data.TransformUserMessage
inputs:
- "persistent://public/default/debezium.public.users"
customSerdeInputs:
"persistent://public/default/debezium.public.users":
com.bridge.data.UserMessageSerDe
output: "persistent://public/default/debezium.public.users-public"
outputSerdeClassName: com.bridge.data.PublicUserSerDe
jar: "transform-function-v1.jar"
parallelism: 1
retainOrdering: true
```
The mentioned `UserMessageSerDe` class is just a very simple wrapper around
Jackson to deserialize UserMessage data classes.
### Modifications
I have modified TopicSchema in such a way that it accepts a functions
classloader parameter which will be used by `newSchemaInstance()` to resolve
this class. This way our function can now start up without any issues.
### Verifying this change
- [ ] Make sure that the change passes the CI checks.
This change is already covered by existing tests, such as `TopicSchemaTest`.
### Does this pull request potentially affect one of the following parts:
<!-- DO NOT REMOVE THIS SECTION. CHECK THE PROPER BOX ONLY. -->
*If the box was checked, please highlight the changes*
- [ ] Dependencies (add or upgrade a dependency)
- [ ] The public API
- [ ] The schema
- [ ] The default values of configurations
- [ ] The threading model
- [ ] The binary protocol
- [ ] The REST endpoints
- [ ] The admin CLI options
- [ ] The metrics
- [ ] Anything that affects deployment
### Documentation
<!-- DO NOT REMOVE THIS SECTION. CHECK THE PROPER BOX ONLY. -->
- [ ] `doc` <!-- Your PR contains doc changes. -->
- [ ] `doc-required` <!-- Your PR changes impact docs and you will update
later -->
- [x] `doc-not-needed` <!-- Your PR changes do not impact docs -->
- [ ] `doc-complete` <!-- Docs have been already added -->
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]