This is an automated email from the ASF dual-hosted git repository.
kunni pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
The following commit(s) were added to refs/heads/master by this push:
new ec244f195 [FLINK-39196][pipeline-connector][oracle] Support change
column nullable without data type (#4295)
ec244f195 is described below
commit ec244f195b2f7f7f7dc63da4514780b015df0424
Author: Mingliang Zhu <[email protected]>
AuthorDate: Mon Mar 30 20:02:42 2026 +0800
[FLINK-39196][pipeline-connector][oracle] Support change column nullable
without data type (#4295)
---
.../oracle/source/OracleEventDeserializer.java | 24 +++++++++++
.../parser/OracleAlterTableParserListener.java | 47 ++++++++++------------
.../oracle/source/OraclePipelineITCase.java | 11 ++++-
3 files changed, 54 insertions(+), 28 deletions(-)
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/main/java/org/apache/flink/cdc/connectors/oracle/source/OracleEventDeserializer.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/main/java/org/apache/flink/cdc/connectors/oracle/source/OracleEventDeserializer.java
index f23a13137..a179df594 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/main/java/org/apache/flink/cdc/connectors/oracle/source/OracleEventDeserializer.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/main/java/org/apache/flink/cdc/connectors/oracle/source/OracleEventDeserializer.java
@@ -24,6 +24,7 @@ import org.apache.flink.cdc.common.event.TableId;
import
org.apache.flink.cdc.connectors.oracle.source.parser.OracleAntlrDdlParser;
import org.apache.flink.cdc.connectors.oracle.table.OracleReadableMetaData;
import org.apache.flink.cdc.debezium.event.DebeziumEventDeserializationSchema;
+import org.apache.flink.cdc.debezium.history.FlinkJsonTableChangeSerializer;
import org.apache.flink.cdc.debezium.table.DebeziumChangelogMode;
import org.apache.flink.table.data.TimestampData;
@@ -31,8 +32,10 @@ import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMap
import io.debezium.data.Envelope;
import io.debezium.data.geometry.Geometry;
+import io.debezium.document.Array;
import io.debezium.relational.Tables;
import io.debezium.relational.history.HistoryRecord;
+import io.debezium.relational.history.TableChanges;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
@@ -58,6 +61,8 @@ public class OracleEventDeserializer extends
DebeziumEventDeserializationSchema
"io.debezium.connector.oracle.SchemaChangeKey";
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+ private static final FlinkJsonTableChangeSerializer
TABLE_CHANGE_SERIALIZER =
+ new FlinkJsonTableChangeSerializer();
private final boolean includeSchemaChanges;
@@ -90,6 +95,25 @@ public class OracleEventDeserializer extends
DebeziumEventDeserializationSchema
customParser = new OracleAntlrDdlParser(databaseName,
schemaName);
tables = new Tables();
}
+ Array tableChanges =
+
historyRecord.document().getArray(HistoryRecord.Fields.TABLE_CHANGES);
+ if (tableChanges != null) {
+ TableChanges changes =
TABLE_CHANGE_SERIALIZER.deserialize(tableChanges, true);
+ for (TableChanges.TableChange tableChange : changes) {
+ switch (tableChange.getType()) {
+ case CREATE:
+ case ALTER:
+ tables.overwriteTable(tableChange.getTable());
+ break;
+ case DROP:
+ // Keep current cache behavior for DROP:
parser state relies on
+ // overwrite and downstream events handle
table drop separately.
+ break;
+ default:
+ break;
+ }
+ }
+ }
String ddl =
historyRecord.document().getString(HistoryRecord.Fields.DDL_STATEMENTS);
customParser.setCurrentDatabase(databaseName);
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/main/java/org/apache/flink/cdc/connectors/oracle/source/parser/OracleAlterTableParserListener.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/main/java/org/apache/flink/cdc/connectors/oracle/source/parser/OracleAlterTableParserListener.java
index 4b0f87eb1..4a849c492 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/main/java/org/apache/flink/cdc/connectors/oracle/source/parser/OracleAlterTableParserListener.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/main/java/org/apache/flink/cdc/connectors/oracle/source/parser/OracleAlterTableParserListener.java
@@ -28,7 +28,6 @@ import io.debezium.connector.oracle.antlr.OracleDdlParser;
import io.debezium.ddl.parser.oracle.generated.PlSqlParser;
import io.debezium.relational.Column;
import io.debezium.relational.ColumnEditor;
-import io.debezium.relational.Table;
import io.debezium.relational.TableEditor;
import io.debezium.relational.TableId;
import io.debezium.text.ParsingException;
@@ -88,15 +87,7 @@ public class OracleAlterTableParserListener extends
BaseParserListener {
@Override
public void enterAlter_table(PlSqlParser.Alter_tableContext ctx) {
TableId tableId = new TableId(catalogName, schemaName,
getTableName(ctx.tableview_name()));
- tableEditor = Table.editor().tableId(tableId);
- if (tableEditor == null) {
- throw new ParsingException(
- null,
- "Trying to alter table "
- + tableId
- + ", which does not exist. Query: "
- + getText(ctx));
- }
+ tableEditor = parser.databaseTables().editOrCreateTable(tableId);
super.enterAlter_table(ctx);
}
@@ -174,24 +165,28 @@ public class OracleAlterTableParserListener extends
BaseParserListener {
columnEditors = new ArrayList<>(columns.size());
for (PlSqlParser.Modify_col_propertiesContext column :
columns) {
String columnName =
getColumnName(column.column_name());
- Column existingColumn =
Column.editor().name(columnName).create();
- if (existingColumn != null) {
- ColumnEditor columnEditor = existingColumn.edit();
- columnDefinitionParserListener =
- new
ColumnDefinitionParserListener(tableEditor, columnEditor);
- listeners.add(columnDefinitionParserListener);
-
- columnEditors.add(columnEditor);
+ Column existingColumn =
tableEditor.create().columnWithName(columnName);
+ ColumnEditor columnEditor;
+ if (existingColumn == null) {
+ if (column.datatype() == null) {
+ throw new ParsingException(
+ null,
+ "Trying to change column "
+ + columnName
+ + " in "
+ + tableEditor.tableId()
+ + " table, but column schema
is missing and "
+ + "MODIFY statement doesn't
provide datatype. Query: "
+ + getText(ctx));
+ }
+ columnEditor = Column.editor().name(columnName);
} else {
- throw new ParsingException(
- null,
- "trying to change column "
- + columnName
- + " in "
- + tableEditor.tableId().toString()
- + " table, which does not exist.
Query: "
- + getText(ctx));
+ columnEditor = existingColumn.edit();
}
+ columnDefinitionParserListener =
+ new
ColumnDefinitionParserListener(tableEditor, columnEditor);
+ listeners.add(columnDefinitionParserListener);
+ columnEditors.add(columnEditor);
}
},
tableEditor);
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/test/java/org/apache/flink/cdc/connectors/oracle/source/OraclePipelineITCase.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/test/java/org/apache/flink/cdc/connectors/oracle/source/OraclePipelineITCase.java
index e5ed8ec6e..497fa54ea 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/test/java/org/apache/flink/cdc/connectors/oracle/source/OraclePipelineITCase.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/test/java/org/apache/flink/cdc/connectors/oracle/source/OraclePipelineITCase.java
@@ -1737,13 +1737,20 @@ public class OraclePipelineITCase extends
OracleSourceTestBase {
statement.execute(
String.format(
- "ALTER TABLE %s.products ADD DESC1 VARCHAR(45) DEFAULT
NULL", "debezium"));
+ "ALTER TABLE %s.products ADD DESC1 VARCHAR(45) DEFAULT
'N/A' NOT NULL",
+ "debezium"));
expected.add(
new AddColumnEvent(
tableId,
Collections.singletonList(
new AddColumnEvent.ColumnWithPosition(
- Column.physicalColumn("DESC1",
DataTypes.VARCHAR(45))))));
+ Column.physicalColumn(
+ "DESC1",
DataTypes.VARCHAR(45).notNull())))));
+
+ statement.execute(String.format("ALTER TABLE %s.products MODIFY DESC1
NULL", "debezium"));
+ expected.add(
+ new AlterColumnTypeEvent(
+ tableId, Collections.singletonMap("DESC1",
DataTypes.VARCHAR(45))));
statement.execute(
String.format(