This is an automated email from the ASF dual-hosted git repository.

baomingyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 624a009c17 [INLONG-11966][Sort]The deserialization process supports 
returning the data byte size in one rowdata (#11967)
624a009c17 is described below

commit 624a009c171dfbf2af3afb74aee747044ba60576
Author: Mingyu Bao <[email protected]>
AuthorDate: Thu Aug 14 15:57:14 2025 +0800

    [INLONG-11966][Sort]The deserialization process supports returning the data 
byte size in one rowdata (#11967)
---
 .../apache/inlong/sort/formats/base/FormatMsg.java |  33 +++++
 .../inlong/sort/formats/base/TableFormatUtils.java |  30 +++++
 .../AbstractInLongMsgDeserializationSchema.java    |  14 ++
 .../AbstractInLongMsgFormatDeserializer.java       |  45 +++++++
 .../sort/formats/inlongmsg/InLongMsgUtils.java     |  12 ++
 .../InLongMsgBinlogFormatDeserializer.java         |  14 ++
 .../inlongmsgbinlog/InLongMsgBinlogUtils.java      | 147 +++++++++++++++++++++
 .../InLongMsgCsvFormatDeserializer.java            |  51 +++++++
 .../formats/inlongmsgcsv/InLongMsgCsvUtils.java    |  68 +++++++++-
 .../inlongmsgkv/InLongMsgKvFormatDeserializer.java |  23 ++++
 .../sort/formats/inlongmsgkv/InLongMsgKvUtils.java |  55 ++++++++
 .../InLongMsgPbDeserializationSchema.java          |  43 +++++-
 .../InLongMsgTlogCsvFormatDeserializer.java        |  46 +++++++
 .../inlongmsgtlogcsv/InLongMsgTlogCsvUtils.java    |  67 +++++++++-
 .../InLongMsgTlogKvFormatDeserializer.java         |  24 ++++
 .../inlongmsgtlogkv/InLongMsgTlogKvUtils.java      |  60 +++++++++
 .../formats/base/DefaultDeserializationSchema.java |  16 +++
 .../csv/CsvRowDataDeserializationSchema.java       |  55 +++++++-
 .../json/JsonRowDataDeserializationSchema.java     |  18 +++
 .../formats/kv/KvRowDataDeserializationSchema.java |  43 ++++++
 20 files changed, 847 insertions(+), 17 deletions(-)

diff --git 
a/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/base/FormatMsg.java
 
b/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/base/FormatMsg.java
new file mode 100644
index 0000000000..752bdf860c
--- /dev/null
+++ 
b/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/base/FormatMsg.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.formats.base;
+
+import lombok.Data;
+import org.apache.flink.table.data.RowData;
+
+@Data
+public class FormatMsg {
+
+    private RowData rowData;
+    private long rowDataLength;
+
+    public FormatMsg(RowData rowData, long rowDataLength) {
+        this.rowData = rowData;
+        this.rowDataLength = rowDataLength;
+    }
+}
diff --git 
a/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/base/TableFormatUtils.java
 
b/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/base/TableFormatUtils.java
index 31e3cdaf09..dd1d3e1579 100644
--- 
a/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/base/TableFormatUtils.java
+++ 
b/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/base/TableFormatUtils.java
@@ -60,6 +60,7 @@ import 
org.apache.inlong.common.pojo.sort.dataflow.field.format.VarBinaryFormatI
 import 
org.apache.inlong.common.pojo.sort.dataflow.field.format.VarCharFormatInfo;
 import org.apache.inlong.sort.formats.inlongmsg.FailureHandler;
 
+import org.apache.commons.lang3.StringUtils;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeinfo.Types;
 import org.apache.flink.table.api.DataTypes;
@@ -582,6 +583,35 @@ public class TableFormatUtils {
         return null;
     }
 
+    public static long getFormatValueLength(FormatInfo fieldFormatInfo, String 
fieldText) {
+        if (fieldFormatInfo instanceof BooleanFormatInfo) {
+            return 4;
+        } else if (fieldFormatInfo instanceof ByteFormatInfo) {
+            return 4;
+        } else if (fieldFormatInfo instanceof BooleanFormatInfo) {
+            return 4;
+        } else if (fieldFormatInfo instanceof ShortFormatInfo) {
+            return 4;
+        } else if (fieldFormatInfo instanceof IntFormatInfo) {
+            return 4;
+        } else if (fieldFormatInfo instanceof LongFormatInfo) {
+            return 8;
+        } else if (fieldFormatInfo instanceof FloatFormatInfo) {
+            return 8;
+        } else if (fieldFormatInfo instanceof DoubleFormatInfo) {
+            return 8;
+        } else if (fieldFormatInfo instanceof DecimalFormatInfo) {
+            return 8;
+        } else if (fieldFormatInfo instanceof DateFormatInfo
+                || fieldFormatInfo instanceof TimeFormatInfo
+                || fieldFormatInfo instanceof TimestampFormatInfo) {
+            return 8;
+        } else if (StringUtils.isNotEmpty(fieldText)) {
+            return fieldText.length();
+        }
+        return 0L;
+    }
+
     /**
      * Serializes the basic field.
      */
diff --git 
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/AbstractInLongMsgDeserializationSchema.java
 
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/AbstractInLongMsgDeserializationSchema.java
index 4a404fe25d..c5cb220456 100644
--- 
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/AbstractInLongMsgDeserializationSchema.java
+++ 
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/AbstractInLongMsgDeserializationSchema.java
@@ -17,6 +17,7 @@
 
 package org.apache.inlong.sort.formats.inlongmsg;
 
+import org.apache.inlong.sort.formats.base.FormatMsg;
 import org.apache.inlong.sort.formats.metrics.FormatMetricGroup;
 
 import org.apache.flink.api.common.serialization.DeserializationSchema;
@@ -78,6 +79,14 @@ public abstract class AbstractInLongMsgDeserializationSchema 
implements Deserial
         }
     }
 
+    public void deserializeFormatMsg(byte[] message, Collector<FormatMsg> out) 
{
+        try {
+            formatDeserializer.flatFormatMsgMap(message, out);
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
     public List<InLongMsgWrap> preParse(byte[] bytes) throws Exception {
         return formatDeserializer.preParse(bytes);
     }
@@ -88,6 +97,11 @@ public abstract class AbstractInLongMsgDeserializationSchema 
implements Deserial
         formatDeserializer.parse(inLongMsgWrap, collector);
     }
 
+    public void parseFormatMsg(InLongMsgWrap inLongMsgWrap,
+            Collector<FormatMsg> collector) throws Exception {
+        formatDeserializer.parseFormatMsg(inLongMsgWrap, collector);
+    }
+
     @Override
     public boolean isEndOfStream(RowData rowData) {
         return false;
diff --git 
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/AbstractInLongMsgFormatDeserializer.java
 
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/AbstractInLongMsgFormatDeserializer.java
index b9611e9a17..43c937a5a7 100644
--- 
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/AbstractInLongMsgFormatDeserializer.java
+++ 
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/AbstractInLongMsgFormatDeserializer.java
@@ -18,6 +18,7 @@
 package org.apache.inlong.sort.formats.inlongmsg;
 
 import org.apache.inlong.common.msg.InLongMsg;
+import org.apache.inlong.sort.formats.base.FormatMsg;
 import org.apache.inlong.sort.formats.metrics.FormatMetricGroup;
 
 import org.apache.commons.lang3.StringUtils;
@@ -32,6 +33,7 @@ import javax.annotation.Nonnull;
 
 import java.io.IOException;
 import java.io.Serializable;
+import java.time.Instant;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
@@ -45,6 +47,10 @@ public abstract class AbstractInLongMsgFormatDeserializer 
implements ResultTypeQ
 
     private static final Logger LOG = 
LoggerFactory.getLogger(AbstractInLongMsgFormatDeserializer.class);
 
+    protected long lastPrintTimestamp = 0L;
+    protected long PRINT_TIMESTAMP_INTERVAL = 60 * 1000L;
+    protected int fieldNameSize = 0;
+
     protected FailureHandler failureHandler;
 
     /**
@@ -76,6 +82,17 @@ public abstract class AbstractInLongMsgFormatDeserializer 
implements ResultTypeQ
      */
     protected abstract List<RowData> convertRowDataList(InLongMsgHead head, 
InLongMsgBody body) throws Exception;
 
+    protected abstract List<FormatMsg> convertFormatMsgList(InLongMsgHead 
head, InLongMsgBody body) throws Exception;
+
+    protected boolean needPrint() {
+        long now = Instant.now().toEpochMilli();
+        if (now - lastPrintTimestamp > PRINT_TIMESTAMP_INTERVAL) {
+            lastPrintTimestamp = now;
+            return true;
+        }
+        return false;
+    }
+
     public void flatMap(
             byte[] bytes,
             Collector<RowData> collector) throws Exception {
@@ -84,6 +101,14 @@ public abstract class AbstractInLongMsgFormatDeserializer 
implements ResultTypeQ
         }
     }
 
+    public void flatFormatMsgMap(
+            byte[] bytes,
+            Collector<FormatMsg> collector) throws Exception {
+        for (InLongMsgWrap inLongMsgWrap : preParse(bytes)) {
+            parseFormatMsg(inLongMsgWrap, collector);
+        }
+    }
+
     public List<InLongMsgWrap> preParse(byte[] bytes) throws Exception {
         final List<InLongMsgWrap> result = new ArrayList<>();
 
@@ -160,6 +185,26 @@ public abstract class AbstractInLongMsgFormatDeserializer 
implements ResultTypeQ
         }
     }
 
