weiqingy commented on code in PR #28498: URL: https://github.com/apache/flink/pull/28498#discussion_r3447627918
########## flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/debezium/DebeziumAvroDecodingFormat.java: ########## @@ -0,0 +1,266 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.avro.registry.confluent.debezium; + +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.formats.avro.registry.confluent.debezium.DebeziumAvroDeserializationSchema.MetadataConverter; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.connector.ChangelogMode; +import org.apache.flink.table.connector.Projection; +import org.apache.flink.table.connector.format.DecodingFormat; +import org.apache.flink.table.connector.format.ProjectableDecodingFormat; +import org.apache.flink.table.connector.source.DynamicTableSource; +import org.apache.flink.table.data.GenericMapData; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.utils.DataTypeUtils; +import org.apache.flink.types.RowKind; + +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import java.util.stream.Stream; + +/** {@link DecodingFormat} for Debezium using Avro encoding. */ +public class DebeziumAvroDecodingFormat + implements ProjectableDecodingFormat<DeserializationSchema<RowData>> { + + // ---------------------------------------------------------------------------------------- + // Mutable attributes + // ---------------------------------------------------------------------------------------- + + private List<String> metadataKeys; + + // ---------------------------------------------------------------------------------------- + // Debezium-specific attributes + // ---------------------------------------------------------------------------------------- + + private final String schemaRegistryURL; + private final String schema; + private final Map<String, ?> optionalPropertiesMap; + + public DebeziumAvroDecodingFormat( + String schemaRegistryURL, String schema, Map<String, ?> optionalPropertiesMap) { + this.schemaRegistryURL = schemaRegistryURL; + this.schema = schema; + this.optionalPropertiesMap = optionalPropertiesMap; + this.metadataKeys = Collections.emptyList(); + } + + @Override + public DeserializationSchema<RowData> createRuntimeDecoder( + DynamicTableSource.Context context, DataType physicalDataType, int[][] projections) { + physicalDataType = Projection.of(projections).project(physicalDataType); + + final List<ReadableMetadata> readableMetadata = + metadataKeys.stream() + .map( + k -> + Stream.of(ReadableMetadata.values()) + .filter(rm -> rm.key.equals(k)) + .findFirst() + .orElseThrow(IllegalStateException::new)) + .collect(Collectors.toList()); + final List<DataTypes.Field> metadataFields = + readableMetadata.stream() + .map(m -> DataTypes.FIELD(m.key, m.dataType)) + .collect(Collectors.toList()); + + final DataType producedDataType = + DataTypeUtils.appendRowFields(physicalDataType, metadataFields); + final TypeInformation<RowData> producedTypeInfo = + context.createTypeInformation(producedDataType); + + return new DebeziumAvroDeserializationSchema( + physicalDataType, + readableMetadata, + producedTypeInfo, + schemaRegistryURL, + schema, + optionalPropertiesMap); + } + + @Override + public Map<String, DataType> listReadableMetadata() { + final Map<String, DataType> metadataMap = new LinkedHashMap<>(); + Stream.of(ReadableMetadata.values()) + .forEachOrdered(m -> metadataMap.put(m.key, m.dataType)); + return metadataMap; + } + + @Override + public void applyReadableMetadata(List<String> metadataKeys) { + this.metadataKeys = metadataKeys; + } + + @Override + public ChangelogMode getChangelogMode() { + return ChangelogMode.newBuilder() + .addContainedKind(RowKind.INSERT) + .addContainedKind(RowKind.UPDATE_BEFORE) + .addContainedKind(RowKind.UPDATE_AFTER) + .addContainedKind(RowKind.DELETE) + .build(); + } + + // ---------------------------------------------------------------------------------------- + // Metadata handling + // ---------------------------------------------------------------------------------------- + + /** List of metadata that can be read with this format. */ + enum ReadableMetadata { + INGESTION_TIMESTAMP( Review Comment: Minor: this inner `convert` returning `row` looks unreachable — the wrapper in `DebeziumAvroDeserializationSchema` only calls `m.converter.convert(...)` when the resolved field is a `GenericRowData`, and the top-level `ts_ms` is a timestamp scalar, so this branch never fires. Could drop the body to a no-op/`null` to avoid implying it runs. ########## flink-formats/flink-avro-confluent-registry/src/test/java/org/apache/flink/formats/avro/registry/confluent/debezium/DebeziumAvroSerDeSchemaTest.java: ########## @@ -146,10 +151,42 @@ void testDeleteDataDeserialization() throws Exception { assertThat(actual).isEqualTo(expected); } + @Test + void testDeserializationWithMetadata() throws Exception { + testDeserializationWithMetadata( + "debezium-avro-insert.avro", + row -> { + // Physical columns + assertThat(row.getLong(0)).isEqualTo(1L); + assertThat(row.getString(1).toString()).isEqualTo("lisi"); + assertThat(row.getString(2).toString()).isEqualTo("test debezium avro data"); + assertThat(row.getDouble(3)).isEqualTo(21.799999237060547); + + // Metadata: ingestion-timestamp (field index 4) + assertThat(row.getTimestamp(4, 3)).isNotNull(); Review Comment: The two timestamp metadata fields are asserted only with `isNotNull()`, so a converter that extracted the wrong field (or a constant) would still pass — could these assert the exact expected values from the fixture, the way the physical columns do? Relatedly, `source.schema` isn't asserted at all (the comment notes it may be null for MySQL), and the fixture is a single connector. Given the `source` shape is the riskiest part of this change, a second connector's fixture would buy a lot of confidence — though I'd understand deferring that if test data is hard to generate. ########## flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/debezium/DebeziumAvroDecodingFormat.java: ########## @@ -0,0 +1,266 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.avro.registry.confluent.debezium; + +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.formats.avro.registry.confluent.debezium.DebeziumAvroDeserializationSchema.MetadataConverter; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.connector.ChangelogMode; +import org.apache.flink.table.connector.Projection; +import org.apache.flink.table.connector.format.DecodingFormat; +import org.apache.flink.table.connector.format.ProjectableDecodingFormat; +import org.apache.flink.table.connector.source.DynamicTableSource; +import org.apache.flink.table.data.GenericMapData; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.utils.DataTypeUtils; +import org.apache.flink.types.RowKind; + +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import java.util.stream.Stream; + +/** {@link DecodingFormat} for Debezium using Avro encoding. */ +public class DebeziumAvroDecodingFormat + implements ProjectableDecodingFormat<DeserializationSchema<RowData>> { + + // ---------------------------------------------------------------------------------------- + // Mutable attributes + // ---------------------------------------------------------------------------------------- + + private List<String> metadataKeys; + + // ---------------------------------------------------------------------------------------- + // Debezium-specific attributes + // ---------------------------------------------------------------------------------------- + + private final String schemaRegistryURL; + private final String schema; + private final Map<String, ?> optionalPropertiesMap; + + public DebeziumAvroDecodingFormat( + String schemaRegistryURL, String schema, Map<String, ?> optionalPropertiesMap) { + this.schemaRegistryURL = schemaRegistryURL; + this.schema = schema; + this.optionalPropertiesMap = optionalPropertiesMap; + this.metadataKeys = Collections.emptyList(); + } + + @Override + public DeserializationSchema<RowData> createRuntimeDecoder( + DynamicTableSource.Context context, DataType physicalDataType, int[][] projections) { + physicalDataType = Projection.of(projections).project(physicalDataType); + + final List<ReadableMetadata> readableMetadata = + metadataKeys.stream() + .map( + k -> + Stream.of(ReadableMetadata.values()) + .filter(rm -> rm.key.equals(k)) + .findFirst() + .orElseThrow(IllegalStateException::new)) + .collect(Collectors.toList()); + final List<DataTypes.Field> metadataFields = + readableMetadata.stream() + .map(m -> DataTypes.FIELD(m.key, m.dataType)) + .collect(Collectors.toList()); + + final DataType producedDataType = + DataTypeUtils.appendRowFields(physicalDataType, metadataFields); + final TypeInformation<RowData> producedTypeInfo = + context.createTypeInformation(producedDataType); + + return new DebeziumAvroDeserializationSchema( + physicalDataType, + readableMetadata, + producedTypeInfo, + schemaRegistryURL, + schema, + optionalPropertiesMap); + } + + @Override + public Map<String, DataType> listReadableMetadata() { + final Map<String, DataType> metadataMap = new LinkedHashMap<>(); + Stream.of(ReadableMetadata.values()) + .forEachOrdered(m -> metadataMap.put(m.key, m.dataType)); + return metadataMap; + } + + @Override + public void applyReadableMetadata(List<String> metadataKeys) { + this.metadataKeys = metadataKeys; + } + + @Override + public ChangelogMode getChangelogMode() { + return ChangelogMode.newBuilder() + .addContainedKind(RowKind.INSERT) + .addContainedKind(RowKind.UPDATE_BEFORE) + .addContainedKind(RowKind.UPDATE_AFTER) + .addContainedKind(RowKind.DELETE) + .build(); + } + + // ---------------------------------------------------------------------------------------- + // Metadata handling + // ---------------------------------------------------------------------------------------- + + /** List of metadata that can be read with this format. */ + enum ReadableMetadata { Review Comment: `debezium-json` exposes a 7th metadata, `schema` (the inline Connect schema), which isn't in this enum. I'm assuming that's intentional because the Confluent format carries the schema in the registry rather than inline, so there's nothing to expose — is that the reasoning, or just out of scope for this PR? ########## flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/debezium/DebeziumAvroDeserializationSchema.java: ########## @@ -189,22 +252,41 @@ public boolean equals(Object o) { } DebeziumAvroDeserializationSchema that = (DebeziumAvroDeserializationSchema) o; return Objects.equals(avroDeserializer, that.avroDeserializer) - && Objects.equals(producedTypeInfo, that.producedTypeInfo); + && Objects.equals(producedTypeInfo, that.producedTypeInfo) + && hasMetadata == that.hasMetadata; } @Override public int hashCode() { - return Objects.hash(avroDeserializer, producedTypeInfo); + return Objects.hash(avroDeserializer, producedTypeInfo, hasMetadata); } - public static RowType createDebeziumAvroRowType(DataType databaseSchema) { - // Debezium Avro contains other information, e.g. "source", "ts_ms" - // but we don't need them - return (RowType) + public static RowType createDebeziumAvroRowType( Review Comment: This comment moved into the parameter list and now reads the opposite of what the code does — it says "but we don't need them," yet the method now does append `source`/`ts_ms` to extract metadata. Worth updating so it describes the current behavior (and moving it back above the params for readability). ########## flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/debezium/DebeziumAvroDecodingFormat.java: ########## @@ -0,0 +1,266 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.avro.registry.confluent.debezium; + +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.formats.avro.registry.confluent.debezium.DebeziumAvroDeserializationSchema.MetadataConverter; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.connector.ChangelogMode; +import org.apache.flink.table.connector.Projection; +import org.apache.flink.table.connector.format.DecodingFormat; +import org.apache.flink.table.connector.format.ProjectableDecodingFormat; +import org.apache.flink.table.connector.source.DynamicTableSource; +import org.apache.flink.table.data.GenericMapData; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.utils.DataTypeUtils; +import org.apache.flink.types.RowKind; + +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import java.util.stream.Stream; + +/** {@link DecodingFormat} for Debezium using Avro encoding. */ +public class DebeziumAvroDecodingFormat + implements ProjectableDecodingFormat<DeserializationSchema<RowData>> { + + // ---------------------------------------------------------------------------------------- + // Mutable attributes + // ---------------------------------------------------------------------------------------- + + private List<String> metadataKeys; + + // ---------------------------------------------------------------------------------------- + // Debezium-specific attributes + // ---------------------------------------------------------------------------------------- + + private final String schemaRegistryURL; + private final String schema; + private final Map<String, ?> optionalPropertiesMap; + + public DebeziumAvroDecodingFormat( + String schemaRegistryURL, String schema, Map<String, ?> optionalPropertiesMap) { + this.schemaRegistryURL = schemaRegistryURL; + this.schema = schema; + this.optionalPropertiesMap = optionalPropertiesMap; + this.metadataKeys = Collections.emptyList(); + } + + @Override + public DeserializationSchema<RowData> createRuntimeDecoder( + DynamicTableSource.Context context, DataType physicalDataType, int[][] projections) { + physicalDataType = Projection.of(projections).project(physicalDataType); + + final List<ReadableMetadata> readableMetadata = + metadataKeys.stream() + .map( + k -> + Stream.of(ReadableMetadata.values()) + .filter(rm -> rm.key.equals(k)) + .findFirst() + .orElseThrow(IllegalStateException::new)) + .collect(Collectors.toList()); + final List<DataTypes.Field> metadataFields = + readableMetadata.stream() + .map(m -> DataTypes.FIELD(m.key, m.dataType)) + .collect(Collectors.toList()); + + final DataType producedDataType = + DataTypeUtils.appendRowFields(physicalDataType, metadataFields); + final TypeInformation<RowData> producedTypeInfo = + context.createTypeInformation(producedDataType); + + return new DebeziumAvroDeserializationSchema( + physicalDataType, + readableMetadata, + producedTypeInfo, + schemaRegistryURL, + schema, + optionalPropertiesMap); + } + + @Override + public Map<String, DataType> listReadableMetadata() { + final Map<String, DataType> metadataMap = new LinkedHashMap<>(); + Stream.of(ReadableMetadata.values()) + .forEachOrdered(m -> metadataMap.put(m.key, m.dataType)); + return metadataMap; + } + + @Override + public void applyReadableMetadata(List<String> metadataKeys) { + this.metadataKeys = metadataKeys; + } + + @Override + public ChangelogMode getChangelogMode() { + return ChangelogMode.newBuilder() + .addContainedKind(RowKind.INSERT) + .addContainedKind(RowKind.UPDATE_BEFORE) + .addContainedKind(RowKind.UPDATE_AFTER) + .addContainedKind(RowKind.DELETE) + .build(); + } + + // ---------------------------------------------------------------------------------------- + // Metadata handling + // ---------------------------------------------------------------------------------------- + + /** List of metadata that can be read with this format. */ + enum ReadableMetadata { + INGESTION_TIMESTAMP( + "ingestion-timestamp", + DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3).nullable(), + DataTypes.FIELD("ts_ms", DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)), + new MetadataConverter() { + private static final long serialVersionUID = 1L; + + @Override + public Object convert(GenericRowData row, int unused) { + return row; + } + }), + + SOURCE_TIMESTAMP( + "source.timestamp", + DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3).nullable(), + SOURCE_FIELD, + new MetadataConverter() { + private static final long serialVersionUID = 1L; + + @Override + public Object convert(GenericRowData row, int unused) { + int pos = SOURCE_PROPERTY_POSITION.get("ts_ms"); + return row.getField(pos); + } + }), + + SOURCE_DATABASE( + "source.database", + DataTypes.STRING().nullable(), + SOURCE_FIELD, + new MetadataConverter() { + private static final long serialVersionUID = 1L; + + @Override + public Object convert(GenericRowData row, int unused) { + int pos = SOURCE_PROPERTY_POSITION.get("db"); + return row.getField(pos); + } + }), + + SOURCE_SCHEMA( + "source.schema", + DataTypes.STRING().nullable(), + SOURCE_FIELD, + new MetadataConverter() { + private static final long serialVersionUID = 1L; + + @Override + public Object convert(GenericRowData row, int unused) { + int pos = SOURCE_PROPERTY_POSITION.get("schema"); + return row.getField(pos); + } + }), + + SOURCE_TABLE( + "source.table", + DataTypes.STRING().nullable(), + SOURCE_FIELD, + new MetadataConverter() { + private static final long serialVersionUID = 1L; + + @Override + public Object convert(GenericRowData row, int unused) { + int pos = SOURCE_PROPERTY_POSITION.get("table"); + return row.getField(pos); + } + }), + + SOURCE_PROPERTIES( + "source.properties", + // key and value of the map are nullable to make handling easier in queries + DataTypes.MAP(DataTypes.STRING().nullable(), DataTypes.STRING().nullable()) + .nullable(), + SOURCE_FIELD, + new MetadataConverter() { + private static final long serialVersionUID = 1L; + + @Override + public Object convert(GenericRowData row, int unused) { + Map<StringData, StringData> result = new HashMap<>(); + for (int i = 0; i < SOURCE_PROPERTY_FIELDS.length; i++) { + Object value = row.getField(i); + result.put( + StringData.fromString(SOURCE_PROPERTY_FIELDS[i].getName()), + value == null ? null : StringData.fromString(value.toString())); + } + return new GenericMapData(result); + } + }); + + final String key; + final DataType dataType; + final DataTypes.Field requiredAvroField; + final MetadataConverter converter; + + ReadableMetadata( + String key, + DataType dataType, + DataTypes.Field requiredAvroField, + MetadataConverter converter) { + this.key = key; + this.dataType = dataType; + this.requiredAvroField = requiredAvroField; + this.converter = converter; + } + } + + private static final DataTypes.Field[] SOURCE_PROPERTY_FIELDS = { Review Comment: `debezium-json` models `source` as a connector-agnostic `MAP<STRING, STRING>` (`DebeziumJsonDecodingFormat`'s `SOURCE_*` entries all use `DataTypes.FIELD("source", DataTypes.MAP(...))`), so it reads whatever fields a given Debezium connector emits. Here `source` is a fixed-order ROW of 13 named fields, and several (`scn`, `commit_scn`, `lcr_position`) are Oracle/XStream-specific while `schema` is absent from MySQL's `source`. Since `AvroToRowDataConverters.createRowConverter` reads positionally (`record.get(i)`), this reader ROW has to line up — via Avro name resolution — with the actual writer schema in the registry. Different connectors (MySQL: `server_id`/`gtid`/`file`/`pos`; Postgres: `lsn`/`xmin`; MongoDB: different again) emit different `source` shapes. How do you see this behaving when the registered schema's `source` doesn't match this list — does Avro resolution fill the missing fields as null, or does it reject? I'm wondering whether mirroring the JSON side's flexible map would sidestep the per-connector coupling entirely. Asking rather than asserting here — you may have a constraint I'm not seeing. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
