This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 8a9e8add15 [Fix][Connector-V2] set default tableid in SeaTunnelRow for
Debezium JSON deserialization (#9431)
8a9e8add15 is described below
commit 8a9e8add15a40c96ece5b4f337d84aa598630b73
Author: corgy-w <[email protected]>
AuthorDate: Fri Jun 13 10:08:29 2025 +0800
[Fix][Connector-V2] set default tableid in SeaTunnelRow for Debezium JSON
deserialization (#9431)
---
...ompatibleDebeziumJsonDeserializationSchema.java | 5 +-
...ompatibleDebeziumJsonDeserializationSchema.java | 68 ++++++++++++++++++++++
2 files changed, 72 insertions(+), 1 deletion(-)
diff --git
a/seatunnel-formats/seatunnel-format-compatible-debezium-json/src/main/java/org/apache/seatunnel/format/compatible/debezium/json/CompatibleDebeziumJsonDeserializationSchema.java
b/seatunnel-formats/seatunnel-format-compatible-debezium-json/src/main/java/org/apache/seatunnel/format/compatible/debezium/json/CompatibleDebeziumJsonDeserializationSchema.java
index a38f19c0e3..9a121657c6 100644
---
a/seatunnel-formats/seatunnel-format-compatible-debezium-json/src/main/java/org/apache/seatunnel/format/compatible/debezium/json/CompatibleDebeziumJsonDeserializationSchema.java
+++
b/seatunnel-formats/seatunnel-format-compatible-debezium-json/src/main/java/org/apache/seatunnel/format/compatible/debezium/json/CompatibleDebeziumJsonDeserializationSchema.java
@@ -19,6 +19,7 @@
package org.apache.seatunnel.format.compatible.debezium.json;
import org.apache.seatunnel.api.serialization.DeserializationSchema;
+import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.type.BasicType;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
@@ -60,7 +61,9 @@ public class CompatibleDebeziumJsonDeserializationSchema
String key = debeziumJsonConverter.serializeKey(record);
String value = debeziumJsonConverter.serializeValue(record);
Object[] fields = new Object[] {record.topic(), key, value};
- return new SeaTunnelRow(fields);
+ SeaTunnelRow seaTunnelRow = new SeaTunnelRow(fields);
+ seaTunnelRow.setTableId(TablePath.DEFAULT.getFullName());
+ return seaTunnelRow;
}
@Override
diff --git
a/seatunnel-formats/seatunnel-format-compatible-debezium-json/src/test/java/org/apache/seatunnel/format/compatible/debezium/json/TestCompatibleDebeziumJsonDeserializationSchema.java
b/seatunnel-formats/seatunnel-format-compatible-debezium-json/src/test/java/org/apache/seatunnel/format/compatible/debezium/json/TestCompatibleDebeziumJsonDeserializationSchema.java
new file mode 100644
index 0000000000..ad374fdc20
--- /dev/null
+++
b/seatunnel-formats/seatunnel-format-compatible-debezium-json/src/test/java/org/apache/seatunnel/format/compatible/debezium/json/TestCompatibleDebeziumJsonDeserializationSchema.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.format.compatible.debezium.json;
+
+import org.apache.seatunnel.shade.com.google.common.collect.Lists;
+
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+
+import org.apache.kafka.connect.data.SchemaBuilder;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.source.SourceRecord;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.lang.reflect.InvocationTargetException;
+import java.util.Arrays;
+
+public class TestCompatibleDebeziumJsonDeserializationSchema {
+
+ @Test
+ public void testDebeziumDeserializationSchema()
+ throws InvocationTargetException, IllegalAccessException {
+ SchemaBuilder schemaBuilder =
+ SchemaBuilder.struct()
+ .name("test")
+ .field("field",
SchemaBuilder.string().optional().build());
+ Struct struct = new Struct(schemaBuilder.build()).put("field",
"value");
+ SourceRecord record =
+ new SourceRecord(
+ null,
+ null,
+ "test",
+ schemaBuilder.build(),
+ struct,
+ schemaBuilder.build(),
+ struct);
+
+ CompatibleDebeziumJsonDeserializationSchema
compatibleDebeziumJsonDeserializationSchema =
+ new CompatibleDebeziumJsonDeserializationSchema(true, true);
+ SeaTunnelRow deserialize =
compatibleDebeziumJsonDeserializationSchema.deserialize(record);
+ Assertions.assertNotNull(deserialize);
+ Assertions.assertEquals(TablePath.DEFAULT.getFullName(),
deserialize.getTableId());
+ Assertions.assertIterableEquals(
+ Lists.newArrayList(
+ "test",
+
"{\"schema\":{\"type\":\"struct\",\"fields\":[{\"type\":\"string\",\"optional\":true,\"field\":\"field\"}],\"optional\":false,\"name\":\"test\"},\"payload\":{\"field\":\"value\"}}",
+
"{\"schema\":{\"type\":\"struct\",\"fields\":[{\"type\":\"string\",\"optional\":true,\"field\":\"field\"}],\"optional\":false,\"name\":\"test\"},\"payload\":{\"field\":\"value\"}}"),
+ Arrays.asList(deserialize.getFields()));
+ }
+}