+    public void parseFormatMsg(InLongMsgWrap inLongMsgWrap, 
Collector<FormatMsg> collector) throws Exception {
+        InLongMsgHead inLongMsgHead = inLongMsgWrap.getInLongMsgHead();
+
+        for (InLongMsgBody inLongMsgBody : 
inLongMsgWrap.getInLongMsgBodyList()) {
+            List<FormatMsg> formatMsgList;
+            try {
+                formatMsgList = convertFormatMsgList(inLongMsgHead, 
inLongMsgBody);
+            } catch (Exception e) {
+                reportDeSerializeErrorMetrics();
+                failureHandler.onConvertingRowFailure(inLongMsgHead, 
inLongMsgBody, e);
+                continue;
+            }
+            if (formatMsgList != null) {
+                for (FormatMsg formatMsg : formatMsgList) {
+                    collector.collect(formatMsg);
+                }
+            }
+        }
+    }
+
     @Override
     public boolean equals(Object o) {
         if (this == o) {
diff --git 
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgUtils.java
 
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgUtils.java
index f30d916341..2b69a34c5e 100644
--- 
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgUtils.java
+++ 
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgUtils.java
@@ -19,6 +19,7 @@ package org.apache.inlong.sort.formats.inlongmsg;
 
 import org.apache.inlong.common.pojo.sort.dataflow.field.format.RowFormatInfo;
 import org.apache.inlong.sort.formats.base.FieldToRowDataConverters;
+import org.apache.inlong.sort.formats.base.FormatMsg;
 import org.apache.inlong.sort.formats.base.TableFormatUtils;
 
 import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -400,6 +401,17 @@ public class InLongMsgUtils {
         return producedRow;
     }
 
+    public static FormatMsg decorateFormatMsgWithNeededHeadFields(
+            @Nullable String timeFieldName,
+            @Nullable String attributesFieldName,
+            Timestamp time,
+            Map<String, String> attributes,
+            FormatMsg formatMsg) {
+        
formatMsg.setRowData(decorateRowDataWithNeededHeadFields(timeFieldName, 
attributesFieldName, time, attributes,
+                (GenericRowData) formatMsg.getRowData()));
+        return formatMsg;
+    }
+
     public static GenericRowData decorateRowDataWithNeededHeadFields(
             @Nullable String timeFieldName,
             @Nullable String attributesFieldName,
diff --git 
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-binlog/src/main/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogFormatDeserializer.java
 
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-binlog/src/main/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogFormatDeserializer.java
index fca7ac1cdc..4d491ae018 100644
--- 
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-binlog/src/main/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogFormatDeserializer.java
+++ 
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-binlog/src/main/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogFormatDeserializer.java
@@ -18,6 +18,7 @@
 package org.apache.inlong.sort.formats.inlongmsgbinlog;
 
 import org.apache.inlong.common.pojo.sort.dataflow.field.format.RowFormatInfo;
+import org.apache.inlong.sort.formats.base.FormatMsg;
 import 
org.apache.inlong.sort.formats.inlongmsg.AbstractInLongMsgFormatDeserializer;
 import org.apache.inlong.sort.formats.inlongmsg.FailureHandler;
 import org.apache.inlong.sort.formats.inlongmsg.InLongMsgBody;
@@ -154,6 +155,19 @@ public final class InLongMsgBinlogFormatDeserializer 
extends AbstractInLongMsgFo
                 failureHandler);
     }
 
+    @Override
+    protected List<FormatMsg> convertFormatMsgList(InLongMsgHead head, 
InLongMsgBody body) throws Exception {
+        return InLongMsgBinlogUtils.getFormatMsgData(
+                rowFormatInfo,
+                timeFieldName,
+                attributesFieldName,
+                metadataFieldName,
+                head.getAttributes(),
+                body.getData(),
+                includeUpdateBefore,
+                failureHandler);
+    }
+
     @Override
     public TypeInformation<RowData> getProducedType() {
         return InLongMsgBinlogUtils.getRowType(
diff --git 
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-binlog/src/main/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogUtils.java
 
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-binlog/src/main/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogUtils.java
index 53c00d0daa..78612399ef 100644
--- 
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-binlog/src/main/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogUtils.java
+++ 
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-binlog/src/main/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogUtils.java
@@ -19,6 +19,7 @@ package org.apache.inlong.sort.formats.inlongmsgbinlog;
 
 import org.apache.inlong.common.pojo.sort.dataflow.field.format.FormatInfo;
 import org.apache.inlong.common.pojo.sort.dataflow.field.format.RowFormatInfo;
+import org.apache.inlong.sort.formats.base.FormatMsg;
 import org.apache.inlong.sort.formats.base.TableFormatUtils;
 import org.apache.inlong.sort.formats.binlog.InLongBinlog;
 import org.apache.inlong.sort.formats.inlongmsg.FailureHandler;
@@ -43,6 +44,7 @@ import java.util.List;
 import java.util.Map;
 
 import static 
org.apache.inlong.sort.formats.base.TableFormatUtils.deserializeBasicField;
+import static 
org.apache.inlong.sort.formats.base.TableFormatUtils.getFormatValueLength;
 import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.INLONGMSG_ATTR_INTERFACE_ID;
 import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.INLONGMSG_ATTR_INTERFACE_NAME;
 import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.INLONGMSG_ATTR_INTERFACE_TID;
@@ -281,4 +283,149 @@ public class InLongMsgBinlogUtils {
 
         return row;
     }
+
+    public static List<FormatMsg> getFormatMsgData(
+            RowFormatInfo rowFormatInfo,
+            String timeFieldName,
+            String attributesFieldName,
+            String metadataFieldName,
+            Map<String, String> attributes,
+            byte[] bytes,
+            boolean includeUpdateBefore, FailureHandler failureHandler) throws 
Exception {
+
+        InLongBinlog.RowData rowData = InLongBinlog.RowData.parseFrom(bytes);
+
+        List<FormatMsg> rows = new ArrayList<>();
+
+        switch (rowData.getEventType()) {
+            case INSERT:
+                rows.add(
+                        constructFormatMsgData(
+                                rowFormatInfo,
+                                timeFieldName,
+                                attributesFieldName,
+                                metadataFieldName,
+                                attributes,
+                                rowData,
+                                DBSYNC_OPERATION_INERT,
+                                rowData.getAfterColumnsList(),
+                                failureHandler));
+                break;
+            case UPDATE:
+                if (includeUpdateBefore) {
+                    rows.add(
+                            constructFormatMsgData(
+                                    rowFormatInfo,
+                                    timeFieldName,
+                                    attributesFieldName,
+                                    metadataFieldName,
+                                    attributes,
+                                    rowData,
+                                    DBSYNC_OPERATION_UPDATE_BEFORE,
+                                    rowData.getBeforeColumnsList(),
+                                    failureHandler));
+                }
+                rows.add(
+                        constructFormatMsgData(
+                                rowFormatInfo,
+                                timeFieldName,
+                                attributesFieldName,
+                                metadataFieldName,
+                                attributes,
+                                rowData,
+                                DBSYNC_OPERATION_UPDATE,
+                                rowData.getAfterColumnsList(),
+                                failureHandler));
+                break;
+            case DELETE:
+                rows.add(
+                        constructFormatMsgData(
+                                rowFormatInfo,
+                                timeFieldName,
+                                attributesFieldName,
+                                metadataFieldName,
+                                attributes,
+                                rowData,
+                                DBSYNC_OPERATION_DELETE,
+                                rowData.getBeforeColumnsList(),
+                                failureHandler));
+                break;
+            default:
+                return null;
+        }
+
+        return rows;
+    }
+
+    private static FormatMsg constructFormatMsgData(
+            RowFormatInfo rowFormatInfo,
+            String timeFieldName,
+            String attributesFieldName,
+            String metadataFieldName,
+            Map<String, String> attributes,
+            InLongBinlog.RowData rowData,
+            String operation,
+            List<InLongBinlog.Column> columns, FailureHandler failureHandler) 
throws Exception {
+        List<Object> headFields = new ArrayList<>();
+        long rowDataLength = 0L;
+        if (timeFieldName != null) {
+            headFields.add(new Timestamp(rowData.getExecuteTime()));
+        }
+
+        if (attributesFieldName != null) {
+            headFields.add(attributes);
+        }
+
+        if (metadataFieldName != null) {
+            Map<String, String> metadata = new HashMap<>();
+
+            metadata.put(METADATA_INSTANCE_NAME, rowData.getInstanceName());
+            metadata.put(METADATA_SCHEMA_NAME, rowData.getSchemaName());
+            metadata.put(METADATA_TABLE_NAME, rowData.getTableName());
+            metadata.put(METADATA_OPERATION_TYPE, operation);
+            metadata.put(METADATA_EXECUTE_TIME, 
Long.toString(rowData.getExecuteTime()));
+            metadata.put(METADATA_EXECUTE_ORDER, 
Long.toString(rowData.getExecuteOrder()));
+            metadata.put(METADATA_TRANSFER_IP, rowData.getTransferIp());
+
+            headFields.add(metadata);
+        }
+
+        String[] dataFieldNames = rowFormatInfo.getFieldNames();
+        FormatInfo[] dataFieldFormatInfos = 
rowFormatInfo.getFieldFormatInfos();
+
+        GenericRowData row = new GenericRowData(dataFieldNames.length + 
headFields.size());
+
+        for (int i = 0; i < headFields.size(); ++i) {
+            row.setField(i, headFields.get(i));
+        }
+
+        Map<String, String> columnMap = new HashMap<>(columns.size());
+        for (InLongBinlog.Column column : columns) {
+            if (column.getIsNull()) {
+                columnMap.put(column.getName(), null);
+            } else {
+                columnMap.put(column.getName(), column.getValue());
+            }
+        }
+
+        for (int i = 0; i < dataFieldNames.length; ++i) {
+            String fieldName = dataFieldNames[i];
+            String fieldText = columnMap.get(fieldName);
+
+            if (fieldText == null) {
+                row.setField(i + headFields.size(), null);
+            } else {
+                Object field =
+                        deserializeBasicField(
+                                fieldName,
+                                dataFieldFormatInfos[i],
+                                fieldText,
+                                null, failureHandler);
+                row.setField(i + headFields.size(), field);
+                rowDataLength += getFormatValueLength(dataFieldFormatInfos[i], 
fieldText);
+            }
+        }
+
+        return new FormatMsg(row, rowDataLength);
+    }
 }
diff --git 
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvFormatDeserializer.java
 
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvFormatDeserializer.java
index aac79533ef..5263e1334d 100644
--- 
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvFormatDeserializer.java
+++ 
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvFormatDeserializer.java
@@ -20,6 +20,7 @@ package org.apache.inlong.sort.formats.inlongmsgcsv;
 import org.apache.inlong.common.pojo.sort.dataflow.field.format.RowFormatInfo;
 import org.apache.inlong.sort.formats.base.FieldToRowDataConverters;
 import 
org.apache.inlong.sort.formats.base.FieldToRowDataConverters.FieldToRowDataConverter;
+import org.apache.inlong.sort.formats.base.FormatMsg;
 import org.apache.inlong.sort.formats.base.TableFormatUtils;
 import org.apache.inlong.sort.formats.base.TextFormatBuilder;
 import 
org.apache.inlong.sort.formats.inlongmsg.AbstractInLongMsgFormatDeserializer;
@@ -31,6 +32,8 @@ import 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.RowData;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
@@ -54,6 +57,8 @@ import static 
org.apache.inlong.sort.formats.inlongmsgcsv.InLongMsgCsvUtils.DEFA
  */
 public final class InLongMsgCsvFormatDeserializer extends 
AbstractInLongMsgFormatDeserializer {
 
+    private static final Logger LOG = 
LoggerFactory.getLogger(InLongMsgCsvFormatDeserializer.class);
+
     private static final long serialVersionUID = 1L;
 
     /**
@@ -221,6 +226,9 @@ public final class InLongMsgCsvFormatDeserializer extends 
AbstractInLongMsgForma
         this.isDeleteEscapeChar = isDeleteEscapeChar;
         this.retainPredefinedField = retainPredefinedField;
 
+        String[] fieldNames = rowFormatInfo.getFieldNames();
+        this.fieldNameSize = (fieldNames == null ? 0 : fieldNames.length);
+
         converters = Arrays.stream(rowFormatInfo.getFieldFormatInfos())
                 .map(formatInfo -> FieldToRowDataConverters.createConverter(
                         TableFormatUtils.deriveLogicalType(formatInfo)))
@@ -256,6 +264,16 @@ public final class InLongMsgCsvFormatDeserializer extends 
AbstractInLongMsgForma
 
     @Override
     protected List<RowData> convertRowDataList(InLongMsgHead head, 
InLongMsgBody body) throws Exception {
+        List<String> predefinedFields = head.getPredefinedFields();
+        List<String> fields = body.getFields();
+        int actualNumFields = (predefinedFields == null ? 0 : 
predefinedFields.size())
+                + (fields == null ? 0 : fields.size());
+        if (needPrint() && actualNumFields != fieldNameSize) {
+            LOG.warn("The number of fields mismatches: expected={}, actual={}. 
" +
+                    "PredefinedFields=[{}], Fields=[{}]", fieldNameSize, 
actualNumFields,
+                    predefinedFields, fields);
+        }
+
         GenericRowData genericRowData = InLongMsgCsvUtils.deserializeRowData(
                 rowFormatInfo,
                 nullLiteral,
@@ -275,6 +293,39 @@ public final class InLongMsgCsvFormatDeserializer extends 
AbstractInLongMsgForma
         return 
Collections.singletonList(InLongMsgUtils.decorateRowWithMetaData(genericRowData,
 head, metadataKeys));
     }
 
+    @Override
+    protected List<FormatMsg> convertFormatMsgList(InLongMsgHead head, 
InLongMsgBody body) throws Exception {
+        List<String> predefinedFields = head.getPredefinedFields();
+        List<String> fields = body.getFields();
+        int actualNumFields = (predefinedFields == null ? 0 : 
predefinedFields.size())
+                + (fields == null ? 0 : fields.size());
+
+        if (needPrint() && actualNumFields != fieldNameSize) {
+            LOG.warn("The number of fields mismatches: expected={}, actual={}. 
" +
+                    "PredefinedFields=[{}], Fields=[{}]", fieldNameSize, 
actualNumFields,
+                    predefinedFields, fields);
+        }
+
+        FormatMsg formatMsg = InLongMsgCsvUtils.deserializeFormatMsgData(
+                rowFormatInfo,
+                nullLiteral,
+                retainPredefinedField ? head.getPredefinedFields() : 
Collections.emptyList(),
+                body.getFields(),
+                converters, failureHandler);
+
+        // Decorate result with time and attributes fields if needed
+        GenericRowData genericRowData = 
InLongMsgUtils.decorateRowDataWithNeededHeadFields(
+                timeFieldName,
+                attributesFieldName,
+                head.getTime(),
+                head.getAttributes(),
+                (GenericRowData) formatMsg.getRowData());
+
+        genericRowData = 
InLongMsgUtils.decorateRowWithMetaData(genericRowData, head, metadataKeys);
+        // Decorate result with metadata if needed
+        return Collections.singletonList(new FormatMsg(genericRowData, 
formatMsg.getRowDataLength()));
+    }
+
     /**
      * The builder for {@link InLongMsgCsvFormatDeserializer}.
      */
diff --git 
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvUtils.java
 
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvUtils.java
index 99a44d33c0..212086a89b 100644
--- 
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvUtils.java
+++ 
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvUtils.java
@@ -21,6 +21,7 @@ import 
org.apache.inlong.common.pojo.sort.dataflow.field.format.FormatInfo;
 import org.apache.inlong.common.pojo.sort.dataflow.field.format.RowFormatInfo;
 import org.apache.inlong.sort.formats.base.FieldToRowDataConverters;
 import 
org.apache.inlong.sort.formats.base.FieldToRowDataConverters.FieldToRowDataConverter;
+import org.apache.inlong.sort.formats.base.FormatMsg;
 import org.apache.inlong.sort.formats.inlongmsg.FailureHandler;
 import org.apache.inlong.sort.formats.inlongmsg.InLongMsgBody;
 import org.apache.inlong.sort.formats.inlongmsg.InLongMsgHead;
@@ -38,6 +39,7 @@ import java.util.Map;
 import java.util.stream.Collectors;
 
 import static 
org.apache.inlong.sort.formats.base.TableFormatUtils.deserializeBasicField;
+import static 
org.apache.inlong.sort.formats.base.TableFormatUtils.getFormatValueLength;
 import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.INLONGMSG_ATTR_INTERFACE_ID;
 import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.INLONGMSG_ATTR_INTERFACE_NAME;
 import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.INLONGMSG_ATTR_INTERFACE_TID;
@@ -129,7 +131,7 @@ public class InLongMsgCsvUtils {
                 }).collect(Collectors.toList());
     }
 
-    public static GenericRowData deserializeRowData(
+    public static FormatMsg deserializeFormatMsgData(
             RowFormatInfo rowFormatInfo,
             String nullLiteral,
             List<String> predefinedFields,
@@ -139,13 +141,67 @@ public class InLongMsgCsvUtils {
         String[] fieldNames = rowFormatInfo.getFieldNames();
         FormatInfo[] fieldFormatInfos = rowFormatInfo.getFieldFormatInfos();
 
-        int actualNumFields = predefinedFields.size() + fields.size();
-        if (actualNumFields != fieldNames.length) {
-            LOG.warn("The number of fields mismatches: expected={}, actual={}. 
" +
-                    "PredefinedFields=[{}], Fields=[{}]", fieldNames.length, 
actualNumFields,
-                    predefinedFields, fields);
+        GenericRowData rowData = new GenericRowData(fieldNames.length);
+        long rowDataLength = 0L;
+        // Deserialize pre-defined fields
+        for (int i = 0; i < predefinedFields.size(); ++i) {
+            if (i >= fieldNames.length) {
+                break;
+            }
+
+            String fieldName = fieldNames[i];
+            FormatInfo fieldFormatInfo = fieldFormatInfos[i];
+            FieldToRowDataConverter converter = converters[i];
+            String fieldText = predefinedFields.get(i);
+
+            Object field = converter.convert(deserializeBasicField(
+                    fieldName,
+                    fieldFormatInfo,
+                    fieldText,
+                    nullLiteral, failureHandler));
+            rowData.setField(i, field);
+            rowDataLength += getFormatValueLength(fieldFormatInfo, fieldText);
+        }
+
+        // Deserialize fields
+        for (int i = 0; i < fields.size(); ++i) {
+
+            if (i + predefinedFields.size() >= fieldNames.length) {
+                break;
+            }
+
+            String fieldName = fieldNames[i + predefinedFields.size()];
+            FormatInfo fieldFormatInfo = fieldFormatInfos[i + 
predefinedFields.size()];
+            FieldToRowDataConverter converter = converters[i + 
predefinedFields.size()];
+            String fieldText = fields.get(i);
+
+            Object field = converter.convert(deserializeBasicField(
+                    fieldName,
+                    fieldFormatInfo,
+                    fieldText,
+                    nullLiteral, failureHandler));
+            rowData.setField(i + predefinedFields.size(), field);
+            rowDataLength += getFormatValueLength(fieldFormatInfo, fieldText);
+        }
+
+        // If schema length is larger than fields' length, use `null` to fill 
in the blanks
+        for (int i = predefinedFields.size() + fields.size(); i < 
fieldNames.length; ++i) {
+            rowData.setField(i, null);
         }
 
+        return new FormatMsg(rowData, rowDataLength);
+    }
+
+    public static GenericRowData deserializeRowData(
+            RowFormatInfo rowFormatInfo,
+            String nullLiteral,
+            List<String> predefinedFields,
+            List<String> fields,
+            FieldToRowDataConverters.FieldToRowDataConverter[] converters,
+            FailureHandler failureHandler) throws Exception {
+        String[] fieldNames = rowFormatInfo.getFieldNames();
+        FormatInfo[] fieldFormatInfos = rowFormatInfo.getFieldFormatInfos();
+
         GenericRowData rowData = new GenericRowData(fieldNames.length);
 
         // Deserialize pre-defined fields
diff --git 
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvFormatDeserializer.java
 
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvFormatDeserializer.java
index a9caf3bac5..7cb030471c 100644
--- 
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvFormatDeserializer.java
+++ 
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvFormatDeserializer.java
@@ -20,6 +20,7 @@ package org.apache.inlong.sort.formats.inlongmsgkv;
 import org.apache.inlong.common.pojo.sort.dataflow.field.format.RowFormatInfo;
 import org.apache.inlong.sort.formats.base.FieldToRowDataConverters;
 import 
org.apache.inlong.sort.formats.base.FieldToRowDataConverters.FieldToRowDataConverter;
+import org.apache.inlong.sort.formats.base.FormatMsg;
 import org.apache.inlong.sort.formats.base.TableFormatUtils;
 import org.apache.inlong.sort.formats.base.TextFormatBuilder;
 import 
org.apache.inlong.sort.formats.inlongmsg.AbstractInLongMsgFormatDeserializer;
@@ -211,6 +212,9 @@ public final class InLongMsgKvFormatDeserializer extends 
AbstractInLongMsgFormat
         this.nullLiteral = nullLiteral;
         this.isDeleteEscapeChar = isDeleteEscapeChar;
 
+        String[] fieldNames = rowFormatInfo.getFieldNames();
+        this.fieldNameSize = (fieldNames == null ? 0 : fieldNames.length);
+
         converters = Arrays.stream(rowFormatInfo.getFieldFormatInfos())
                 .map(formatInfo -> FieldToRowDataConverters.createConverter(
                         TableFormatUtils.deriveLogicalType(formatInfo)))
@@ -264,6 +268,25 @@ public final class InLongMsgKvFormatDeserializer extends 
AbstractInLongMsgFormat
                 genericRowData));
     }
 
+    protected List<FormatMsg> convertFormatMsgList(InLongMsgHead head, 
InLongMsgBody body) throws Exception {
+        FormatMsg formatMsg = InLongMsgKvUtils.deserializeFormatMsgData(
+                rowFormatInfo,
+                nullLiteral,
+                retainPredefinedField ? head.getPredefinedFields() : 
Collections.emptyList(),
+                body.getEntries(),
+                converters,
+                failureHandler);
+
+        // Decorate result with time and attributes fields if needed
+        
formatMsg.setRowData(InLongMsgUtils.decorateRowDataWithNeededHeadFields(
+                timeFieldName,
+                attributesFieldName,
+                head.getTime(),
+                head.getAttributes(),
+                (GenericRowData) formatMsg.getRowData()));
+        return Collections.singletonList(formatMsg);
+    }
+
     /**
      * The builder for {@link InLongMsgKvFormatDeserializer}.
      */
diff --git 
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvUtils.java
 
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvUtils.java
index 703a064384..772e5619d7 100644
--- 
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvUtils.java
+++ 
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvUtils.java
@@ -20,6 +20,7 @@ package org.apache.inlong.sort.formats.inlongmsgkv;
 import org.apache.inlong.common.pojo.sort.dataflow.field.format.FormatInfo;
 import org.apache.inlong.common.pojo.sort.dataflow.field.format.RowFormatInfo;
 import 
org.apache.inlong.sort.formats.base.FieldToRowDataConverters.FieldToRowDataConverter;
+import org.apache.inlong.sort.formats.base.FormatMsg;
 import org.apache.inlong.sort.formats.inlongmsg.FailureHandler;
 import org.apache.inlong.sort.formats.inlongmsg.InLongMsgBody;
 import org.apache.inlong.sort.formats.inlongmsg.InLongMsgHead;
@@ -34,6 +35,7 @@ import java.util.Map;
 import java.util.stream.Collectors;
 
 import static 
org.apache.inlong.sort.formats.base.TableFormatUtils.deserializeBasicField;
+import static 
org.apache.inlong.sort.formats.base.TableFormatUtils.getFormatValueLength;
 import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.INLONGMSG_ATTR_INTERFACE_NAME;
 import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.INLONGMSG_ATTR_INTERFACE_TID;
 import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.INLONGMSG_ATTR_STREAM_ID;
@@ -172,4 +174,57 @@ public class InLongMsgKvUtils {
 
         return row;
     }
+
+    public static FormatMsg deserializeFormatMsgData(
+            RowFormatInfo rowFormatInfo,
+            String nullLiteral,
+            List<String> predefinedFields,
+            Map<String, String> entries,
+            FieldToRowDataConverter[] converters,
+            FailureHandler failureHandler) throws Exception {
+        String[] fieldNames = rowFormatInfo.getFieldNames();
+        FormatInfo[] fieldFormatInfos = rowFormatInfo.getFieldFormatInfos();
+
+        GenericRowData row = new GenericRowData(fieldNames.length);
+        long rowDataLength = 0L;
+        for (int i = 0; i < predefinedFields.size(); ++i) {
+
+            if (i >= fieldNames.length) {
+                break;
+            }
+
+            String fieldName = fieldNames[i];
+            FormatInfo fieldFormatInfo = fieldFormatInfos[i];
+            FieldToRowDataConverter converter = converters[i];
+            String fieldText = predefinedFields.get(i);
+
+            Object field = converter.convert(
+                    deserializeBasicField(
+                            fieldName,
+                            fieldFormatInfo,
+                            fieldText,
+                            nullLiteral, failureHandler));
+            row.setField(i, field);
+            rowDataLength += getFormatValueLength(fieldFormatInfo, fieldText);
+        }
+
+        for (int i = predefinedFields.size(); i < fieldNames.length; ++i) {
+
+            String fieldName = fieldNames[i];
+            FormatInfo fieldFormatInfo = fieldFormatInfos[i];
+            FieldToRowDataConverter converter = converters[i];
+            String fieldText = entries.get(fieldName);
+
+            Object field = converter.convert(deserializeBasicField(
+                    fieldName,
+                    fieldFormatInfo,
+                    fieldText,
+                    nullLiteral,
+                    failureHandler));
+            row.setField(i, field);
+            rowDataLength += getFormatValueLength(fieldFormatInfo, fieldText);
+        }
+
+        return new FormatMsg(row, rowDataLength);
+    }
 }
diff --git 
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-pb/src/main/java/org/apache/inlong/sort/formats/inlongmsgpb/InLongMsgPbDeserializationSchema.java
 
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-pb/src/main/java/org/apache/inlong/sort/formats/inlongmsgpb/InLongMsgPbDeserializationSchema.java
index ce446bf13f..fbbfb1e7c3 100644
--- 
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-pb/src/main/java/org/apache/inlong/sort/formats/inlongmsgpb/InLongMsgPbDeserializationSchema.java
+++ 
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-pb/src/main/java/org/apache/inlong/sort/formats/inlongmsgpb/InLongMsgPbDeserializationSchema.java
@@ -20,6 +20,8 @@ package org.apache.inlong.sort.formats.inlongmsgpb;
 import org.apache.inlong.common.msg.InLongMsg;
 import org.apache.inlong.sdk.commons.protocol.ProxySdk.MessageObj;
 import org.apache.inlong.sdk.commons.protocol.ProxySdk.MessageObjs;
+import org.apache.inlong.sort.formats.base.DefaultDeserializationSchema;
+import org.apache.inlong.sort.formats.base.FormatMsg;
 
 import com.google.common.base.Objects;
 import org.apache.flink.api.common.serialization.DeserializationSchema;
@@ -82,7 +84,39 @@ public class InLongMsgPbDeserializationSchema implements 
DeserializationSchema<R
         List<MessageObj> msgList = msgObjs.getMsgsList();
         for (MessageObj msg : msgList) {
             RowData row = 
deserializationSchema.deserialize(msg.getBody().toByteArray());
-            this.emitRow(msg, (GenericRowData) row, out);
+            RowData newRow = emitRow(msg, (GenericRowData) row);
+            if (newRow == null) {
+                out.collect(row);
+                continue;
+            }
+            out.collect(newRow);
+        }
+    }
+
+    public void deserializeFormatMsg(byte[] message, Collector<FormatMsg> out) 
throws Exception {
+        byte[] decompressed = decompressor.decompress(message);
+        MessageObjs msgObjs = MessageObjs.parseFrom(decompressed);
+        List<MessageObj> msgList = msgObjs.getMsgsList();
+        for (MessageObj msg : msgList) {
+            byte[] body = msg.getBody().toByteArray();
+            if (deserializationSchema instanceof DeserializationSchema) {
+                FormatMsg formatMsg = ((DefaultDeserializationSchema) 
deserializationSchema)
+                        .deserializeFormatMsg(body);
+                RowData newRow = emitRow(msg, (GenericRowData) 
formatMsg.getRowData());
+                if (newRow != null) {
+                    formatMsg.setRowData(newRow);
+                }
+                out.collect(formatMsg);
+            } else {
+                RowData row = 
deserializationSchema.deserialize(msg.getBody().toByteArray());
+                RowData newRow = emitRow(msg, (GenericRowData) row);
+                long bodyLength = (body == null ? 0 : body.length);
+                if (newRow == null) {
+                    out.collect(new FormatMsg(row, bodyLength));
+                    continue;
+                }
+                out.collect(new FormatMsg(newRow, bodyLength));
+            }
         }
     }
 
