weiqingy commented on code in PR #28498:
URL: https://github.com/apache/flink/pull/28498#discussion_r3448964529


##########
flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/debezium/DebeziumAvroDecodingFormat.java:
##########
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro.registry.confluent.debezium;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import 
org.apache.flink.formats.avro.registry.confluent.debezium.DebeziumAvroDeserializationSchema.MetadataConverter;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.Projection;
+import org.apache.flink.table.connector.format.DecodingFormat;
+import org.apache.flink.table.connector.format.ProjectableDecodingFormat;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.data.GenericMapData;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.utils.DataTypeUtils;
+import org.apache.flink.types.RowKind;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+
+/** {@link DecodingFormat} for Debezium using Avro encoding. */
+public class DebeziumAvroDecodingFormat
+        implements ProjectableDecodingFormat<DeserializationSchema<RowData>> {
+
+    // 
----------------------------------------------------------------------------------------
+    // Mutable attributes
+    // 
----------------------------------------------------------------------------------------
+
+    private List<String> metadataKeys;
+
+    // 
----------------------------------------------------------------------------------------
+    // Debezium-specific attributes
+    // 
----------------------------------------------------------------------------------------
+
+    private final String schemaRegistryURL;
+    private final String schema;
+    private final Map<String, ?> optionalPropertiesMap;
+
+    public DebeziumAvroDecodingFormat(
+            String schemaRegistryURL, String schema, Map<String, ?> 
optionalPropertiesMap) {
+        this.schemaRegistryURL = schemaRegistryURL;
+        this.schema = schema;
+        this.optionalPropertiesMap = optionalPropertiesMap;
+        this.metadataKeys = Collections.emptyList();
+    }
+
+    @Override
+    public DeserializationSchema<RowData> createRuntimeDecoder(
+            DynamicTableSource.Context context, DataType physicalDataType, 
int[][] projections) {
+        physicalDataType = 
Projection.of(projections).project(physicalDataType);
+
+        final List<ReadableMetadata> readableMetadata =
+                metadataKeys.stream()
+                        .map(
+                                k ->
+                                        Stream.of(ReadableMetadata.values())
+                                                .filter(rm -> rm.key.equals(k))
+                                                .findFirst()
+                                                
.orElseThrow(IllegalStateException::new))
+                        .collect(Collectors.toList());
+        final List<DataTypes.Field> metadataFields =
+                readableMetadata.stream()
+                        .map(m -> DataTypes.FIELD(m.key, m.dataType))
+                        .collect(Collectors.toList());
+
+        final DataType producedDataType =
+                DataTypeUtils.appendRowFields(physicalDataType, 
metadataFields);
+        final TypeInformation<RowData> producedTypeInfo =
+                context.createTypeInformation(producedDataType);
+
+        return new DebeziumAvroDeserializationSchema(
+                physicalDataType,
+                readableMetadata,
+                producedTypeInfo,
+                schemaRegistryURL,
+                schema,
+                optionalPropertiesMap);
+    }
+
+    @Override
+    public Map<String, DataType> listReadableMetadata() {
+        final Map<String, DataType> metadataMap = new LinkedHashMap<>();
+        Stream.of(ReadableMetadata.values())
+                .forEachOrdered(m -> metadataMap.put(m.key, m.dataType));
+        return metadataMap;
+    }
+
+    @Override
+    public void applyReadableMetadata(List<String> metadataKeys) {
+        this.metadataKeys = metadataKeys;
+    }
+
+    @Override
+    public ChangelogMode getChangelogMode() {
+        return ChangelogMode.newBuilder()
+                .addContainedKind(RowKind.INSERT)
+                .addContainedKind(RowKind.UPDATE_BEFORE)
+                .addContainedKind(RowKind.UPDATE_AFTER)
+                .addContainedKind(RowKind.DELETE)
+                .build();
+    }
+
+    // 
----------------------------------------------------------------------------------------
+    // Metadata handling
+    // 
----------------------------------------------------------------------------------------
+
+    /** List of metadata that can be read with this format. */
+    enum ReadableMetadata {
+        INGESTION_TIMESTAMP(
+                "ingestion-timestamp",
+                DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3).nullable(),
+                DataTypes.FIELD("ts_ms", 
DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)),
+                new MetadataConverter() {
+                    private static final long serialVersionUID = 1L;
+
+                    @Override
+                    public Object convert(GenericRowData row, int unused) {
+                        return row;
+                    }
+                }),
+
+        SOURCE_TIMESTAMP(
+                "source.timestamp",
+                DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3).nullable(),
+                SOURCE_FIELD,
+                new MetadataConverter() {
+                    private static final long serialVersionUID = 1L;
+
+                    @Override
+                    public Object convert(GenericRowData row, int unused) {
+                        int pos = SOURCE_PROPERTY_POSITION.get("ts_ms");
+                        return row.getField(pos);
+                    }
+                }),
+
+        SOURCE_DATABASE(
+                "source.database",
+                DataTypes.STRING().nullable(),
+                SOURCE_FIELD,
+                new MetadataConverter() {
+                    private static final long serialVersionUID = 1L;
+
+                    @Override
+                    public Object convert(GenericRowData row, int unused) {
+                        int pos = SOURCE_PROPERTY_POSITION.get("db");
+                        return row.getField(pos);
+                    }
+                }),
+
+        SOURCE_SCHEMA(
+                "source.schema",
+                DataTypes.STRING().nullable(),
+                SOURCE_FIELD,
+                new MetadataConverter() {
+                    private static final long serialVersionUID = 1L;
+
+                    @Override
+                    public Object convert(GenericRowData row, int unused) {
+                        int pos = SOURCE_PROPERTY_POSITION.get("schema");
+                        return row.getField(pos);
+                    }
+                }),
+
+        SOURCE_TABLE(
+                "source.table",
+                DataTypes.STRING().nullable(),
+                SOURCE_FIELD,
+                new MetadataConverter() {
+                    private static final long serialVersionUID = 1L;
+
+                    @Override
+                    public Object convert(GenericRowData row, int unused) {
+                        int pos = SOURCE_PROPERTY_POSITION.get("table");
+                        return row.getField(pos);
+                    }
+                }),
+
+        SOURCE_PROPERTIES(
+                "source.properties",
+                // key and value of the map are nullable to make handling 
easier in queries
+                DataTypes.MAP(DataTypes.STRING().nullable(), 
DataTypes.STRING().nullable())
+                        .nullable(),
+                SOURCE_FIELD,
+                new MetadataConverter() {
+                    private static final long serialVersionUID = 1L;
+
+                    @Override
+                    public Object convert(GenericRowData row, int unused) {
+                        Map<StringData, StringData> result = new HashMap<>();
+                        for (int i = 0; i < SOURCE_PROPERTY_FIELDS.length; 
i++) {
+                            Object value = row.getField(i);
+                            result.put(
+                                    
StringData.fromString(SOURCE_PROPERTY_FIELDS[i].getName()),
+                                    value == null ? null : 
StringData.fromString(value.toString()));
+                        }
+                        return new GenericMapData(result);
+                    }
+                });
+
+        final String key;
+        final DataType dataType;
+        final DataTypes.Field requiredAvroField;
+        final MetadataConverter converter;
+
+        ReadableMetadata(
+                String key,
+                DataType dataType,
+                DataTypes.Field requiredAvroField,
+                MetadataConverter converter) {
+            this.key = key;
+            this.dataType = dataType;
+            this.requiredAvroField = requiredAvroField;
+            this.converter = converter;
+        }
+    }
+
+    private static final DataTypes.Field[] SOURCE_PROPERTY_FIELDS = {

Review Comment:
   Thanks for digging into this — the record-vs-map constraint is real and 
correctly diagnosed. The converter switch only wires `record → ROW` 
(`AvroToRowDataConverters.java:148-149`) and `map → MAP` (`:150-152`), so 
declaring `source` as `MAP` would have `AvroSchemaConverter` build a valid Avro 
*map* schema that then can't resolve against the registry's *record* — per 
Avro's resolution rules a writer record and a reader map aren't compatible, so 
it fails at decode. Good catch.
   
   To close the open question from my first comment (mismatched `source` → null 
or reject): it resolves by name — `RegistryAvroDeserializationSchema` sets both 
writer and reader schema on the datum reader 
(`RegistryAvroDeserializationSchema.java:102-103`), so the record comes back in 
reader order and the positional reads stay aligned. Resolution then splits by 
nullability: the connector-specific fields are nullable → `withDefault(null)` 
(`AvroSchemaConverter.java:554-557`), so `scn`/`commit_scn`/`lcr_position` come 
back null when a connector omits them; but 
`version`/`connector`/`name`/`db`/`table` are non-nullable → `noDefault()`, and 
a default-less reader field the writer lacks makes Avro throw and fail the 
whole message. So the fixed ROW's real limitation is that it can't surface 
fields it doesn't list (MySQL `gtid`, Postgres `lsn`) — and, latently, the 
non-nullable `version`/`connector`/`name`/`db`/`table` would hard-fail any 
connector whose `source` doesn't carry those
  exact names. A connector-agnostic map sidesteps both.
   
   I'd lean to option (2). Both options stringify the source fields into a 
`MAP<STRING,STRING>` — that lossiness is inherent to record→map either way — so 
the deciding factor is blast radius: (1) puts an opinionated record→map 
coercion into shared `AvroToRowDataConverters`, where any Avro user declaring a 
`MAP` target over a record field would hit it implicitly, to serve a 
Debezium-specific need; (2) keeps it local and explicit. And the registered 
`source` schema isn't available at planning time anyway (the `schema` option is 
the optional user override, normally null; the real one resolves from the 
registry by ID at runtime), so a runtime handler in 
`DebeziumAvroDeserializationSchema` is its natural home.
   
   One note if you take (2): to actually gain agnosticism it should build the 
map from the connector's *writer* `source` record — before it's projected onto 
the 13-field reader ROW — otherwise the connector-only fields are already 
dropped and it's just the same fixed list re-shaped as a map. The typed 
accessors (`source.database`, …) could then key into that map by name, and the 
positional `SOURCE_PROPERTY_POSITION` assumption goes away.
   
   On scope, my read is that the current typed set is enough to land this PR 
for the initial parity — assuming the supported connectors are documented — 
with the connector-agnostic map as a follow-up; that keeps the change focused 
and unblocks the relational connectors now. The final scope and merge call is 
of course the committers' to make.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to