[ 
https://issues.apache.org/jira/browse/FLINK-36414?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17886557#comment-17886557
 ] 

Yang Hua Jie commented on FLINK-36414:
--------------------------------------

I found it's during to flink-json in-compatiblity causes this issue, downgraded 
flink to 1.18, resolved this error. 

 

However, the CDC doesn't seems to work well with kafka as sink, it works fine 
while sinking to doris.

> NoSuchMethodError while trying flink cdc 
> -----------------------------------------
>
>                 Key: FLINK-36414
>                 URL: https://issues.apache.org/jira/browse/FLINK-36414
>             Project: Flink
>          Issue Type: Bug
>          Components: Flink CDC
>    Affects Versions: cdc-3.2.0
>            Reporter: Yang Hua Jie
>            Priority: Blocker
>
> flink: 1.20.0
> flink-cdc: 3.2.0
> mysql version: 8.0
> kafka version: 2.2.1
>  
> cdc job
> ```
> source:
> type: mysql
> hostname: localhost
> port: 3306
> username: root
> password: 123456
> tables: app_db.\.*
> server-time-zone: 'UTC'
> sink:
> type: kafka
> name: Kafka Sink
> properties.bootstrap.servers: PLAINTEXT://localhost:9092
> pipeline:
> name: Sync MySQL Database to Kafka
> parallelism: 2
> ```
>  
>  
> Here is the error
>  
> ```
> 2024-10-01 10:05:14
> java.lang.NoSuchMethodError: 'void 
> org.apache.flink.formats.json.JsonRowDataSerializationSchema.<init>(org.apache.flink.table.types.logical.RowType,
>  org.apache.flink.formats.common.TimestampFormat, 
> org.apache.flink.formats.json.JsonFormatOptions$MapNullKeyMode, 
> java.lang.String, boolean)'
> at 
> org.apache.flink.cdc.connectors.kafka.serialization.JsonSerializationSchema.buildSerializationForPrimaryKey(JsonSerializationSchema.java:134)
> at 
> org.apache.flink.cdc.connectors.kafka.serialization.JsonSerializationSchema.serialize(JsonSerializationSchema.java:101)
> at 
> org.apache.flink.cdc.connectors.kafka.serialization.JsonSerializationSchema.serialize(JsonSerializationSchema.java:47)
> at 
> org.apache.flink.cdc.connectors.kafka.sink.PipelineKafkaRecordSerializationSchema.serialize(PipelineKafkaRecordSerializationSchema.java:99)
> at 
> org.apache.flink.cdc.connectors.kafka.sink.PipelineKafkaRecordSerializationSchema.serialize(PipelineKafkaRecordSerializationSchema.java:44)
> at 
> org.apache.flink.cdc.connectors.kafka.shaded.org.apache.flink.connector.kafka.sink.KafkaWriter.write(KafkaWriter.java:196)
> at 
> org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.processElement(SinkWriterOperator.java:160)
> at 
> org.apache.flink.cdc.runtime.operators.sink.DataSinkWriterOperator.processElement(DataSinkWriterOperator.java:163)
> at 
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:75)
> at 
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:50)
> at 
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
> at 
> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:38)
> at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:238)
> at 
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:157)
> at 
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:114)
> at 
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:638)
> at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:973)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:917)
> at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:970)
> at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:949)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:763)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
> at java.base/java.lang.Thread.run(Thread.java:829)
> ```



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to