@@ -130,10 +164,9 @@ public class InLongMsgPbDeserializationSchema implements 
DeserializationSchema<R
     }
 
     /** add metadata column */
-    private void emitRow(MessageObj message, GenericRowData physicalRow, 
Collector<RowData> out) {
+    private RowData emitRow(MessageObj message, GenericRowData physicalRow) {
         if (metadataConverters.length == 0) {
-            out.collect(physicalRow);
-            return;
+            return null;
         }
         final int physicalArity = physicalRow.getArity();
         final int metadataArity = metadataConverters.length;
@@ -146,6 +179,6 @@ public class InLongMsgPbDeserializationSchema implements 
DeserializationSchema<R
             producedRow.setField(
                     physicalArity + metadataPos, 
metadataConverters[metadataPos].read(message));
         }
-        out.collect(producedRow);
+        return producedRow;
     }
 }
diff --git 
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvFormatDeserializer.java
 
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvFormatDeserializer.java
index 415c1efd64..ab095ddaa8 100644
--- 
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvFormatDeserializer.java
+++ 
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvFormatDeserializer.java
@@ -20,6 +20,7 @@ package org.apache.inlong.sort.formats.inlongmsgtlogcsv;
 import org.apache.inlong.common.pojo.sort.dataflow.field.format.RowFormatInfo;
 import org.apache.inlong.sort.formats.base.FieldToRowDataConverters;
 import 
