This is an automated email from the ASF dual-hosted git repository. kunni pushed a commit to branch release-3.6 in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
commit 6882d7abb0049db04cc8737bed68cf09ec4c3979 Author: Jia Fan <[email protected]> AuthorDate: Wed Mar 25 09:41:14 2026 +0800 [FLINK-39171][oracle-cdc] Fix Oracle pipeline connector wrong field case (#4292) Co-authored-by: lvyanquan <[email protected]> Co-authored-by: Claude Opus 4.6 <[email protected]> (cherry picked from commit 1f64e5711549a27649f5b98b51f0ac31b255d6e8) --- .../oracle/source/parser/BaseParserListener.java | 19 ++---------- .../connectors/oracle/utils/OracleSchemaUtils.java | 34 +++++++++++++++++++--- .../oracle/source/OraclePipelineITCase.java | 18 ++++++------ .../flink/cdc/pipeline/tests/OracleE2eITCase.java | 6 ++-- 4 files changed, 45 insertions(+), 32 deletions(-) diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/main/java/org/apache/flink/cdc/connectors/oracle/source/parser/BaseParserListener.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/main/java/org/apache/flink/cdc/connectors/oracle/source/parser/BaseParserListener.java index efd4d3043..f56fd3a2b 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/main/java/org/apache/flink/cdc/connectors/oracle/source/parser/BaseParserListener.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/main/java/org/apache/flink/cdc/connectors/oracle/source/parser/BaseParserListener.java @@ -17,6 +17,8 @@ package org.apache.flink.cdc.connectors.oracle.source.parser; +import org.apache.flink.cdc.connectors.oracle.utils.OracleSchemaUtils; + import io.debezium.ddl.parser.oracle.generated.PlSqlParser; import io.debezium.ddl.parser.oracle.generated.PlSqlParserBaseListener; @@ -76,21 +78,6 @@ public class BaseParserListener extends PlSqlParserBaseListener { * @return parsed table or column name from the supplied name argument */ private static String getTableOrColumnName(String name) { - return removeQuotes(name, true); - } - - /** - * Removes leading and trailing double quote characters from the provided string. - * - * @param text value to have double quotes removed - * @param upperCaseIfNotQuoted control if returned string is upper-cased if not quoted - * @return string that has had quotes removed - */ - @SuppressWarnings("SameParameterValue") - private static String removeQuotes(String text, boolean upperCaseIfNotQuoted) { - if (text != null && text.length() > 2 && text.startsWith("\"") && text.endsWith("\"")) { - return text.substring(1, text.length() - 1); - } - return upperCaseIfNotQuoted ? text.toUpperCase() : text; + return OracleSchemaUtils.removeQuotes(name); } } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/main/java/org/apache/flink/cdc/connectors/oracle/utils/OracleSchemaUtils.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/main/java/org/apache/flink/cdc/connectors/oracle/utils/OracleSchemaUtils.java index e81c4110c..083442b51 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/main/java/org/apache/flink/cdc/connectors/oracle/utils/OracleSchemaUtils.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/main/java/org/apache/flink/cdc/connectors/oracle/utils/OracleSchemaUtils.java @@ -46,6 +46,25 @@ public class OracleSchemaUtils { private static final Logger LOG = LoggerFactory.getLogger(OracleSchemaUtils.class); + /** + * Removes leading and trailing double quote characters from the provided string. + * + * <p>Oracle table and column names are inherently stored in upper-case; however, if the objects + * are created using double-quotes, the case of the object name is retained. This method will + * adhere to those rules and will always return the name in upper-case unless the provided name + * is double-quoted in which the returned value will have the double-quotes removed and case + * retained. + * + * @param text value to have double quotes removed + * @return string that has had quotes removed + */ + public static String removeQuotes(String text) { + if (text != null && text.length() > 2 && text.startsWith("\"") && text.endsWith("\"")) { + return text.substring(1, text.length() - 1); + } + return text.toUpperCase(); + } + public static List<TableId> listTables( OracleSourceConfig sourceConfig, @Nullable String dbName) { try (JdbcConnection jdbc = createOracleConnection(sourceConfig)) { @@ -127,16 +146,23 @@ public class OracleSchemaUtils { .map(OracleSchemaUtils::toColumn) .collect(Collectors.toList()); + List<String> primaryKeys = + table.primaryKeyColumnNames().stream() + .map(OracleSchemaUtils::removeQuotes) + .collect(Collectors.toList()); + return Schema.newBuilder() .setColumns(columns) - .primaryKey(table.primaryKeyColumnNames()) + .primaryKey(primaryKeys) .comment(table.comment()) .build(); } public static Column toColumn(io.debezium.relational.Column column) { return Column.physicalColumn( - column.name(), OracleTypeUtils.fromDbzColumn(column), column.comment()); + removeQuotes(column.name()), + OracleTypeUtils.fromDbzColumn(column), + column.comment()); } public static io.debezium.relational.TableId toDbzTableId(TableId tableId) { @@ -158,7 +184,7 @@ public class OracleSchemaUtils { DataType dataType = OracleTypeUtils.fromDbzColumn(column); org.apache.flink.cdc.common.schema.Column cdcColumn = org.apache.flink.cdc.common.schema.Column.physicalColumn( - column.name().toLowerCase(Locale.ROOT), dataType); + removeQuotes(column.name()), dataType); list.add(cdcColumn); } return Schema.newBuilder().setColumns(list).primaryKey(pks).build(); @@ -181,7 +207,7 @@ public class OracleSchemaUtils { while (rs.next()) { String columnName; columnName = rs.getString(1); - list.add(columnName.toLowerCase(Locale.ROOT)); + list.add(removeQuotes(columnName)); } return list; }); diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/test/java/org/apache/flink/cdc/connectors/oracle/source/OraclePipelineITCase.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/test/java/org/apache/flink/cdc/connectors/oracle/source/OraclePipelineITCase.java index 78883662d..e5ed8ec6e 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/test/java/org/apache/flink/cdc/connectors/oracle/source/OraclePipelineITCase.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/test/java/org/apache/flink/cdc/connectors/oracle/source/OraclePipelineITCase.java @@ -983,10 +983,10 @@ public class OraclePipelineITCase extends OracleSourceTestBase { new CreateTableEvent( TableId.tableId("DEBEZIUM", "MYLAKE"), Schema.newBuilder() - .physicalColumn("feature_id", DataTypes.BIGINT().notNull()) - .physicalColumn("name", DataTypes.VARCHAR(32)) - .physicalColumn("shape", DataTypes.STRING()) - .primaryKey(Arrays.asList("feature_id")) + .physicalColumn("FEATURE_ID", DataTypes.BIGINT().notNull()) + .physicalColumn("NAME", DataTypes.VARCHAR(32)) + .physicalColumn("SHAPE", DataTypes.STRING()) + .primaryKey(Arrays.asList("FEATURE_ID")) .build()); RowType rowType = @@ -1578,11 +1578,11 @@ public class OraclePipelineITCase extends OracleSourceTestBase { return new CreateTableEvent( tableId, Schema.newBuilder() - .physicalColumn("id", DataTypes.BIGINT().notNull()) - .physicalColumn("name", DataTypes.VARCHAR(255).notNull()) - .physicalColumn("description", DataTypes.VARCHAR(512)) - .physicalColumn("weight", DataTypes.FLOAT()) - .primaryKey(Collections.singletonList("id")) + .physicalColumn("ID", DataTypes.BIGINT().notNull()) + .physicalColumn("NAME", DataTypes.VARCHAR(255).notNull()) + .physicalColumn("DESCRIPTION", DataTypes.VARCHAR(512)) + .physicalColumn("WEIGHT", DataTypes.FLOAT()) + .primaryKey(Collections.singletonList("ID")) .build()); } diff --git a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/OracleE2eITCase.java b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/OracleE2eITCase.java index 0be4734e7..e6b5387a2 100644 --- a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/OracleE2eITCase.java +++ b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/OracleE2eITCase.java @@ -150,7 +150,7 @@ public class OracleE2eITCase extends PipelineTestEnvironment { Statement stat = conn.createStatement()) { waitUntilSpecificEvent( - "CreateTableEvent{tableId=DEBEZIUM.PRODUCTS, schema=columns={`id` BIGINT NOT NULL,`name` VARCHAR(255) NOT NULL,`description` VARCHAR(512),`weight` FLOAT}, primaryKeys=id, options=()}"); + "CreateTableEvent{tableId=DEBEZIUM.PRODUCTS, schema=columns={`ID` BIGINT NOT NULL,`NAME` VARCHAR(255) NOT NULL,`DESCRIPTION` VARCHAR(512),`WEIGHT` FLOAT}, primaryKeys=ID, options=()}"); waitUntilSpecificEvent( "DataChangeEvent{tableId=DEBEZIUM.PRODUCTS, before=[], after=[109, spare tire, 24 inch spare tire, 22.2], op=INSERT, meta=()}"); waitUntilSpecificEvent( @@ -171,7 +171,7 @@ public class OracleE2eITCase extends PipelineTestEnvironment { "DataChangeEvent{tableId=DEBEZIUM.PRODUCTS, before=[], after=[105, hammer, 14oz carpenters hammer, 0.875], op=INSERT, meta=()}"); waitUntilSpecificEvent( - "CreateTableEvent{tableId=DEBEZIUM.CUSTOMERS, schema=columns={`id` BIGINT NOT NULL,`name` VARCHAR(255) NOT NULL,`address` VARCHAR(1024),`phone_number` VARCHAR(512)}, primaryKeys=id, options=()}"); + "CreateTableEvent{tableId=DEBEZIUM.CUSTOMERS, schema=columns={`ID` BIGINT NOT NULL,`NAME` VARCHAR(255) NOT NULL,`ADDRESS` VARCHAR(1024),`PHONE_NUMBER` VARCHAR(512)}, primaryKeys=ID, options=()}"); waitUntilSpecificEvent( "DataChangeEvent{tableId=DEBEZIUM.CUSTOMERS, before=[], after=[171798691841, user_1, Shanghai, 123567891234], op=INSERT, meta=()}"); waitUntilSpecificEvent( @@ -225,7 +225,7 @@ public class OracleE2eITCase extends PipelineTestEnvironment { waitUntilSpecificEvent( "DataChangeEvent{tableId=DEBEZIUM.PRODUCTS, before=[107, rocks, box of assorted rocks, 5.3], after=[107, rocks, box of assorted rocks, 5.1], op=UPDATE, meta=()}"); waitUntilSpecificEvent( - "CreateTableEvent{tableId=DEBEZIUM.CUSTOMERS_1, schema=columns={`id` BIGINT NOT NULL,`name` VARCHAR(255) NOT NULL,`address` VARCHAR(1024),`phone_number` VARCHAR(512)}, primaryKeys=id, options=()}"); + "CreateTableEvent{tableId=DEBEZIUM.CUSTOMERS_1, schema=columns={`ID` BIGINT NOT NULL,`NAME` VARCHAR(255) NOT NULL,`ADDRESS` VARCHAR(1024),`PHONE_NUMBER` VARCHAR(512)}, primaryKeys=ID, options=()}"); waitUntilSpecificEvent( "DataChangeEvent{tableId=DEBEZIUM.CUSTOMERS_1, before=[], after=[171798691842, user_10, Shanghai, 123567891234], op=INSERT, meta=()}"); waitUntilSpecificEvent(
