This is an automated email from the ASF dual-hosted git repository.

kunni pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/master by this push:
     new 1f64e5711 [FLINK-39171][oracle-cdc] Fix Oracle pipeline connector 
wrong field case (#4292)
1f64e5711 is described below

commit 1f64e5711549a27649f5b98b51f0ac31b255d6e8
Author: Jia Fan <[email protected]>
AuthorDate: Wed Mar 25 09:41:14 2026 +0800

    [FLINK-39171][oracle-cdc] Fix Oracle pipeline connector wrong field case 
(#4292)
    
    Co-authored-by: lvyanquan <[email protected]>
    Co-authored-by: Claude Opus 4.6 <[email protected]>
---
 .../oracle/source/parser/BaseParserListener.java   | 19 ++----------
 .../connectors/oracle/utils/OracleSchemaUtils.java | 34 +++++++++++++++++++---
 .../oracle/source/OraclePipelineITCase.java        | 18 ++++++------
 .../flink/cdc/pipeline/tests/OracleE2eITCase.java  |  6 ++--
 4 files changed, 45 insertions(+), 32 deletions(-)

diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/main/java/org/apache/flink/cdc/connectors/oracle/source/parser/BaseParserListener.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/main/java/org/apache/flink/cdc/connectors/oracle/source/parser/BaseParserListener.java
index efd4d3043..f56fd3a2b 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/main/java/org/apache/flink/cdc/connectors/oracle/source/parser/BaseParserListener.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/main/java/org/apache/flink/cdc/connectors/oracle/source/parser/BaseParserListener.java
@@ -17,6 +17,8 @@
 
 package org.apache.flink.cdc.connectors.oracle.source.parser;
 
+import org.apache.flink.cdc.connectors.oracle.utils.OracleSchemaUtils;
+
 import io.debezium.ddl.parser.oracle.generated.PlSqlParser;
 import io.debezium.ddl.parser.oracle.generated.PlSqlParserBaseListener;
 
@@ -76,21 +78,6 @@ public class BaseParserListener extends 
PlSqlParserBaseListener {
      * @return parsed table or column name from the supplied name argument
      */
     private static String getTableOrColumnName(String name) {
-        return removeQuotes(name, true);
-    }
-
-    /**
-     * Removes leading and trailing double quote characters from the provided 
string.
-     *
-     * @param text value to have double quotes removed
-     * @param upperCaseIfNotQuoted control if returned string is upper-cased 
if not quoted
-     * @return string that has had quotes removed
-     */
-    @SuppressWarnings("SameParameterValue")
-    private static String removeQuotes(String text, boolean 
upperCaseIfNotQuoted) {
-        if (text != null && text.length() > 2 && text.startsWith("\"") && 
text.endsWith("\"")) {
-            return text.substring(1, text.length() - 1);
-        }
-        return upperCaseIfNotQuoted ? text.toUpperCase() : text;
+        return OracleSchemaUtils.removeQuotes(name);
     }
 }
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/main/java/org/apache/flink/cdc/connectors/oracle/utils/OracleSchemaUtils.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/main/java/org/apache/flink/cdc/connectors/oracle/utils/OracleSchemaUtils.java
index e81c4110c..083442b51 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/main/java/org/apache/flink/cdc/connectors/oracle/utils/OracleSchemaUtils.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/main/java/org/apache/flink/cdc/connectors/oracle/utils/OracleSchemaUtils.java
@@ -46,6 +46,25 @@ public class OracleSchemaUtils {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(OracleSchemaUtils.class);
 
+    /**
+     * Removes leading and trailing double quote characters from the provided 
string.
+     *
+     * <p>Oracle table and column names are inherently stored in upper-case; 
however, if the objects
+     * are created using double-quotes, the case of the object name is 
retained. This method will
+     * adhere to those rules and will always return the name in upper-case 
unless the provided name
+     * is double-quoted in which the returned value will have the 
double-quotes removed and case
+     * retained.
+     *
+     * @param text value to have double quotes removed
+     * @return string that has had quotes removed
+     */
+    public static String removeQuotes(String text) {
+        if (text != null && text.length() > 2 && text.startsWith("\"") && 
text.endsWith("\"")) {
+            return text.substring(1, text.length() - 1);
+        }
+        return text.toUpperCase();
+    }
+
     public static List<TableId> listTables(
             OracleSourceConfig sourceConfig, @Nullable String dbName) {
         try (JdbcConnection jdbc = createOracleConnection(sourceConfig)) {
@@ -127,16 +146,23 @@ public class OracleSchemaUtils {
                         .map(OracleSchemaUtils::toColumn)
                         .collect(Collectors.toList());
 
+        List<String> primaryKeys =
+                table.primaryKeyColumnNames().stream()
+                        .map(OracleSchemaUtils::removeQuotes)
+                        .collect(Collectors.toList());
+
         return Schema.newBuilder()
                 .setColumns(columns)
-                .primaryKey(table.primaryKeyColumnNames())
+                .primaryKey(primaryKeys)
                 .comment(table.comment())
                 .build();
     }
 
     public static Column toColumn(io.debezium.relational.Column column) {
         return Column.physicalColumn(
-                column.name(), OracleTypeUtils.fromDbzColumn(column), 
column.comment());
+                removeQuotes(column.name()),
+                OracleTypeUtils.fromDbzColumn(column),
+                column.comment());
     }
 
     public static io.debezium.relational.TableId toDbzTableId(TableId tableId) 
{
@@ -158,7 +184,7 @@ public class OracleSchemaUtils {
             DataType dataType = OracleTypeUtils.fromDbzColumn(column);
             org.apache.flink.cdc.common.schema.Column cdcColumn =
                     org.apache.flink.cdc.common.schema.Column.physicalColumn(
-                            column.name().toLowerCase(Locale.ROOT), dataType);
+                            removeQuotes(column.name()), dataType);
             list.add(cdcColumn);
         }
         return Schema.newBuilder().setColumns(list).primaryKey(pks).build();
@@ -181,7 +207,7 @@ public class OracleSchemaUtils {
                         while (rs.next()) {
                             String columnName;
                             columnName = rs.getString(1);
-                            list.add(columnName.toLowerCase(Locale.ROOT));
+                            list.add(removeQuotes(columnName));
                         }
                         return list;
                     });
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/test/java/org/apache/flink/cdc/connectors/oracle/source/OraclePipelineITCase.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/test/java/org/apache/flink/cdc/connectors/oracle/source/OraclePipelineITCase.java
index 78883662d..e5ed8ec6e 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/test/java/org/apache/flink/cdc/connectors/oracle/source/OraclePipelineITCase.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/test/java/org/apache/flink/cdc/connectors/oracle/source/OraclePipelineITCase.java
@@ -983,10 +983,10 @@ public class OraclePipelineITCase extends 
OracleSourceTestBase {
                 new CreateTableEvent(
                         TableId.tableId("DEBEZIUM", "MYLAKE"),
                         Schema.newBuilder()
-                                .physicalColumn("feature_id", 
DataTypes.BIGINT().notNull())
-                                .physicalColumn("name", DataTypes.VARCHAR(32))
-                                .physicalColumn("shape", DataTypes.STRING())
-                                .primaryKey(Arrays.asList("feature_id"))
+                                .physicalColumn("FEATURE_ID", 
DataTypes.BIGINT().notNull())
+                                .physicalColumn("NAME", DataTypes.VARCHAR(32))
+                                .physicalColumn("SHAPE", DataTypes.STRING())
+                                .primaryKey(Arrays.asList("FEATURE_ID"))
                                 .build());
 
         RowType rowType =
@@ -1578,11 +1578,11 @@ public class OraclePipelineITCase extends 
OracleSourceTestBase {
         return new CreateTableEvent(
                 tableId,
                 Schema.newBuilder()
-                        .physicalColumn("id", DataTypes.BIGINT().notNull())
-                        .physicalColumn("name", 
DataTypes.VARCHAR(255).notNull())
-                        .physicalColumn("description", DataTypes.VARCHAR(512))
-                        .physicalColumn("weight", DataTypes.FLOAT())
-                        .primaryKey(Collections.singletonList("id"))
+                        .physicalColumn("ID", DataTypes.BIGINT().notNull())
+                        .physicalColumn("NAME", 
DataTypes.VARCHAR(255).notNull())
+                        .physicalColumn("DESCRIPTION", DataTypes.VARCHAR(512))
+                        .physicalColumn("WEIGHT", DataTypes.FLOAT())
+                        .primaryKey(Collections.singletonList("ID"))
                         .build());
     }
 
diff --git 
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/OracleE2eITCase.java
 
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/OracleE2eITCase.java
index 0be4734e7..e6b5387a2 100644
--- 
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/OracleE2eITCase.java
+++ 
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/OracleE2eITCase.java
@@ -150,7 +150,7 @@ public class OracleE2eITCase extends 
PipelineTestEnvironment {
                 Statement stat = conn.createStatement()) {
 
             waitUntilSpecificEvent(
-                    "CreateTableEvent{tableId=DEBEZIUM.PRODUCTS, 
schema=columns={`id` BIGINT NOT NULL,`name` VARCHAR(255) NOT NULL,`description` 
VARCHAR(512),`weight` FLOAT}, primaryKeys=id, options=()}");
+                    "CreateTableEvent{tableId=DEBEZIUM.PRODUCTS, 
schema=columns={`ID` BIGINT NOT NULL,`NAME` VARCHAR(255) NOT NULL,`DESCRIPTION` 
VARCHAR(512),`WEIGHT` FLOAT}, primaryKeys=ID, options=()}");
             waitUntilSpecificEvent(
                     "DataChangeEvent{tableId=DEBEZIUM.PRODUCTS, before=[], 
after=[109, spare tire, 24 inch spare tire, 22.2], op=INSERT, meta=()}");
             waitUntilSpecificEvent(
@@ -171,7 +171,7 @@ public class OracleE2eITCase extends 
PipelineTestEnvironment {
                     "DataChangeEvent{tableId=DEBEZIUM.PRODUCTS, before=[], 
after=[105, hammer, 14oz carpenters hammer, 0.875], op=INSERT, meta=()}");
 
             waitUntilSpecificEvent(
-                    "CreateTableEvent{tableId=DEBEZIUM.CUSTOMERS, 
schema=columns={`id` BIGINT NOT NULL,`name` VARCHAR(255) NOT NULL,`address` 
VARCHAR(1024),`phone_number` VARCHAR(512)}, primaryKeys=id, options=()}");
+                    "CreateTableEvent{tableId=DEBEZIUM.CUSTOMERS, 
schema=columns={`ID` BIGINT NOT NULL,`NAME` VARCHAR(255) NOT NULL,`ADDRESS` 
VARCHAR(1024),`PHONE_NUMBER` VARCHAR(512)}, primaryKeys=ID, options=()}");
             waitUntilSpecificEvent(
                     "DataChangeEvent{tableId=DEBEZIUM.CUSTOMERS, before=[], 
after=[171798691841, user_1, Shanghai, 123567891234], op=INSERT, meta=()}");
             waitUntilSpecificEvent(
@@ -225,7 +225,7 @@ public class OracleE2eITCase extends 
PipelineTestEnvironment {
             waitUntilSpecificEvent(
                     "DataChangeEvent{tableId=DEBEZIUM.PRODUCTS, before=[107, 
rocks, box of assorted rocks, 5.3], after=[107, rocks, box of assorted rocks, 
5.1], op=UPDATE, meta=()}");
             waitUntilSpecificEvent(
-                    "CreateTableEvent{tableId=DEBEZIUM.CUSTOMERS_1, 
schema=columns={`id` BIGINT NOT NULL,`name` VARCHAR(255) NOT NULL,`address` 
VARCHAR(1024),`phone_number` VARCHAR(512)}, primaryKeys=id, options=()}");
+                    "CreateTableEvent{tableId=DEBEZIUM.CUSTOMERS_1, 
schema=columns={`ID` BIGINT NOT NULL,`NAME` VARCHAR(255) NOT NULL,`ADDRESS` 
VARCHAR(1024),`PHONE_NUMBER` VARCHAR(512)}, primaryKeys=ID, options=()}");
             waitUntilSpecificEvent(
                     "DataChangeEvent{tableId=DEBEZIUM.CUSTOMERS_1, before=[], 
after=[171798691842, user_10, Shanghai, 123567891234], op=INSERT, meta=()}");
             waitUntilSpecificEvent(

Reply via email to