org.apache.inlong.sort.formats.base.FieldToRowDataConverters.FieldToRowDataConverter;
+import org.apache.inlong.sort.formats.base.FormatMsg;
 import org.apache.inlong.sort.formats.base.TableFormatUtils;
 import org.apache.inlong.sort.formats.base.TextFormatBuilder;
 import 
org.apache.inlong.sort.formats.inlongmsg.AbstractInLongMsgFormatDeserializer;
@@ -31,6 +32,8 @@ import 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.RowData;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
@@ -52,6 +55,8 @@ import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.DEFAULT_TI
  */
 public final class InLongMsgTlogCsvFormatDeserializer extends 
AbstractInLongMsgFormatDeserializer {
 
+    private static final Logger LOG = 
LoggerFactory.getLogger(InLongMsgTlogCsvFormatDeserializer.class);
+
     private static final long serialVersionUID = 1L;
 
     /**
@@ -226,6 +231,10 @@ public final class InLongMsgTlogCsvFormatDeserializer 
extends AbstractInLongMsgF
         this.isIncludeFirstSegment = isIncludeFirstSegment;
         this.isDeleteHeadDelimiter = isDeleteHeadDelimiter;
         this.isDeleteEscapeChar = isDeleteEscapeChar;
+
+        String[] fieldNames = rowFormatInfo.getFieldNames();
+        this.fieldNameSize = (fieldNames == null ? 0 : fieldNames.length);
+
         converters = Arrays.stream(rowFormatInfo.getFieldFormatInfos())
                 .map(formatInfo -> FieldToRowDataConverters.createConverter(
                         TableFormatUtils.deriveLogicalType(formatInfo)))
@@ -254,6 +263,14 @@ public final class InLongMsgTlogCsvFormatDeserializer 
extends AbstractInLongMsgF
 
     @Override
     protected List<RowData> convertRowDataList(InLongMsgHead head, 
InLongMsgBody body) throws Exception {
+        List<String> predefinedFields = head.getPredefinedFields();
+        List<String> fields = body.getFields();
+        int actualNumFields = (predefinedFields == null ? 0 : 
predefinedFields.size())
+                + (fields == null ? 0 : fields.size());
+        if (needPrint() && actualNumFields != fieldNameSize) {
+            LOG.warn("The number of fields mismatches: " + fieldNameSize +
+                    " expected, but was " + actualNumFields + ".");
+        }
         GenericRowData dataRow =
                 InLongMsgTlogCsvUtils.deserializeRowData(
                         rowFormatInfo,
@@ -272,6 +289,35 @@ public final class InLongMsgTlogCsvFormatDeserializer 
extends AbstractInLongMsgF
         return 
Collections.singletonList(InLongMsgUtils.decorateRowWithMetaData(genericRowData,
 head, metadataKeys));
     }
 
+    protected List<FormatMsg> convertFormatMsgList(InLongMsgHead head, 
InLongMsgBody body) throws Exception {
+        List<String> predefinedFields = head.getPredefinedFields();
+        List<String> fields = body.getFields();
+        int actualNumFields = (predefinedFields == null ? 0 : 
predefinedFields.size())
+                + (fields == null ? 0 : fields.size());
+
+        if (needPrint() && actualNumFields != fieldNameSize) {
+            LOG.warn("The number of fields mismatches: " + fieldNameSize +
+                    " expected, but was " + actualNumFields + ".");
+        }
+
+        FormatMsg formatMsg =
+                InLongMsgTlogCsvUtils.deserializeFormatMsgData(
+                        rowFormatInfo,
+                        nullLiteral,
+                        head.getPredefinedFields(),
+                        body.getFields(),
+                        converters, failureHandler);
+
+        GenericRowData genericRowData = 
InLongMsgUtils.decorateRowDataWithNeededHeadFields(
+                timeFieldName,
+                attributesFieldName,
+                head.getTime(),
+                head.getAttributes(),
+                (GenericRowData) formatMsg.getRowData());
+        
formatMsg.setRowData(InLongMsgUtils.decorateRowWithMetaData(genericRowData, 
head, metadataKeys));
+        return Collections.singletonList(formatMsg);
+    }
+
     /**
      * The builder for {@link InLongMsgTlogCsvFormatDeserializer}.
      */
diff --git 
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvUtils.java
 
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvUtils.java
index 1e72a668b6..db4787f83b 100644
--- 
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvUtils.java
+++ 
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvUtils.java
@@ -20,6 +20,7 @@ package org.apache.inlong.sort.formats.inlongmsgtlogcsv;
 import org.apache.inlong.common.pojo.sort.dataflow.field.format.FormatInfo;
 import org.apache.inlong.common.pojo.sort.dataflow.field.format.RowFormatInfo;
 import org.apache.inlong.sort.formats.base.FieldToRowDataConverters;
+import org.apache.inlong.sort.formats.base.FormatMsg;
 import org.apache.inlong.sort.formats.inlongmsg.FailureHandler;
 import org.apache.inlong.sort.formats.inlongmsg.InLongMsgBody;
 import org.apache.inlong.sort.formats.inlongmsg.InLongMsgHead;
@@ -36,6 +37,7 @@ import java.util.List;
 import java.util.Map;
 
 import static 
org.apache.inlong.sort.formats.base.TableFormatUtils.deserializeBasicField;
+import static 
org.apache.inlong.sort.formats.base.TableFormatUtils.getFormatValueLength;
 import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.INLONGMSG_ATTR_TIME_DT;
 import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.INLONGMSG_ATTR_TIME_T;
 import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.getPredefinedFields;
@@ -132,13 +134,66 @@ public class InLongMsgTlogCsvUtils {
         String[] fieldNames = rowFormatInfo.getFieldNames();
         FormatInfo[] fieldFormatInfos = rowFormatInfo.getFieldFormatInfos();
 
-        int actualNumFields = predefinedFields.size() + fields.size();
-        if (actualNumFields != fieldNames.length) {
-            LOG.warn("The number of fields mismatches: " + fieldNames.length +
-                    " expected, but was " + actualNumFields + ".");
+        GenericRowData rowData = new GenericRowData(fieldNames.length);
+
+        for (int i = 0; i < predefinedFields.size(); ++i) {
+
+            if (i >= fieldNames.length) {
+                break;
+            }
+
+            String fieldName = fieldNames[i];
+            FormatInfo fieldFormatInfo = fieldFormatInfos[i];
+
+            String fieldText = predefinedFields.get(i);
+
+            Object field =
+                    converters[i].convert(deserializeBasicField(
+                            fieldName,
+                            fieldFormatInfo,
+                            fieldText,
+                            nullLiteral, failureHandler));
+            rowData.setField(i, field);
+        }
+
+        for (int i = 0; i < fields.size(); ++i) {
+
+            if (i + predefinedFields.size() >= fieldNames.length) {
+                break;
+            }
+
+            String fieldName = fieldNames[i + predefinedFields.size()];
+            FormatInfo fieldFormatInfo = fieldFormatInfos[i + 
predefinedFields.size()];
+
+            String fieldText = fields.get(i);
+
+            Object field =
+                    converters[i + 
predefinedFields.size()].convert(deserializeBasicField(
+                            fieldName,
+                            fieldFormatInfo,
+                            fieldText,
+                            nullLiteral, failureHandler));
+            rowData.setField(i + predefinedFields.size(), field);
+        }
+
+        for (int i = predefinedFields.size() + fields.size(); i < 
fieldNames.length; ++i) {
+            rowData.setField(i, null);
         }
 
+        return rowData;
+    }
+
+    public static FormatMsg deserializeFormatMsgData(RowFormatInfo 
rowFormatInfo,
+            String nullLiteral,
+            List<String> predefinedFields,
+            List<String> fields,
+            FieldToRowDataConverters.FieldToRowDataConverter[] converters,
+            FailureHandler failureHandler) throws Exception {
+        String[] fieldNames = rowFormatInfo.getFieldNames();
+        FormatInfo[] fieldFormatInfos = rowFormatInfo.getFieldFormatInfos();
+
         GenericRowData rowData = new GenericRowData(fieldNames.length);
+        long rowDataLength = 0L;
 
         for (int i = 0; i < predefinedFields.size(); ++i) {
 
@@ -158,6 +213,7 @@ public class InLongMsgTlogCsvUtils {
                             fieldText,
                             nullLiteral, failureHandler));
             rowData.setField(i, field);
+            rowDataLength += getFormatValueLength(fieldFormatInfo, fieldText);
         }
 
         for (int i = 0; i < fields.size(); ++i) {
@@ -178,12 +234,13 @@ public class InLongMsgTlogCsvUtils {
                             fieldText,
                             nullLiteral, failureHandler));
             rowData.setField(i + predefinedFields.size(), field);
+            rowDataLength += getFormatValueLength(fieldFormatInfo, fieldText);
         }
 
         for (int i = predefinedFields.size() + fields.size(); i < 
fieldNames.length; ++i) {
             rowData.setField(i, null);
         }
 
-        return rowData;
+        return new FormatMsg(rowData, rowDataLength);
     }
 }
