lvyanquan commented on code in PR #3129:
URL: https://github.com/apache/flink-cdc/pull/3129#discussion_r1522351212


##########
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperator.java:
##########
@@ -78,29 +133,130 @@ public void setup(
     @Override
     public void processElement(StreamRecord<Event> streamRecord) {
         Event event = streamRecord.getValue();
+        // Schema changes
         if (event instanceof SchemaChangeEvent) {
             TableId tableId = ((SchemaChangeEvent) event).tableId();
             LOG.info(
                     "Table {} received SchemaChangeEvent and start to be 
blocked.",
                     tableId.toString());
             handleSchemaChangeEvent(tableId, (SchemaChangeEvent) event);
+            // Update caches
+            cachedSchemas.put(tableId, getLatestSchema(tableId));
+            getRoutedTable(tableId)
+                    .ifPresent(routed -> cachedSchemas.put(routed, 
getLatestSchema(routed)));
             return;
         }
-        output.collect(streamRecord);
+
+        // Data changes
+        DataChangeEvent dataChangeEvent = (DataChangeEvent) event;
+        TableId tableId = dataChangeEvent.tableId();
+        Optional<TableId> optionalRoutedTable = getRoutedTable(tableId);
+        if (optionalRoutedTable.isPresent()) {
+            output.collect(
+                    new StreamRecord<>(
+                            maybeFillInNullForEmptyColumns(
+                                    dataChangeEvent, 
optionalRoutedTable.get())));
+        } else {
+            output.collect(streamRecord);
+        }
     }
 
     // 
----------------------------------------------------------------------------------
 
+    private DataChangeEvent maybeFillInNullForEmptyColumns(
+            DataChangeEvent originalEvent, TableId routedTableId) {
+        try {
+            Schema originalSchema = cachedSchemas.get(originalEvent.tableId());
+            Schema routedTableSchema = cachedSchemas.get(routedTableId);
+            if (originalSchema.equals(routedTableSchema)) {
+                return ChangeEventUtils.recreateDataChangeEvent(originalEvent, 
routedTableId);
+            }
+            switch (originalEvent.op()) {
+                case INSERT:
+                    return DataChangeEvent.insertEvent(
+                            routedTableId,
+                            regenerateRecordData(
+                                    originalEvent.after(), originalSchema, 
routedTableSchema),
+                            originalEvent.meta());
+                case UPDATE:
+                    return DataChangeEvent.updateEvent(
+                            routedTableId,
+                            regenerateRecordData(
+                                    originalEvent.before(), originalSchema, 
routedTableSchema),
+                            regenerateRecordData(
+                                    originalEvent.after(), originalSchema, 
routedTableSchema),
+                            originalEvent.meta());
+                case DELETE:
+                    return DataChangeEvent.deleteEvent(
+                            routedTableId,
+                            regenerateRecordData(
+                                    originalEvent.before(), originalSchema, 
routedTableSchema),
+                            originalEvent.meta());
+                case REPLACE:
+                    return DataChangeEvent.replaceEvent(
+                            routedTableId,
+                            regenerateRecordData(
+                                    originalEvent.after(), originalSchema, 
routedTableSchema),
+                            originalEvent.meta());
+                default:
+                    throw new IllegalArgumentException(
+                            String.format(
+                                    "Unrecognized operation type \"%s\"", 
originalEvent.op()));
+            }
+        } catch (Exception e) {
+            throw new IllegalStateException("Unable to fill null for empty 
columns", e);
+        }
+    }
+
+    private RecordData regenerateRecordData(
+            RecordData recordData, Schema originalSchema, Schema 
routedTableSchema) {
+        // Regenerate record data
+        List<RecordData.FieldGetter> fieldGetters = new ArrayList<>();
+        for (Column column : routedTableSchema.getColumns()) {
+            String columnName = column.getName();
+            int columnIndex = 
originalSchema.getColumnNames().indexOf(columnName);
+            if (columnIndex == -1) {
+                fieldGetters.add(new NullFieldGetter());
+            } else {
+                RecordData.FieldGetter fieldGetter =
+                        RecordData.createFieldGetter(
+                                
originalSchema.getColumn(columnName).get().getType(), columnIndex);

Review Comment:
   Can we cache a tunple of originalSchema and its fieldGetterList? As 
RecordData.createFieldGetter will need to judge DataType and create new Object.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to