TemiChan created FLINK-38281:
--------------------------------
Summary: Schema changes still caused column size does not match
the data size after enabling scan.parse.online.schema.changes.enabled
Key: FLINK-38281
URL: https://issues.apache.org/jira/browse/FLINK-38281
Project: Flink
Issue Type: Improvement
Components: Flink CDC
Affects Versions: cdc-3.4.0, cdc-3.3.0
Reporter: TemiChan
During the process of adding a field to the source table in CDC3.3 using pt-osc
or gh-ost, if data is inserted into the source table, the error "column size
does not match the data size" will be reported by CDC. However, if there is no
data change in the source table during the online field addition process, and
the data change is made to the source table after the field addition is
completed by the tool, no error will be reported.
*1.while true insert data*
{{while}} {{{}true{}}}{{{};{}}}{{{}do{}}} {{mysql --login-path=dba doris_sync
-Be }}{{{}"insert into test(ptest21,tcc10)select ptest21,tcc10 from test limit
10"{}}}{{{};done;{}}}
*{{2.add column}}*
{{{}pt-online-schema-change {-}{{-}}charset=utf8mb4
{-}{{-}}recursion-method=none -{-}no-version-check --skip-check-slave-lag
--user=root --password={-}{}}}{{{}"xx"{}}} {{-host=127.0.0.1-}}
{{{}u=chenqimi,P={}}}{{{}3006{}}}{{{},D=doris_sync,t={}}}{{{}test{}}}{{{},A=utf8mb4
--alter {{}}}}{{{}"add column tax_code VARCHAR(30) NULL "{}}} {{-execute}}
*3.flinkcdc error*
{{java.lang.IllegalStateException: Column size does not match the data size}}
{{ }}{{{}at
org.apache.flink.cdc.common.utils.Preconditions.checkState(Preconditions.java:{}}}{{{}161{}}}{{{}){}}}
{{ }}{{{}at
org.apache.flink.cdc.connectors.doris.sink.DorisEventSerializer.serializerRecord(DorisEventSerializer.java:{}}}{{{}121{}}}{{{}){}}}
{{ }}{{{}at
org.apache.flink.cdc.connectors.doris.sink.DorisEventSerializer.applyDataChangeEvent(DorisEventSerializer.java:{}}}{{{}100{}}}{{{}){}}}
{{ }}{{{}at
org.apache.flink.cdc.connectors.doris.sink.DorisEventSerializer.serialize(DorisEventSerializer.java:{}}}{{{}71{}}}{{{}){}}}
{{ }}{{{}at
org.apache.flink.cdc.connectors.doris.sink.DorisEventSerializer.serialize(DorisEventSerializer.java:{}}}{{{}49{}}}{{{}){}}}
{{ }}{{{}at
org.apache.doris.flink.sink.batch.DorisBatchWriter.write(DorisBatchWriter.java:{}}}{{{}120{}}}{{{}){}}}
{{ }}{{{}at
org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.processElement(SinkWriterOperator.java:{}}}{{{}160{}}}{{{}){}}}
{{ }}{{{}at
org.apache.flink.cdc.runtime.operators.sink.DataSinkWriterOperator.processElement(DataSinkWriterOperator.java:{}}}{{{}178{}}}{{{}){}}}
{{ }}{{{}at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:{}}}{{{}75{}}}{{{}){}}}
{{ }}{{{}at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:{}}}{{{}50{}}}{{{}){}}}
{{ }}{{{}at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:{}}}{{{}29{}}}{{{}){}}}
{{ }}{{{}at
org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:{}}}{{{}38{}}}{{{}){}}}
{{ }}{{{}at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:{}}}{{{}238{}}}{{{}){}}}
{{ }}{{{}at
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:{}}}{{{}157{}}}{{{}){}}}
{{ }}{{{}at
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:{}}}{{{}114{}}}{{{}){}}}
{{ }}{{{}at
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:{}}}{{{}65{}}}{{{}){}}}
{{ }}{{{}at
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:{}}}{{{}579{}}}{{{}){}}}
{{ }}{{{}at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:{}}}{{{}231{}}}{{{}){}}}
{{ }}{{{}at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:{}}}{{{}909{}}}{{{}){}}}
{{ }}{{{}at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:{}}}{{{}858{}}}{{{}){}}}
{{ }}{{{}at
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:{}}}{{{}958{}}}{{{}){}}}
{{ }}{{{}at
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:{}}}{{{}937{}}}{{{}){}}}
{{ }}{{{}at
org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:{}}}{{{}751{}}}{{{}){}}}
{{ }}{{{}at
org.apache.flink.runtime.taskmanager.Task.run(Task.java:{}}}{{{}566{}}}{{{}){}}}
{{ }}{{{}at java.lang.Thread.run(Thread.java:{}}}{{{}748{}}}{{{}){}}}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)