diff --git 
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogkv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvFormatDeserializer.java
 
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogkv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvFormatDeserializer.java
index 2d54f9db13..dff67d924f 100644
--- 
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogkv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvFormatDeserializer.java
+++ 
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogkv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvFormatDeserializer.java
@@ -20,6 +20,7 @@ package org.apache.inlong.sort.formats.inlongmsgtlogkv;
 import org.apache.inlong.common.pojo.sort.dataflow.field.format.RowFormatInfo;
 import org.apache.inlong.sort.formats.base.FieldToRowDataConverters;
 import 
org.apache.inlong.sort.formats.base.FieldToRowDataConverters.FieldToRowDataConverter;
+import org.apache.inlong.sort.formats.base.FormatMsg;
 import org.apache.inlong.sort.formats.base.TableFormatUtils;
 import org.apache.inlong.sort.formats.base.TextFormatBuilder;
 import 
org.apache.inlong.sort.formats.inlongmsg.AbstractInLongMsgFormatDeserializer;
@@ -189,6 +190,9 @@ public final class InLongMsgTlogKvFormatDeserializer 
extends AbstractInLongMsgFo
         this.nullLiteral = nullLiteral;
         this.isDeleteEscapeChar = isDeleteEscapeChar;
 
+        String[] fieldNames = rowFormatInfo.getFieldNames();
+        this.fieldNameSize = (fieldNames == null ? 0 : fieldNames.length);
+
         this.converters = Arrays.stream(rowFormatInfo.getFieldFormatInfos())
                 .map(formatInfo -> FieldToRowDataConverters.createConverter(
                         TableFormatUtils.deriveLogicalType(formatInfo)))
