weiqingy commented on code in PR #28498:
URL: https://github.com/apache/flink/pull/28498#discussion_r3447627918


##########
flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/debezium/DebeziumAvroDecodingFormat.java:
##########
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro.registry.confluent.debezium;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import 
org.apache.flink.formats.avro.registry.confluent.debezium.DebeziumAvroDeserializationSchema.MetadataConverter;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.Projection;
+import org.apache.flink.table.connector.format.DecodingFormat;
+import org.apache.flink.table.connector.format.ProjectableDecodingFormat;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.data.GenericMapData;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.utils.DataTypeUtils;
+import org.apache.flink.types.RowKind;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+
+/** {@link DecodingFormat} for Debezium using Avro encoding. */
+public class DebeziumAvroDecodingFormat
+        implements ProjectableDecodingFormat<DeserializationSchema<RowData>> {
+
+    // 
----------------------------------------------------------------------------------------
+    // Mutable attributes
+    // 
----------------------------------------------------------------------------------------
+
+    private List<String> metadataKeys;
+
+    // 
----------------------------------------------------------------------------------------
+    // Debezium-specific attributes
+    // 
----------------------------------------------------------------------------------------
+
+    private final String schemaRegistryURL;
+    private final String schema;
+    private final Map<String, ?> optionalPropertiesMap;
+
+    public DebeziumAvroDecodingFormat(
+            String schemaRegistryURL, String schema, Map<String, ?> 
optionalPropertiesMap) {
+        this.schemaRegistryURL = schemaRegistryURL;
+        this.schema = schema;
+        this.optionalPropertiesMap = optionalPropertiesMap;
+        this.metadataKeys = Collections.emptyList();
+    }
+
+    @Override
+    public DeserializationSchema<RowData> createRuntimeDecoder(
+            DynamicTableSource.Context context, DataType physicalDataType, 
int[][] projections) {
+        physicalDataType = 
Projection.of(projections).project(physicalDataType);
+
+        final List<ReadableMetadata> readableMetadata =
+                metadataKeys.stream()
+                        .map(
+                                k ->
+                                        Stream.of(ReadableMetadata.values())
+                                                .filter(rm -> rm.key.equals(k))
+                                                .findFirst()
+                                                
.orElseThrow(IllegalStateException::new))
+                        .collect(Collectors.toList());
+        final List<DataTypes.Field> metadataFields =
+                readableMetadata.stream()
+                        .map(m -> DataTypes.FIELD(m.key, m.dataType))
+                        .collect(Collectors.toList());
+
+        final DataType producedDataType =
+                DataTypeUtils.appendRowFields(physicalDataType, 
metadataFields);
+        final TypeInformation<RowData> producedTypeInfo =
+                context.createTypeInformation(producedDataType);
+
+        return new DebeziumAvroDeserializationSchema(
+                physicalDataType,
+                readableMetadata,
+                producedTypeInfo,
+                schemaRegistryURL,
+                schema,
+                optionalPropertiesMap);
+    }
+
+    @Override
+    public Map<String, DataType> listReadableMetadata() {
+        final Map<String, DataType> metadataMap = new LinkedHashMap<>();
+        Stream.of(ReadableMetadata.values())
+                .forEachOrdered(m -> metadataMap.put(m.key, m.dataType));
+        return metadataMap;
+    }
+
+    @Override
+    public void applyReadableMetadata(List<String> metadataKeys) {
+        this.metadataKeys = metadataKeys;
+    }
+
+    @Override
+    public ChangelogMode getChangelogMode() {
+        return ChangelogMode.newBuilder()
+                .addContainedKind(RowKind.INSERT)
+                .addContainedKind(RowKind.UPDATE_BEFORE)
+                .addContainedKind(RowKind.UPDATE_AFTER)
+                .addContainedKind(RowKind.DELETE)
+                .build();
+    }
+
+    // 
----------------------------------------------------------------------------------------
+    // Metadata handling
+    // 
----------------------------------------------------------------------------------------
+
+    /** List of metadata that can be read with this format. */
+    enum ReadableMetadata {
+        INGESTION_TIMESTAMP(

Review Comment:
   Minor: this inner `convert` returning `row` looks unreachable — the wrapper 
in `DebeziumAvroDeserializationSchema` only calls `m.converter.convert(...)` 
when the resolved field is a `GenericRowData`, and the top-level `ts_ms` is a 
timestamp scalar, so this branch never fires. Could drop the body to a 
no-op/`null` to avoid implying it runs.



##########
flink-formats/flink-avro-confluent-registry/src/test/java/org/apache/flink/formats/avro/registry/confluent/debezium/DebeziumAvroSerDeSchemaTest.java:
##########
@@ -146,10 +151,42 @@ void testDeleteDataDeserialization() throws Exception {
         assertThat(actual).isEqualTo(expected);
     }
 
+    @Test
+    void testDeserializationWithMetadata() throws Exception {
+        testDeserializationWithMetadata(
+                "debezium-avro-insert.avro",
+                row -> {
+                    // Physical columns
+                    assertThat(row.getLong(0)).isEqualTo(1L);
+                    assertThat(row.getString(1).toString()).isEqualTo("lisi");
+                    assertThat(row.getString(2).toString()).isEqualTo("test 
debezium avro data");
+                    assertThat(row.getDouble(3)).isEqualTo(21.799999237060547);
+
+                    // Metadata: ingestion-timestamp (field index 4)
+                    assertThat(row.getTimestamp(4, 3)).isNotNull();

Review Comment:
   The two timestamp metadata fields are asserted only with `isNotNull()`, so a 
converter that extracted the wrong field (or a constant) would still pass — 
could these assert the exact expected values from the fixture, the way the 
physical columns do? Relatedly, `source.schema` isn't asserted at all (the 
comment notes it may be null for MySQL), and the fixture is a single connector. 
Given the `source` shape is the riskiest part of this change, a second 
connector's fixture would buy a lot of confidence — though I'd understand 
deferring that if test data is hard to generate.



##########
flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/debezium/DebeziumAvroDecodingFormat.java:
##########
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro.registry.confluent.debezium;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import 
org.apache.flink.formats.avro.registry.confluent.debezium.DebeziumAvroDeserializationSchema.MetadataConverter;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.Projection;
+import org.apache.flink.table.connector.format.DecodingFormat;
+import org.apache.flink.table.connector.format.ProjectableDecodingFormat;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.data.GenericMapData;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.utils.DataTypeUtils;
+import org.apache.flink.types.RowKind;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+
+/** {@link DecodingFormat} for Debezium using Avro encoding. */
+public class DebeziumAvroDecodingFormat
+        implements ProjectableDecodingFormat<DeserializationSchema<RowData>> {
+
+    // 
----------------------------------------------------------------------------------------
+    // Mutable attributes
+    // 
----------------------------------------------------------------------------------------
+
+    private List<String> metadataKeys;
+
+    // 
----------------------------------------------------------------------------------------
+    // Debezium-specific attributes
+    // 
----------------------------------------------------------------------------------------
+
+    private final String schemaRegistryURL;
+    private final String schema;
+    private final Map<String, ?> optionalPropertiesMap;
+
+    public DebeziumAvroDecodingFormat(
+            String schemaRegistryURL, String schema, Map<String, ?> 
optionalPropertiesMap) {
+        this.schemaRegistryURL = schemaRegistryURL;
+        this.schema = schema;
+        this.optionalPropertiesMap = optionalPropertiesMap;
+        this.metadataKeys = Collections.emptyList();
+    }
+
+    @Override
+    public DeserializationSchema<RowData> createRuntimeDecoder(
+            DynamicTableSource.Context context, DataType physicalDataType, 
int[][] projections) {
+        physicalDataType = 
Projection.of(projections).project(physicalDataType);
+
+        final List<ReadableMetadata> readableMetadata =
+                metadataKeys.stream()
+                        .map(
+                                k ->
+                                        Stream.of(ReadableMetadata.values())
+                                                .filter(rm -> rm.key.equals(k))
+                                                .findFirst()
+                                                
.orElseThrow(IllegalStateException::new))
+                        .collect(Collectors.toList());
+        final List<DataTypes.Field> metadataFields =
+                readableMetadata.stream()
+                        .map(m -> DataTypes.FIELD(m.key, m.dataType))
+                        .collect(Collectors.toList());
+
+        final DataType producedDataType =
+                DataTypeUtils.appendRowFields(physicalDataType, 
metadataFields);
+        final TypeInformation<RowData> producedTypeInfo =
+                context.createTypeInformation(producedDataType);
+
+        return new DebeziumAvroDeserializationSchema(
+                physicalDataType,
+                readableMetadata,
+                producedTypeInfo,
+                schemaRegistryURL,
+                schema,
+                optionalPropertiesMap);
+    }
+
+    @Override
+    public Map<String, DataType> listReadableMetadata() {
+        final Map<String, DataType> metadataMap = new LinkedHashMap<>();
+        Stream.of(ReadableMetadata.values())
+                .forEachOrdered(m -> metadataMap.put(m.key, m.dataType));
+        return metadataMap;
+    }
+
+    @Override
+    public void applyReadableMetadata(List<String> metadataKeys) {
+        this.metadataKeys = metadataKeys;
+    }
+
+    @Override
+    public ChangelogMode getChangelogMode() {
+        return ChangelogMode.newBuilder()
+                .addContainedKind(RowKind.INSERT)
+                .addContainedKind(RowKind.UPDATE_BEFORE)
+                .addContainedKind(RowKind.UPDATE_AFTER)
+                .addContainedKind(RowKind.DELETE)
+                .build();
+    }
+
+    // 
----------------------------------------------------------------------------------------
+    // Metadata handling
+    // 
----------------------------------------------------------------------------------------
+
+    /** List of metadata that can be read with this format. */
+    enum ReadableMetadata {

Review Comment:
   `debezium-json` exposes a 7th metadata, `schema` (the inline Connect 
schema), which isn't in this enum. I'm assuming that's intentional because the 
Confluent format carries the schema in the registry rather than inline, so 
there's nothing to expose — is that the reasoning, or just out of scope for 
this PR?



##########
flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/debezium/DebeziumAvroDeserializationSchema.java:
##########
@@ -189,22 +252,41 @@ public boolean equals(Object o) {
         }
         DebeziumAvroDeserializationSchema that = 
(DebeziumAvroDeserializationSchema) o;
         return Objects.equals(avroDeserializer, that.avroDeserializer)
-                && Objects.equals(producedTypeInfo, that.producedTypeInfo);
+                && Objects.equals(producedTypeInfo, that.producedTypeInfo)
+                && hasMetadata == that.hasMetadata;
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(avroDeserializer, producedTypeInfo);
+        return Objects.hash(avroDeserializer, producedTypeInfo, hasMetadata);
     }
 
-    public static RowType createDebeziumAvroRowType(DataType databaseSchema) {
-        // Debezium Avro contains other information, e.g. "source", "ts_ms"
-        // but we don't need them
-        return (RowType)
+    public static RowType createDebeziumAvroRowType(

Review Comment:
   This comment moved into the parameter list and now reads the opposite of 
what the code does — it says "but we don't need them," yet the method now does 
append `source`/`ts_ms` to extract metadata. Worth updating so it describes the 
current behavior (and moving it back above the params for readability).



##########
flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/debezium/DebeziumAvroDecodingFormat.java:
##########
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro.registry.confluent.debezium;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import 
org.apache.flink.formats.avro.registry.confluent.debezium.DebeziumAvroDeserializationSchema.MetadataConverter;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.Projection;
+import org.apache.flink.table.connector.format.DecodingFormat;
+import org.apache.flink.table.connector.format.ProjectableDecodingFormat;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.data.GenericMapData;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.utils.DataTypeUtils;
+import org.apache.flink.types.RowKind;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+
+/** {@link DecodingFormat} for Debezium using Avro encoding. */
+public class DebeziumAvroDecodingFormat
+        implements ProjectableDecodingFormat<DeserializationSchema<RowData>> {
+
+    // 
----------------------------------------------------------------------------------------
+    // Mutable attributes
+    // 
----------------------------------------------------------------------------------------
+
+    private List<String> metadataKeys;
+
+    // 
----------------------------------------------------------------------------------------
+    // Debezium-specific attributes
+    // 
----------------------------------------------------------------------------------------
+
+    private final String schemaRegistryURL;
+    private final String schema;
+    private final Map<String, ?> optionalPropertiesMap;
+
+    public DebeziumAvroDecodingFormat(
+            String schemaRegistryURL, String schema, Map<String, ?> 
optionalPropertiesMap) {
+        this.schemaRegistryURL = schemaRegistryURL;
+        this.schema = schema;
+        this.optionalPropertiesMap = optionalPropertiesMap;
+        this.metadataKeys = Collections.emptyList();
+    }
+
+    @Override
+    public DeserializationSchema<RowData> createRuntimeDecoder(
+            DynamicTableSource.Context context, DataType physicalDataType, 
int[][] projections) {
+        physicalDataType = 
Projection.of(projections).project(physicalDataType);
+
+        final List<ReadableMetadata> readableMetadata =
+                metadataKeys.stream()
+                        .map(
+                                k ->
+                                        Stream.of(ReadableMetadata.values())
+                                                .filter(rm -> rm.key.equals(k))
+                                                .findFirst()
+                                                
.orElseThrow(IllegalStateException::new))
+                        .collect(Collectors.toList());
+        final List<DataTypes.Field> metadataFields =
+                readableMetadata.stream()
+                        .map(m -> DataTypes.FIELD(m.key, m.dataType))
+                        .collect(Collectors.toList());
+
+        final DataType producedDataType =
+                DataTypeUtils.appendRowFields(physicalDataType, 
metadataFields);
+        final TypeInformation<RowData> producedTypeInfo =
+                context.createTypeInformation(producedDataType);
+
+        return new DebeziumAvroDeserializationSchema(
+                physicalDataType,
+                readableMetadata,
+                producedTypeInfo,
+                schemaRegistryURL,
+                schema,
+                optionalPropertiesMap);
+    }
+
+    @Override
+    public Map<String, DataType> listReadableMetadata() {
+        final Map<String, DataType> metadataMap = new LinkedHashMap<>();
+        Stream.of(ReadableMetadata.values())
+                .forEachOrdered(m -> metadataMap.put(m.key, m.dataType));
+        return metadataMap;
+    }
+
+    @Override
+    public void applyReadableMetadata(List<String> metadataKeys) {
+        this.metadataKeys = metadataKeys;
+    }
+
+    @Override
+    public ChangelogMode getChangelogMode() {
+        return ChangelogMode.newBuilder()
+                .addContainedKind(RowKind.INSERT)
+                .addContainedKind(RowKind.UPDATE_BEFORE)
+                .addContainedKind(RowKind.UPDATE_AFTER)
+                .addContainedKind(RowKind.DELETE)
+                .build();
+    }
+
+    // 
----------------------------------------------------------------------------------------
+    // Metadata handling
+    // 
----------------------------------------------------------------------------------------
+
+    /** List of metadata that can be read with this format. */
+    enum ReadableMetadata {
+        INGESTION_TIMESTAMP(
+                "ingestion-timestamp",
+                DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3).nullable(),
+                DataTypes.FIELD("ts_ms", 
DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)),
+                new MetadataConverter() {
+                    private static final long serialVersionUID = 1L;
+
+                    @Override
+                    public Object convert(GenericRowData row, int unused) {
+                        return row;
+                    }
+                }),
+
+        SOURCE_TIMESTAMP(
+                "source.timestamp",
+                DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3).nullable(),
+                SOURCE_FIELD,
+                new MetadataConverter() {
+                    private static final long serialVersionUID = 1L;
+
+                    @Override
+                    public Object convert(GenericRowData row, int unused) {
+                        int pos = SOURCE_PROPERTY_POSITION.get("ts_ms");
+                        return row.getField(pos);
+                    }
+                }),
+
+        SOURCE_DATABASE(
+                "source.database",
+                DataTypes.STRING().nullable(),
+                SOURCE_FIELD,
+                new MetadataConverter() {
+                    private static final long serialVersionUID = 1L;
+
+                    @Override
+                    public Object convert(GenericRowData row, int unused) {
+                        int pos = SOURCE_PROPERTY_POSITION.get("db");
+                        return row.getField(pos);
+                    }
+                }),
+
+        SOURCE_SCHEMA(
+                "source.schema",
+                DataTypes.STRING().nullable(),
+                SOURCE_FIELD,
+                new MetadataConverter() {
+                    private static final long serialVersionUID = 1L;
+
+                    @Override
+                    public Object convert(GenericRowData row, int unused) {
+                        int pos = SOURCE_PROPERTY_POSITION.get("schema");
+                        return row.getField(pos);
+                    }
+                }),
+
+        SOURCE_TABLE(
+                "source.table",
+                DataTypes.STRING().nullable(),
+                SOURCE_FIELD,
+                new MetadataConverter() {
+                    private static final long serialVersionUID = 1L;
+
+                    @Override
+                    public Object convert(GenericRowData row, int unused) {
+                        int pos = SOURCE_PROPERTY_POSITION.get("table");
+                        return row.getField(pos);
+                    }
+                }),
+
+        SOURCE_PROPERTIES(
+                "source.properties",
+                // key and value of the map are nullable to make handling 
easier in queries
+                DataTypes.MAP(DataTypes.STRING().nullable(), 
DataTypes.STRING().nullable())
+                        .nullable(),
+                SOURCE_FIELD,
+                new MetadataConverter() {
+                    private static final long serialVersionUID = 1L;
+
+                    @Override
+                    public Object convert(GenericRowData row, int unused) {
+                        Map<StringData, StringData> result = new HashMap<>();
+                        for (int i = 0; i < SOURCE_PROPERTY_FIELDS.length; 
i++) {
+                            Object value = row.getField(i);
+                            result.put(
+                                    
StringData.fromString(SOURCE_PROPERTY_FIELDS[i].getName()),
+                                    value == null ? null : 
StringData.fromString(value.toString()));
+                        }
+                        return new GenericMapData(result);
+                    }
+                });
+
+        final String key;
+        final DataType dataType;
+        final DataTypes.Field requiredAvroField;
+        final MetadataConverter converter;
+
+        ReadableMetadata(
+                String key,
+                DataType dataType,
+                DataTypes.Field requiredAvroField,
+                MetadataConverter converter) {
+            this.key = key;
+            this.dataType = dataType;
+            this.requiredAvroField = requiredAvroField;
+            this.converter = converter;
+        }
+    }
+
+    private static final DataTypes.Field[] SOURCE_PROPERTY_FIELDS = {

Review Comment:
   `debezium-json` models `source` as a connector-agnostic `MAP<STRING, 
STRING>` (`DebeziumJsonDecodingFormat`'s `SOURCE_*` entries all use 
`DataTypes.FIELD("source", DataTypes.MAP(...))`), so it reads whatever fields a 
given Debezium connector emits. Here `source` is a fixed-order ROW of 13 named 
fields, and several (`scn`, `commit_scn`, `lcr_position`) are 
Oracle/XStream-specific while `schema` is absent from MySQL's `source`.
   
   Since `AvroToRowDataConverters.createRowConverter` reads positionally 
(`record.get(i)`), this reader ROW has to line up — via Avro name resolution — 
with the actual writer schema in the registry. Different connectors (MySQL: 
`server_id`/`gtid`/`file`/`pos`; Postgres: `lsn`/`xmin`; MongoDB: different 
again) emit different `source` shapes. How do you see this behaving when the 
registered schema's `source` doesn't match this list — does Avro resolution 
fill the missing fields as null, or does it reject? I'm wondering whether 
mirroring the JSON side's flexible map would sidestep the per-connector 
coupling entirely. Asking rather than asserting here — you may have a 
constraint I'm not seeing.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to