This is an automated email from the ASF dual-hosted git repository.
baomingyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 624a009c17 [INLONG-11966][Sort]The deserialization process supports
returning the data byte size in one rowdata (#11967)
624a009c17 is described below
commit 624a009c171dfbf2af3afb74aee747044ba60576
Author: Mingyu Bao <[email protected]>
AuthorDate: Thu Aug 14 15:57:14 2025 +0800
[INLONG-11966][Sort]The deserialization process supports returning the data
byte size in one rowdata (#11967)
---
.../apache/inlong/sort/formats/base/FormatMsg.java | 33 +++++
.../inlong/sort/formats/base/TableFormatUtils.java | 30 +++++
.../AbstractInLongMsgDeserializationSchema.java | 14 ++
.../AbstractInLongMsgFormatDeserializer.java | 45 +++++++
.../sort/formats/inlongmsg/InLongMsgUtils.java | 12 ++
.../InLongMsgBinlogFormatDeserializer.java | 14 ++
.../inlongmsgbinlog/InLongMsgBinlogUtils.java | 147 +++++++++++++++++++++
.../InLongMsgCsvFormatDeserializer.java | 51 +++++++
.../formats/inlongmsgcsv/InLongMsgCsvUtils.java | 68 +++++++++-
.../inlongmsgkv/InLongMsgKvFormatDeserializer.java | 23 ++++
.../sort/formats/inlongmsgkv/InLongMsgKvUtils.java | 55 ++++++++
.../InLongMsgPbDeserializationSchema.java | 43 +++++-
.../InLongMsgTlogCsvFormatDeserializer.java | 46 +++++++
.../inlongmsgtlogcsv/InLongMsgTlogCsvUtils.java | 67 +++++++++-
.../InLongMsgTlogKvFormatDeserializer.java | 24 ++++
.../inlongmsgtlogkv/InLongMsgTlogKvUtils.java | 60 +++++++++
.../formats/base/DefaultDeserializationSchema.java | 16 +++
.../csv/CsvRowDataDeserializationSchema.java | 55 +++++++-
.../json/JsonRowDataDeserializationSchema.java | 18 +++
.../formats/kv/KvRowDataDeserializationSchema.java | 43 ++++++
20 files changed, 847 insertions(+), 17 deletions(-)
diff --git
a/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/base/FormatMsg.java
b/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/base/FormatMsg.java
new file mode 100644
index 0000000000..752bdf860c
--- /dev/null
+++
b/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/base/FormatMsg.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.formats.base;
+
+import lombok.Data;
+import org.apache.flink.table.data.RowData;
+
+@Data
+public class FormatMsg {
+
+ private RowData rowData;
+ private long rowDataLength;
+
+ public FormatMsg(RowData rowData, long rowDataLength) {
+ this.rowData = rowData;
+ this.rowDataLength = rowDataLength;
+ }
+}
diff --git
a/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/base/TableFormatUtils.java
b/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/base/TableFormatUtils.java
index 31e3cdaf09..dd1d3e1579 100644
---
a/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/base/TableFormatUtils.java
+++
b/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/base/TableFormatUtils.java
@@ -60,6 +60,7 @@ import
org.apache.inlong.common.pojo.sort.dataflow.field.format.VarBinaryFormatI
import
org.apache.inlong.common.pojo.sort.dataflow.field.format.VarCharFormatInfo;
import org.apache.inlong.sort.formats.inlongmsg.FailureHandler;
+import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.table.api.DataTypes;
@@ -582,6 +583,35 @@ public class TableFormatUtils {
return null;
}
+ public static long getFormatValueLength(FormatInfo fieldFormatInfo, String
fieldText) {
+ if (fieldFormatInfo instanceof BooleanFormatInfo) {
+ return 4;
+ } else if (fieldFormatInfo instanceof ByteFormatInfo) {
+ return 4;
+ } else if (fieldFormatInfo instanceof BooleanFormatInfo) {
+ return 4;
+ } else if (fieldFormatInfo instanceof ShortFormatInfo) {
+ return 4;
+ } else if (fieldFormatInfo instanceof IntFormatInfo) {
+ return 4;
+ } else if (fieldFormatInfo instanceof LongFormatInfo) {
+ return 8;
+ } else if (fieldFormatInfo instanceof FloatFormatInfo) {
+ return 8;
+ } else if (fieldFormatInfo instanceof DoubleFormatInfo) {
+ return 8;
+ } else if (fieldFormatInfo instanceof DecimalFormatInfo) {
+ return 8;
+ } else if (fieldFormatInfo instanceof DateFormatInfo
+ || fieldFormatInfo instanceof TimeFormatInfo
+ || fieldFormatInfo instanceof TimestampFormatInfo) {
+ return 8;
+ } else if (StringUtils.isNotEmpty(fieldText)) {
+ return fieldText.length();
+ }
+ return 0L;
+ }
+
/**
* Serializes the basic field.
*/
diff --git
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/AbstractInLongMsgDeserializationSchema.java
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/AbstractInLongMsgDeserializationSchema.java
index 4a404fe25d..c5cb220456 100644
---
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/AbstractInLongMsgDeserializationSchema.java
+++
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/AbstractInLongMsgDeserializationSchema.java
@@ -17,6 +17,7 @@
package org.apache.inlong.sort.formats.inlongmsg;
+import org.apache.inlong.sort.formats.base.FormatMsg;
import org.apache.inlong.sort.formats.metrics.FormatMetricGroup;
import org.apache.flink.api.common.serialization.DeserializationSchema;
@@ -78,6 +79,14 @@ public abstract class AbstractInLongMsgDeserializationSchema
implements Deserial
}
}
+ public void deserializeFormatMsg(byte[] message, Collector<FormatMsg> out)
{
+ try {
+ formatDeserializer.flatFormatMsgMap(message, out);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
public List<InLongMsgWrap> preParse(byte[] bytes) throws Exception {
return formatDeserializer.preParse(bytes);
}
@@ -88,6 +97,11 @@ public abstract class AbstractInLongMsgDeserializationSchema
implements Deserial
formatDeserializer.parse(inLongMsgWrap, collector);
}
+ public void parseFormatMsg(InLongMsgWrap inLongMsgWrap,
+ Collector<FormatMsg> collector) throws Exception {
+ formatDeserializer.parseFormatMsg(inLongMsgWrap, collector);
+ }
+
@Override
public boolean isEndOfStream(RowData rowData) {
return false;
diff --git
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/AbstractInLongMsgFormatDeserializer.java
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/AbstractInLongMsgFormatDeserializer.java
index b9611e9a17..43c937a5a7 100644
---
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/AbstractInLongMsgFormatDeserializer.java
+++
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/AbstractInLongMsgFormatDeserializer.java
@@ -18,6 +18,7 @@
package org.apache.inlong.sort.formats.inlongmsg;
import org.apache.inlong.common.msg.InLongMsg;
+import org.apache.inlong.sort.formats.base.FormatMsg;
import org.apache.inlong.sort.formats.metrics.FormatMetricGroup;
import org.apache.commons.lang3.StringUtils;
@@ -32,6 +33,7 @@ import javax.annotation.Nonnull;
import java.io.IOException;
import java.io.Serializable;
+import java.time.Instant;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
@@ -45,6 +47,10 @@ public abstract class AbstractInLongMsgFormatDeserializer
implements ResultTypeQ
private static final Logger LOG =
LoggerFactory.getLogger(AbstractInLongMsgFormatDeserializer.class);
+ protected long lastPrintTimestamp = 0L;
+ protected long PRINT_TIMESTAMP_INTERVAL = 60 * 1000L;
+ protected int fieldNameSize = 0;
+
protected FailureHandler failureHandler;
/**
@@ -76,6 +82,17 @@ public abstract class AbstractInLongMsgFormatDeserializer
implements ResultTypeQ
*/
protected abstract List<RowData> convertRowDataList(InLongMsgHead head,
InLongMsgBody body) throws Exception;
+ protected abstract List<FormatMsg> convertFormatMsgList(InLongMsgHead
head, InLongMsgBody body) throws Exception;
+
+ protected boolean needPrint() {
+ long now = Instant.now().toEpochMilli();
+ if (now - lastPrintTimestamp > PRINT_TIMESTAMP_INTERVAL) {
+ lastPrintTimestamp = now;
+ return true;
+ }
+ return false;
+ }
+
public void flatMap(
byte[] bytes,
Collector<RowData> collector) throws Exception {
@@ -84,6 +101,14 @@ public abstract class AbstractInLongMsgFormatDeserializer
implements ResultTypeQ
}
}
+ public void flatFormatMsgMap(
+ byte[] bytes,
+ Collector<FormatMsg> collector) throws Exception {
+ for (InLongMsgWrap inLongMsgWrap : preParse(bytes)) {
+ parseFormatMsg(inLongMsgWrap, collector);
+ }
+ }
+
public List<InLongMsgWrap> preParse(byte[] bytes) throws Exception {
final List<InLongMsgWrap> result = new ArrayList<>();
@@ -160,6 +185,26 @@ public abstract class AbstractInLongMsgFormatDeserializer
implements ResultTypeQ
}
}
+ public void parseFormatMsg(InLongMsgWrap inLongMsgWrap,
Collector<FormatMsg> collector) throws Exception {
+ InLongMsgHead inLongMsgHead = inLongMsgWrap.getInLongMsgHead();
+
+ for (InLongMsgBody inLongMsgBody :
inLongMsgWrap.getInLongMsgBodyList()) {
+ List<FormatMsg> formatMsgList;
+ try {
+ formatMsgList = convertFormatMsgList(inLongMsgHead,
inLongMsgBody);
+ } catch (Exception e) {
+ reportDeSerializeErrorMetrics();
+ failureHandler.onConvertingRowFailure(inLongMsgHead,
inLongMsgBody, e);
+ continue;
+ }
+ if (formatMsgList != null) {
+ for (FormatMsg formatMsg : formatMsgList) {
+ collector.collect(formatMsg);
+ }
+ }
+ }
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) {
diff --git
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgUtils.java
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgUtils.java
index f30d916341..2b69a34c5e 100644
---
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgUtils.java
+++
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgUtils.java
@@ -19,6 +19,7 @@ package org.apache.inlong.sort.formats.inlongmsg;
import org.apache.inlong.common.pojo.sort.dataflow.field.format.RowFormatInfo;
import org.apache.inlong.sort.formats.base.FieldToRowDataConverters;
+import org.apache.inlong.sort.formats.base.FormatMsg;
import org.apache.inlong.sort.formats.base.TableFormatUtils;
import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -400,6 +401,17 @@ public class InLongMsgUtils {
return producedRow;
}
+ public static FormatMsg decorateFormatMsgWithNeededHeadFields(
+ @Nullable String timeFieldName,
+ @Nullable String attributesFieldName,
+ Timestamp time,
+ Map<String, String> attributes,
+ FormatMsg formatMsg) {
+
formatMsg.setRowData(decorateRowDataWithNeededHeadFields(timeFieldName,
attributesFieldName, time, attributes,
+ (GenericRowData) formatMsg.getRowData()));
+ return formatMsg;
+ }
+
public static GenericRowData decorateRowDataWithNeededHeadFields(
@Nullable String timeFieldName,
@Nullable String attributesFieldName,
diff --git
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-binlog/src/main/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogFormatDeserializer.java
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-binlog/src/main/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogFormatDeserializer.java
index fca7ac1cdc..4d491ae018 100644
---
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-binlog/src/main/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogFormatDeserializer.java
+++
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-binlog/src/main/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogFormatDeserializer.java
@@ -18,6 +18,7 @@
package org.apache.inlong.sort.formats.inlongmsgbinlog;
import org.apache.inlong.common.pojo.sort.dataflow.field.format.RowFormatInfo;
+import org.apache.inlong.sort.formats.base.FormatMsg;
import
org.apache.inlong.sort.formats.inlongmsg.AbstractInLongMsgFormatDeserializer;
import org.apache.inlong.sort.formats.inlongmsg.FailureHandler;
import org.apache.inlong.sort.formats.inlongmsg.InLongMsgBody;
@@ -154,6 +155,19 @@ public final class InLongMsgBinlogFormatDeserializer
extends AbstractInLongMsgFo
failureHandler);
}
+ @Override
+ protected List<FormatMsg> convertFormatMsgList(InLongMsgHead head,
InLongMsgBody body) throws Exception {
+ return InLongMsgBinlogUtils.getFormatMsgData(
+ rowFormatInfo,
+ timeFieldName,
+ attributesFieldName,
+ metadataFieldName,
+ head.getAttributes(),
+ body.getData(),
+ includeUpdateBefore,
+ failureHandler);
+ }
+
@Override
public TypeInformation<RowData> getProducedType() {
return InLongMsgBinlogUtils.getRowType(
diff --git
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-binlog/src/main/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogUtils.java
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-binlog/src/main/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogUtils.java
index 53c00d0daa..78612399ef 100644
---
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-binlog/src/main/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogUtils.java
+++
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-binlog/src/main/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogUtils.java
@@ -19,6 +19,7 @@ package org.apache.inlong.sort.formats.inlongmsgbinlog;
import org.apache.inlong.common.pojo.sort.dataflow.field.format.FormatInfo;
import org.apache.inlong.common.pojo.sort.dataflow.field.format.RowFormatInfo;
+import org.apache.inlong.sort.formats.base.FormatMsg;
import org.apache.inlong.sort.formats.base.TableFormatUtils;
import org.apache.inlong.sort.formats.binlog.InLongBinlog;
import org.apache.inlong.sort.formats.inlongmsg.FailureHandler;
@@ -43,6 +44,7 @@ import java.util.List;
import java.util.Map;
import static
org.apache.inlong.sort.formats.base.TableFormatUtils.deserializeBasicField;
+import static
org.apache.inlong.sort.formats.base.TableFormatUtils.getFormatValueLength;
import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.INLONGMSG_ATTR_INTERFACE_ID;
import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.INLONGMSG_ATTR_INTERFACE_NAME;
import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.INLONGMSG_ATTR_INTERFACE_TID;
@@ -281,4 +283,149 @@ public class InLongMsgBinlogUtils {
return row;
}
+
+ public static List<FormatMsg> getFormatMsgData(
+ RowFormatInfo rowFormatInfo,
+ String timeFieldName,
+ String attributesFieldName,
+ String metadataFieldName,
+ Map<String, String> attributes,
+ byte[] bytes,
+ boolean includeUpdateBefore, FailureHandler failureHandler) throws
Exception {
+
+ InLongBinlog.RowData rowData = InLongBinlog.RowData.parseFrom(bytes);
+
+ List<FormatMsg> rows = new ArrayList<>();
+
+ switch (rowData.getEventType()) {
+ case INSERT:
+ rows.add(
+ constructFormatMsgData(
+ rowFormatInfo,
+ timeFieldName,
+ attributesFieldName,
+ metadataFieldName,
+ attributes,
+ rowData,
+ DBSYNC_OPERATION_INERT,
+ rowData.getAfterColumnsList(),
+ failureHandler));
+ break;
+ case UPDATE:
+ if (includeUpdateBefore) {
+ rows.add(
+ constructFormatMsgData(
+ rowFormatInfo,
+ timeFieldName,
+ attributesFieldName,
+ metadataFieldName,
+ attributes,
+ rowData,
+ DBSYNC_OPERATION_UPDATE_BEFORE,
+ rowData.getBeforeColumnsList(),
+ failureHandler));
+ }
+ rows.add(
+ constructFormatMsgData(
+ rowFormatInfo,
+ timeFieldName,
+ attributesFieldName,
+ metadataFieldName,
+ attributes,
+ rowData,
+ DBSYNC_OPERATION_UPDATE,
+ rowData.getAfterColumnsList(),
+ failureHandler));
+ break;
+ case DELETE:
+ rows.add(
+ constructFormatMsgData(
+ rowFormatInfo,
+ timeFieldName,
+ attributesFieldName,
+ metadataFieldName,
+ attributes,
+ rowData,
+ DBSYNC_OPERATION_DELETE,
+ rowData.getBeforeColumnsList(),
+ failureHandler));
+ break;
+ default:
+ return null;
+ }
+
+ return rows;
+ }
+
+ private static FormatMsg constructFormatMsgData(
+ RowFormatInfo rowFormatInfo,
+ String timeFieldName,
+ String attributesFieldName,
+ String metadataFieldName,
+ Map<String, String> attributes,
+ InLongBinlog.RowData rowData,
+ String operation,
+ List<InLongBinlog.Column> columns, FailureHandler failureHandler)
throws Exception {
+ List<Object> headFields = new ArrayList<>();
+ long rowDataLength = 0L;
+ if (timeFieldName != null) {
+ headFields.add(new Timestamp(rowData.getExecuteTime()));
+ }
+
+ if (attributesFieldName != null) {
+ headFields.add(attributes);
+ }
+
+ if (metadataFieldName != null) {
+ Map<String, String> metadata = new HashMap<>();
+
+ metadata.put(METADATA_INSTANCE_NAME, rowData.getInstanceName());
+ metadata.put(METADATA_SCHEMA_NAME, rowData.getSchemaName());
+ metadata.put(METADATA_TABLE_NAME, rowData.getTableName());
+ metadata.put(METADATA_OPERATION_TYPE, operation);
+ metadata.put(METADATA_EXECUTE_TIME,
Long.toString(rowData.getExecuteTime()));
+ metadata.put(METADATA_EXECUTE_ORDER,
Long.toString(rowData.getExecuteOrder()));
+ metadata.put(METADATA_TRANSFER_IP, rowData.getTransferIp());
+
+ headFields.add(metadata);
+ }
+
+ String[] dataFieldNames = rowFormatInfo.getFieldNames();
+ FormatInfo[] dataFieldFormatInfos =
rowFormatInfo.getFieldFormatInfos();
+
+ GenericRowData row = new GenericRowData(dataFieldNames.length +
headFields.size());
+
+ for (int i = 0; i < headFields.size(); ++i) {
+ row.setField(i, headFields.get(i));
+ }
+
+ Map<String, String> columnMap = new HashMap<>(columns.size());
+ for (InLongBinlog.Column column : columns) {
+ if (column.getIsNull()) {
+ columnMap.put(column.getName(), null);
+ } else {
+ columnMap.put(column.getName(), column.getValue());
+ }
+ }
+
+ for (int i = 0; i < dataFieldNames.length; ++i) {
+ String fieldName = dataFieldNames[i];
+ String fieldText = columnMap.get(fieldName);
+
+ if (fieldText == null) {
+ row.setField(i + headFields.size(), null);
+ } else {
+ Object field =
+ deserializeBasicField(
+ fieldName,
+ dataFieldFormatInfos[i],
+ fieldText,
+ null, failureHandler);
+ row.setField(i + headFields.size(), field);
+ rowDataLength += getFormatValueLength(dataFieldFormatInfos[i],
fieldText);
+ }
+ }
+
+ return new FormatMsg(row, rowDataLength);
+ }
}
diff --git
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvFormatDeserializer.java
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvFormatDeserializer.java
index aac79533ef..5263e1334d 100644
---
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvFormatDeserializer.java
+++
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvFormatDeserializer.java
@@ -20,6 +20,7 @@ package org.apache.inlong.sort.formats.inlongmsgcsv;
import org.apache.inlong.common.pojo.sort.dataflow.field.format.RowFormatInfo;
import org.apache.inlong.sort.formats.base.FieldToRowDataConverters;
import
org.apache.inlong.sort.formats.base.FieldToRowDataConverters.FieldToRowDataConverter;
+import org.apache.inlong.sort.formats.base.FormatMsg;
import org.apache.inlong.sort.formats.base.TableFormatUtils;
import org.apache.inlong.sort.formats.base.TextFormatBuilder;
import
org.apache.inlong.sort.formats.inlongmsg.AbstractInLongMsgFormatDeserializer;
@@ -31,6 +32,8 @@ import
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
@@ -54,6 +57,8 @@ import static
org.apache.inlong.sort.formats.inlongmsgcsv.InLongMsgCsvUtils.DEFA
*/
public final class InLongMsgCsvFormatDeserializer extends
AbstractInLongMsgFormatDeserializer {
+ private static final Logger LOG =
LoggerFactory.getLogger(InLongMsgCsvFormatDeserializer.class);
+
private static final long serialVersionUID = 1L;
/**
@@ -221,6 +226,9 @@ public final class InLongMsgCsvFormatDeserializer extends
AbstractInLongMsgForma
this.isDeleteEscapeChar = isDeleteEscapeChar;
this.retainPredefinedField = retainPredefinedField;
+ String[] fieldNames = rowFormatInfo.getFieldNames();
+ this.fieldNameSize = (fieldNames == null ? 0 : fieldNames.length);
+
converters = Arrays.stream(rowFormatInfo.getFieldFormatInfos())
.map(formatInfo -> FieldToRowDataConverters.createConverter(
TableFormatUtils.deriveLogicalType(formatInfo)))
@@ -256,6 +264,16 @@ public final class InLongMsgCsvFormatDeserializer extends
AbstractInLongMsgForma
@Override
protected List<RowData> convertRowDataList(InLongMsgHead head,
InLongMsgBody body) throws Exception {
+ List<String> predefinedFields = head.getPredefinedFields();
+ List<String> fields = body.getFields();
+ int actualNumFields = (predefinedFields == null ? 0 :
predefinedFields.size())
+ + (fields == null ? 0 : fields.size());
+ if (needPrint() && actualNumFields != fieldNameSize) {
+ LOG.warn("The number of fields mismatches: expected={}, actual={}.
" +
+ "PredefinedFields=[{}], Fields=[{}]", fieldNameSize,
actualNumFields,
+ predefinedFields, fields);
+ }
+
GenericRowData genericRowData = InLongMsgCsvUtils.deserializeRowData(
rowFormatInfo,
nullLiteral,
@@ -275,6 +293,39 @@ public final class InLongMsgCsvFormatDeserializer extends
AbstractInLongMsgForma
return
Collections.singletonList(InLongMsgUtils.decorateRowWithMetaData(genericRowData,
head, metadataKeys));
}
+ @Override
+ protected List<FormatMsg> convertFormatMsgList(InLongMsgHead head,
InLongMsgBody body) throws Exception {
+ List<String> predefinedFields = head.getPredefinedFields();
+ List<String> fields = body.getFields();
+ int actualNumFields = (predefinedFields == null ? 0 :
predefinedFields.size())
+ + (fields == null ? 0 : fields.size());
+
+ if (needPrint() && actualNumFields != fieldNameSize) {
+ LOG.warn("The number of fields mismatches: expected={}, actual={}.
" +
+ "PredefinedFields=[{}], Fields=[{}]", fieldNameSize,
actualNumFields,
+ predefinedFields, fields);
+ }
+
+ FormatMsg formatMsg = InLongMsgCsvUtils.deserializeFormatMsgData(
+ rowFormatInfo,
+ nullLiteral,
+ retainPredefinedField ? head.getPredefinedFields() :
Collections.emptyList(),
+ body.getFields(),
+ converters, failureHandler);
+
+ // Decorate result with time and attributes fields if needed
+ GenericRowData genericRowData =
InLongMsgUtils.decorateRowDataWithNeededHeadFields(
+ timeFieldName,
+ attributesFieldName,
+ head.getTime(),
+ head.getAttributes(),
+ (GenericRowData) formatMsg.getRowData());
+
+ genericRowData =
InLongMsgUtils.decorateRowWithMetaData(genericRowData, head, metadataKeys);
+ // Decorate result with metadata if needed
+ return Collections.singletonList(new FormatMsg(genericRowData,
formatMsg.getRowDataLength()));
+ }
+
/**
* The builder for {@link InLongMsgCsvFormatDeserializer}.
*/
diff --git
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvUtils.java
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvUtils.java
index 99a44d33c0..212086a89b 100644
---
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvUtils.java
+++
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvUtils.java
@@ -21,6 +21,7 @@ import
org.apache.inlong.common.pojo.sort.dataflow.field.format.FormatInfo;
import org.apache.inlong.common.pojo.sort.dataflow.field.format.RowFormatInfo;
import org.apache.inlong.sort.formats.base.FieldToRowDataConverters;
import
org.apache.inlong.sort.formats.base.FieldToRowDataConverters.FieldToRowDataConverter;
+import org.apache.inlong.sort.formats.base.FormatMsg;
import org.apache.inlong.sort.formats.inlongmsg.FailureHandler;
import org.apache.inlong.sort.formats.inlongmsg.InLongMsgBody;
import org.apache.inlong.sort.formats.inlongmsg.InLongMsgHead;
@@ -38,6 +39,7 @@ import java.util.Map;
import java.util.stream.Collectors;
import static
org.apache.inlong.sort.formats.base.TableFormatUtils.deserializeBasicField;
+import static
org.apache.inlong.sort.formats.base.TableFormatUtils.getFormatValueLength;
import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.INLONGMSG_ATTR_INTERFACE_ID;
import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.INLONGMSG_ATTR_INTERFACE_NAME;
import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.INLONGMSG_ATTR_INTERFACE_TID;
@@ -129,7 +131,7 @@ public class InLongMsgCsvUtils {
}).collect(Collectors.toList());
}
- public static GenericRowData deserializeRowData(
+ public static FormatMsg deserializeFormatMsgData(
RowFormatInfo rowFormatInfo,
String nullLiteral,
List<String> predefinedFields,
@@ -139,13 +141,67 @@ public class InLongMsgCsvUtils {
String[] fieldNames = rowFormatInfo.getFieldNames();
FormatInfo[] fieldFormatInfos = rowFormatInfo.getFieldFormatInfos();
- int actualNumFields = predefinedFields.size() + fields.size();
- if (actualNumFields != fieldNames.length) {
- LOG.warn("The number of fields mismatches: expected={}, actual={}.
" +
- "PredefinedFields=[{}], Fields=[{}]", fieldNames.length,
actualNumFields,
- predefinedFields, fields);
+ GenericRowData rowData = new GenericRowData(fieldNames.length);
+ long rowDataLength = 0L;
+ // Deserialize pre-defined fields
+ for (int i = 0; i < predefinedFields.size(); ++i) {
+ if (i >= fieldNames.length) {
+ break;
+ }
+
+ String fieldName = fieldNames[i];
+ FormatInfo fieldFormatInfo = fieldFormatInfos[i];
+ FieldToRowDataConverter converter = converters[i];
+ String fieldText = predefinedFields.get(i);
+
+ Object field = converter.convert(deserializeBasicField(
+ fieldName,
+ fieldFormatInfo,
+ fieldText,
+ nullLiteral, failureHandler));
+ rowData.setField(i, field);
+ rowDataLength += getFormatValueLength(fieldFormatInfo, fieldText);
+ }
+
+ // Deserialize fields
+ for (int i = 0; i < fields.size(); ++i) {
+
+ if (i + predefinedFields.size() >= fieldNames.length) {
+ break;
+ }
+
+ String fieldName = fieldNames[i + predefinedFields.size()];
+ FormatInfo fieldFormatInfo = fieldFormatInfos[i +
predefinedFields.size()];
+ FieldToRowDataConverter converter = converters[i +
predefinedFields.size()];
+ String fieldText = fields.get(i);
+
+ Object field = converter.convert(deserializeBasicField(
+ fieldName,
+ fieldFormatInfo,
+ fieldText,
+ nullLiteral, failureHandler));
+ rowData.setField(i + predefinedFields.size(), field);
+ rowDataLength += getFormatValueLength(fieldFormatInfo, fieldText);
+ }
+
+ // If schema length is larger than fields' length, use `null` to fill
in the blanks
+ for (int i = predefinedFields.size() + fields.size(); i <
fieldNames.length; ++i) {
+ rowData.setField(i, null);
}
+ return new FormatMsg(rowData, rowDataLength);
+ }
+
+ public static GenericRowData deserializeRowData(
+ RowFormatInfo rowFormatInfo,
+ String nullLiteral,
+ List<String> predefinedFields,
+ List<String> fields,
+ FieldToRowDataConverters.FieldToRowDataConverter[] converters,
+ FailureHandler failureHandler) throws Exception {
+ String[] fieldNames = rowFormatInfo.getFieldNames();
+ FormatInfo[] fieldFormatInfos = rowFormatInfo.getFieldFormatInfos();
+
GenericRowData rowData = new GenericRowData(fieldNames.length);
// Deserialize pre-defined fields
diff --git
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvFormatDeserializer.java
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvFormatDeserializer.java
index a9caf3bac5..7cb030471c 100644
---
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvFormatDeserializer.java
+++
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvFormatDeserializer.java
@@ -20,6 +20,7 @@ package org.apache.inlong.sort.formats.inlongmsgkv;
import org.apache.inlong.common.pojo.sort.dataflow.field.format.RowFormatInfo;
import org.apache.inlong.sort.formats.base.FieldToRowDataConverters;
import
org.apache.inlong.sort.formats.base.FieldToRowDataConverters.FieldToRowDataConverter;
+import org.apache.inlong.sort.formats.base.FormatMsg;
import org.apache.inlong.sort.formats.base.TableFormatUtils;
import org.apache.inlong.sort.formats.base.TextFormatBuilder;
import
org.apache.inlong.sort.formats.inlongmsg.AbstractInLongMsgFormatDeserializer;
@@ -211,6 +212,9 @@ public final class InLongMsgKvFormatDeserializer extends
AbstractInLongMsgFormat
this.nullLiteral = nullLiteral;
this.isDeleteEscapeChar = isDeleteEscapeChar;
+ String[] fieldNames = rowFormatInfo.getFieldNames();
+ this.fieldNameSize = (fieldNames == null ? 0 : fieldNames.length);
+
converters = Arrays.stream(rowFormatInfo.getFieldFormatInfos())
.map(formatInfo -> FieldToRowDataConverters.createConverter(
TableFormatUtils.deriveLogicalType(formatInfo)))
@@ -264,6 +268,25 @@ public final class InLongMsgKvFormatDeserializer extends
AbstractInLongMsgFormat
genericRowData));
}
+ protected List<FormatMsg> convertFormatMsgList(InLongMsgHead head,
InLongMsgBody body) throws Exception {
+ FormatMsg formatMsg = InLongMsgKvUtils.deserializeFormatMsgData(
+ rowFormatInfo,
+ nullLiteral,
+ retainPredefinedField ? head.getPredefinedFields() :
Collections.emptyList(),
+ body.getEntries(),
+ converters,
+ failureHandler);
+
+ // Decorate result with time and attributes fields if needed
+
formatMsg.setRowData(InLongMsgUtils.decorateRowDataWithNeededHeadFields(
+ timeFieldName,
+ attributesFieldName,
+ head.getTime(),
+ head.getAttributes(),
+ (GenericRowData) formatMsg.getRowData()));
+ return Collections.singletonList(formatMsg);
+ }
+
/**
* The builder for {@link InLongMsgKvFormatDeserializer}.
*/
diff --git
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvUtils.java
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvUtils.java
index 703a064384..772e5619d7 100644
---
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvUtils.java
+++
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvUtils.java
@@ -20,6 +20,7 @@ package org.apache.inlong.sort.formats.inlongmsgkv;
import org.apache.inlong.common.pojo.sort.dataflow.field.format.FormatInfo;
import org.apache.inlong.common.pojo.sort.dataflow.field.format.RowFormatInfo;
import
org.apache.inlong.sort.formats.base.FieldToRowDataConverters.FieldToRowDataConverter;
+import org.apache.inlong.sort.formats.base.FormatMsg;
import org.apache.inlong.sort.formats.inlongmsg.FailureHandler;
import org.apache.inlong.sort.formats.inlongmsg.InLongMsgBody;
import org.apache.inlong.sort.formats.inlongmsg.InLongMsgHead;
@@ -34,6 +35,7 @@ import java.util.Map;
import java.util.stream.Collectors;
import static
org.apache.inlong.sort.formats.base.TableFormatUtils.deserializeBasicField;
+import static
org.apache.inlong.sort.formats.base.TableFormatUtils.getFormatValueLength;
import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.INLONGMSG_ATTR_INTERFACE_NAME;
import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.INLONGMSG_ATTR_INTERFACE_TID;
import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.INLONGMSG_ATTR_STREAM_ID;
@@ -172,4 +174,57 @@ public class InLongMsgKvUtils {
return row;
}
+
+ public static FormatMsg deserializeFormatMsgData(
+ RowFormatInfo rowFormatInfo,
+ String nullLiteral,
+ List<String> predefinedFields,
+ Map<String, String> entries,
+ FieldToRowDataConverter[] converters,
+ FailureHandler failureHandler) throws Exception {
+ String[] fieldNames = rowFormatInfo.getFieldNames();
+ FormatInfo[] fieldFormatInfos = rowFormatInfo.getFieldFormatInfos();
+
+ GenericRowData row = new GenericRowData(fieldNames.length);
+ long rowDataLength = 0L;
+ for (int i = 0; i < predefinedFields.size(); ++i) {
+
+ if (i >= fieldNames.length) {
+ break;
+ }
+
+ String fieldName = fieldNames[i];
+ FormatInfo fieldFormatInfo = fieldFormatInfos[i];
+ FieldToRowDataConverter converter = converters[i];
+ String fieldText = predefinedFields.get(i);
+
+ Object field = converter.convert(
+ deserializeBasicField(
+ fieldName,
+ fieldFormatInfo,
+ fieldText,
+ nullLiteral, failureHandler));
+ row.setField(i, field);
+ rowDataLength += getFormatValueLength(fieldFormatInfo, fieldText);
+ }
+
+ for (int i = predefinedFields.size(); i < fieldNames.length; ++i) {
+
+ String fieldName = fieldNames[i];
+ FormatInfo fieldFormatInfo = fieldFormatInfos[i];
+ FieldToRowDataConverter converter = converters[i];
+ String fieldText = entries.get(fieldName);
+
+ Object field = converter.convert(deserializeBasicField(
+ fieldName,
+ fieldFormatInfo,
+ fieldText,
+ nullLiteral,
+ failureHandler));
+ row.setField(i, field);
+ rowDataLength += getFormatValueLength(fieldFormatInfo, fieldText);
+ }
+
+ return new FormatMsg(row, rowDataLength);
+ }
}
diff --git
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-pb/src/main/java/org/apache/inlong/sort/formats/inlongmsgpb/InLongMsgPbDeserializationSchema.java
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-pb/src/main/java/org/apache/inlong/sort/formats/inlongmsgpb/InLongMsgPbDeserializationSchema.java
index ce446bf13f..fbbfb1e7c3 100644
---
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-pb/src/main/java/org/apache/inlong/sort/formats/inlongmsgpb/InLongMsgPbDeserializationSchema.java
+++
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-pb/src/main/java/org/apache/inlong/sort/formats/inlongmsgpb/InLongMsgPbDeserializationSchema.java
@@ -20,6 +20,8 @@ package org.apache.inlong.sort.formats.inlongmsgpb;
import org.apache.inlong.common.msg.InLongMsg;
import org.apache.inlong.sdk.commons.protocol.ProxySdk.MessageObj;
import org.apache.inlong.sdk.commons.protocol.ProxySdk.MessageObjs;
+import org.apache.inlong.sort.formats.base.DefaultDeserializationSchema;
+import org.apache.inlong.sort.formats.base.FormatMsg;
import com.google.common.base.Objects;
import org.apache.flink.api.common.serialization.DeserializationSchema;
@@ -82,7 +84,39 @@ public class InLongMsgPbDeserializationSchema implements
DeserializationSchema<R
List<MessageObj> msgList = msgObjs.getMsgsList();
for (MessageObj msg : msgList) {
RowData row =
deserializationSchema.deserialize(msg.getBody().toByteArray());
- this.emitRow(msg, (GenericRowData) row, out);
+ RowData newRow = emitRow(msg, (GenericRowData) row);
+ if (newRow == null) {
+ out.collect(row);
+ continue;
+ }
+ out.collect(newRow);
+ }
+ }
+
+ public void deserializeFormatMsg(byte[] message, Collector<FormatMsg> out)
throws Exception {
+ byte[] decompressed = decompressor.decompress(message);
+ MessageObjs msgObjs = MessageObjs.parseFrom(decompressed);
+ List<MessageObj> msgList = msgObjs.getMsgsList();
+ for (MessageObj msg : msgList) {
+ byte[] body = msg.getBody().toByteArray();
+ if (deserializationSchema instanceof DeserializationSchema) {
+ FormatMsg formatMsg = ((DefaultDeserializationSchema)
deserializationSchema)
+ .deserializeFormatMsg(body);
+ RowData newRow = emitRow(msg, (GenericRowData)
formatMsg.getRowData());
+ if (newRow != null) {
+ formatMsg.setRowData(newRow);
+ }
+ out.collect(formatMsg);
+ } else {
+ RowData row =
deserializationSchema.deserialize(msg.getBody().toByteArray());
+ RowData newRow = emitRow(msg, (GenericRowData) row);
+ long bodyLength = (body == null ? 0 : body.length);
+ if (newRow == null) {
+ out.collect(new FormatMsg(row, bodyLength));
+ continue;
+ }
+ out.collect(new FormatMsg(newRow, bodyLength));
+ }
}
}
@@ -130,10 +164,9 @@ public class InLongMsgPbDeserializationSchema implements
DeserializationSchema<R
}
/** add metadata column */
- private void emitRow(MessageObj message, GenericRowData physicalRow,
Collector<RowData> out) {
+ private RowData emitRow(MessageObj message, GenericRowData physicalRow) {
if (metadataConverters.length == 0) {
- out.collect(physicalRow);
- return;
+ return null;
}
final int physicalArity = physicalRow.getArity();
final int metadataArity = metadataConverters.length;
@@ -146,6 +179,6 @@ public class InLongMsgPbDeserializationSchema implements
DeserializationSchema<R
producedRow.setField(
physicalArity + metadataPos,
metadataConverters[metadataPos].read(message));
}
- out.collect(producedRow);
+ return producedRow;
}
}
diff --git
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvFormatDeserializer.java
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvFormatDeserializer.java
index 415c1efd64..ab095ddaa8 100644
---
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvFormatDeserializer.java
+++
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvFormatDeserializer.java
@@ -20,6 +20,7 @@ package org.apache.inlong.sort.formats.inlongmsgtlogcsv;
import org.apache.inlong.common.pojo.sort.dataflow.field.format.RowFormatInfo;
import org.apache.inlong.sort.formats.base.FieldToRowDataConverters;
import
org.apache.inlong.sort.formats.base.FieldToRowDataConverters.FieldToRowDataConverter;
+import org.apache.inlong.sort.formats.base.FormatMsg;
import org.apache.inlong.sort.formats.base.TableFormatUtils;
import org.apache.inlong.sort.formats.base.TextFormatBuilder;
import
org.apache.inlong.sort.formats.inlongmsg.AbstractInLongMsgFormatDeserializer;
@@ -31,6 +32,8 @@ import
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
@@ -52,6 +55,8 @@ import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.DEFAULT_TI
*/
public final class InLongMsgTlogCsvFormatDeserializer extends
AbstractInLongMsgFormatDeserializer {
+ private static final Logger LOG =
LoggerFactory.getLogger(InLongMsgTlogCsvFormatDeserializer.class);
+
private static final long serialVersionUID = 1L;
/**
@@ -226,6 +231,10 @@ public final class InLongMsgTlogCsvFormatDeserializer
extends AbstractInLongMsgF
this.isIncludeFirstSegment = isIncludeFirstSegment;
this.isDeleteHeadDelimiter = isDeleteHeadDelimiter;
this.isDeleteEscapeChar = isDeleteEscapeChar;
+
+ String[] fieldNames = rowFormatInfo.getFieldNames();
+ this.fieldNameSize = (fieldNames == null ? 0 : fieldNames.length);
+
converters = Arrays.stream(rowFormatInfo.getFieldFormatInfos())
.map(formatInfo -> FieldToRowDataConverters.createConverter(
TableFormatUtils.deriveLogicalType(formatInfo)))
@@ -254,6 +263,14 @@ public final class InLongMsgTlogCsvFormatDeserializer
extends AbstractInLongMsgF
@Override
protected List<RowData> convertRowDataList(InLongMsgHead head,
InLongMsgBody body) throws Exception {
+ List<String> predefinedFields = head.getPredefinedFields();
+ List<String> fields = body.getFields();
+ int actualNumFields = (predefinedFields == null ? 0 :
predefinedFields.size())
+ + (fields == null ? 0 : fields.size());
+ if (needPrint() && actualNumFields != fieldNameSize) {
+ LOG.warn("The number of fields mismatches: " + fieldNameSize +
+ " expected, but was " + actualNumFields + ".");
+ }
GenericRowData dataRow =
InLongMsgTlogCsvUtils.deserializeRowData(
rowFormatInfo,
@@ -272,6 +289,35 @@ public final class InLongMsgTlogCsvFormatDeserializer
extends AbstractInLongMsgF
return
Collections.singletonList(InLongMsgUtils.decorateRowWithMetaData(genericRowData,
head, metadataKeys));
}
+ protected List<FormatMsg> convertFormatMsgList(InLongMsgHead head,
InLongMsgBody body) throws Exception {
+ List<String> predefinedFields = head.getPredefinedFields();
+ List<String> fields = body.getFields();
+ int actualNumFields = (predefinedFields == null ? 0 :
predefinedFields.size())
+ + (fields == null ? 0 : fields.size());
+
+ if (needPrint() && actualNumFields != fieldNameSize) {
+ LOG.warn("The number of fields mismatches: " + fieldNameSize +
+ " expected, but was " + actualNumFields + ".");
+ }
+
+ FormatMsg formatMsg =
+ InLongMsgTlogCsvUtils.deserializeFormatMsgData(
+ rowFormatInfo,
+ nullLiteral,
+ head.getPredefinedFields(),
+ body.getFields(),
+ converters, failureHandler);
+
+ GenericRowData genericRowData =
InLongMsgUtils.decorateRowDataWithNeededHeadFields(
+ timeFieldName,
+ attributesFieldName,
+ head.getTime(),
+ head.getAttributes(),
+ (GenericRowData) formatMsg.getRowData());
+
formatMsg.setRowData(InLongMsgUtils.decorateRowWithMetaData(genericRowData,
head, metadataKeys));
+ return Collections.singletonList(formatMsg);
+ }
+
/**
* The builder for {@link InLongMsgTlogCsvFormatDeserializer}.
*/
diff --git
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvUtils.java
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvUtils.java
index 1e72a668b6..db4787f83b 100644
---
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvUtils.java
+++
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvUtils.java
@@ -20,6 +20,7 @@ package org.apache.inlong.sort.formats.inlongmsgtlogcsv;
import org.apache.inlong.common.pojo.sort.dataflow.field.format.FormatInfo;
import org.apache.inlong.common.pojo.sort.dataflow.field.format.RowFormatInfo;
import org.apache.inlong.sort.formats.base.FieldToRowDataConverters;
+import org.apache.inlong.sort.formats.base.FormatMsg;
import org.apache.inlong.sort.formats.inlongmsg.FailureHandler;
import org.apache.inlong.sort.formats.inlongmsg.InLongMsgBody;
import org.apache.inlong.sort.formats.inlongmsg.InLongMsgHead;
@@ -36,6 +37,7 @@ import java.util.List;
import java.util.Map;
import static
org.apache.inlong.sort.formats.base.TableFormatUtils.deserializeBasicField;
+import static
org.apache.inlong.sort.formats.base.TableFormatUtils.getFormatValueLength;
import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.INLONGMSG_ATTR_TIME_DT;
import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.INLONGMSG_ATTR_TIME_T;
import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.getPredefinedFields;
@@ -132,13 +134,66 @@ public class InLongMsgTlogCsvUtils {
String[] fieldNames = rowFormatInfo.getFieldNames();
FormatInfo[] fieldFormatInfos = rowFormatInfo.getFieldFormatInfos();
- int actualNumFields = predefinedFields.size() + fields.size();
- if (actualNumFields != fieldNames.length) {
- LOG.warn("The number of fields mismatches: " + fieldNames.length +
- " expected, but was " + actualNumFields + ".");
+ GenericRowData rowData = new GenericRowData(fieldNames.length);
+
+ for (int i = 0; i < predefinedFields.size(); ++i) {
+
+ if (i >= fieldNames.length) {
+ break;
+ }
+
+ String fieldName = fieldNames[i];
+ FormatInfo fieldFormatInfo = fieldFormatInfos[i];
+
+ String fieldText = predefinedFields.get(i);
+
+ Object field =
+ converters[i].convert(deserializeBasicField(
+ fieldName,
+ fieldFormatInfo,
+ fieldText,
+ nullLiteral, failureHandler));
+ rowData.setField(i, field);
+ }
+
+ for (int i = 0; i < fields.size(); ++i) {
+
+ if (i + predefinedFields.size() >= fieldNames.length) {
+ break;
+ }
+
+ String fieldName = fieldNames[i + predefinedFields.size()];
+ FormatInfo fieldFormatInfo = fieldFormatInfos[i +
predefinedFields.size()];
+
+ String fieldText = fields.get(i);
+
+ Object field =
+ converters[i +
predefinedFields.size()].convert(deserializeBasicField(
+ fieldName,
+ fieldFormatInfo,
+ fieldText,
+ nullLiteral, failureHandler));
+ rowData.setField(i + predefinedFields.size(), field);
+ }
+
+ for (int i = predefinedFields.size() + fields.size(); i <
fieldNames.length; ++i) {
+ rowData.setField(i, null);
}
+ return rowData;
+ }
+
+ public static FormatMsg deserializeFormatMsgData(RowFormatInfo
rowFormatInfo,
+ String nullLiteral,
+ List<String> predefinedFields,
+ List<String> fields,
+ FieldToRowDataConverters.FieldToRowDataConverter[] converters,
+ FailureHandler failureHandler) throws Exception {
+ String[] fieldNames = rowFormatInfo.getFieldNames();
+ FormatInfo[] fieldFormatInfos = rowFormatInfo.getFieldFormatInfos();
+
GenericRowData rowData = new GenericRowData(fieldNames.length);
+ long rowDataLength = 0L;
for (int i = 0; i < predefinedFields.size(); ++i) {
@@ -158,6 +213,7 @@ public class InLongMsgTlogCsvUtils {
fieldText,
nullLiteral, failureHandler));
rowData.setField(i, field);
+ rowDataLength += getFormatValueLength(fieldFormatInfo, fieldText);
}
for (int i = 0; i < fields.size(); ++i) {
@@ -178,12 +234,13 @@ public class InLongMsgTlogCsvUtils {
fieldText,
nullLiteral, failureHandler));
rowData.setField(i + predefinedFields.size(), field);
+ rowDataLength += getFormatValueLength(fieldFormatInfo, fieldText);
}
for (int i = predefinedFields.size() + fields.size(); i <
fieldNames.length; ++i) {
rowData.setField(i, null);
}
- return rowData;
+ return new FormatMsg(rowData, rowDataLength);
}
}
diff --git
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogkv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvFormatDeserializer.java
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogkv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvFormatDeserializer.java
index 2d54f9db13..dff67d924f 100644
---
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogkv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvFormatDeserializer.java
+++
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogkv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvFormatDeserializer.java
@@ -20,6 +20,7 @@ package org.apache.inlong.sort.formats.inlongmsgtlogkv;
import org.apache.inlong.common.pojo.sort.dataflow.field.format.RowFormatInfo;
import org.apache.inlong.sort.formats.base.FieldToRowDataConverters;
import
org.apache.inlong.sort.formats.base.FieldToRowDataConverters.FieldToRowDataConverter;
+import org.apache.inlong.sort.formats.base.FormatMsg;
import org.apache.inlong.sort.formats.base.TableFormatUtils;
import org.apache.inlong.sort.formats.base.TextFormatBuilder;
import
org.apache.inlong.sort.formats.inlongmsg.AbstractInLongMsgFormatDeserializer;
@@ -189,6 +190,9 @@ public final class InLongMsgTlogKvFormatDeserializer
extends AbstractInLongMsgFo
this.nullLiteral = nullLiteral;
this.isDeleteEscapeChar = isDeleteEscapeChar;
+ String[] fieldNames = rowFormatInfo.getFieldNames();
+ this.fieldNameSize = (fieldNames == null ? 0 : fieldNames.length);
+
this.converters = Arrays.stream(rowFormatInfo.getFieldFormatInfos())
.map(formatInfo -> FieldToRowDataConverters.createConverter(
TableFormatUtils.deriveLogicalType(formatInfo)))
@@ -237,6 +241,26 @@ public final class InLongMsgTlogKvFormatDeserializer
extends AbstractInLongMsgFo
return Collections.singletonList(rowData);
}
+ @Override
+ protected List<FormatMsg> convertFormatMsgList(InLongMsgHead head,
InLongMsgBody body) throws Exception {
+ FormatMsg formatMsg =
+ InLongMsgTlogKvUtils.deserializeFormatMsgData(
+ rowFormatInfo,
+ nullLiteral,
+ head.getPredefinedFields(),
+ body.getEntries(), converters, failureHandler);
+
+ RowData rowData = InLongMsgUtils.decorateRowWithNeededHeadFields(
+ timeFieldName,
+ attributesFieldName,
+ head.getTime(),
+ head.getAttributes(),
+ (GenericRowData) formatMsg.getRowData());
+ formatMsg.setRowData(rowData);
+
+ return Collections.singletonList(formatMsg);
+ }
+
/**
* The builder for {@link InLongMsgTlogKvFormatDeserializer}.
*/
diff --git
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogkv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvUtils.java
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogkv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvUtils.java
index f42caf3053..57893cbde3 100644
---
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogkv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvUtils.java
+++
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-tlogkv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvUtils.java
@@ -20,6 +20,7 @@ package org.apache.inlong.sort.formats.inlongmsgtlogkv;
import org.apache.inlong.common.pojo.sort.dataflow.field.format.FormatInfo;
import org.apache.inlong.common.pojo.sort.dataflow.field.format.RowFormatInfo;
import
org.apache.inlong.sort.formats.base.FieldToRowDataConverters.FieldToRowDataConverter;
+import org.apache.inlong.sort.formats.base.FormatMsg;
import org.apache.inlong.sort.formats.inlongmsg.FailureHandler;
import org.apache.inlong.sort.formats.inlongmsg.InLongMsgBody;
import org.apache.inlong.sort.formats.inlongmsg.InLongMsgHead;
@@ -34,6 +35,7 @@ import java.util.List;
import java.util.Map;
import static
org.apache.inlong.sort.formats.base.TableFormatUtils.deserializeBasicField;
+import static
org.apache.inlong.sort.formats.base.TableFormatUtils.getFormatValueLength;
import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.INLONGMSG_ATTR_TIME_DT;
import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.INLONGMSG_ATTR_TIME_T;
import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.getPredefinedFields;
@@ -159,4 +161,62 @@ public class InLongMsgTlogKvUtils {
return rowData;
}
+ /**
+ * Deserializes the row from the given entries.
+ *
+ * @param rowFormatInfo The format information of the row.
+ * @param nullLiteral The literal for null values.
+ * @param predefinedFields The predefined fields.
+ * @param entries The entries.
+ * @return The row FormatMsg from the given entries.
+ */
+ public static FormatMsg deserializeFormatMsgData(
+ RowFormatInfo rowFormatInfo,
+ String nullLiteral,
+ List<String> predefinedFields,
+ Map<String, String> entries,
+ FieldToRowDataConverter[] converters,
+ FailureHandler failureHandler) throws Exception {
+ String[] fieldNames = rowFormatInfo.getFieldNames();
+ FormatInfo[] fieldFormatInfos = rowFormatInfo.getFieldFormatInfos();
+
+ GenericRowData rowData = new GenericRowData(fieldNames.length);
+ long rowDataLength = 0L;
+ for (int i = 0; i < predefinedFields.size(); ++i) {
+
+ if (i >= fieldNames.length) {
+ break;
+ }
+
+ String fieldName = fieldNames[i];
+ FormatInfo fieldFormatInfo = fieldFormatInfos[i];
+ String fieldText = predefinedFields.get(i);
+ FieldToRowDataConverter converter = converters[i];
+ Object field = converter.convert(
+ deserializeBasicField(
+ fieldName,
+ fieldFormatInfo,
+ fieldText,
+ nullLiteral, failureHandler));
+ rowData.setField(i, field);
+ rowDataLength += getFormatValueLength(fieldFormatInfos[i],
fieldText);
+ }
+
+ for (int i = predefinedFields.size(); i < fieldNames.length; ++i) {
+ String fieldName = fieldNames[i];
+ FormatInfo fieldFormatInfo = fieldFormatInfos[i];
+ String fieldText = entries.get(fieldName);
+ FieldToRowDataConverter converter = converters[i];
+ Object field = converter.convert(deserializeBasicField(
+ fieldName,
+ fieldFormatInfo,
+ fieldText,
+ nullLiteral, failureHandler));
+ rowData.setField(i, field);
+ rowDataLength += getFormatValueLength(fieldFormatInfos[i],
fieldText);
+ }
+
+ return new FormatMsg(rowData, rowDataLength);
+ }
+
}
diff --git
a/inlong-sort/sort-formats/format-rowdata/format-rowdata-base/src/main/java/org/apache/inlong/sort/formats/base/DefaultDeserializationSchema.java
b/inlong-sort/sort-formats/format-rowdata/format-rowdata-base/src/main/java/org/apache/inlong/sort/formats/base/DefaultDeserializationSchema.java
index c2d55af15d..c0525efa99 100644
---
a/inlong-sort/sort-formats/format-rowdata/format-rowdata-base/src/main/java/org/apache/inlong/sort/formats/base/DefaultDeserializationSchema.java
+++
b/inlong-sort/sort-formats/format-rowdata/format-rowdata-base/src/main/java/org/apache/inlong/sort/formats/base/DefaultDeserializationSchema.java
@@ -29,6 +29,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
+import java.time.Instant;
import java.util.Objects;
import static org.apache.flink.util.Preconditions.checkArgument;
@@ -42,6 +43,10 @@ public abstract class DefaultDeserializationSchema<T>
implements Deserialization
private static final Logger LOG =
LoggerFactory.getLogger(DefaultDeserializationSchema.class);
+ protected long lastPrintTimestamp = 0L;
+ protected long PRINT_TIMESTAMP_INTERVAL = 60 * 1000L;
+ protected int fieldNameSize = 0;
+
protected FailureHandler failureHandler;
/**
@@ -107,8 +112,19 @@ public abstract class DefaultDeserializationSchema<T>
implements Deserialization
}
}
+ protected boolean needPrint() {
+ long now = Instant.now().toEpochMilli();
+ if (now - lastPrintTimestamp > PRINT_TIMESTAMP_INTERVAL) {
+ lastPrintTimestamp = now;
+ return true;
+ }
+ return false;
+ }
+
protected abstract T deserializeInternal(byte[] bytes) throws Exception;
+ public abstract FormatMsg deserializeFormatMsg(byte[] bytes) throws
Exception;
+
@Override
public boolean equals(Object object) {
if (this == object) {
diff --git
a/inlong-sort/sort-formats/format-rowdata/format-rowdata-csv/src/main/java/org/apache/inlong/sort/formats/csv/CsvRowDataDeserializationSchema.java
b/inlong-sort/sort-formats/format-rowdata/format-rowdata-csv/src/main/java/org/apache/inlong/sort/formats/csv/CsvRowDataDeserializationSchema.java
index 665b1b2fc5..c705c4344f 100644
---
a/inlong-sort/sort-formats/format-rowdata/format-rowdata-csv/src/main/java/org/apache/inlong/sort/formats/csv/CsvRowDataDeserializationSchema.java
+++
b/inlong-sort/sort-formats/format-rowdata/format-rowdata-csv/src/main/java/org/apache/inlong/sort/formats/csv/CsvRowDataDeserializationSchema.java
@@ -22,6 +22,7 @@ import
org.apache.inlong.common.pojo.sort.dataflow.field.format.RowFormatInfo;
import org.apache.inlong.sort.formats.base.DefaultDeserializationSchema;
import org.apache.inlong.sort.formats.base.FieldToRowDataConverters;
import
org.apache.inlong.sort.formats.base.FieldToRowDataConverters.FieldToRowDataConverter;
+import org.apache.inlong.sort.formats.base.FormatMsg;
import org.apache.inlong.sort.formats.base.TableFormatUtils;
import org.apache.inlong.sort.formats.inlongmsg.FailureHandler;
@@ -48,6 +49,7 @@ import static
org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_E
import static
org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_IGNORE_ERRORS;
import static
org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_NULL_LITERAL;
import static
org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_QUOTE_CHARACTER;
+import static
org.apache.inlong.sort.formats.base.TableFormatUtils.getFormatValueLength;
import static org.apache.inlong.sort.formats.util.StringUtils.splitCsv;
/**
@@ -127,6 +129,9 @@ public final class CsvRowDataDeserializationSchema extends
DefaultDeserializatio
this.quoteChar = quoteChar;
this.nullLiteral = nullLiteral;
+ String[] fieldNames = rowFormatInfo.getFieldNames();
+ this.fieldNameSize = (fieldNames == null ? 0 : fieldNames.length);
+
converters = Arrays.stream(rowFormatInfo.getFieldFormatInfos())
.map(formatInfo -> FieldToRowDataConverters.createConverter(
TableFormatUtils.deriveLogicalType(formatInfo)))
@@ -151,6 +156,9 @@ public final class CsvRowDataDeserializationSchema extends
DefaultDeserializatio
this.quoteChar = quoteChar;
this.nullLiteral = nullLiteral;
+ String[] fieldNames = rowFormatInfo.getFieldNames();
+ this.fieldNameSize = (fieldNames == null ? 0 : fieldNames.length);
+
converters = Arrays.stream(rowFormatInfo.getFieldFormatInfos())
.map(formatInfo -> FieldToRowDataConverters.createConverter(
TableFormatUtils.deriveLogicalType(formatInfo)))
@@ -246,10 +254,12 @@ public final class CsvRowDataDeserializationSchema
extends DefaultDeserializatio
FormatInfo[] fieldFormatInfos =
rowFormatInfo.getFieldFormatInfos();
String[] fieldTexts = splitCsv(text, delimiter, escapeChar,
quoteChar);
- if (fieldTexts.length != fieldNames.length) {
+
+ if (needPrint() && fieldTexts.length != fieldNameSize) {
LOG.warn("The number of fields mismatches: expected=[{}],
actual=[{}]. Text=[{}].",
fieldNames.length, fieldTexts.length, text);
}
+
GenericRowData rowData = new GenericRowData(fieldNames.length);
for (int i = 0; i < fieldNames.length; ++i) {
@@ -274,6 +284,49 @@ public final class CsvRowDataDeserializationSchema extends
DefaultDeserializatio
return null;
}
+ @Override
+ public FormatMsg deserializeFormatMsg(byte[] message) throws Exception {
+ if (message == null) {
+ return null;
+ }
+ String text = new String(message, Charset.forName(charset));
+ long rowDataLength = 0L;
+ try {
+ String[] fieldNames = rowFormatInfo.getFieldNames();
+ FormatInfo[] fieldFormatInfos =
rowFormatInfo.getFieldFormatInfos();
+
+ String[] fieldTexts = splitCsv(text, delimiter, escapeChar,
quoteChar);
+
+ if (needPrint() && fieldTexts.length != fieldNameSize) {
+ LOG.warn("The number of fields mismatches: expected=[{}],
actual=[{}]. Text=[{}].",
+ fieldNames.length, fieldTexts.length, text);
+ }
+
+ GenericRowData rowData = new GenericRowData(fieldNames.length);
+
+ for (int i = 0; i < fieldNames.length; ++i) {
+ if (i >= fieldTexts.length) {
+ rowData.setField(i, null);
+ } else {
+ Object field =
+ TableFormatUtils.deserializeBasicField(
+ fieldNames[i],
+ fieldFormatInfos[i],
+ fieldTexts[i],
+ nullLiteral, failureHandler);
+
+ rowData.setField(i, converters[i].convert(field));
+ rowDataLength += getFormatValueLength(fieldFormatInfos[i],
fieldTexts[i]);
+ }
+ }
+ return new FormatMsg(rowData, rowDataLength);
+ } catch (Throwable t) {
+ failureHandler.onParsingMsgFailure(text, new RuntimeException(
+ String.format("Could not properly deserialize csv.
Text=[{}].", text), t));
+ }
+ return null;
+ }
+
@Override
public boolean isEndOfStream(RowData nextElement) {
return false;
diff --git
a/inlong-sort/sort-formats/format-rowdata/format-rowdata-json/src/main/java/org/apache/inlong/sort/formats/json/JsonRowDataDeserializationSchema.java
b/inlong-sort/sort-formats/format-rowdata/format-rowdata-json/src/main/java/org/apache/inlong/sort/formats/json/JsonRowDataDeserializationSchema.java
index 386f75f998..9037470c20 100644
---
a/inlong-sort/sort-formats/format-rowdata/format-rowdata-json/src/main/java/org/apache/inlong/sort/formats/json/JsonRowDataDeserializationSchema.java
+++
b/inlong-sort/sort-formats/format-rowdata/format-rowdata-json/src/main/java/org/apache/inlong/sort/formats/json/JsonRowDataDeserializationSchema.java
@@ -18,6 +18,7 @@
package org.apache.inlong.sort.formats.json;
import org.apache.inlong.sort.formats.base.DefaultDeserializationSchema;
+import org.apache.inlong.sort.formats.base.FormatMsg;
import org.apache.inlong.sort.formats.base.TextFormatOptions.TimestampFormat;
import
org.apache.inlong.sort.formats.json.FieldToRowDataConverters.FieldToRowDataConverter;
@@ -119,6 +120,23 @@ public class JsonRowDataDeserializationSchema extends
DefaultDeserializationSche
return rowData;
}
+ @Override
+ public FormatMsg deserializeFormatMsg(byte[] message) throws Exception {
+ if (message == null) {
+ return null;
+ }
+
+ String jsonStr = new String(message, charset);
+ RowData rowData = null;
+ try {
+ rowData = (RowData) runtimeConverter.convert(jsonStr);
+ } catch (Throwable t) {
+ failureHandler.onParsingMsgFailure(jsonStr, new RuntimeException(
+ String.format("Could not properly deserialize json.
Text=[%s].", jsonStr), t));
+ }
+ return new FormatMsg(rowData, message.length);
+ }
+
@Override
public boolean isEndOfStream(RowData nextElement) {
return false;
diff --git
a/inlong-sort/sort-formats/format-rowdata/format-rowdata-kv/src/main/java/org/apache/inlong/sort/formats/kv/KvRowDataDeserializationSchema.java
b/inlong-sort/sort-formats/format-rowdata/format-rowdata-kv/src/main/java/org/apache/inlong/sort/formats/kv/KvRowDataDeserializationSchema.java
index 771cc5bd17..200719d145 100644
---
a/inlong-sort/sort-formats/format-rowdata/format-rowdata-kv/src/main/java/org/apache/inlong/sort/formats/kv/KvRowDataDeserializationSchema.java
+++
b/inlong-sort/sort-formats/format-rowdata/format-rowdata-kv/src/main/java/org/apache/inlong/sort/formats/kv/KvRowDataDeserializationSchema.java
@@ -21,6 +21,7 @@ import
org.apache.inlong.common.pojo.sort.dataflow.field.format.FormatInfo;
import org.apache.inlong.common.pojo.sort.dataflow.field.format.RowFormatInfo;
import org.apache.inlong.sort.formats.base.DefaultDeserializationSchema;
import org.apache.inlong.sort.formats.base.FieldToRowDataConverters;
+import org.apache.inlong.sort.formats.base.FormatMsg;
import org.apache.inlong.sort.formats.base.TableFormatForRowDataUtils;
import org.apache.inlong.sort.formats.inlongmsg.FailureHandler;
@@ -46,6 +47,7 @@ import static
org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_K
import static
org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_NULL_LITERAL;
import static
org.apache.inlong.sort.formats.base.TableFormatConstants.DEFAULT_QUOTE_CHARACTER;
import static
org.apache.inlong.sort.formats.base.TableFormatUtils.deserializeBasicField;
+import static
org.apache.inlong.sort.formats.base.TableFormatUtils.getFormatValueLength;
import static org.apache.inlong.sort.formats.util.StringUtils.splitKv;
/**
@@ -139,6 +141,8 @@ public class KvRowDataDeserializationSchema extends
DefaultDeserializationSchema
this.escapeChar = escapeChar;
this.quoteChar = quoteChar;
this.nullLiteral = nullLiteral;
+ String[] fieldNames = rowFormatInfo.getFieldNames();
+ this.fieldNameSize = (fieldNames == null ? 0 : fieldNames.length);
converters = Arrays.stream(rowFormatInfo.getFieldFormatInfos())
.map(formatInfo -> FieldToRowDataConverters.createConverter(
@@ -166,6 +170,9 @@ public class KvRowDataDeserializationSchema extends
DefaultDeserializationSchema
this.quoteChar = quoteChar;
this.nullLiteral = nullLiteral;
+ String[] fieldNames = rowFormatInfo.getFieldNames();
+ this.fieldNameSize = (fieldNames == null ? 0 : fieldNames.length);
+
converters = Arrays.stream(rowFormatInfo.getFieldFormatInfos())
.map(formatInfo -> FieldToRowDataConverters.createConverter(
TableFormatForRowDataUtils.deriveLogicalType(formatInfo)))
@@ -206,6 +213,42 @@ public class KvRowDataDeserializationSchema extends
DefaultDeserializationSchema
return null;
}
+ @Override
+ public FormatMsg deserializeFormatMsg(byte[] bytes) throws Exception {
+ String text = new String(bytes, Charset.forName(charset));
+ GenericRowData rowData = null;
+ long rowDataLength = 0L;
+ try {
+ List<Map<String, String>> fieldTexts =
+ splitKv(text, entryDelimiter, kvDelimiter, escapeChar,
quoteChar, null,
+ true);
+
+ String[] fieldNames = rowFormatInfo.getFieldNames();
+ FormatInfo[] fieldFormatInfos =
rowFormatInfo.getFieldFormatInfos();
+
+ rowData = new GenericRowData(fieldFormatInfos.length);
+ for (int i = 0; i < fieldFormatInfos.length; i++) {
+ String fieldName = fieldNames[i];
+ FormatInfo fieldFormatInfo = fieldFormatInfos[i];
+
+ String fieldText = fieldTexts.get(0).get(fieldName);
+
+ Object field = deserializeBasicField(
+ fieldName,
+ fieldFormatInfo,
+ fieldText,
+ nullLiteral, failureHandler);
+ rowData.setField(i, converters[i].convert(field));
+ rowDataLength += getFormatValueLength(fieldFormatInfo,
fieldText);
+ }
+ return new FormatMsg(rowData, rowDataLength);
+ } catch (Throwable t) {
+ failureHandler.onParsingMsgFailure(text, new RuntimeException(
+ String.format("Could not properly deserialize kv.
Text=[{}].", text), t));
+ }
+ return null;
+ }
+
@Override
public boolean isEndOfStream(RowData rowData) {
return false;