@@ -237,6 +241,26 @@ public final class InLongMsgTlogKvFormatDeserializer 
extends AbstractInLongMsgFo
         return Collections.singletonList(rowData);
     }
 
+    @Override
+    protected List<FormatMsg> convertFormatMsgList(InLongMsgHead head, 
InLongMsgBody body) throws Exception {
+        FormatMsg formatMsg =
+                InLongMsgTlogKvUtils.deserializeFormatMsgData(
+                        rowFormatInfo,
+                        nullLiteral,
+                        head.getPredefinedFields(),
+                        body.getEntries(), converters, failureHandler);
+
+        RowData rowData = InLongMsgUtils.decorateRowWithNeededHeadFields(
+                timeFieldName,
+                attributesFieldName,
+                head.getTime(),
+                head.getAttributes(),
+                (GenericRowData) formatMsg.getRowData());
+        formatMsg.setRowData(rowData);
+
+        return Collections.singletonList(formatMsg);
+    }
+
     /**
      * The builder for {@link InLongMsgTlogKvFormatDeserializer}.
      */
diff --git 
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogkv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvUtils.java
 
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogkv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvUtils.java
index f42caf3053..57893cbde3 100644
--- 
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogkv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvUtils.java
+++ 
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogkv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvUtils.java
@@ -20,6 +20,7 @@ package org.apache.inlong.sort.formats.inlongmsgtlogkv;
 import org.apache.inlong.common.pojo.sort.dataflow.field.format.FormatInfo;
 import org.apache.inlong.common.pojo.sort.dataflow.field.format.RowFormatInfo;
 import 
org.apache.inlong.sort.formats.base.FieldToRowDataConverters.FieldToRowDataConverter;
+import org.apache.inlong.sort.formats.base.FormatMsg;
 import org.apache.inlong.sort.formats.inlongmsg.FailureHandler;
 import org.apache.inlong.sort.formats.inlongmsg.InLongMsgBody;
 import org.apache.inlong.sort.formats.inlongmsg.InLongMsgHead;
