This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 09fa669fcf [INLONG-11803][Sort] Solve the NPE problem when parsing
InLongMsg (#11804)
09fa669fcf is described below
commit 09fa669fcfceca2b7305b6a1b4f1c88730023078
Author: Mingyu Bao <[email protected]>
AuthorDate: Mon Mar 17 16:11:45 2025 +0800
[INLONG-11803][Sort] Solve the NPE problem when parsing InLongMsg (#11804)
---
.../AbstractInLongMsgFormatDeserializer.java | 20 ++++++++++++++++++++
1 file changed, 20 insertions(+)
diff --git
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/AbstractInLongMsgFormatDeserializer.java
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/AbstractInLongMsgFormatDeserializer.java
index 9f63185102..b9611e9a17 100644
---
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/AbstractInLongMsgFormatDeserializer.java
+++
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/AbstractInLongMsgFormatDeserializer.java
@@ -20,24 +20,31 @@ package org.apache.inlong.sort.formats.inlongmsg;
import org.apache.inlong.common.msg.InLongMsg;
import org.apache.inlong.sort.formats.metrics.FormatMetricGroup;
+import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.table.data.RowData;
import org.apache.flink.util.Collector;
import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import javax.annotation.Nonnull;
+import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
+import java.util.Set;
/**
* The base for all InLongMsg format deserializers.
*/
public abstract class AbstractInLongMsgFormatDeserializer implements
ResultTypeQueryable<RowData>, Serializable {
+ private static final Logger LOG =
LoggerFactory.getLogger(AbstractInLongMsgFormatDeserializer.class);
+
protected FailureHandler failureHandler;
/**
@@ -81,6 +88,19 @@ public abstract class AbstractInLongMsgFormatDeserializer
implements ResultTypeQ
final List<InLongMsgWrap> result = new ArrayList<>();
InLongMsg inLongMsg = InLongMsg.parseFrom(bytes);
+ if (inLongMsg == null) {
+ failureHandler.onParsingMsgFailure(bytes, new IOException(
+ String.format("Could not parse InLongMsg from bytes.
bytes={}.",
+ StringUtils.join(bytes))));
+ return result;
+ }
+ try {
+ Set<String> set = inLongMsg.getAttrs();
+ } catch (Exception e) {
+ failureHandler.onParsingMsgFailure(bytes,
+ new IOException("Parse InLongMsg from bytes has
exception.", e));
+ return result;
+ }
for (String attr : inLongMsg.getAttrs()) {
Iterator<byte[]> iterator = inLongMsg.getIterator(attr);
if (iterator == null) {