This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 8a9e8add15 [Fix][Connector-V2] set default tableid in SeaTunnelRow for 
Debezium JSON deserialization (#9431)
8a9e8add15 is described below

commit 8a9e8add15a40c96ece5b4f337d84aa598630b73
Author: corgy-w <[email protected]>
AuthorDate: Fri Jun 13 10:08:29 2025 +0800

    [Fix][Connector-V2] set default tableid in SeaTunnelRow for Debezium JSON 
deserialization (#9431)
---
 ...ompatibleDebeziumJsonDeserializationSchema.java |  5 +-
 ...ompatibleDebeziumJsonDeserializationSchema.java | 68 ++++++++++++++++++++++
 2 files changed, 72 insertions(+), 1 deletion(-)

diff --git 
a/seatunnel-formats/seatunnel-format-compatible-debezium-json/src/main/java/org/apache/seatunnel/format/compatible/debezium/json/CompatibleDebeziumJsonDeserializationSchema.java
 
b/seatunnel-formats/seatunnel-format-compatible-debezium-json/src/main/java/org/apache/seatunnel/format/compatible/debezium/json/CompatibleDebeziumJsonDeserializationSchema.java
index a38f19c0e3..9a121657c6 100644
--- 
a/seatunnel-formats/seatunnel-format-compatible-debezium-json/src/main/java/org/apache/seatunnel/format/compatible/debezium/json/CompatibleDebeziumJsonDeserializationSchema.java
+++ 
b/seatunnel-formats/seatunnel-format-compatible-debezium-json/src/main/java/org/apache/seatunnel/format/compatible/debezium/json/CompatibleDebeziumJsonDeserializationSchema.java
@@ -19,6 +19,7 @@
 package org.apache.seatunnel.format.compatible.debezium.json;
 
 import org.apache.seatunnel.api.serialization.DeserializationSchema;
+import org.apache.seatunnel.api.table.catalog.TablePath;
 import org.apache.seatunnel.api.table.type.BasicType;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
@@ -60,7 +61,9 @@ public class CompatibleDebeziumJsonDeserializationSchema
         String key = debeziumJsonConverter.serializeKey(record);
         String value = debeziumJsonConverter.serializeValue(record);
         Object[] fields = new Object[] {record.topic(), key, value};
-        return new SeaTunnelRow(fields);
+        SeaTunnelRow seaTunnelRow = new SeaTunnelRow(fields);
+        seaTunnelRow.setTableId(TablePath.DEFAULT.getFullName());
+        return seaTunnelRow;
     }
 
     @Override
diff --git 
a/seatunnel-formats/seatunnel-format-compatible-debezium-json/src/test/java/org/apache/seatunnel/format/compatible/debezium/json/TestCompatibleDebeziumJsonDeserializationSchema.java
 
b/seatunnel-formats/seatunnel-format-compatible-debezium-json/src/test/java/org/apache/seatunnel/format/compatible/debezium/json/TestCompatibleDebeziumJsonDeserializationSchema.java
new file mode 100644
index 0000000000..ad374fdc20
--- /dev/null
+++ 
b/seatunnel-formats/seatunnel-format-compatible-debezium-json/src/test/java/org/apache/seatunnel/format/compatible/debezium/json/TestCompatibleDebeziumJsonDeserializationSchema.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.format.compatible.debezium.json;
+
+import org.apache.seatunnel.shade.com.google.common.collect.Lists;
+
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+
+import org.apache.kafka.connect.data.SchemaBuilder;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.source.SourceRecord;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.lang.reflect.InvocationTargetException;
+import java.util.Arrays;
+
+public class TestCompatibleDebeziumJsonDeserializationSchema {
+
+    @Test
+    public void testDebeziumDeserializationSchema()
+            throws InvocationTargetException, IllegalAccessException {
+        SchemaBuilder schemaBuilder =
+                SchemaBuilder.struct()
+                        .name("test")
+                        .field("field", 
SchemaBuilder.string().optional().build());
+        Struct struct = new Struct(schemaBuilder.build()).put("field", 
"value");
+        SourceRecord record =
+                new SourceRecord(
+                        null,
+                        null,
+                        "test",
+                        schemaBuilder.build(),
+                        struct,
+                        schemaBuilder.build(),
+                        struct);
+
+        CompatibleDebeziumJsonDeserializationSchema 
compatibleDebeziumJsonDeserializationSchema =
+                new CompatibleDebeziumJsonDeserializationSchema(true, true);
+        SeaTunnelRow deserialize = 
compatibleDebeziumJsonDeserializationSchema.deserialize(record);
+        Assertions.assertNotNull(deserialize);
+        Assertions.assertEquals(TablePath.DEFAULT.getFullName(), 
deserialize.getTableId());
+        Assertions.assertIterableEquals(
+                Lists.newArrayList(
+                        "test",
+                        
"{\"schema\":{\"type\":\"struct\",\"fields\":[{\"type\":\"string\",\"optional\":true,\"field\":\"field\"}],\"optional\":false,\"name\":\"test\"},\"payload\":{\"field\":\"value\"}}",
+                        
"{\"schema\":{\"type\":\"struct\",\"fields\":[{\"type\":\"string\",\"optional\":true,\"field\":\"field\"}],\"optional\":false,\"name\":\"test\"},\"payload\":{\"field\":\"value\"}}"),
+                Arrays.asList(deserialize.getFields()));
+    }
+}

Reply via email to