@@ -34,6 +35,7 @@ import java.util.List;
 import java.util.Map;
 
 import static 
org.apache.inlong.sort.formats.base.TableFormatUtils.deserializeBasicField;
+import static 
org.apache.inlong.sort.formats.base.TableFormatUtils.getFormatValueLength;
 import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.INLONGMSG_ATTR_TIME_DT;
 import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.INLONGMSG_ATTR_TIME_T;
 import static 
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.getPredefinedFields;
@@ -159,4 +161,62 @@ public class InLongMsgTlogKvUtils {
         return rowData;
     }
 
+    /**
+     * Deserializes the row from the given entries.
+     *
+     * @param rowFormatInfo The format information of the row.
+     * @param nullLiteral The literal for null values.
+     * @param predefinedFields The predefined fields.
+     * @param entries The entries.
+     * @return The row FormatMsg from the given entries.
+     */
+    public static FormatMsg deserializeFormatMsgData(
+            RowFormatInfo rowFormatInfo,
+            String nullLiteral,
+            List<String> predefinedFields,
+            Map<String, String> entries,
+            FieldToRowDataConverter[] converters,
+            FailureHandler failureHandler) throws Exception {
+        String[] fieldNames = rowFormatInfo.getFieldNames();
+        FormatInfo[] fieldFormatInfos = rowFormatInfo.getFieldFormatInfos();
+
+        GenericRowData rowData = new GenericRowData(fieldNames.length);
+        long rowDataLength = 0L;
+        for (int i = 0; i < predefinedFields.size(); ++i) {
+
+            if (i >= fieldNames.length) {
+                break;
+            }
+
+            String fieldName = fieldNames[i];
+            FormatInfo fieldFormatInfo = fieldFormatInfos[i];
+            String fieldText = predefinedFields.get(i);
+            FieldToRowDataConverter converter = converters[i];
+            Object field = converter.convert(
+                    deserializeBasicField(
+                            fieldName,
+                            fieldFormatInfo,
+                            fieldText,
+                            nullLiteral, failureHandler));
+            rowData.setField(i, field);
+            rowDataLength += getFormatValueLength(fieldFormatInfos[i], 
fieldText);
+        }
+
+        for (int i = predefinedFields.size(); i < fieldNames.length; ++i) {
+            String fieldName = fieldNames[i];
+            FormatInfo fieldFormatInfo = fieldFormatInfos[i];
+            String fieldText = entries.get(fieldName);
+            FieldToRowDataConverter converter = converters[i];
+            Object field = converter.convert(deserializeBasicField(
+                    fieldName,
+                    fieldFormatInfo,
+                    fieldText,
+                    nullLiteral, failureHandler));
+            rowData.setField(i, field);
+            rowDataLength += getFormatValueLength(fieldFormatInfos[i], 
fieldText);
+        }
+
+        return new FormatMsg(rowData, rowDataLength);
+    }
+
 }
diff --git 
a/inlong-sort/sort-formats/format-rowdata/format-rowdata-base/src/main/java/org/apache/inlong/sort/formats/base/DefaultDeserializationSchema.java
 
b/inlong-sort/sort-formats/format-rowdata/format-rowdata-base/src/main/java/org/apache/inlong/sort/formats/base/DefaultDeserializationSchema.java
index c2d55af15d..c0525efa99 100644
--- 
a/inlong-sort/sort-formats/format-rowdata/format-rowdata-base/src/main/java/org/apache/inlong/sort/formats/base/DefaultDeserializationSchema.java
+++ 
b/inlong-sort/sort-formats/format-rowdata/format-rowdata-base/src/main/java/org/apache/inlong/sort/formats/base/DefaultDeserializationSchema.java
@@ -29,6 +29,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.time.Instant;
 import java.util.Objects;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
@@ -42,6 +43,10 @@ public abstract class DefaultDeserializationSchema<T> 
implements Deserialization
 
     private static final Logger LOG = 
LoggerFactory.getLogger(DefaultDeserializationSchema.class);
 
