[ 
https://issues.apache.org/jira/browse/FLINK-39407?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18080856#comment-18080856
 ] 

Melih Aycicek commented on FLINK-39407:
---------------------------------------

Hi, I would be happy to take this. Feel free to assign this ticket to me.

> CreateTableEvent failed in MySQL to Paimon pipeline
> ---------------------------------------------------
>
>                 Key: FLINK-39407
>                 URL: https://issues.apache.org/jira/browse/FLINK-39407
>             Project: Flink
>          Issue Type: Bug
>          Components: Flink CDC
>    Affects Versions: 1.19.3, 1.20.3
>            Reporter: Edward Zhang
>            Priority: Blocker
>
> h2. In quickstart
> I use cdc-up quickstart `cdcup.sh` to test MySQL -> Paimon pipeline. After 
> setting Flink version to 1.19.3/1.20.3 with CDC 3.2.1, I run cdc up and 
> pipeline task (a test_table with primary key is built in advance).
> Find pipeline job failed. Here's the log in jobmanager:
> {code:java}
> 2026-04-07 16:48:26,894 INFO  
> org.apache.flink.cdc.runtime.operators.schema.coordinator.SchemaRegistryRequestHandler
>  [] - SchemaChangeStatus switched from APPLYING to FINISHED for request 
> [CreateTableEvent{tableId=cdcup.test_table, schema=columns={`id` INT NOT 
> NULL,`name` VARCHAR(255)}, primaryKeys=id, options=()}].
> 2026-04-07 16:48:27,755 INFO  
> org.apache.flink.cdc.runtime.operators.schema.coordinator.SchemaRegistryRequestHandler
>  [] - SchemaChangeStatus switched from FINISHED to IDLE for request 
> [CreateTableEvent{tableId=cdcup.test_table, schema=columns={`id` INT NOT 
> NULL,`name` VARCHAR(255)}, primaryKeys=id, options=()}]
> 2026-04-07 16:48:27,764 INFO  org.apache.flink.runtime.jobmaster.JobMaster    
>              [] - Trying to recover from a global failure.
> org.apache.flink.util.FlinkException: Global failure triggered by 
> OperatorCoordinator for 'Source: Flink CDC Event Source: mysql -> 
> SchemaOperator -> PrePartition' (operator 9899a42c64d67ef3172b7e3be3c1bbb9).
>       at 
> org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder$LazyInitializedCoordinatorContext.failJob(OperatorCoordinatorHolder.java:651)
>  ~[flink-dist-1.19.3.jar:1.19.3]
>       at 
> org.apache.flink.cdc.runtime.operators.schema.coordinator.SchemaRegistryRequestHandler.clearCurrentSchemaChangeRequest(SchemaRegistryRequestHandler.java:460)
>  ~[?:?]
>       at 
> org.apache.flink.cdc.runtime.operators.schema.coordinator.SchemaRegistryRequestHandler.getSchemaChangeResult(SchemaRegistryRequestHandler.java:321)
>  ~[?:?]
>       at 
> org.apache.flink.cdc.runtime.operators.schema.coordinator.SchemaRegistry.lambda$handleCoordinationRequest$3(SchemaRegistry.java:276)
>  ~[?:?]
>       at 
> org.apache.flink.cdc.runtime.operators.schema.coordinator.SchemaRegistry.lambda$runInEventLoop$2(SchemaRegistry.java:241)
>  ~[?:?]
>       at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) 
> ~[?:?]
>       at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) 
> ~[?:?]
>       at java.lang.Thread.run(Unknown Source) ~[?:?]
> Caused by: java.lang.RuntimeException: Failed to apply schema change.
>       ... 7 more
> Caused by: org.apache.flink.cdc.common.exceptions.SchemaEvolveException: 
> org.apache.paimon.catalog.Catalog$DatabaseNotExistException: Database cdcup 
> does not exist.
>       at 
> org.apache.flink.cdc.connectors.paimon.sink.PaimonMetadataApplier.applySchemaChange(PaimonMetadataApplier.java:135)
>  ~[?:?]
>       at 
> org.apache.flink.cdc.runtime.operators.schema.coordinator.SchemaRegistryRequestHandler.applySchemaChange(SchemaRegistryRequestHandler.java:238)
>  ~[?:?]
>       at 
> org.apache.flink.cdc.runtime.operators.schema.coordinator.SchemaRegistryRequestHandler.lambda$flushSuccess$0(SchemaRegistryRequestHandler.java:306)
>  ~[?:?]
>       at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) 
> ~[?:?]
>       at java.util.concurrent.FutureTask.run(Unknown Source) ~[?:?]
>       ... 3 more
> Caused by: org.apache.paimon.catalog.Catalog$DatabaseNotExistException: 
> Database cdcup does not exist.
>       at 
> org.apache.paimon.catalog.AbstractCatalog.createTable(AbstractCatalog.java:240)
>  ~[?:?]
>       at 
> org.apache.flink.cdc.connectors.paimon.sink.PaimonMetadataApplier.applyCreateTable(PaimonMetadataApplier.java:172)
>  ~[?:?]
>       at 
> org.apache.flink.cdc.connectors.paimon.sink.PaimonMetadataApplier.applySchemaChange(PaimonMetadataApplier.java:122)
>  ~[?:?]
>       at 
> org.apache.flink.cdc.runtime.operators.schema.coordinator.SchemaRegistryRequestHandler.applySchemaChange(SchemaRegistryRequestHandler.java:238)
>  ~[?:?]
>       at 
> org.apache.flink.cdc.runtime.operators.schema.coordinator.SchemaRegistryRequestHandler.lambda$flushSuccess$0(SchemaRegistryRequestHandler.java:306)
>  ~[?:?]
>       at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) 
> ~[?:?]
>       at java.util.concurrent.FutureTask.run(Unknown Source) ~[?:?]
>       ... 3 more
> {code}
> It shows `Catalog$DatabaseNotExistException: Database cdcup does not exist.` 
> and I find the directory of warehouse described in `pipeline-definition.yaml` 
> is empty.
> {code:java}
> # ls
> paimon-warehouse
> # cd paimon-warehouse
> # ls
> # 
> {code}
> h2. In manual testing
> I try manual testing with Flink CDC 3.5.0 (Flink version 1.20.3) with this 
> pipeline yaml file:
> {code:java}
> pipeline:
>   parallelism: 1
> source:
>   type: mysql
>   hostname: mysql
>   port: 3306
>   username: root
>   password: ''
>   tables: cdcup.\.*
>   server-id: 5400-6400
>   server-time-zone: UTC
> sink:
>   type: paimon
>   name: Paimon Sink
>   catalog.properties.metastore: filesystem
>   catalog.properties.warehouse: /data
> {code}
> Similar error shows below.
> {code:java}
> 2026-04-07 16:30:28,078 ERROR 
> org.apache.flink.cdc.runtime.operators.schema.common.SchemaRegistry [] - An 
> exception was triggered from Schema change applying task. Job will fail now.
> org.apache.flink.util.FlinkRuntimeException: Failed to apply schema change 
> event.
>       at 
> org.apache.flink.cdc.runtime.operators.schema.regular.SchemaCoordinator.lambda$startSchemaChangesEvolve$1(SchemaCoordinator.java:249)
>  
> ~[blob_p-8653b0f314a29add02bbe48f41a95c6f1bf2e1f7-9ca021d7d429015bf0771ebd9b16e9af:3.5.0]
>       at 
> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) 
> [?:?]
>       at java.base/java.util.concurrent.FutureTask.run(Unknown Source) [?:?]
>       at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown 
> Source) [?:?]
>       at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown 
> Source) [?:?]
>       at java.base/java.lang.Thread.run(Unknown Source) [?:?]
> Caused by: java.lang.RuntimeException: Create database location failed, 
> database: cdcup, location: /data/cdcup.db
>       at 
> org.apache.paimon.catalog.FileSystemCatalog.createDatabaseImpl(FileSystemCatalog.java:77)
>  
> ~[blob_p-aaa60797436b72aa192ff21020f80668311462b7-e41db7bd8a3e52848072c239b554bb95:3.5.0]
>       at 
> org.apache.paimon.catalog.AbstractCatalog.createDatabase(AbstractCatalog.java:165)
>  
> ~[blob_p-aaa60797436b72aa192ff21020f80668311462b7-e41db7bd8a3e52848072c239b554bb95:3.5.0]
>       at org.apache.paimon.catalog.Catalog.createDatabase(Catalog.java:90) 
> ~[blob_p-aaa60797436b72aa192ff21020f80668311462b7-e41db7bd8a3e52848072c239b554bb95:3.5.0]
>       at 
> org.apache.flink.cdc.connectors.paimon.sink.PaimonMetadataApplier.applyCreateTable(PaimonMetadataApplier.java:166)
>  
> ~[blob_p-aaa60797436b72aa192ff21020f80668311462b7-e41db7bd8a3e52848072c239b554bb95:3.5.0]
>       at 
> org.apache.flink.cdc.connectors.paimon.sink.PaimonMetadataApplier.lambda$applySchemaChange$2(PaimonMetadataApplier.java:135)
>  
> ~[blob_p-aaa60797436b72aa192ff21020f80668311462b7-e41db7bd8a3e52848072c239b554bb95:3.5.0]
>       at 
> org.apache.flink.cdc.common.event.visitor.SchemaChangeEventVisitor.visit(SchemaChangeEventVisitor.java:57)
>  
> ~[blob_p-8653b0f314a29add02bbe48f41a95c6f1bf2e1f7-9ca021d7d429015bf0771ebd9b16e9af:3.5.0]
>       at 
> org.apache.flink.cdc.connectors.paimon.sink.PaimonMetadataApplier.applySchemaChange(PaimonMetadataApplier.java:124)
>  
> ~[blob_p-aaa60797436b72aa192ff21020f80668311462b7-e41db7bd8a3e52848072c239b554bb95:3.5.0]
>       at 
> org.apache.flink.cdc.runtime.operators.schema.regular.SchemaCoordinator.applyAndUpdateEvolvedSchemaChange(SchemaCoordinator.java:437)
>  
> ~[blob_p-8653b0f314a29add02bbe48f41a95c6f1bf2e1f7-9ca021d7d429015bf0771ebd9b16e9af:3.5.0]
>       at 
> org.apache.flink.cdc.runtime.operators.schema.regular.SchemaCoordinator.applySchemaChange(SchemaCoordinator.java:406)
>  
> ~[blob_p-8653b0f314a29add02bbe48f41a95c6f1bf2e1f7-9ca021d7d429015bf0771ebd9b16e9af:3.5.0]
>       at 
> org.apache.flink.cdc.runtime.operators.schema.regular.SchemaCoordinator.lambda$startSchemaChangesEvolve$1(SchemaCoordinator.java:247)
>  
> ~[blob_p-8653b0f314a29add02bbe48f41a95c6f1bf2e1f7-9ca021d7d429015bf0771ebd9b16e9af:3.5.0]
>       ... 5 more
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to