[ 
https://issues.apache.org/jira/browse/FLINK-38281?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

TemiChan updated FLINK-38281:
-----------------------------
    Fix Version/s:     (was: cdc-3.4.0)

> Schema changes still caused column size does not match the data size after 
> enabling scan.parse.online.schema.changes.enabled(pt-osc,gh-ost)
> -------------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-38281
>                 URL: https://issues.apache.org/jira/browse/FLINK-38281
>             Project: Flink
>          Issue Type: Improvement
>          Components: Flink CDC
>    Affects Versions: cdc-3.3.0, cdc-3.4.0
>            Reporter: TemiChan
>            Priority: Major
>
> During the process of adding a field to the source table in CDC3.3 using 
> pt-osc or gh-ost, if data is inserted into the source table, the error 
> "column size does not match the data size" will be reported by CDC. However, 
> if there is no data change in the source table during the online field 
> addition process, and the data change is made to the source table after the 
> field addition is completed by the tool, no error will be reported.
> *1.while true insert data*
> {{while}} {{{}true{}}}{{{};{}}}{{{}do{}}} {{mysql --login-path=dba doris_sync 
> -Be }}{{{}"insert into test(ptest21,tcc10)select ptest21,tcc10 from test 
> limit 10"{}}}{{{};done;{}}}
> *{{2.add column}}*
> {{{}pt-online-schema-change  {-}{{-}}charset=utf8mb4 
> {-}{{-}}recursion-method=none -{-}no-version-check --skip-check-slave-lag  
> --user=root --password={-}{}}}{{{}"xx"{}}} {{-host=127.0.0.1-}} 
> {{{}u=chenqimi,P={}}}{{{}3006{}}}{{{},D=doris_sync,t={}}}{{{}test{}}}{{{},A=utf8mb4
>  --alter {{}}}}{{{}"add column tax_code VARCHAR(30) NULL "{}}} {{-execute}}
> *3.flinkcdc  error*
> {{java.lang.IllegalStateException: Column size does not match the data size}}
> {{    }}{{{}at 
> org.apache.flink.cdc.common.utils.Preconditions.checkState(Preconditions.java:{}}}{{{}161{}}}{{{}){}}}
> {{    }}{{{}at 
> org.apache.flink.cdc.connectors.doris.sink.DorisEventSerializer.serializerRecord(DorisEventSerializer.java:{}}}{{{}121{}}}{{{}){}}}
> {{    }}{{{}at 
> org.apache.flink.cdc.connectors.doris.sink.DorisEventSerializer.applyDataChangeEvent(DorisEventSerializer.java:{}}}{{{}100{}}}{{{}){}}}
> {{    }}{{{}at 
> org.apache.flink.cdc.connectors.doris.sink.DorisEventSerializer.serialize(DorisEventSerializer.java:{}}}{{{}71{}}}{{{}){}}}
> {{    }}{{{}at 
> org.apache.flink.cdc.connectors.doris.sink.DorisEventSerializer.serialize(DorisEventSerializer.java:{}}}{{{}49{}}}{{{}){}}}
> {{    }}{{{}at 
> org.apache.doris.flink.sink.batch.DorisBatchWriter.write(DorisBatchWriter.java:{}}}{{{}120{}}}{{{}){}}}
> {{    }}{{{}at 
> org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.processElement(SinkWriterOperator.java:{}}}{{{}160{}}}{{{}){}}}
> {{    }}{{{}at 
> org.apache.flink.cdc.runtime.operators.sink.DataSinkWriterOperator.processElement(DataSinkWriterOperator.java:{}}}{{{}178{}}}{{{}){}}}
> {{    }}{{{}at 
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:{}}}{{{}75{}}}{{{}){}}}
> {{    }}{{{}at 
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:{}}}{{{}50{}}}{{{}){}}}
> {{    }}{{{}at 
> org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:{}}}{{{}29{}}}{{{}){}}}
> {{    }}{{{}at 
> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:{}}}{{{}38{}}}{{{}){}}}
> {{    }}{{{}at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:{}}}{{{}238{}}}{{{}){}}}
> {{    }}{{{}at 
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:{}}}{{{}157{}}}{{{}){}}}
> {{    }}{{{}at 
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:{}}}{{{}114{}}}{{{}){}}}
> {{    }}{{{}at 
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:{}}}{{{}65{}}}{{{}){}}}
> {{    }}{{{}at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:{}}}{{{}579{}}}{{{}){}}}
> {{    }}{{{}at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:{}}}{{{}231{}}}{{{}){}}}
> {{    }}{{{}at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:{}}}{{{}909{}}}{{{}){}}}
> {{    }}{{{}at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:{}}}{{{}858{}}}{{{}){}}}
> {{    }}{{{}at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:{}}}{{{}958{}}}{{{}){}}}
> {{    }}{{{}at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:{}}}{{{}937{}}}{{{}){}}}
> {{    }}{{{}at 
> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:{}}}{{{}751{}}}{{{}){}}}
> {{    }}{{{}at 
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:{}}}{{{}566{}}}{{{}){}}}
> {{    }}{{{}at java.lang.Thread.run(Thread.java:{}}}{{{}748{}}}{{{}){}}}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to