[ https://issues.apache.org/jira/browse/FLINK-35277?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
wenli xiao updated FLINK-35277: ------------------------------- Description: 1. background When attempting to use Flink CDC 3.1 in the Flink connector to load data from DB2 to Apache Doris, I set up DB2 using the Docker image `{{{}ruanhang/db2-cdc-demo:v1`{}}}. After configuring the DB2 asynchronous CDC, I tried to capture a table using {{{}CALL ASNCDC.ADDTABLE('MYSCHEMA', 'MYTABLE'){}}}. However, I encountered an error when attempting to add the eleventh table: [23505][-803] One or more values in the INSERT statement, UPDATE statement, or foreign key update caused by a DELETE statement are not valid because the primary key, unique constraint or unique index identified by "2" constrains table "ASNCDC.IBMSNAP_PRUNCNTL" from having duplicate values for the index key.. SQLCODE=-803, SQLSTATE=23505, DRIVER=4.26.14 !image-2024-04-30-22-19-17-350.png! 2. The error indicates that the table {{Asncdc.IBMSNAP_PRUNCNTL}} has a duplicate primary key. Here is the schema of {{{}Asncdc.IBMSNAP_PRUNCNTL{}}}: create table IBMSNAP_PRUNCNTL ( TARGET_SERVER CHARACTER(18) not null, TARGET_OWNER VARCHAR(128) not null, TARGET_TABLE VARCHAR(128) not null, SYNCHTIME TIMESTAMP(6), SYNCHPOINT VARCHAR(16) FOR BIT DATA, SOURCE_OWNER VARCHAR(128) not null, SOURCE_TABLE VARCHAR(128) not null, SOURCE_VIEW_QUAL SMALLINT not null, APPLY_QUAL CHARACTER(18) not null, SET_NAME CHARACTER(18) not null, CNTL_SERVER CHARACTER(18) not null, TARGET_STRUCTURE SMALLINT not null, CNTL_ALIAS CHARACTER(8), PHYS_CHANGE_OWNER VARCHAR(128), PHYS_CHANGE_TABLE VARCHAR(128), MAP_ID VARCHAR(10) not null ); create unique index IBMSNAP_PRUNCNTLX on IBMSNAP_PRUNCNTL (SOURCE_OWNER, SOURCE_TABLE, SOURCE_VIEW_QUAL, APPLY_QUAL, SET_NAME, TARGET_SERVER, TARGET_TABLE, TARGET_OWNER); create unique index IBMSNAP_PRUNCNTLX1 on IBMSNAP_PRUNCNTL (MAP_ID); create index IBMSNAP_PRUNCNTLX2 on IBMSNAP_PRUNCNTL (PHYS_CHANGE_OWNER, PHYS_CHANGE_TABLE); create index IBMSNAP_PRUNCNTLX3 on IBMSNAP_PRUNCNTL (APPLY_QUAL, SET_NAME, TARGET_SERVER); The issue stems from the logic in {{asncdc.addtable}} not aligning with the {{asncdcaddremove.sql}} script when calling the {{addtable}} procedure. The original insert statement is as follows: – Original insert statement SET stmtSQL = 'INSERT INTO ASNCDC.IBMSNAP_PRUNCNTL ( ' || 'TARGET_SERVER, ' || 'TARGET_OWNER, ' || 'TARGET_TABLE, ' || 'SYNCHTIME, ' || 'SYNCHPOINT, ' || 'SOURCE_OWNER, ' || 'SOURCE_TABLE, ' || 'SOURCE_VIEW_QUAL, ' || 'APPLY_QUAL, ' || 'SET_NAME, ' || 'CNTL_SERVER , ' || 'TARGET_STRUCTURE , ' || 'CNTL_ALIAS , ' || 'PHYS_CHANGE_OWNER , ' || 'PHYS_CHANGE_TABLE , ' || 'MAP_ID ' || ') VALUES ( ' || '''KAFKA'', ' || '''' || tableschema || ''', ' || '''' || tablename || ''', ' || 'NULL, ' || 'NULL, ' || '''' || tableschema || ''', ' || '''' || tablename || ''', ' || '0, ' || '''KAFKAQUAL'', ' || '''SET001'', ' || ' (Select CURRENT_SERVER from sysibm.sysdummy1 ), ' || '8, ' || ' (Select CURRENT_SERVER from sysibm.sysdummy1 ), ' || '''ASNCDC'', ' || '''CDC_' || tableschema || '_' || tablename || ''', ' || ' ( SELECT CASE WHEN max(CAST(MAP_ID AS INT)) IS NULL THEN CAST(1 AS VARCHAR(10)) ELSE CAST(CAST(max(MAP_ID) AS INT) + 1 AS VARCHAR(10)) END AS MYINT from ASNCDC.IBMSNAP_PRUNCNTL ) ' || ' )'; EXECUTE IMMEDIATE stmtSQL; The {{max(MAP_ID)}} logic is incorrect, as the correct result should be {{{}CAST(max(CAST(MAP_ID AS INT)) + 1 AS VARCHAR(10)){}}}. This issue prevents the addition of the eleventh table. For more details about {{{}asncdcaddremove.sql{}}}, please refer to: [asncdcaddremove.sql|https://github.com/debezium/debezium-examples/blob/main/tutorial/debezium-db2-init/db2server/asncdcaddremove.sql#L189] was: 1. background When attempting to use Flink CDC 3.1 in the Flink connector to load data from DB2 to Apache Doris, I set up DB2 using the Docker image {{{}ruanhang/db2-cdc-demo:v1{}}}. After configuring the DB2 asynchronous CDC, I tried to capture a table using {{{}CALL ASNCDC.ADDTABLE('MYSCHEMA', 'MYTABLE'){}}}. However, I encountered an error when attempting to add the eleventh table: [23505][-803] One or more values in the INSERT statement, UPDATE statement, or foreign key update caused by a DELETE statement are not valid because the primary key, unique constraint or unique index identified by "2" constrains table "ASNCDC.IBMSNAP_PRUNCNTL" from having duplicate values for the index key.. SQLCODE=-803, SQLSTATE=23505, DRIVER=4.26.14 !image-2024-04-30-22-19-17-350.png! 2. The error indicates that the table {{Asncdc.IBMSNAP_PRUNCNTL}} has a duplicate primary key. Here is the schema of {{{}Asncdc.IBMSNAP_PRUNCNTL{}}}: create table IBMSNAP_PRUNCNTL ( TARGET_SERVER CHARACTER(18) not null, TARGET_OWNER VARCHAR(128) not null, TARGET_TABLE VARCHAR(128) not null, SYNCHTIME TIMESTAMP(6), SYNCHPOINT VARCHAR(16) FOR BIT DATA, SOURCE_OWNER VARCHAR(128) not null, SOURCE_TABLE VARCHAR(128) not null, SOURCE_VIEW_QUAL SMALLINT not null, APPLY_QUAL CHARACTER(18) not null, SET_NAME CHARACTER(18) not null, CNTL_SERVER CHARACTER(18) not null, TARGET_STRUCTURE SMALLINT not null, CNTL_ALIAS CHARACTER(8), PHYS_CHANGE_OWNER VARCHAR(128), PHYS_CHANGE_TABLE VARCHAR(128), MAP_ID VARCHAR(10) not null ); create unique index IBMSNAP_PRUNCNTLX on IBMSNAP_PRUNCNTL (SOURCE_OWNER, SOURCE_TABLE, SOURCE_VIEW_QUAL, APPLY_QUAL, SET_NAME, TARGET_SERVER, TARGET_TABLE, TARGET_OWNER); create unique index IBMSNAP_PRUNCNTLX1 on IBMSNAP_PRUNCNTL (MAP_ID); create index IBMSNAP_PRUNCNTLX2 on IBMSNAP_PRUNCNTL (PHYS_CHANGE_OWNER, PHYS_CHANGE_TABLE); create index IBMSNAP_PRUNCNTLX3 on IBMSNAP_PRUNCNTL (APPLY_QUAL, SET_NAME, TARGET_SERVER); The issue stems from the logic in {{asncdc.addtable}} not aligning with the {{asncdcaddremove.sql}} script when calling the {{addtable}} procedure. The original insert statement is as follows: – Original insert statement SET stmtSQL = 'INSERT INTO ASNCDC.IBMSNAP_PRUNCNTL ( ' || 'TARGET_SERVER, ' || 'TARGET_OWNER, ' || 'TARGET_TABLE, ' || 'SYNCHTIME, ' || 'SYNCHPOINT, ' || 'SOURCE_OWNER, ' || 'SOURCE_TABLE, ' || 'SOURCE_VIEW_QUAL, ' || 'APPLY_QUAL, ' || 'SET_NAME, ' || 'CNTL_SERVER , ' || 'TARGET_STRUCTURE , ' || 'CNTL_ALIAS , ' || 'PHYS_CHANGE_OWNER , ' || 'PHYS_CHANGE_TABLE , ' || 'MAP_ID ' || ') VALUES ( ' || '''KAFKA'', ' || '''' || tableschema || ''', ' || '''' || tablename || ''', ' || 'NULL, ' || 'NULL, ' || '''' || tableschema || ''', ' || '''' || tablename || ''', ' || '0, ' || '''KAFKAQUAL'', ' || '''SET001'', ' || ' (Select CURRENT_SERVER from sysibm.sysdummy1 ), ' || '8, ' || ' (Select CURRENT_SERVER from sysibm.sysdummy1 ), ' || '''ASNCDC'', ' || '''CDC_' || tableschema || '_' || tablename || ''', ' || ' ( SELECT CASE WHEN max(CAST(MAP_ID AS INT)) IS NULL THEN CAST(1 AS VARCHAR(10)) ELSE CAST(CAST(max(MAP_ID) AS INT) + 1 AS VARCHAR(10)) END AS MYINT from ASNCDC.IBMSNAP_PRUNCNTL ) ' || ' )'; EXECUTE IMMEDIATE stmtSQL; The {{max(MAP_ID)}} logic is incorrect, as the correct result should be {{{}CAST(max(CAST(MAP_ID AS INT)) + 1 AS VARCHAR(10)){}}}. This issue prevents the addition of the eleventh table. For more details about {{{}asncdcaddremove.sql{}}}, please refer to: [asncdcaddremove.sql|https://github.com/debezium/debezium-examples/blob/main/tutorial/debezium-db2-init/db2server/asncdcaddremove.sql#L189] > Fix the error in the `asncdcaddremove.sql` script for the DB2 test container. > ----------------------------------------------------------------------------- > > Key: FLINK-35277 > URL: https://issues.apache.org/jira/browse/FLINK-35277 > Project: Flink > Issue Type: Bug > Components: Flink CDC > Affects Versions: 3.1.0 > Environment: Flink 1.18.0 > Flink CDC 3.1-pre > DB2 11.5.x > > Reporter: wenli xiao > Priority: Minor > Attachments: image-2024-04-30-22-19-17-350.png > > > 1. background > When attempting to use Flink CDC 3.1 in the Flink connector to load data from > DB2 to Apache Doris, I set up DB2 using the Docker image > `{{{}ruanhang/db2-cdc-demo:v1`{}}}. After configuring the DB2 asynchronous > CDC, I tried to capture a table using {{{}CALL ASNCDC.ADDTABLE('MYSCHEMA', > 'MYTABLE'){}}}. However, I encountered an error when attempting to add the > eleventh table: > [23505][-803] One or more values in the INSERT statement, UPDATE statement, > or foreign key update caused by a DELETE statement are not valid because the > primary key, unique constraint or unique index identified by "2" constrains > table "ASNCDC.IBMSNAP_PRUNCNTL" from having duplicate values for the index > key.. SQLCODE=-803, SQLSTATE=23505, DRIVER=4.26.14 > !image-2024-04-30-22-19-17-350.png! > > 2. > The error indicates that the table {{Asncdc.IBMSNAP_PRUNCNTL}} has a > duplicate primary key. > Here is the schema of {{{}Asncdc.IBMSNAP_PRUNCNTL{}}}: > create table IBMSNAP_PRUNCNTL > ( > TARGET_SERVER CHARACTER(18) not null, > TARGET_OWNER VARCHAR(128) not null, > TARGET_TABLE VARCHAR(128) not null, > SYNCHTIME TIMESTAMP(6), > SYNCHPOINT VARCHAR(16) FOR BIT DATA, > SOURCE_OWNER VARCHAR(128) not null, > SOURCE_TABLE VARCHAR(128) not null, > SOURCE_VIEW_QUAL SMALLINT not null, > APPLY_QUAL CHARACTER(18) not null, > SET_NAME CHARACTER(18) not null, > CNTL_SERVER CHARACTER(18) not null, > TARGET_STRUCTURE SMALLINT not null, > CNTL_ALIAS CHARACTER(8), > PHYS_CHANGE_OWNER VARCHAR(128), > PHYS_CHANGE_TABLE VARCHAR(128), > MAP_ID VARCHAR(10) not null > ); > > create unique index IBMSNAP_PRUNCNTLX > on IBMSNAP_PRUNCNTL (SOURCE_OWNER, SOURCE_TABLE, SOURCE_VIEW_QUAL, > APPLY_QUAL, SET_NAME, TARGET_SERVER, > TARGET_TABLE, TARGET_OWNER); > > create unique index IBMSNAP_PRUNCNTLX1 > on IBMSNAP_PRUNCNTL (MAP_ID); > > create index IBMSNAP_PRUNCNTLX2 > on IBMSNAP_PRUNCNTL (PHYS_CHANGE_OWNER, PHYS_CHANGE_TABLE); > > create index IBMSNAP_PRUNCNTLX3 > on IBMSNAP_PRUNCNTL (APPLY_QUAL, SET_NAME, TARGET_SERVER); > The issue stems from the logic in {{asncdc.addtable}} not aligning with the > {{asncdcaddremove.sql}} script when calling the {{addtable}} procedure. The > original insert statement is as follows: > – Original insert statement > SET stmtSQL = 'INSERT INTO ASNCDC.IBMSNAP_PRUNCNTL ( ' || > 'TARGET_SERVER, ' || > 'TARGET_OWNER, ' || > 'TARGET_TABLE, ' || > 'SYNCHTIME, ' || > 'SYNCHPOINT, ' || > 'SOURCE_OWNER, ' || > 'SOURCE_TABLE, ' || > 'SOURCE_VIEW_QUAL, ' || > 'APPLY_QUAL, ' || > 'SET_NAME, ' || > 'CNTL_SERVER , ' || > 'TARGET_STRUCTURE , ' || > 'CNTL_ALIAS , ' || > 'PHYS_CHANGE_OWNER , ' || > 'PHYS_CHANGE_TABLE , ' || > 'MAP_ID ' || > ') VALUES ( ' || > '''KAFKA'', ' || > '''' || tableschema || ''', ' || > '''' || tablename || ''', ' || > 'NULL, ' || > 'NULL, ' || > '''' || tableschema || ''', ' || > '''' || tablename || ''', ' || > '0, ' || > '''KAFKAQUAL'', ' || > '''SET001'', ' || > ' (Select CURRENT_SERVER from sysibm.sysdummy1 ), ' || > '8, ' || > ' (Select CURRENT_SERVER from sysibm.sysdummy1 ), ' || > '''ASNCDC'', ' || > '''CDC_' || tableschema || '_' || tablename || ''', ' || > ' ( SELECT CASE WHEN max(CAST(MAP_ID AS INT)) IS NULL THEN > CAST(1 AS VARCHAR(10)) ELSE CAST(CAST(max(MAP_ID) AS INT) + 1 AS VARCHAR(10)) > END AS MYINT from ASNCDC.IBMSNAP_PRUNCNTL ) ' || > ' )'; > EXECUTE IMMEDIATE stmtSQL; > The {{max(MAP_ID)}} logic is incorrect, as the correct result should be > {{{}CAST(max(CAST(MAP_ID AS INT)) + 1 AS VARCHAR(10)){}}}. This issue > prevents the addition of the eleventh table. For more details about > {{{}asncdcaddremove.sql{}}}, please refer to: > [asncdcaddremove.sql|https://github.com/debezium/debezium-examples/blob/main/tutorial/debezium-db2-init/db2server/asncdcaddremove.sql#L189] -- This message was sent by Atlassian Jira (v8.20.10#820010)