This is an automated email from the ASF dual-hosted git repository.
kunni pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
The following commit(s) were added to refs/heads/master by this push:
new 1f64e5711 [FLINK-39171][oracle-cdc] Fix Oracle pipeline connector
wrong field case (#4292)
1f64e5711 is described below
commit 1f64e5711549a27649f5b98b51f0ac31b255d6e8
Author: Jia Fan <[email protected]>
AuthorDate: Wed Mar 25 09:41:14 2026 +0800
[FLINK-39171][oracle-cdc] Fix Oracle pipeline connector wrong field case
(#4292)
Co-authored-by: lvyanquan <[email protected]>
Co-authored-by: Claude Opus 4.6 <[email protected]>
---
.../oracle/source/parser/BaseParserListener.java | 19 ++----------
.../connectors/oracle/utils/OracleSchemaUtils.java | 34 +++++++++++++++++++---
.../oracle/source/OraclePipelineITCase.java | 18 ++++++------
.../flink/cdc/pipeline/tests/OracleE2eITCase.java | 6 ++--
4 files changed, 45 insertions(+), 32 deletions(-)
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/main/java/org/apache/flink/cdc/connectors/oracle/source/parser/BaseParserListener.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/main/java/org/apache/flink/cdc/connectors/oracle/source/parser/BaseParserListener.java
index efd4d3043..f56fd3a2b 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/main/java/org/apache/flink/cdc/connectors/oracle/source/parser/BaseParserListener.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/main/java/org/apache/flink/cdc/connectors/oracle/source/parser/BaseParserListener.java
@@ -17,6 +17,8 @@
package org.apache.flink.cdc.connectors.oracle.source.parser;
+import org.apache.flink.cdc.connectors.oracle.utils.OracleSchemaUtils;
+
import io.debezium.ddl.parser.oracle.generated.PlSqlParser;
import io.debezium.ddl.parser.oracle.generated.PlSqlParserBaseListener;
@@ -76,21 +78,6 @@ public class BaseParserListener extends
PlSqlParserBaseListener {
* @return parsed table or column name from the supplied name argument
*/
private static String getTableOrColumnName(String name) {
- return removeQuotes(name, true);
- }
-
- /**
- * Removes leading and trailing double quote characters from the provided
string.
- *
- * @param text value to have double quotes removed
- * @param upperCaseIfNotQuoted control if returned string is upper-cased
if not quoted
- * @return string that has had quotes removed
- */
- @SuppressWarnings("SameParameterValue")
- private static String removeQuotes(String text, boolean
upperCaseIfNotQuoted) {
- if (text != null && text.length() > 2 && text.startsWith("\"") &&
text.endsWith("\"")) {
- return text.substring(1, text.length() - 1);
- }
- return upperCaseIfNotQuoted ? text.toUpperCase() : text;
+ return OracleSchemaUtils.removeQuotes(name);
}
}
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/main/java/org/apache/flink/cdc/connectors/oracle/utils/OracleSchemaUtils.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/main/java/org/apache/flink/cdc/connectors/oracle/utils/OracleSchemaUtils.java
index e81c4110c..083442b51 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/main/java/org/apache/flink/cdc/connectors/oracle/utils/OracleSchemaUtils.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/main/java/org/apache/flink/cdc/connectors/oracle/utils/OracleSchemaUtils.java
@@ -46,6 +46,25 @@ public class OracleSchemaUtils {
private static final Logger LOG =
LoggerFactory.getLogger(OracleSchemaUtils.class);
+ /**
+ * Removes leading and trailing double quote characters from the provided
string.
+ *
+ * <p>Oracle table and column names are inherently stored in upper-case;
however, if the objects
+ * are created using double-quotes, the case of the object name is
retained. This method will
+ * adhere to those rules and will always return the name in upper-case
unless the provided name
+ * is double-quoted in which the returned value will have the
double-quotes removed and case
+ * retained.
+ *
+ * @param text value to have double quotes removed
+ * @return string that has had quotes removed
+ */
+ public static String removeQuotes(String text) {
+ if (text != null && text.length() > 2 && text.startsWith("\"") &&
text.endsWith("\"")) {
+ return text.substring(1, text.length() - 1);
+ }
+ return text.toUpperCase();
+ }
+
public static List<TableId> listTables(
OracleSourceConfig sourceConfig, @Nullable String dbName) {
try (JdbcConnection jdbc = createOracleConnection(sourceConfig)) {
@@ -127,16 +146,23 @@ public class OracleSchemaUtils {
.map(OracleSchemaUtils::toColumn)
.collect(Collectors.toList());
+ List<String> primaryKeys =
+ table.primaryKeyColumnNames().stream()
+ .map(OracleSchemaUtils::removeQuotes)
+ .collect(Collectors.toList());
+
return Schema.newBuilder()
.setColumns(columns)
- .primaryKey(table.primaryKeyColumnNames())
+ .primaryKey(primaryKeys)
.comment(table.comment())
.build();
}
public static Column toColumn(io.debezium.relational.Column column) {
return Column.physicalColumn(
- column.name(), OracleTypeUtils.fromDbzColumn(column),
column.comment());
+ removeQuotes(column.name()),
+ OracleTypeUtils.fromDbzColumn(column),
+ column.comment());
}
public static io.debezium.relational.TableId toDbzTableId(TableId tableId)
{
@@ -158,7 +184,7 @@ public class OracleSchemaUtils {
DataType dataType = OracleTypeUtils.fromDbzColumn(column);
org.apache.flink.cdc.common.schema.Column cdcColumn =
org.apache.flink.cdc.common.schema.Column.physicalColumn(
- column.name().toLowerCase(Locale.ROOT), dataType);
+ removeQuotes(column.name()), dataType);
list.add(cdcColumn);
}
return Schema.newBuilder().setColumns(list).primaryKey(pks).build();
@@ -181,7 +207,7 @@ public class OracleSchemaUtils {
while (rs.next()) {
String columnName;
columnName = rs.getString(1);
- list.add(columnName.toLowerCase(Locale.ROOT));
+ list.add(removeQuotes(columnName));
}
return list;
});
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/test/java/org/apache/flink/cdc/connectors/oracle/source/OraclePipelineITCase.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/test/java/org/apache/flink/cdc/connectors/oracle/source/OraclePipelineITCase.java
index 78883662d..e5ed8ec6e 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/test/java/org/apache/flink/cdc/connectors/oracle/source/OraclePipelineITCase.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/test/java/org/apache/flink/cdc/connectors/oracle/source/OraclePipelineITCase.java
@@ -983,10 +983,10 @@ public class OraclePipelineITCase extends
OracleSourceTestBase {
new CreateTableEvent(
TableId.tableId("DEBEZIUM", "MYLAKE"),
Schema.newBuilder()
- .physicalColumn("feature_id",
DataTypes.BIGINT().notNull())
- .physicalColumn("name", DataTypes.VARCHAR(32))
- .physicalColumn("shape", DataTypes.STRING())
- .primaryKey(Arrays.asList("feature_id"))
+ .physicalColumn("FEATURE_ID",
DataTypes.BIGINT().notNull())
+ .physicalColumn("NAME", DataTypes.VARCHAR(32))
+ .physicalColumn("SHAPE", DataTypes.STRING())
+ .primaryKey(Arrays.asList("FEATURE_ID"))
.build());
RowType rowType =
@@ -1578,11 +1578,11 @@ public class OraclePipelineITCase extends
OracleSourceTestBase {
return new CreateTableEvent(
tableId,
Schema.newBuilder()
- .physicalColumn("id", DataTypes.BIGINT().notNull())
- .physicalColumn("name",
DataTypes.VARCHAR(255).notNull())
- .physicalColumn("description", DataTypes.VARCHAR(512))
- .physicalColumn("weight", DataTypes.FLOAT())
- .primaryKey(Collections.singletonList("id"))
+ .physicalColumn("ID", DataTypes.BIGINT().notNull())
+ .physicalColumn("NAME",
DataTypes.VARCHAR(255).notNull())
+ .physicalColumn("DESCRIPTION", DataTypes.VARCHAR(512))
+ .physicalColumn("WEIGHT", DataTypes.FLOAT())
+ .primaryKey(Collections.singletonList("ID"))
.build());
}
diff --git
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/OracleE2eITCase.java
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/OracleE2eITCase.java
index 0be4734e7..e6b5387a2 100644
---
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/OracleE2eITCase.java
+++
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/OracleE2eITCase.java
@@ -150,7 +150,7 @@ public class OracleE2eITCase extends
PipelineTestEnvironment {
Statement stat = conn.createStatement()) {
waitUntilSpecificEvent(
- "CreateTableEvent{tableId=DEBEZIUM.PRODUCTS,
schema=columns={`id` BIGINT NOT NULL,`name` VARCHAR(255) NOT NULL,`description`
VARCHAR(512),`weight` FLOAT}, primaryKeys=id, options=()}");
+ "CreateTableEvent{tableId=DEBEZIUM.PRODUCTS,
schema=columns={`ID` BIGINT NOT NULL,`NAME` VARCHAR(255) NOT NULL,`DESCRIPTION`
VARCHAR(512),`WEIGHT` FLOAT}, primaryKeys=ID, options=()}");
waitUntilSpecificEvent(
"DataChangeEvent{tableId=DEBEZIUM.PRODUCTS, before=[],
after=[109, spare tire, 24 inch spare tire, 22.2], op=INSERT, meta=()}");
waitUntilSpecificEvent(
@@ -171,7 +171,7 @@ public class OracleE2eITCase extends
PipelineTestEnvironment {
"DataChangeEvent{tableId=DEBEZIUM.PRODUCTS, before=[],
after=[105, hammer, 14oz carpenters hammer, 0.875], op=INSERT, meta=()}");
waitUntilSpecificEvent(
- "CreateTableEvent{tableId=DEBEZIUM.CUSTOMERS,
schema=columns={`id` BIGINT NOT NULL,`name` VARCHAR(255) NOT NULL,`address`
VARCHAR(1024),`phone_number` VARCHAR(512)}, primaryKeys=id, options=()}");
+ "CreateTableEvent{tableId=DEBEZIUM.CUSTOMERS,
schema=columns={`ID` BIGINT NOT NULL,`NAME` VARCHAR(255) NOT NULL,`ADDRESS`
VARCHAR(1024),`PHONE_NUMBER` VARCHAR(512)}, primaryKeys=ID, options=()}");
waitUntilSpecificEvent(
"DataChangeEvent{tableId=DEBEZIUM.CUSTOMERS, before=[],
after=[171798691841, user_1, Shanghai, 123567891234], op=INSERT, meta=()}");
waitUntilSpecificEvent(
@@ -225,7 +225,7 @@ public class OracleE2eITCase extends
PipelineTestEnvironment {
waitUntilSpecificEvent(
"DataChangeEvent{tableId=DEBEZIUM.PRODUCTS, before=[107,
rocks, box of assorted rocks, 5.3], after=[107, rocks, box of assorted rocks,
5.1], op=UPDATE, meta=()}");
waitUntilSpecificEvent(
- "CreateTableEvent{tableId=DEBEZIUM.CUSTOMERS_1,
schema=columns={`id` BIGINT NOT NULL,`name` VARCHAR(255) NOT NULL,`address`
VARCHAR(1024),`phone_number` VARCHAR(512)}, primaryKeys=id, options=()}");
+ "CreateTableEvent{tableId=DEBEZIUM.CUSTOMERS_1,
schema=columns={`ID` BIGINT NOT NULL,`NAME` VARCHAR(255) NOT NULL,`ADDRESS`
VARCHAR(1024),`PHONE_NUMBER` VARCHAR(512)}, primaryKeys=ID, options=()}");
waitUntilSpecificEvent(
"DataChangeEvent{tableId=DEBEZIUM.CUSTOMERS_1, before=[],
after=[171798691842, user_10, Shanghai, 123567891234], op=INSERT, meta=()}");
waitUntilSpecificEvent(