This is an automated email from the ASF dual-hosted git repository.

kunni pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/master by this push:
     new ec244f195 [FLINK-39196][pipeline-connector][oracle] Support change 
column nullable without data type (#4295)
ec244f195 is described below

commit ec244f195b2f7f7f7dc63da4514780b015df0424
Author: Mingliang Zhu <[email protected]>
AuthorDate: Mon Mar 30 20:02:42 2026 +0800

    [FLINK-39196][pipeline-connector][oracle] Support change column nullable 
without data type (#4295)
---
 .../oracle/source/OracleEventDeserializer.java     | 24 +++++++++++
 .../parser/OracleAlterTableParserListener.java     | 47 ++++++++++------------
 .../oracle/source/OraclePipelineITCase.java        | 11 ++++-
 3 files changed, 54 insertions(+), 28 deletions(-)

diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/main/java/org/apache/flink/cdc/connectors/oracle/source/OracleEventDeserializer.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/main/java/org/apache/flink/cdc/connectors/oracle/source/OracleEventDeserializer.java
index f23a13137..a179df594 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/main/java/org/apache/flink/cdc/connectors/oracle/source/OracleEventDeserializer.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/main/java/org/apache/flink/cdc/connectors/oracle/source/OracleEventDeserializer.java
@@ -24,6 +24,7 @@ import org.apache.flink.cdc.common.event.TableId;
 import 
org.apache.flink.cdc.connectors.oracle.source.parser.OracleAntlrDdlParser;
 import org.apache.flink.cdc.connectors.oracle.table.OracleReadableMetaData;
 import org.apache.flink.cdc.debezium.event.DebeziumEventDeserializationSchema;
+import org.apache.flink.cdc.debezium.history.FlinkJsonTableChangeSerializer;
 import org.apache.flink.cdc.debezium.table.DebeziumChangelogMode;
 import org.apache.flink.table.data.TimestampData;
 
@@ -31,8 +32,10 @@ import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMap
 
 import io.debezium.data.Envelope;
 import io.debezium.data.geometry.Geometry;
+import io.debezium.document.Array;
 import io.debezium.relational.Tables;
 import io.debezium.relational.history.HistoryRecord;
+import io.debezium.relational.history.TableChanges;
 import org.apache.kafka.connect.data.Schema;
 import org.apache.kafka.connect.data.Struct;
 import org.apache.kafka.connect.source.SourceRecord;
@@ -58,6 +61,8 @@ public class OracleEventDeserializer extends 
DebeziumEventDeserializationSchema
             "io.debezium.connector.oracle.SchemaChangeKey";
 
     private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+    private static final FlinkJsonTableChangeSerializer 
TABLE_CHANGE_SERIALIZER =
+            new FlinkJsonTableChangeSerializer();
 
     private final boolean includeSchemaChanges;
 
@@ -90,6 +95,25 @@ public class OracleEventDeserializer extends 
DebeziumEventDeserializationSchema
                     customParser = new OracleAntlrDdlParser(databaseName, 
schemaName);
                     tables = new Tables();
                 }
+                Array tableChanges =
+                        
historyRecord.document().getArray(HistoryRecord.Fields.TABLE_CHANGES);
+                if (tableChanges != null) {
+                    TableChanges changes = 
TABLE_CHANGE_SERIALIZER.deserialize(tableChanges, true);
+                    for (TableChanges.TableChange tableChange : changes) {
+                        switch (tableChange.getType()) {
+                            case CREATE:
+                            case ALTER:
+                                tables.overwriteTable(tableChange.getTable());
+                                break;
+                            case DROP:
+                                // Keep current cache behavior for DROP: 
parser state relies on
+                                // overwrite and downstream events handle 
table drop separately.
+                                break;
+                            default:
+                                break;
+                        }
+                    }
+                }
                 String ddl =
                         
historyRecord.document().getString(HistoryRecord.Fields.DDL_STATEMENTS);
                 customParser.setCurrentDatabase(databaseName);
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/main/java/org/apache/flink/cdc/connectors/oracle/source/parser/OracleAlterTableParserListener.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/main/java/org/apache/flink/cdc/connectors/oracle/source/parser/OracleAlterTableParserListener.java
index 4b0f87eb1..4a849c492 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/main/java/org/apache/flink/cdc/connectors/oracle/source/parser/OracleAlterTableParserListener.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/main/java/org/apache/flink/cdc/connectors/oracle/source/parser/OracleAlterTableParserListener.java
@@ -28,7 +28,6 @@ import io.debezium.connector.oracle.antlr.OracleDdlParser;
 import io.debezium.ddl.parser.oracle.generated.PlSqlParser;
 import io.debezium.relational.Column;
 import io.debezium.relational.ColumnEditor;
-import io.debezium.relational.Table;
 import io.debezium.relational.TableEditor;
 import io.debezium.relational.TableId;
 import io.debezium.text.ParsingException;
@@ -88,15 +87,7 @@ public class OracleAlterTableParserListener extends 
BaseParserListener {
     @Override
     public void enterAlter_table(PlSqlParser.Alter_tableContext ctx) {
         TableId tableId = new TableId(catalogName, schemaName, 
getTableName(ctx.tableview_name()));
-        tableEditor = Table.editor().tableId(tableId);
-        if (tableEditor == null) {
-            throw new ParsingException(
-                    null,
-                    "Trying to alter table "
-                            + tableId
-                            + ", which does not exist. Query: "
-                            + getText(ctx));
-        }
+        tableEditor = parser.databaseTables().editOrCreateTable(tableId);
         super.enterAlter_table(ctx);
     }
 
@@ -174,24 +165,28 @@ public class OracleAlterTableParserListener extends 
BaseParserListener {
                     columnEditors = new ArrayList<>(columns.size());
                     for (PlSqlParser.Modify_col_propertiesContext column : 
columns) {
                         String columnName = 
getColumnName(column.column_name());
-                        Column existingColumn = 
Column.editor().name(columnName).create();
-                        if (existingColumn != null) {
-                            ColumnEditor columnEditor = existingColumn.edit();
-                            columnDefinitionParserListener =
-                                    new 
ColumnDefinitionParserListener(tableEditor, columnEditor);
-                            listeners.add(columnDefinitionParserListener);
-
-                            columnEditors.add(columnEditor);
+                        Column existingColumn = 
tableEditor.create().columnWithName(columnName);
+                        ColumnEditor columnEditor;
+                        if (existingColumn == null) {
+                            if (column.datatype() == null) {
+                                throw new ParsingException(
+                                        null,
+                                        "Trying to change column "
+                                                + columnName
+                                                + " in "
+                                                + tableEditor.tableId()
+                                                + " table, but column schema 
is missing and "
+                                                + "MODIFY statement doesn't 
provide datatype. Query: "
+                                                + getText(ctx));
+                            }
+                            columnEditor = Column.editor().name(columnName);
                         } else {
-                            throw new ParsingException(
-                                    null,
-                                    "trying to change column "
-                                            + columnName
-                                            + " in "
-                                            + tableEditor.tableId().toString()
-                                            + " table, which does not exist.  
Query: "
-                                            + getText(ctx));
+                            columnEditor = existingColumn.edit();
                         }
+                        columnDefinitionParserListener =
+                                new 
ColumnDefinitionParserListener(tableEditor, columnEditor);
+                        listeners.add(columnDefinitionParserListener);
+                        columnEditors.add(columnEditor);
                     }
                 },
                 tableEditor);
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/test/java/org/apache/flink/cdc/connectors/oracle/source/OraclePipelineITCase.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/test/java/org/apache/flink/cdc/connectors/oracle/source/OraclePipelineITCase.java
index e5ed8ec6e..497fa54ea 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/test/java/org/apache/flink/cdc/connectors/oracle/source/OraclePipelineITCase.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/test/java/org/apache/flink/cdc/connectors/oracle/source/OraclePipelineITCase.java
@@ -1737,13 +1737,20 @@ public class OraclePipelineITCase extends 
OracleSourceTestBase {
 
         statement.execute(
                 String.format(
-                        "ALTER TABLE %s.products ADD DESC1 VARCHAR(45) DEFAULT 
NULL", "debezium"));
+                        "ALTER TABLE %s.products ADD DESC1 VARCHAR(45) DEFAULT 
'N/A' NOT NULL",
+                        "debezium"));
         expected.add(
                 new AddColumnEvent(
                         tableId,
                         Collections.singletonList(
                                 new AddColumnEvent.ColumnWithPosition(
-                                        Column.physicalColumn("DESC1", 
DataTypes.VARCHAR(45))))));
+                                        Column.physicalColumn(
+                                                "DESC1", 
DataTypes.VARCHAR(45).notNull())))));
+
+        statement.execute(String.format("ALTER TABLE %s.products MODIFY DESC1 
NULL", "debezium"));
+        expected.add(
+                new AlterColumnTypeEvent(
+                        tableId, Collections.singletonMap("DESC1", 
DataTypes.VARCHAR(45))));
 
         statement.execute(
                 String.format(

Reply via email to