This is an automated email from the ASF dual-hosted git repository.
panyuepeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 2379582867b [FLINK-39401][table-runtime] Extend raw format to support
line-delimiter option (#27897)
2379582867b is described below
commit 2379582867b37e499c8b6680d8efed42c60f665b
Author: Feat Zhang <[email protected]>
AuthorDate: Fri Apr 17 09:43:42 2026 +0800
[FLINK-39401][table-runtime] Extend raw format to support line-delimiter
option (#27897)
Co-authored-by: Yuepeng Pan <[email protected]>
---
.../docs/connectors/table/formats/raw.md | 11 +
docs/content/docs/connectors/table/formats/raw.md | 13 +
.../raw/RawFormatDeserializationSchema.java | 65 ++++-
.../apache/flink/formats/raw/RawFormatFactory.java | 15 +-
.../apache/flink/formats/raw/RawFormatOptions.java | 9 +
.../formats/raw/RawFormatSerializationSchema.java | 32 ++-
.../table/formats/raw/RawFormatFactoryTest.java | 23 ++
.../formats/raw/RawFormatLineDelimiterTest.java | 298 +++++++++++++++++++++
8 files changed, 459 insertions(+), 7 deletions(-)
diff --git a/docs/content.zh/docs/connectors/table/formats/raw.md
b/docs/content.zh/docs/connectors/table/formats/raw.md
index ebe756c56e8..69b4acfa1a0 100644
--- a/docs/content.zh/docs/connectors/table/formats/raw.md
+++ b/docs/content.zh/docs/connectors/table/formats/raw.md
@@ -105,6 +105,17 @@ Format 参数
<td>指定字节序来编码数字值的字节。有效值为'big-endian'和'little-endian'。
更多细节可查阅 <a href="https://zh.wikipedia.org/wiki/字节序">字节序</a>。</td>
</tr>
+ <tr>
+ <td><h5>raw.line-delimiter</h5></td>
+ <td>可选</td>
+ <td style="word-wrap: break-word;">(无)</td>
+ <td>String</td>
+ <td>指定行分隔符,用于在反序列化时将一条消息拆分为多行。设置后,每条消息将使用
+ 'raw.charset' 解码,并按此分隔符切分,每段输出一条数据行。在序列化时,分隔符字节会被
+ 追加到每条序列化值的末尾。常用值为 '\n'(换行符)或 '||'。
+ <br><strong>注意:</strong>当序列化与反序列化使用相同的分隔符配置时,两者具有
+ round-trip 兼容性:序列化器追加的末尾分隔符会在反序列化时自动被去除。</td>
+ </tr>
</tbody>
</table>
diff --git a/docs/content/docs/connectors/table/formats/raw.md
b/docs/content/docs/connectors/table/formats/raw.md
index 9596eb07fcd..1efdd2b40ee 100644
--- a/docs/content/docs/connectors/table/formats/raw.md
+++ b/docs/content/docs/connectors/table/formats/raw.md
@@ -105,6 +105,19 @@ Format Options
<td>Specify the endianness to encode the bytes of numeric value. Valid
values are 'big-endian' and 'little-endian'.
See more details of <a
href="https://en.wikipedia.org/wiki/Endianness">endianness</a>.</td>
</tr>
+ <tr>
+ <td><h5>raw.line-delimiter</h5></td>
+ <td>optional</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>Specify the line delimiter for splitting incoming messages into
multiple rows during
+ deserialization. When set, each incoming message is decoded using
'raw.charset' and then split
+ by this delimiter; one row is emitted per segment. During serialization,
the delimiter bytes
+ are appended to each serialized value. Common values are '\n' (newline)
or '||'.
+ <br><strong>Note:</strong> When the same delimiter is configured for
both serialization and
+ deserialization, the two are round-trip compatible: a trailing delimiter
appended by the
+ serializer is automatically stripped during deserialization.</td>
+ </tr>
</tbody>
</table>
diff --git
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/formats/raw/RawFormatDeserializationSchema.java
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/formats/raw/RawFormatDeserializationSchema.java
index 94d1cb30a8e..4ac4f82f464 100644
---
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/formats/raw/RawFormatDeserializationSchema.java
+++
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/formats/raw/RawFormatDeserializationSchema.java
@@ -29,12 +29,16 @@ import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.types.DeserializationException;
+import org.apache.flink.util.Collector;
+
+import javax.annotation.Nullable;
import java.io.IOException;
import java.io.Serializable;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.Objects;
+import java.util.regex.Pattern;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -55,6 +59,14 @@ public class RawFormatDeserializationSchema implements
DeserializationSchema<Row
private final boolean isBigEndian;
+ @Nullable private final String lineDelimiter;
+
+ /**
+ * Pre-compiled pattern for splitting by {@link #lineDelimiter}, or {@code
null} if no
+ * delimiter. Note: this field and {@link #lineDelimiter} are either both
null or both non-null.
+ */
+ @Nullable private final Pattern lineDelimiterPattern;
+
private final DeserializationRuntimeConverter converter;
private final DataLengthValidator validator;
@@ -64,12 +76,24 @@ public class RawFormatDeserializationSchema implements
DeserializationSchema<Row
TypeInformation<RowData> producedTypeInfo,
String charsetName,
boolean isBigEndian) {
+ this(deserializedType, producedTypeInfo, charsetName, isBigEndian,
null);
+ }
+
+ public RawFormatDeserializationSchema(
+ LogicalType deserializedType,
+ TypeInformation<RowData> producedTypeInfo,
+ String charsetName,
+ boolean isBigEndian,
+ @Nullable String lineDelimiter) {
this.deserializedType = checkNotNull(deserializedType);
this.producedTypeInfo = checkNotNull(producedTypeInfo);
this.converter = createConverter(deserializedType, charsetName,
isBigEndian);
this.validator = createDataLengthValidator(deserializedType);
this.charsetName = charsetName;
this.isBigEndian = isBigEndian;
+ this.lineDelimiter = lineDelimiter;
+ this.lineDelimiterPattern =
+ lineDelimiter != null ?
Pattern.compile(Pattern.quote(lineDelimiter)) : null;
}
@Override
@@ -92,6 +116,41 @@ public class RawFormatDeserializationSchema implements
DeserializationSchema<Row
return rowData;
}
+ @Override
+ public void deserialize(byte[] message, Collector<RowData> out) throws
IOException {
+ if (lineDelimiter == null) {
+ // no delimiter: default single-record behavior
+ RowData row = deserialize(message);
+ if (row != null) {
+ out.collect(row);
+ }
+ return;
+ }
+
+ if (message == null) {
+ return;
+ }
+
+ Charset charset = Charset.forName(charsetName);
+ String decoded = new String(message, charset);
+ // Use pre-compiled pattern. Split with -1 to keep intentional empty
middle segments,
+ // but strip the single trailing empty string produced when the
message ends with the
+ // delimiter (e.g. a serializer that appends one delimiter per row).
+ String[] parts = lineDelimiterPattern.split(decoded, -1);
+ int count = parts.length;
+ if (count > 0 && parts[count - 1].isEmpty()) {
+ count--;
+ }
+ for (int i = 0; i < count; i++) {
+ byte[] partBytes = parts[i].getBytes(charset);
+ validator.validate(partBytes);
+ Object field = converter.convert(partBytes);
+ GenericRowData rowData = new GenericRowData(1);
+ rowData.setField(0, field);
+ out.collect(rowData);
+ }
+ }
+
@Override
public boolean isEndOfStream(RowData nextElement) {
return false;
@@ -114,12 +173,14 @@ public class RawFormatDeserializationSchema implements
DeserializationSchema<Row
return producedTypeInfo.equals(that.producedTypeInfo)
&& deserializedType.equals(that.deserializedType)
&& charsetName.equals(that.charsetName)
- && isBigEndian == that.isBigEndian;
+ && isBigEndian == that.isBigEndian
+ && Objects.equals(lineDelimiter, that.lineDelimiter);
}
@Override
public int hashCode() {
- return Objects.hash(producedTypeInfo, deserializedType, charsetName,
isBigEndian);
+ return Objects.hash(
+ producedTypeInfo, deserializedType, charsetName, isBigEndian,
lineDelimiter);
}
// ------------------------------------------------------------------------
diff --git
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/formats/raw/RawFormatFactory.java
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/formats/raw/RawFormatFactory.java
index 5c9dfe3cb19..05b52cf2c94 100644
---
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/formats/raw/RawFormatFactory.java
+++
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/formats/raw/RawFormatFactory.java
@@ -45,6 +45,7 @@ import
org.apache.flink.shaded.guava33.com.google.common.collect.Sets;
import java.nio.charset.Charset;
import java.util.Collections;
import java.util.HashSet;
+import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
@@ -72,6 +73,7 @@ public class RawFormatFactory implements
DeserializationFormatFactory, Serializa
Set<ConfigOption<?>> options = new HashSet<>();
options.add(RawFormatOptions.ENDIANNESS);
options.add(RawFormatOptions.CHARSET);
+ options.add(RawFormatOptions.LINE_DELIMITER);
return options;
}
@@ -81,6 +83,8 @@ public class RawFormatFactory implements
DeserializationFormatFactory, Serializa
FactoryUtil.validateFactoryOptions(this, formatOptions);
final String charsetName = validateAndGetCharsetName(formatOptions);
final boolean isBigEndian = isBigEndian(formatOptions);
+ final Optional<String> lineDelimiter =
+ formatOptions.getOptional(RawFormatOptions.LINE_DELIMITER);
return new DecodingFormat<DeserializationSchema<RowData>>() {
@Override
@@ -91,7 +95,11 @@ public class RawFormatFactory implements
DeserializationFormatFactory, Serializa
final TypeInformation<RowData> producedTypeInfo =
context.createTypeInformation(producedDataType);
return new RawFormatDeserializationSchema(
- fieldType, producedTypeInfo, charsetName, isBigEndian);
+ fieldType,
+ producedTypeInfo,
+ charsetName,
+ isBigEndian,
+ lineDelimiter.orElse(null));
}
@Override
@@ -107,6 +115,8 @@ public class RawFormatFactory implements
DeserializationFormatFactory, Serializa
FactoryUtil.validateFactoryOptions(this, formatOptions);
final String charsetName = validateAndGetCharsetName(formatOptions);
final boolean isBigEndian = isBigEndian(formatOptions);
+ final Optional<String> lineDelimiter =
+ formatOptions.getOptional(RawFormatOptions.LINE_DELIMITER);
return new EncodingFormat<SerializationSchema<RowData>>() {
@Override
@@ -114,7 +124,8 @@ public class RawFormatFactory implements
DeserializationFormatFactory, Serializa
DynamicTableSink.Context context, DataType
consumedDataType) {
final RowType physicalRowType = (RowType)
consumedDataType.getLogicalType();
final LogicalType fieldType =
validateAndExtractSingleField(physicalRowType);
- return new RawFormatSerializationSchema(fieldType,
charsetName, isBigEndian);
+ return new RawFormatSerializationSchema(
+ fieldType, charsetName, isBigEndian,
lineDelimiter.orElse(null));
}
@Override
diff --git
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/formats/raw/RawFormatOptions.java
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/formats/raw/RawFormatOptions.java
index aa88d6765ea..7fa674b7f98 100644
---
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/formats/raw/RawFormatOptions.java
+++
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/formats/raw/RawFormatOptions.java
@@ -43,5 +43,14 @@ public class RawFormatOptions {
.defaultValue(StandardCharsets.UTF_8.displayName())
.withDescription("Defines the string charset.");
+ public static final ConfigOption<String> LINE_DELIMITER =
+ ConfigOptions.key("line-delimiter")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "Optional line delimiter. Supports Java escape
sequences (e.g. '\\n', '\\r\\n'). "
+ + "When set, deserialization splits each
message by this delimiter and emits "
+ + "one RowData per part. Serialization
appends the delimiter after each row's value.");
+
private RawFormatOptions() {}
}
diff --git
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/formats/raw/RawFormatSerializationSchema.java
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/formats/raw/RawFormatSerializationSchema.java
index 839cc66abf1..2f16dee5caa 100644
---
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/formats/raw/RawFormatSerializationSchema.java
+++
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/formats/raw/RawFormatSerializationSchema.java
@@ -27,10 +27,13 @@ import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RawType;
+import javax.annotation.Nullable;
+
import java.io.IOException;
import java.io.Serializable;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
import java.util.Objects;
/** Serialization schema that serializes an {@link RowData} object into raw
(byte based) value. */
@@ -47,12 +50,28 @@ public class RawFormatSerializationSchema implements
SerializationSchema<RowData
private final boolean isBigEndian;
+ @Nullable private final String lineDelimiter;
+
+ /** Pre-computed delimiter bytes, or {@code null} if no delimiter is set.
*/
+ @Nullable private final byte[] delimiterBytes;
+
public RawFormatSerializationSchema(
LogicalType serializedType, String charsetName, boolean
isBigEndian) {
+ this(serializedType, charsetName, isBigEndian, null);
+ }
+
+ public RawFormatSerializationSchema(
+ LogicalType serializedType,
+ String charsetName,
+ boolean isBigEndian,
+ @Nullable String lineDelimiter) {
this.serializedType = serializedType;
this.converter = createConverter(serializedType, charsetName,
isBigEndian);
this.charsetName = charsetName;
this.isBigEndian = isBigEndian;
+ this.lineDelimiter = lineDelimiter;
+ this.delimiterBytes =
+ lineDelimiter != null ?
lineDelimiter.getBytes(Charset.forName(charsetName)) : null;
}
@Override
@@ -63,7 +82,13 @@ public class RawFormatSerializationSchema implements
SerializationSchema<RowData
@Override
public byte[] serialize(RowData row) {
try {
- return converter.convert(row);
+ byte[] valueBytes = converter.convert(row);
+ if (delimiterBytes == null || valueBytes == null) {
+ return valueBytes;
+ }
+ byte[] result = Arrays.copyOf(valueBytes, valueBytes.length +
delimiterBytes.length);
+ System.arraycopy(delimiterBytes, 0, result, valueBytes.length,
delimiterBytes.length);
+ return result;
} catch (IOException e) {
throw new RuntimeException("Could not serialize row '" + row + "'.
", e);
}
@@ -80,12 +105,13 @@ public class RawFormatSerializationSchema implements
SerializationSchema<RowData
RawFormatSerializationSchema that = (RawFormatSerializationSchema) o;
return serializedType.equals(that.serializedType)
&& charsetName.equals(that.charsetName)
- && isBigEndian == that.isBigEndian;
+ && isBigEndian == that.isBigEndian
+ && Objects.equals(lineDelimiter, that.lineDelimiter);
}
@Override
public int hashCode() {
- return Objects.hash(serializedType, charsetName, isBigEndian);
+ return Objects.hash(serializedType, charsetName, isBigEndian,
lineDelimiter);
}
// ------------------------------------------------------------------------
diff --git
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/formats/raw/RawFormatFactoryTest.java
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/formats/raw/RawFormatFactoryTest.java
index 805d88117f7..17ad29f9225 100644
---
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/formats/raw/RawFormatFactoryTest.java
+++
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/formats/raw/RawFormatFactoryTest.java
@@ -175,6 +175,29 @@ class RawFormatFactoryTest {
.hasMessage("The 'raw' format doesn't supports 'MAP<INT,
STRING>' as column type.");
}
+ @Test
+ void testLineDelimiterOption() {
+ final Map<String, String> tableOptions =
+ getModifiedOptions(
+ options -> {
+ options.put("raw.line-delimiter", "\n");
+ });
+
+ // test deserialization schema contains line delimiter
+ final RawFormatDeserializationSchema expectedDeser =
+ new RawFormatDeserializationSchema(
+ ROW_TYPE.getTypeAt(0), InternalTypeInfo.of(ROW_TYPE),
"UTF-8", true, "\n");
+ DeserializationSchema<RowData> actualDeser =
+ createDeserializationSchema(SCHEMA, tableOptions);
+ assertThat(actualDeser).isEqualTo(expectedDeser);
+
+ // test serialization schema contains line delimiter
+ final RawFormatSerializationSchema expectedSer =
+ new RawFormatSerializationSchema(ROW_TYPE.getTypeAt(0),
"UTF-8", true, "\n");
+ SerializationSchema<RowData> actualSer =
createSerializationSchema(SCHEMA, tableOptions);
+ assertThat(actualSer).isEqualTo(expectedSer);
+ }
+
// ------------------------------------------------------------------------
// Utilities
// ------------------------------------------------------------------------
diff --git
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/formats/raw/RawFormatLineDelimiterTest.java
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/formats/raw/RawFormatLineDelimiterTest.java
new file mode 100644
index 00000000000..52652121698
--- /dev/null
+++
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/formats/raw/RawFormatLineDelimiterTest.java
@@ -0,0 +1,298 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.formats.raw;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.formats.raw.RawFormatDeserializationSchema;
+import org.apache.flink.formats.raw.RawFormatSerializationSchema;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.types.logical.VarCharType;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.UserCodeClassLoader;
+
+import org.junit.jupiter.api.Test;
+
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Tests for {@link RawFormatDeserializationSchema} and {@link
RawFormatSerializationSchema} with
+ * the {@code raw.line-delimiter} option.
+ */
+class RawFormatLineDelimiterTest {
+
+ private static final VarCharType STRING_TYPE = VarCharType.STRING_TYPE;
+
+ // -----------------------------------------------------------------------
+ // Deserialization tests
+ // -----------------------------------------------------------------------
+
+ @Test
+ void testDeserializeWithoutDelimiter_singleRow() throws Exception {
+ RawFormatDeserializationSchema schema =
+ new RawFormatDeserializationSchema(
+ STRING_TYPE,
+ TypeInformation.of(RowData.class),
+ StandardCharsets.UTF_8.name(),
+ true,
+ null);
+ openDeser(schema);
+
+ List<RowData> rows = collectRows(schema,
"hello".getBytes(StandardCharsets.UTF_8));
+ assertThat(rows).hasSize(1);
+ assertThat(rows.get(0).getString(0)).hasToString("hello");
+ }
+
+ @Test
+ void testDeserializeWithNewlineDelimiter_multipleRows() throws Exception {
+ RawFormatDeserializationSchema schema =
+ new RawFormatDeserializationSchema(
+ STRING_TYPE,
+ TypeInformation.of(RowData.class),
+ StandardCharsets.UTF_8.name(),
+ true,
+ "\n");
+ openDeser(schema);
+
+ byte[] message =
"line1\nline2\nline3".getBytes(StandardCharsets.UTF_8);
+ List<RowData> rows = collectRows(schema, message);
+ assertThat(rows).hasSize(3);
+ assertThat(rows.get(0).getString(0)).hasToString("line1");
+ assertThat(rows.get(1).getString(0)).hasToString("line2");
+ assertThat(rows.get(2).getString(0)).hasToString("line3");
+ }
+
+ @Test
+ void testDeserializeWithCustomMultiCharDelimiter() throws Exception {
+ RawFormatDeserializationSchema schema =
+ new RawFormatDeserializationSchema(
+ STRING_TYPE,
+ TypeInformation.of(RowData.class),
+ StandardCharsets.UTF_8.name(),
+ true,
+ "||");
+ openDeser(schema);
+
+ byte[] message =
"record1||record2||record3".getBytes(StandardCharsets.UTF_8);
+ List<RowData> rows = collectRows(schema, message);
+ assertThat(rows).hasSize(3);
+ assertThat(rows.get(0).getString(0)).hasToString("record1");
+ assertThat(rows.get(1).getString(0)).hasToString("record2");
+ assertThat(rows.get(2).getString(0)).hasToString("record3");
+ }
+
+ @Test
+ void testDeserializeWithNullMessage_noOutput() throws Exception {
+ RawFormatDeserializationSchema schema =
+ new RawFormatDeserializationSchema(
+ STRING_TYPE,
+ TypeInformation.of(RowData.class),
+ StandardCharsets.UTF_8.name(),
+ true,
+ "\n");
+ openDeser(schema);
+
+ List<RowData> rows = collectRows(schema, null);
+ assertThat(rows).isEmpty();
+ }
+
+ @Test
+ void testDeserializeWithGbkCharset() throws Exception {
+ Charset gbk = Charset.forName("GBK");
+ String original = "你好\n世界";
+ byte[] message = original.getBytes(gbk);
+
+ RawFormatDeserializationSchema schema =
+ new RawFormatDeserializationSchema(
+ STRING_TYPE, TypeInformation.of(RowData.class), "GBK",
true, "\n");
+ openDeser(schema);
+
+ List<RowData> rows = collectRows(schema, message);
+ assertThat(rows).hasSize(2);
+ assertThat(rows.get(0).getString(0)).hasToString("你好");
+ assertThat(rows.get(1).getString(0)).hasToString("世界");
+ }
+
+ // -----------------------------------------------------------------------
+ // Serialization tests
+ // -----------------------------------------------------------------------
+
+ @Test
+ void testSerializeWithoutDelimiter_noAppend() throws Exception {
+ RawFormatSerializationSchema schema =
+ new RawFormatSerializationSchema(
+ STRING_TYPE, StandardCharsets.UTF_8.name(), true,
null);
+ openSer(schema);
+
+ RowData row = buildStringRow("hello");
+ byte[] result = schema.serialize(row);
+ assertThat(result).isEqualTo("hello".getBytes(StandardCharsets.UTF_8));
+ }
+
+ @Test
+ void testSerializeWithNewlineDelimiter_appendsDelimiter() throws Exception
{
+ RawFormatSerializationSchema schema =
+ new RawFormatSerializationSchema(
+ STRING_TYPE, StandardCharsets.UTF_8.name(), true,
"\n");
+ openSer(schema);
+
+ RowData row = buildStringRow("hello");
+ byte[] result = schema.serialize(row);
+
assertThat(result).isEqualTo("hello\n".getBytes(StandardCharsets.UTF_8));
+ }
+
+ @Test
+ void testSerializeWithCustomDelimiter_appendsDelimiter() throws Exception {
+ RawFormatSerializationSchema schema =
+ new RawFormatSerializationSchema(
+ STRING_TYPE, StandardCharsets.UTF_8.name(), true,
"||");
+ openSer(schema);
+
+ RowData row = buildStringRow("record1");
+ byte[] result = schema.serialize(row);
+
assertThat(result).isEqualTo("record1||".getBytes(StandardCharsets.UTF_8));
+ }
+
+ @Test
+ void testSerializeNullRow_returnsNull() throws Exception {
+ RawFormatSerializationSchema schema =
+ new RawFormatSerializationSchema(
+ STRING_TYPE, StandardCharsets.UTF_8.name(), true,
"\n");
+ openSer(schema);
+
+ GenericRowData nullRow = new GenericRowData(1);
+ nullRow.setField(0, null);
+ byte[] result = schema.serialize(nullRow);
+ assertThat(result).isNull();
+ }
+
+ @Test
+ void testDeserializeTrailingDelimiter_noExtraRow() throws Exception {
+ // Verify that a message ending with the delimiter does not produce a
trailing empty row.
+ // This ensures round-trip compatibility: serialize("hello") ->
"hello\n" ->
+ // deserialize -> ["hello"] (1 row, not 2).
+ RawFormatDeserializationSchema schema =
+ new RawFormatDeserializationSchema(
+ STRING_TYPE,
+ TypeInformation.of(RowData.class),
+ StandardCharsets.UTF_8.name(),
+ true,
+ "\n");
+ openDeser(schema);
+
+ // Message already ends with the delimiter (as produced by the
serializer)
+ byte[] message = "hello\n".getBytes(StandardCharsets.UTF_8);
+ List<RowData> rows = collectRows(schema, message);
+ assertThat(rows).hasSize(1);
+ assertThat(rows.get(0).getString(0)).hasToString("hello");
+ }
+
+ @Test
+ void testRoundTrip_serializeThenDeserialize() throws Exception {
+ // Verify that rows written by the serializer can be read back
correctly by the
+ // deserializer when both share the same delimiter configuration.
+ RawFormatSerializationSchema ser =
+ new RawFormatSerializationSchema(
+ STRING_TYPE, StandardCharsets.UTF_8.name(), true,
"\n");
+ openSer(ser);
+
+ RawFormatDeserializationSchema deser =
+ new RawFormatDeserializationSchema(
+ STRING_TYPE,
+ TypeInformation.of(RowData.class),
+ StandardCharsets.UTF_8.name(),
+ true,
+ "\n");
+ openDeser(deser);
+
+ // Serialize a single row -> "hello\n"
+ byte[] serialized = ser.serialize(buildStringRow("hello"));
+
+ // Deserialize "hello\n" -> should yield exactly 1 row
+ List<RowData> rows = collectRows(deser, serialized);
+ assertThat(rows).hasSize(1);
+ assertThat(rows.get(0).getString(0)).hasToString("hello");
+ }
+
+ // -----------------------------------------------------------------------
+ // Helpers
+ // -----------------------------------------------------------------------
+
+ private void openDeser(RawFormatDeserializationSchema schema) throws
Exception {
+ schema.open(
+ new DeserializationSchema.InitializationContext() {
+ @Override
+ public MetricGroup getMetricGroup() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public UserCodeClassLoader getUserCodeClassLoader() {
+ throw new UnsupportedOperationException();
+ }
+ });
+ }
+
+ private void openSer(RawFormatSerializationSchema schema) throws Exception
{
+ schema.open(
+ new SerializationSchema.InitializationContext() {
+ @Override
+ public MetricGroup getMetricGroup() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public UserCodeClassLoader getUserCodeClassLoader() {
+ throw new UnsupportedOperationException();
+ }
+ });
+ }
+
+ private List<RowData> collectRows(RawFormatDeserializationSchema schema,
byte[] message)
+ throws Exception {
+ List<RowData> rows = new ArrayList<>();
+ schema.deserialize(
+ message,
+ new Collector<RowData>() {
+ @Override
+ public void collect(RowData record) {
+ rows.add(record);
+ }
+
+ @Override
+ public void close() {}
+ });
+ return rows;
+ }
+
+ private RowData buildStringRow(String value) {
+ GenericRowData row = new GenericRowData(1);
+ row.setField(0, StringData.fromString(value));
+ return row;
+ }
+}