[ 
https://issues.apache.org/jira/browse/FLINK-38641?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

JunboWang updated FLINK-38641:
------------------------------
    Summary: Unquote double quotes from default values on MySQL  (was: Trim 
surrounding double quotes from default values for BIGINT and SMALLINT types 
when parsing MySQL DDL)

> Unquote double quotes from default values on MySQL
> --------------------------------------------------
>
>                 Key: FLINK-38641
>                 URL: https://issues.apache.org/jira/browse/FLINK-38641
>             Project: Flink
>          Issue Type: Improvement
>          Components: Flink CDC
>    Affects Versions: cdc-3.5.0
>         Environment: Flink CDC 3.5.0
>            Reporter: JunboWang
>            Priority: Major
>              Labels: pull-request-available
>
> Debezium Issue:[https://issues.redhat.com/browse/DBZ-6824]
> [https://github.com/debezium/debezium/pull/4804]
> Users may use DDL statements like:
> {code:java}
> // code placeholder
> alter table test_tbl add column `col4` INT NOT NULL DEFAULT "0";
> alter table test_tbl add column `col4` INT NOT NULL DEFAULT " 0";
> alter table test_tbl add column `col5` DOUBLE NOT NULL DEFAULT " 0.0";
> alter table test_tbl add column `col6` BIGINT NOT NULL DEFAULT " 0"; {code}
> which can lead to exceptions when parsing due to the surrounding double 
> quotes and leading/trailing spaces in default values.
> {code:java}
> // code placeholder
> 18445 [Source Data Fetcher for Source: default_value_test[1] -> 
> ConstraintEnforcer[2] (1/4)#0] ERROR 
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager - 
> Received uncaught exception.
> java.lang.RuntimeException: SplitFetcher thread 0 received unexpected 
> exception while polling the records
>     at 
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:168)
>  ~[flink-connector-base-1.18.0-qiyi-1.jar:1.18.0-qiyi-1]
>     at 
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:117)
>  [flink-connector-base-1.18.0-qiyi-1.jar:1.18.0-qiyi-1]
>     at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
> [?:1.8.0_401]
>     at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266) 
> [?:1.8.0_401]
>     at java.util.concurrent.FutureTask.run(FutureTask.java) [?:1.8.0_401]
>     at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  [?:1.8.0_401]
>     at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  [?:1.8.0_401]
>     at java.lang.Thread.run(Thread.java:750) [?:1.8.0_401]
> Caused by: org.apache.kafka.connect.errors.ConnectException: An exception 
> occurred in the change event producer. This connector will be stopped.
>     at 
> io.debezium.pipeline.ErrorHandler.setProducerThrowable(ErrorHandler.java:50) 
> ~[debezium-core-1.9.8.Final.jar:1.9.8.Final]
>     at 
> org.apache.flink.cdc.connectors.mysql.debezium.task.context.MySqlErrorHandler.setProducerThrowable(MySqlErrorHandler.java:86)
>  ~[classes/:?]
>     at 
> io.debezium.connector.mysql.MySqlStreamingChangeEventSource.handleEvent(MySqlStreamingChangeEventSource.java:430)
>  ~[classes/:?]
>     at 
> org.apache.flink.cdc.connectors.mysql.debezium.task.MySqlBinlogSplitReadTask.handleEvent(MySqlBinlogSplitReadTask.java:96)
>  ~[classes/:?]
>     at 
> io.debezium.connector.mysql.MySqlStreamingChangeEventSource.lambda$execute$25(MySqlStreamingChangeEventSource.java:1096)
>  ~[classes/:?]
>     at 
> com.github.shyiko.mysql.binlog.BinaryLogClient.notifyEventListeners(BinaryLogClient.java:1246)
>  ~[mysql-binlog-connector-java-0.27.2.jar:0.27.2]
>     at 
> com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:1072)
>  ~[mysql-binlog-connector-java-0.27.2.jar:0.27.2]
>     at 
> com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:631)
>  ~[mysql-binlog-connector-java-0.27.2.jar:0.27.2]
>     at 
> com.github.shyiko.mysql.binlog.BinaryLogClient$7.run(BinaryLogClient.java:932)
>  ~[mysql-binlog-connector-java-0.27.2.jar:0.27.2]
>     ... 1 more
> Caused by: io.debezium.DebeziumException: Error processing binlog event
>     at 
> io.debezium.connector.mysql.MySqlStreamingChangeEventSource.handleEvent(MySqlStreamingChangeEventSource.java:430)
>  ~[classes/:?]
>     at 
> org.apache.flink.cdc.connectors.mysql.debezium.task.MySqlBinlogSplitReadTask.handleEvent(MySqlBinlogSplitReadTask.java:96)
>  ~[classes/:?]
>     at 
> io.debezium.connector.mysql.MySqlStreamingChangeEventSource.lambda$execute$25(MySqlStreamingChangeEventSource.java:1096)
>  ~[classes/:?]
>     at 
> com.github.shyiko.mysql.binlog.BinaryLogClient.notifyEventListeners(BinaryLogClient.java:1246)
>  ~[mysql-binlog-connector-java-0.27.2.jar:0.27.2]
>     at 
> com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:1072)
>  ~[mysql-binlog-connector-java-0.27.2.jar:0.27.2]
>     at 
> com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:631)
>  ~[mysql-binlog-connector-java-0.27.2.jar:0.27.2]
>     at 
> com.github.shyiko.mysql.binlog.BinaryLogClient$7.run(BinaryLogClient.java:932)
>  ~[mysql-binlog-connector-java-0.27.2.jar:0.27.2]
>     ... 1 more
> Caused by: io.debezium.DebeziumException: java.lang.NumberFormatException: 
> For input string: "" 0 ""
>     at 
> io.debezium.connector.mysql.MySqlStreamingChangeEventSource.lambda$handleQueryEvent$2(MySqlStreamingChangeEventSource.java:675)
>  ~[classes/:?]
>     at 
> org.apache.flink.cdc.connectors.mysql.debezium.dispatcher.EventDispatcherImpl.dispatchSchemaChangeEvent(EventDispatcherImpl.java:145)
>  ~[classes/:?]
>     at 
> org.apache.flink.cdc.connectors.mysql.debezium.dispatcher.EventDispatcherImpl.dispatchSchemaChangeEvent(EventDispatcherImpl.java:64)
>  ~[classes/:?]
>     at 
> io.debezium.connector.mysql.MySqlStreamingChangeEventSource.handleQueryEvent(MySqlStreamingChangeEventSource.java:668)
>  ~[classes/:?]
>     at 
> io.debezium.connector.mysql.MySqlStreamingChangeEventSource.lambda$execute$14(MySqlStreamingChangeEventSource.java:1050)
>  ~[classes/:?]
>     at 
> io.debezium.connector.mysql.MySqlStreamingChangeEventSource.handleEvent(MySqlStreamingChangeEventSource.java:409)
>  ~[classes/:?]
>     at 
> org.apache.flink.cdc.connectors.mysql.debezium.task.MySqlBinlogSplitReadTask.handleEvent(MySqlBinlogSplitReadTask.java:96)
>  ~[classes/:?]
>     at 
> io.debezium.connector.mysql.MySqlStreamingChangeEventSource.lambda$execute$25(MySqlStreamingChangeEventSource.java:1096)
>  ~[classes/:?]
>     at 
> com.github.shyiko.mysql.binlog.BinaryLogClient.notifyEventListeners(BinaryLogClient.java:1246)
>  ~[mysql-binlog-connector-java-0.27.2.jar:0.27.2]
>     at 
> com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:1072)
>  ~[mysql-binlog-connector-java-0.27.2.jar:0.27.2]
>     at 
> com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:631)
>  ~[mysql-binlog-connector-java-0.27.2.jar:0.27.2]
>     at 
> com.github.shyiko.mysql.binlog.BinaryLogClient$7.run(BinaryLogClient.java:932)
>  ~[mysql-binlog-connector-java-0.27.2.jar:0.27.2]
>     ... 1 more
> Caused by: java.lang.NumberFormatException: For input string: "" 0 ""
>     at 
> sun.misc.FloatingDecimal.readJavaFormatString(FloatingDecimal.java:2043) 
> ~[?:1.8.0_401]
>     at sun.misc.FloatingDecimal.parseDouble(FloatingDecimal.java:110) 
> ~[?:1.8.0_401]
>     at java.lang.Double.parseDouble(Double.java:538) ~[?:1.8.0_401]
>     at 
> io.debezium.connector.mysql.MySqlValueConverters.convertSmallInt(MySqlValueConverters.java:459)
>  ~[classes/:?]
>     at 
> io.debezium.jdbc.JdbcValueConverters.lambda$converter$4(JdbcValueConverters.java:293)
>  ~[debezium-core-1.9.8.Final.jar:1.9.8.Final]
>     at 
> io.debezium.connector.mysql.MySqlDefaultValueConverter.parseDefaultValue(MySqlDefaultValueConverter.java:140)
>  ~[classes/:?]
>     at 
> io.debezium.relational.TableSchemaBuilder.lambda$addField$9(TableSchemaBuilder.java:393)
>  ~[debezium-core-1.9.8.Final.jar:?]
>     at java.util.Optional.flatMap(Optional.java:241) ~[?:1.8.0_401]
>     at 
> io.debezium.relational.TableSchemaBuilder.addField(TableSchemaBuilder.java:393)
>  ~[debezium-core-1.9.8.Final.jar:?]
>     at 
> io.debezium.relational.TableSchemaBuilder.lambda$create$2(TableSchemaBuilder.java:149)
>  ~[debezium-core-1.9.8.Final.jar:?]
>     at 
> java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184) 
> ~[?:1.8.0_401]
>     at 
> java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175) 
> ~[?:1.8.0_401]
>     at 
> java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1384)
>  ~[?:1.8.0_401]
>     at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482) 
> ~[?:1.8.0_401]
>     at 
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472) 
> ~[?:1.8.0_401]
>     at 
> java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:151) 
> ~[?:1.8.0_401]
>     at 
> java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:174)
>  ~[?:1.8.0_401]
>     at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) 
> ~[?:1.8.0_401]
>     at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:418) 
> ~[?:1.8.0_401]
>     at 
> io.debezium.relational.TableSchemaBuilder.create(TableSchemaBuilder.java:147) 
> ~[debezium-core-1.9.8.Final.jar:?]
>     at 
> io.debezium.relational.RelationalDatabaseSchema.buildAndRegisterSchema(RelationalDatabaseSchema.java:135)
>  ~[debezium-core-1.9.8.Final.jar:?]
>     at 
> io.debezium.connector.mysql.MySqlDatabaseSchema.lambda$applySchemaChange$2(MySqlDatabaseSchema.java:179)
>  ~[debezium-connector-mysql-1.9.8.Final.jar:?]
>     at java.lang.Iterable.forEach(Iterable.java:75) ~[?:1.8.0_401]
>     at 
> io.debezium.connector.mysql.MySqlDatabaseSchema.applySchemaChange(MySqlDatabaseSchema.java:179)
>  ~[debezium-connector-mysql-1.9.8.Final.jar:?]
>     at 
> org.apache.flink.cdc.connectors.mysql.debezium.dispatcher.EventDispatcherImpl$SchemaChangeEventReceiver.schemaChangeEvent(EventDispatcherImpl.java:216)
>  ~[classes/:?]
>     at 
> io.debezium.connector.mysql.MySqlStreamingChangeEventSource.lambda$handleQueryEvent$2(MySqlStreamingChangeEventSource.java:673)
>  ~[classes/:?]
>     at 
> org.apache.flink.cdc.connectors.mysql.debezium.dispatcher.EventDispatcherImpl.dispatchSchemaChangeEvent(EventDispatcherImpl.java:145)
>  ~[classes/:?]
>     at 
> org.apache.flink.cdc.connectors.mysql.debezium.dispatcher.EventDispatcherImpl.dispatchSchemaChangeEvent(EventDispatcherImpl.java:64)
>  ~[classes/:?]
>     at 
> io.debezium.connector.mysql.MySqlStreamingChangeEventSource.handleQueryEvent(MySqlStreamingChangeEventSource.java:668)
>  ~[classes/:?]
>     at 
> io.debezium.connector.mysql.MySqlStreamingChangeEventSource.lambda$execute$14(MySqlStreamingChangeEventSource.java:1050)
>  ~[classes/:?]
>     at 
> io.debezium.connector.mysql.MySqlStreamingChangeEventSource.handleEvent(MySqlStreamingChangeEventSource.java:409)
>  ~[classes/:?]
>     at 
> org.apache.flink.cdc.connectors.mysql.debezium.task.MySqlBinlogSplitReadTask.handleEvent(MySqlBinlogSplitReadTask.java:96)
>  ~[classes/:?]
>     at 
> io.debezium.connector.mysql.MySqlStreamingChangeEventSource.lambda$execute$25(MySqlStreamingChangeEventSource.java:1096)
>  ~[classes/:?]
>     at 
> com.github.shyiko.mysql.binlog.BinaryLogClient.notifyEventListeners(BinaryLogClient.java:1246)
>  ~[mysql-binlog-connector-java-0.27.2.jar:0.27.2]
>     at 
> com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:1072)
>  ~[mysql-binlog-connector-java-0.27.2.jar:0.27.2]
>     at 
> com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:631)
>  ~[mysql-binlog-connector-java-0.27.2.jar:0.27.2]
>     at 
> com.github.shyiko.mysql.binlog.BinaryLogClient$7.run(BinaryLogClient.java:932)
>  ~[mysql-binlog-connector-java-0.27.2.jar:0.27.2]
>     ... 1 more {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to