+    protected long lastPrintTimestamp = 0L;
+    protected long PRINT_TIMESTAMP_INTERVAL = 60 * 1000L;
+    protected int fieldNameSize = 0;
+
     protected FailureHandler failureHandler;
 
     /**
@@ -107,8 +112,19 @@ public abstract class DefaultDeserializationSchema<T> 
implements Deserialization
         }
     }
 
+    protected boolean needPrint() {
+        long now = Instant.now().toEpochMilli();
+        if (now - lastPrintTimestamp > PRINT_TIMESTAMP_INTERVAL) {
+            lastPrintTimestamp = now;
+            return true;
+        }
+        return false;
+    }
+
     protected abstract T deserializeInternal(byte[] bytes) throws Exception;
 
+    public abstract FormatMsg deserializeFormatMsg(byte[] bytes) throws 
Exception;
+
     @Override
     public boolean equals(Object object) {
         if (this == object) {
diff --git 
a/inlong-sort/sort-formats/format-rowdata/format-rowdata-csv/src/main/java/org/apache/inlong/sort/formats/csv/CsvRowDataDeserializationSchema.java
 
b/inlong-sort/sort-formats/format-rowdata/format-rowdata-csv/src/main/java/org/apache/inlong/sort/formats/csv/CsvRowDataDeserializationSchema.java
index 665b1b2fc5..c705c4344f 100644
--- 
a/inlong-sort/sort-formats/format-rowdata/format-rowdata-csv/src/main/java/org/apache/inlong/sort/formats/csv/CsvRowDataDeserializationSchema.java
+++ 
b/inlong-sort/sort-formats/format-rowdata/format-rowdata-csv/src/main/java/org/apache/inlong/sort/formats/csv/CsvRowDataDeserializationSchema.java
@@ -22,6 +22,7 @@ import 
org.apache.inlong.common.pojo.sort.dataflow.field.format.RowFormatInfo;
 import org.apache.inlong.sort.formats.base.DefaultDeserializationSchema;
 import org.apache.inlong.sort.formats.base.FieldToRowDataConverters;
 import 
org.apache.inlong.sort.formats.base.FieldToRowDataConverters.FieldToRowDataConverter;
+import org.apache.inlong.sort.formats.base.FormatMsg;
 import org.apache.inlong.sort.formats.base.TableFormatUtils;
 import org.apache.inlong.sort.formats.inlongmsg.FailureHandler;
 
@@ -48,6 +49,7 @@ import static 
org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_E
 import static 
org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_IGNORE_ERRORS;
 import static 
org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_NULL_LITERAL;
 import static 
org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_QUOTE_CHARACTER;
+import static 
org.apache.inlong.sort.formats.base.TableFormatUtils.getFormatValueLength;
 import static org.apache.inlong.sort.formats.util.StringUtils.splitCsv;
 
 /**
@@ -127,6 +129,9 @@ public final class CsvRowDataDeserializationSchema extends 
DefaultDeserializatio
         this.quoteChar = quoteChar;
         this.nullLiteral = nullLiteral;
 
+        String[] fieldNames = rowFormatInfo.getFieldNames();
+        this.fieldNameSize = (fieldNames == null ? 0 : fieldNames.length);
+
         converters = Arrays.stream(rowFormatInfo.getFieldFormatInfos())
                 .map(formatInfo -> FieldToRowDataConverters.createConverter(
                         TableFormatUtils.deriveLogicalType(formatInfo)))
@@ -151,6 +156,9 @@ public final class CsvRowDataDeserializationSchema extends 
DefaultDeserializatio
         this.quoteChar = quoteChar;
         this.nullLiteral = nullLiteral;
 
+        String[] fieldNames = rowFormatInfo.getFieldNames();
+        this.fieldNameSize = (fieldNames == null ? 0 : fieldNames.length);
+
         converters = Arrays.stream(rowFormatInfo.getFieldFormatInfos())
                 .map(formatInfo -> FieldToRowDataConverters.createConverter(
                         TableFormatUtils.deriveLogicalType(formatInfo)))
@@ -246,10 +254,12 @@ public final class CsvRowDataDeserializationSchema 
extends DefaultDeserializatio
             FormatInfo[] fieldFormatInfos = 
rowFormatInfo.getFieldFormatInfos();
 
             String[] fieldTexts = splitCsv(text, delimiter, escapeChar, 
quoteChar);
-            if (fieldTexts.length != fieldNames.length) {
+
+            if (needPrint() && fieldTexts.length != fieldNameSize) {
                 LOG.warn("The number of fields mismatches: expected=[{}], 
actual=[{}]. Text=[{}].",
                         fieldNames.length, fieldTexts.length, text);
             }
+
             GenericRowData rowData = new GenericRowData(fieldNames.length);
 
             for (int i = 0; i < fieldNames.length; ++i) {
@@ -274,6 +284,49 @@ public final class CsvRowDataDeserializationSchema extends 
DefaultDeserializatio
         return null;
     }
 
+    @Override
+    public FormatMsg deserializeFormatMsg(byte[] message) throws Exception {
+        if (message == null) {
+            return null;
+        }
+        String text = new String(message, Charset.forName(charset));
+        long rowDataLength = 0L;
+        try {
+            String[] fieldNames = rowFormatInfo.getFieldNames();
+            FormatInfo[] fieldFormatInfos = 
rowFormatInfo.getFieldFormatInfos();
+
+            String[] fieldTexts = splitCsv(text, delimiter, escapeChar, 
quoteChar);
+
+            if (needPrint() && fieldTexts.length != fieldNameSize) {
+                LOG.warn("The number of fields mismatches: expected=[{}], 
actual=[{}]. Text=[{}].",
+                        fieldNames.length, fieldTexts.length, text);
+            }
+
+            GenericRowData rowData = new GenericRowData(fieldNames.length);
+
+            for (int i = 0; i < fieldNames.length; ++i) {
+                if (i >= fieldTexts.length) {
+                    rowData.setField(i, null);
+                } else {
+                    Object field =
+                            TableFormatUtils.deserializeBasicField(
+                                    fieldNames[i],
+                                    fieldFormatInfos[i],
+                                    fieldTexts[i],
+                                    nullLiteral, failureHandler);
+
+                    rowData.setField(i, converters[i].convert(field));
+                    rowDataLength += getFormatValueLength(fieldFormatInfos[i], 
fieldTexts[i]);
+                }
+            }
+            return new FormatMsg(rowData, rowDataLength);
+        } catch (Throwable t) {
+            failureHandler.onParsingMsgFailure(text, new RuntimeException(
+                    String.format("Could not properly deserialize csv. 
Text=[{}].", text), t));
+        }
+        return null;
+    }
+
     @Override
     public boolean isEndOfStream(RowData nextElement) {
         return false;
diff --git 
a/inlong-sort/sort-formats/format-rowdata/format-rowdata-json/src/main/java/org/apache/inlong/sort/formats/json/JsonRowDataDeserializationSchema.java
 
b/inlong-sort/sort-formats/format-rowdata/format-rowdata-json/src/main/java/org/apache/inlong/sort/formats/json/JsonRowDataDeserializationSchema.java
index 386f75f998..9037470c20 100644
--- 
a/inlong-sort/sort-formats/format-rowdata/format-rowdata-json/src/main/java/org/apache/inlong/sort/formats/json/JsonRowDataDeserializationSchema.java
+++ 
b/inlong-sort/sort-formats/format-rowdata/format-rowdata-json/src/main/java/org/apache/inlong/sort/formats/json/JsonRowDataDeserializationSchema.java
@@ -18,6 +18,7 @@
 package org.apache.inlong.sort.formats.json;
 
 import org.apache.inlong.sort.formats.base.DefaultDeserializationSchema;
+import org.apache.inlong.sort.formats.base.FormatMsg;
 import org.apache.inlong.sort.formats.base.TextFormatOptions.TimestampFormat;
 import 
org.apache.inlong.sort.formats.json.FieldToRowDataConverters.FieldToRowDataConverter;
 
@@ -119,6 +120,23 @@ public class JsonRowDataDeserializationSchema extends 
DefaultDeserializationSche
         return rowData;
     }
 
+    @Override
+    public FormatMsg deserializeFormatMsg(byte[] message) throws Exception {
+        if (message == null) {
+            return null;
+        }
+
+        String jsonStr = new String(message, charset);
+        RowData rowData = null;
+        try {
+            rowData = (RowData) runtimeConverter.convert(jsonStr);
+        } catch (Throwable t) {
+            failureHandler.onParsingMsgFailure(jsonStr, new RuntimeException(
+                    String.format("Could not properly deserialize json. 
Text=[%s].", jsonStr), t));
+        }
+        return new FormatMsg(rowData, message.length);
+    }
+
     @Override
     public boolean isEndOfStream(RowData nextElement) {
         return false;
diff --git 
a/inlong-sort/sort-formats/format-rowdata/format-rowdata-kv/src/main/java/org/apache/inlong/sort/formats/kv/KvRowDataDeserializationSchema.java
 
b/inlong-sort/sort-formats/format-rowdata/format-rowdata-kv/src/main/java/org/apache/inlong/sort/formats/kv/KvRowDataDeserializationSchema.java
index 771cc5bd17..200719d145 100644
--- 
a/inlong-sort/sort-formats/format-rowdata/format-rowdata-kv/src/main/java/org/apache/inlong/sort/formats/kv/KvRowDataDeserializationSchema.java
+++ 
b/inlong-sort/sort-formats/format-rowdata/format-rowdata-kv/src/main/java/org/apache/inlong/sort/formats/kv/KvRowDataDeserializationSchema.java
@@ -21,6 +21,7 @@ import 
org.apache.inlong.common.pojo.sort.dataflow.field.format.FormatInfo;
 import org.apache.inlong.common.pojo.sort.dataflow.field.format.RowFormatInfo;
 import org.apache.inlong.sort.formats.base.DefaultDeserializationSchema;
 import org.apache.inlong.sort.formats.base.FieldToRowDataConverters;
+import org.apache.inlong.sort.formats.base.FormatMsg;
 import org.apache.inlong.sort.formats.base.TableFormatForRowDataUtils;
 import org.apache.inlong.sort.formats.inlongmsg.FailureHandler;
 
@@ -46,6 +47,7 @@ import static 
org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_K
 import static 
org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_NULL_LITERAL;
 import static 
org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_QUOTE_CHARACTER;
 import static 
org.apache.inlong.sort.formats.base.TableFormatUtils.deserializeBasicField;
+import static 
org.apache.inlong.sort.formats.base.TableFormatUtils.getFormatValueLength;
 import static org.apache.inlong.sort.formats.util.StringUtils.splitKv;
 
 /**
@@ -139,6 +141,8 @@ public class KvRowDataDeserializationSchema extends 
DefaultDeserializationSchema
         this.escapeChar = escapeChar;
         this.quoteChar = quoteChar;
         this.nullLiteral = nullLiteral;
+        String[] fieldNames = rowFormatInfo.getFieldNames();
+        this.fieldNameSize = (fieldNames == null ? 0 : fieldNames.length);
 
         converters = Arrays.stream(rowFormatInfo.getFieldFormatInfos())
                 .map(formatInfo -> FieldToRowDataConverters.createConverter(
@@ -166,6 +170,9 @@ public class KvRowDataDeserializationSchema extends 
DefaultDeserializationSchema
         this.quoteChar = quoteChar;
         this.nullLiteral = nullLiteral;
 
+        String[] fieldNames = rowFormatInfo.getFieldNames();
+        this.fieldNameSize = (fieldNames == null ? 0 : fieldNames.length);
+
         converters = Arrays.stream(rowFormatInfo.getFieldFormatInfos())
                 .map(formatInfo -> FieldToRowDataConverters.createConverter(
                         
TableFormatForRowDataUtils.deriveLogicalType(formatInfo)))
@@ -206,6 +213,42 @@ public class KvRowDataDeserializationSchema extends 
DefaultDeserializationSchema
         return null;
     }
 
+    @Override
+    public FormatMsg deserializeFormatMsg(byte[] bytes) throws Exception {
+        String text = new String(bytes, Charset.forName(charset));
+        GenericRowData rowData = null;
+        long rowDataLength = 0L;
+        try {
+            List<Map<String, String>> fieldTexts =
+                    splitKv(text, entryDelimiter, kvDelimiter, escapeChar, 
quoteChar, null,
+                            true);
+
+            String[] fieldNames = rowFormatInfo.getFieldNames();
+            FormatInfo[] fieldFormatInfos = 
rowFormatInfo.getFieldFormatInfos();
+
+            rowData = new GenericRowData(fieldFormatInfos.length);
+            for (int i = 0; i < fieldFormatInfos.length; i++) {
+                String fieldName = fieldNames[i];
+                FormatInfo fieldFormatInfo = fieldFormatInfos[i];
+
+                String fieldText = fieldTexts.get(0).get(fieldName);
+
+                Object field = deserializeBasicField(
+                        fieldName,
+                        fieldFormatInfo,
+                        fieldText,
+                        nullLiteral, failureHandler);
+                rowData.setField(i, converters[i].convert(field));
+                rowDataLength += getFormatValueLength(fieldFormatInfo, 
fieldText);
+            }
+            return new FormatMsg(rowData, rowDataLength);
+        } catch (Throwable t) {
+            failureHandler.onParsingMsgFailure(text, new RuntimeException(
+                    String.format("Could not properly deserialize kv. 
Text=[{}].", text), t));
+        }
+        return null;
+    }
+
     @Override
     public boolean isEndOfStream(RowData rowData) {
         return false;

Reply via email to