damienburke opened a new issue, #23953: URL: https://github.com/apache/pulsar/issues/23953
### Search before asking - [x] I searched in the [issues](https://github.com/apache/pulsar/issues) and found nothing similar. ### Motivation When using the Pulsar IO JDBC Sink with PostgreSQL and Avro schemas - if the database has a column that does not have a corresponding field in the schema/message, the sink does not work. I noticed this issue when trying to integrate with a table that has a primary key populated by the db, i.e. using a regular db sequence. The seemingly only workaround is have my app call the db sequence and set the ID in the message before passing it to the sink. This seems a basic use case and would be very useful feature. ### Solution Improve the JDBC Sink to handle cases where messages lack db fields, by introducing additional configuration options: e.g. something like `missingKeyHandlingMode` `FAIL` (default): Keep existing behavior, failing when a db field is missing from message. `IGNORE`: Use db default value. ### Alternatives _No response_ ### Anything else? **Current errors** **Setup** * using Pulsar 4.0.2 * The db table has an id field defined as: `id integer` - with no constraints. * The message/schem does not have an id field. `org.apache.avro.AvroRuntimeException: Not a valid schema field: id at org.apache.avro.generic.GenericData$Record.get(GenericData.java:282) ~[java-instance.jar:?] at org.apache.pulsar.client.impl.schema.generic.GenericAvroRecord.getField(GenericAvroRecord.java:48) ~[pulsar-client-original-4.0.2.jar:4.0.2] at org.apache.pulsar.io.jdbc.BaseJdbcAutoSchemaSink.lambda$createMutation$1(BaseJdbcAutoSchemaSink.java:146) ~[pulsar-io-jdbc-core-4.0.2.jar:?]` I did test `excludeNonDeclaredFields` set to true, which gives error also: `2025-02-08T10:02:47,417+0000 [pool-5-thread-1] ERROR org.apache.pulsar.io.jdbc.JdbcAbstractSink - Got exception No value specified for parameter 1. after 0 ms, failing 1 messages org.postgresql.util.PSQLException: No value specified for parameter 1. at org.postgresql.core.v3.SimpleParameterList.checkAllParametersSet(SimpleParameterList.java:339) ~[postgresql-42.5.5.jar:42.5.5] at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:340) ~[postgresql-42.5.5.jar:42.5.5] at org.postgresql.jdbc.PgStatement.executeInternal(PgStatement.java:496) ~[postgresql-42.5.5.jar:42.5.5]` ** Solution** Improve the JDBC Sink to handle cases where messages lack required key fields, by introducing additional configuration options: e.g. something like `missingKeyHandlingMode` (New Config Option) `FAIL` (default): Keep existing behavior, failing when a db field is missing from message. `IGNORE`: Use db default value. ------------------------------------------------------------ **To Reproduce** Steps to reproduce the behavior: has context menu ### Are you willing to submit a PR? - [x] I'm willing to submit a PR! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
