Hi robin, Seems i didn't make it clear.
Actually we still use jdbc sink connector. But we want to use the JDBC Sink function in our own distributed platform intead of kafka connector I want to consolidate the code here: https://github.com/confluentinc/kafka-connect-jdbc/ Receive kafka avro record, add it to JDBCSinkTask, the task will automatically generate the sql and execute it according to the schema registry. Seems i can do it like this. But i am not able to transform GenericRecord to SinkRecord. JdbcSinkTask task = new JdbcSinkTask(); task.start(props2); consumer.subscribe(Collections.singletonList("orders")); while (true) { final ConsumerRecords<String, GenericRecord> records = consumer.poll(Duration.ofMillis(100)); for (final ConsumerRecord<String, GenericRecord> record : records) { final String key = record.key(); final GenericRecord value = record.value(); change the GenericRecord to sinkRecord task.put(Arrays.asList(sinkRecord)); } } Thanks, Lei [email protected] From: Robin Moffatt Date: 2020-05-11 16:40 To: users Subject: Re: Write to database directly by referencing schema registry, no jdbc sink connector > wirite to target database. I want to use self-written java code instead of kafka jdbc sink connector. Out of interest, why do you want to do this? Why not use the JDBC sink connector (or a fork of it if you need to amend its functionality)? -- Robin Moffatt | Senior Developer Advocate | [email protected] | @rmoff On Sat, 9 May 2020 at 03:38, [email protected] < [email protected]> wrote: > > Using debezium to parse binlog, using avro serialization and send to kafka. > > Need to consume the avro serialized binlog data and wirite to target > database > I want to use self-written java code instead of kafka jdbc sink > connector. > > How can i reference the schema registry, convert a kafka message to > corresponding table record and write to corresponding table? > Is there any example code to do this ? > > Thanks, > Lei > > > > [email protected] > >
