lvyanquan commented on code in PR #3129: URL: https://github.com/apache/flink-cdc/pull/3129#discussion_r1522351212
########## flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperator.java: ########## @@ -78,29 +133,130 @@ public void setup( @Override public void processElement(StreamRecord<Event> streamRecord) { Event event = streamRecord.getValue(); + // Schema changes if (event instanceof SchemaChangeEvent) { TableId tableId = ((SchemaChangeEvent) event).tableId(); LOG.info( "Table {} received SchemaChangeEvent and start to be blocked.", tableId.toString()); handleSchemaChangeEvent(tableId, (SchemaChangeEvent) event); + // Update caches + cachedSchemas.put(tableId, getLatestSchema(tableId)); + getRoutedTable(tableId) + .ifPresent(routed -> cachedSchemas.put(routed, getLatestSchema(routed))); return; } - output.collect(streamRecord); + + // Data changes + DataChangeEvent dataChangeEvent = (DataChangeEvent) event; + TableId tableId = dataChangeEvent.tableId(); + Optional<TableId> optionalRoutedTable = getRoutedTable(tableId); + if (optionalRoutedTable.isPresent()) { + output.collect( + new StreamRecord<>( + maybeFillInNullForEmptyColumns( + dataChangeEvent, optionalRoutedTable.get()))); + } else { + output.collect(streamRecord); + } } // ---------------------------------------------------------------------------------- + private DataChangeEvent maybeFillInNullForEmptyColumns( + DataChangeEvent originalEvent, TableId routedTableId) { + try { + Schema originalSchema = cachedSchemas.get(originalEvent.tableId()); + Schema routedTableSchema = cachedSchemas.get(routedTableId); + if (originalSchema.equals(routedTableSchema)) { + return ChangeEventUtils.recreateDataChangeEvent(originalEvent, routedTableId); + } + switch (originalEvent.op()) { + case INSERT: + return DataChangeEvent.insertEvent( + routedTableId, + regenerateRecordData( + originalEvent.after(), originalSchema, routedTableSchema), + originalEvent.meta()); + case UPDATE: + return DataChangeEvent.updateEvent( + routedTableId, + regenerateRecordData( + originalEvent.before(), originalSchema, routedTableSchema), + regenerateRecordData( + originalEvent.after(), originalSchema, routedTableSchema), + originalEvent.meta()); + case DELETE: + return DataChangeEvent.deleteEvent( + routedTableId, + regenerateRecordData( + originalEvent.before(), originalSchema, routedTableSchema), + originalEvent.meta()); + case REPLACE: + return DataChangeEvent.replaceEvent( + routedTableId, + regenerateRecordData( + originalEvent.after(), originalSchema, routedTableSchema), + originalEvent.meta()); + default: + throw new IllegalArgumentException( + String.format( + "Unrecognized operation type \"%s\"", originalEvent.op())); + } + } catch (Exception e) { + throw new IllegalStateException("Unable to fill null for empty columns", e); + } + } + + private RecordData regenerateRecordData( + RecordData recordData, Schema originalSchema, Schema routedTableSchema) { + // Regenerate record data + List<RecordData.FieldGetter> fieldGetters = new ArrayList<>(); + for (Column column : routedTableSchema.getColumns()) { + String columnName = column.getName(); + int columnIndex = originalSchema.getColumnNames().indexOf(columnName); + if (columnIndex == -1) { + fieldGetters.add(new NullFieldGetter()); + } else { + RecordData.FieldGetter fieldGetter = + RecordData.createFieldGetter( + originalSchema.getColumn(columnName).get().getType(), columnIndex); Review Comment: Can we cache a tunple of originalSchema and its fieldGetterList? As RecordData.createFieldGetter will need to judge DataType and create new Object. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org