This is an automated email from the ASF dual-hosted git repository.

panyuepeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 2379582867b [FLINK-39401][table-runtime] Extend raw format to support 
line-delimiter option (#27897)
2379582867b is described below

commit 2379582867b37e499c8b6680d8efed42c60f665b
Author: Feat Zhang <[email protected]>
AuthorDate: Fri Apr 17 09:43:42 2026 +0800

    [FLINK-39401][table-runtime] Extend raw format to support line-delimiter 
option (#27897)
    
     Co-authored-by: Yuepeng Pan <[email protected]>
---
 .../docs/connectors/table/formats/raw.md           |  11 +
 docs/content/docs/connectors/table/formats/raw.md  |  13 +
 .../raw/RawFormatDeserializationSchema.java        |  65 ++++-
 .../apache/flink/formats/raw/RawFormatFactory.java |  15 +-
 .../apache/flink/formats/raw/RawFormatOptions.java |   9 +
 .../formats/raw/RawFormatSerializationSchema.java  |  32 ++-
 .../table/formats/raw/RawFormatFactoryTest.java    |  23 ++
 .../formats/raw/RawFormatLineDelimiterTest.java    | 298 +++++++++++++++++++++
 8 files changed, 459 insertions(+), 7 deletions(-)

diff --git a/docs/content.zh/docs/connectors/table/formats/raw.md 
b/docs/content.zh/docs/connectors/table/formats/raw.md
index ebe756c56e8..69b4acfa1a0 100644
--- a/docs/content.zh/docs/connectors/table/formats/raw.md
+++ b/docs/content.zh/docs/connectors/table/formats/raw.md
@@ -105,6 +105,17 @@ Format 参数
       <td>指定字节序来编码数字值的字节。有效值为'big-endian'和'little-endian'。
       更多细节可查阅 <a href="https://zh.wikipedia.org/wiki/字节序";>字节序</a>。</td>
     </tr>
+    <tr>
+      <td><h5>raw.line-delimiter</h5></td>
+      <td>可选</td>
+      <td style="word-wrap: break-word;">(无)</td>
+      <td>String</td>
+      <td>指定行分隔符,用于在反序列化时将一条消息拆分为多行。设置后,每条消息将使用
+      'raw.charset' 解码,并按此分隔符切分,每段输出一条数据行。在序列化时,分隔符字节会被
+      追加到每条序列化值的末尾。常用值为 '\n'(换行符)或 '||'。
+      <br><strong>注意:</strong>当序列化与反序列化使用相同的分隔符配置时,两者具有
+      round-trip 兼容性:序列化器追加的末尾分隔符会在反序列化时自动被去除。</td>
+    </tr>
     </tbody>
 </table>
 
diff --git a/docs/content/docs/connectors/table/formats/raw.md 
b/docs/content/docs/connectors/table/formats/raw.md
index 9596eb07fcd..1efdd2b40ee 100644
--- a/docs/content/docs/connectors/table/formats/raw.md
+++ b/docs/content/docs/connectors/table/formats/raw.md
@@ -105,6 +105,19 @@ Format Options
       <td>Specify the endianness to encode the bytes of numeric value. Valid 
values are 'big-endian' and 'little-endian'.
       See more details of <a 
href="https://en.wikipedia.org/wiki/Endianness";>endianness</a>.</td>
     </tr>
+    <tr>
+      <td><h5>raw.line-delimiter</h5></td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>Specify the line delimiter for splitting incoming messages into 
multiple rows during
+      deserialization. When set, each incoming message is decoded using 
'raw.charset' and then split
+      by this delimiter; one row is emitted per segment. During serialization, 
the delimiter bytes
+      are appended to each serialized value. Common values are '\n' (newline) 
or '||'.
+      <br><strong>Note:</strong> When the same delimiter is configured for 
both serialization and
+      deserialization, the two are round-trip compatible: a trailing delimiter 
appended by the
+      serializer is automatically stripped during deserialization.</td>
+    </tr>
     </tbody>
 </table>
 
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/formats/raw/RawFormatDeserializationSchema.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/formats/raw/RawFormatDeserializationSchema.java
index 94d1cb30a8e..4ac4f82f464 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/formats/raw/RawFormatDeserializationSchema.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/formats/raw/RawFormatDeserializationSchema.java
@@ -29,12 +29,16 @@ import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.StringData;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.types.DeserializationException;
+import org.apache.flink.util.Collector;
+
+import javax.annotation.Nullable;
 
 import java.io.IOException;
 import java.io.Serializable;
 import java.nio.charset.Charset;
 import java.nio.charset.StandardCharsets;
 import java.util.Objects;
+import java.util.regex.Pattern;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -55,6 +59,14 @@ public class RawFormatDeserializationSchema implements 
DeserializationSchema<Row
 
     private final boolean isBigEndian;
 
+    @Nullable private final String lineDelimiter;
+
+    /**
+     * Pre-compiled pattern for splitting by {@link #lineDelimiter}, or {@code 
null} if no
+     * delimiter. Note: this field and {@link #lineDelimiter} are either both 
null or both non-null.
+     */
+    @Nullable private final Pattern lineDelimiterPattern;
+
     private final DeserializationRuntimeConverter converter;
 
     private final DataLengthValidator validator;
@@ -64,12 +76,24 @@ public class RawFormatDeserializationSchema implements 
DeserializationSchema<Row
             TypeInformation<RowData> producedTypeInfo,
             String charsetName,
             boolean isBigEndian) {
+        this(deserializedType, producedTypeInfo, charsetName, isBigEndian, 
null);
+    }
+
+    public RawFormatDeserializationSchema(
+            LogicalType deserializedType,
+            TypeInformation<RowData> producedTypeInfo,
+            String charsetName,
+            boolean isBigEndian,
+            @Nullable String lineDelimiter) {
         this.deserializedType = checkNotNull(deserializedType);
         this.producedTypeInfo = checkNotNull(producedTypeInfo);
         this.converter = createConverter(deserializedType, charsetName, 
isBigEndian);
         this.validator = createDataLengthValidator(deserializedType);
         this.charsetName = charsetName;
         this.isBigEndian = isBigEndian;
+        this.lineDelimiter = lineDelimiter;
+        this.lineDelimiterPattern =
+                lineDelimiter != null ? 
Pattern.compile(Pattern.quote(lineDelimiter)) : null;
     }
 
     @Override
@@ -92,6 +116,41 @@ public class RawFormatDeserializationSchema implements 
DeserializationSchema<Row
         return rowData;
     }
 
+    @Override
+    public void deserialize(byte[] message, Collector<RowData> out) throws 
IOException {
+        if (lineDelimiter == null) {
+            // no delimiter: default single-record behavior
+            RowData row = deserialize(message);
+            if (row != null) {
+                out.collect(row);
+            }
+            return;
+        }
+
+        if (message == null) {
+            return;
+        }
+
+        Charset charset = Charset.forName(charsetName);
+        String decoded = new String(message, charset);
+        // Use pre-compiled pattern. Split with -1 to keep intentional empty 
middle segments,
+        // but strip the single trailing empty string produced when the 
message ends with the
+        // delimiter (e.g. a serializer that appends one delimiter per row).
+        String[] parts = lineDelimiterPattern.split(decoded, -1);
+        int count = parts.length;
+        if (count > 0 && parts[count - 1].isEmpty()) {
+            count--;
+        }
+        for (int i = 0; i < count; i++) {
+            byte[] partBytes = parts[i].getBytes(charset);
+            validator.validate(partBytes);
+            Object field = converter.convert(partBytes);
+            GenericRowData rowData = new GenericRowData(1);
+            rowData.setField(0, field);
+            out.collect(rowData);
+        }
+    }
+
     @Override
     public boolean isEndOfStream(RowData nextElement) {
         return false;
@@ -114,12 +173,14 @@ public class RawFormatDeserializationSchema implements 
DeserializationSchema<Row
         return producedTypeInfo.equals(that.producedTypeInfo)
                 && deserializedType.equals(that.deserializedType)
                 && charsetName.equals(that.charsetName)
-                && isBigEndian == that.isBigEndian;
+                && isBigEndian == that.isBigEndian
+                && Objects.equals(lineDelimiter, that.lineDelimiter);
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(producedTypeInfo, deserializedType, charsetName, 
isBigEndian);
+        return Objects.hash(
+                producedTypeInfo, deserializedType, charsetName, isBigEndian, 
lineDelimiter);
     }
 
     // ------------------------------------------------------------------------
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/formats/raw/RawFormatFactory.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/formats/raw/RawFormatFactory.java
index 5c9dfe3cb19..05b52cf2c94 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/formats/raw/RawFormatFactory.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/formats/raw/RawFormatFactory.java
@@ -45,6 +45,7 @@ import 
org.apache.flink.shaded.guava33.com.google.common.collect.Sets;
 import java.nio.charset.Charset;
 import java.util.Collections;
 import java.util.HashSet;
+import java.util.Optional;
 import java.util.Set;
 import java.util.stream.Collectors;
 
@@ -72,6 +73,7 @@ public class RawFormatFactory implements 
DeserializationFormatFactory, Serializa
         Set<ConfigOption<?>> options = new HashSet<>();
         options.add(RawFormatOptions.ENDIANNESS);
         options.add(RawFormatOptions.CHARSET);
+        options.add(RawFormatOptions.LINE_DELIMITER);
         return options;
     }
 
@@ -81,6 +83,8 @@ public class RawFormatFactory implements 
DeserializationFormatFactory, Serializa
         FactoryUtil.validateFactoryOptions(this, formatOptions);
         final String charsetName = validateAndGetCharsetName(formatOptions);
         final boolean isBigEndian = isBigEndian(formatOptions);
+        final Optional<String> lineDelimiter =
+                formatOptions.getOptional(RawFormatOptions.LINE_DELIMITER);
 
         return new DecodingFormat<DeserializationSchema<RowData>>() {
             @Override
@@ -91,7 +95,11 @@ public class RawFormatFactory implements 
DeserializationFormatFactory, Serializa
                 final TypeInformation<RowData> producedTypeInfo =
                         context.createTypeInformation(producedDataType);
                 return new RawFormatDeserializationSchema(
-                        fieldType, producedTypeInfo, charsetName, isBigEndian);
+                        fieldType,
+                        producedTypeInfo,
+                        charsetName,
+                        isBigEndian,
+                        lineDelimiter.orElse(null));
             }
 
             @Override
@@ -107,6 +115,8 @@ public class RawFormatFactory implements 
DeserializationFormatFactory, Serializa
         FactoryUtil.validateFactoryOptions(this, formatOptions);
         final String charsetName = validateAndGetCharsetName(formatOptions);
         final boolean isBigEndian = isBigEndian(formatOptions);
+        final Optional<String> lineDelimiter =
+                formatOptions.getOptional(RawFormatOptions.LINE_DELIMITER);
 
         return new EncodingFormat<SerializationSchema<RowData>>() {
             @Override
@@ -114,7 +124,8 @@ public class RawFormatFactory implements 
DeserializationFormatFactory, Serializa
                     DynamicTableSink.Context context, DataType 
consumedDataType) {
                 final RowType physicalRowType = (RowType) 
consumedDataType.getLogicalType();
                 final LogicalType fieldType = 
validateAndExtractSingleField(physicalRowType);
-                return new RawFormatSerializationSchema(fieldType, 
charsetName, isBigEndian);
+                return new RawFormatSerializationSchema(
+                        fieldType, charsetName, isBigEndian, 
lineDelimiter.orElse(null));
             }
 
             @Override
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/formats/raw/RawFormatOptions.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/formats/raw/RawFormatOptions.java
index aa88d6765ea..7fa674b7f98 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/formats/raw/RawFormatOptions.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/formats/raw/RawFormatOptions.java
@@ -43,5 +43,14 @@ public class RawFormatOptions {
                     .defaultValue(StandardCharsets.UTF_8.displayName())
                     .withDescription("Defines the string charset.");
 
+    public static final ConfigOption<String> LINE_DELIMITER =
+            ConfigOptions.key("line-delimiter")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Optional line delimiter. Supports Java escape 
sequences (e.g. '\\n', '\\r\\n'). "
+                                    + "When set, deserialization splits each 
message by this delimiter and emits "
+                                    + "one RowData per part. Serialization 
appends the delimiter after each row's value.");
+
     private RawFormatOptions() {}
 }
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/formats/raw/RawFormatSerializationSchema.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/formats/raw/RawFormatSerializationSchema.java
index 839cc66abf1..2f16dee5caa 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/formats/raw/RawFormatSerializationSchema.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/formats/raw/RawFormatSerializationSchema.java
@@ -27,10 +27,13 @@ import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.RawType;
 
+import javax.annotation.Nullable;
+
 import java.io.IOException;
 import java.io.Serializable;
 import java.nio.charset.Charset;
 import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
 import java.util.Objects;
 
 /** Serialization schema that serializes an {@link RowData} object into raw 
(byte based) value. */
@@ -47,12 +50,28 @@ public class RawFormatSerializationSchema implements 
SerializationSchema<RowData
 
     private final boolean isBigEndian;
 
+    @Nullable private final String lineDelimiter;
+
+    /** Pre-computed delimiter bytes, or {@code null} if no delimiter is set. 
*/
+    @Nullable private final byte[] delimiterBytes;
+
     public RawFormatSerializationSchema(
             LogicalType serializedType, String charsetName, boolean 
isBigEndian) {
+        this(serializedType, charsetName, isBigEndian, null);
+    }
+
+    public RawFormatSerializationSchema(
+            LogicalType serializedType,
+            String charsetName,
+            boolean isBigEndian,
+            @Nullable String lineDelimiter) {
         this.serializedType = serializedType;
         this.converter = createConverter(serializedType, charsetName, 
isBigEndian);
         this.charsetName = charsetName;
         this.isBigEndian = isBigEndian;
+        this.lineDelimiter = lineDelimiter;
+        this.delimiterBytes =
+                lineDelimiter != null ? 
lineDelimiter.getBytes(Charset.forName(charsetName)) : null;
     }
 
     @Override
@@ -63,7 +82,13 @@ public class RawFormatSerializationSchema implements 
SerializationSchema<RowData
     @Override
     public byte[] serialize(RowData row) {
         try {
-            return converter.convert(row);
+            byte[] valueBytes = converter.convert(row);
+            if (delimiterBytes == null || valueBytes == null) {
+                return valueBytes;
+            }
+            byte[] result = Arrays.copyOf(valueBytes, valueBytes.length + 
delimiterBytes.length);
+            System.arraycopy(delimiterBytes, 0, result, valueBytes.length, 
delimiterBytes.length);
+            return result;
         } catch (IOException e) {
             throw new RuntimeException("Could not serialize row '" + row + "'. 
", e);
         }
@@ -80,12 +105,13 @@ public class RawFormatSerializationSchema implements 
SerializationSchema<RowData
         RawFormatSerializationSchema that = (RawFormatSerializationSchema) o;
         return serializedType.equals(that.serializedType)
                 && charsetName.equals(that.charsetName)
-                && isBigEndian == that.isBigEndian;
+                && isBigEndian == that.isBigEndian
+                && Objects.equals(lineDelimiter, that.lineDelimiter);
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(serializedType, charsetName, isBigEndian);
+        return Objects.hash(serializedType, charsetName, isBigEndian, 
lineDelimiter);
     }
 
     // ------------------------------------------------------------------------
diff --git 
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/formats/raw/RawFormatFactoryTest.java
 
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/formats/raw/RawFormatFactoryTest.java
index 805d88117f7..17ad29f9225 100644
--- 
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/formats/raw/RawFormatFactoryTest.java
+++ 
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/formats/raw/RawFormatFactoryTest.java
@@ -175,6 +175,29 @@ class RawFormatFactoryTest {
                 .hasMessage("The 'raw' format doesn't supports 'MAP<INT, 
STRING>' as column type.");
     }
 
+    @Test
+    void testLineDelimiterOption() {
+        final Map<String, String> tableOptions =
+                getModifiedOptions(
+                        options -> {
+                            options.put("raw.line-delimiter", "\n");
+                        });
+
+        // test deserialization schema contains line delimiter
+        final RawFormatDeserializationSchema expectedDeser =
+                new RawFormatDeserializationSchema(
+                        ROW_TYPE.getTypeAt(0), InternalTypeInfo.of(ROW_TYPE), 
"UTF-8", true, "\n");
+        DeserializationSchema<RowData> actualDeser =
+                createDeserializationSchema(SCHEMA, tableOptions);
+        assertThat(actualDeser).isEqualTo(expectedDeser);
+
+        // test serialization schema contains line delimiter
+        final RawFormatSerializationSchema expectedSer =
+                new RawFormatSerializationSchema(ROW_TYPE.getTypeAt(0), 
"UTF-8", true, "\n");
+        SerializationSchema<RowData> actualSer = 
createSerializationSchema(SCHEMA, tableOptions);
+        assertThat(actualSer).isEqualTo(expectedSer);
+    }
+
     // ------------------------------------------------------------------------
     //  Utilities
     // ------------------------------------------------------------------------
diff --git 
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/formats/raw/RawFormatLineDelimiterTest.java
 
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/formats/raw/RawFormatLineDelimiterTest.java
new file mode 100644
index 00000000000..52652121698
--- /dev/null
+++ 
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/formats/raw/RawFormatLineDelimiterTest.java
@@ -0,0 +1,298 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.formats.raw;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.formats.raw.RawFormatDeserializationSchema;
+import org.apache.flink.formats.raw.RawFormatSerializationSchema;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.types.logical.VarCharType;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.UserCodeClassLoader;
+
+import org.junit.jupiter.api.Test;
+
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Tests for {@link RawFormatDeserializationSchema} and {@link 
RawFormatSerializationSchema} with
+ * the {@code raw.line-delimiter} option.
+ */
+class RawFormatLineDelimiterTest {
+
+    private static final VarCharType STRING_TYPE = VarCharType.STRING_TYPE;
+
+    // -----------------------------------------------------------------------
+    // Deserialization tests
+    // -----------------------------------------------------------------------
+
+    @Test
+    void testDeserializeWithoutDelimiter_singleRow() throws Exception {
+        RawFormatDeserializationSchema schema =
+                new RawFormatDeserializationSchema(
+                        STRING_TYPE,
+                        TypeInformation.of(RowData.class),
+                        StandardCharsets.UTF_8.name(),
+                        true,
+                        null);
+        openDeser(schema);
+
+        List<RowData> rows = collectRows(schema, 
"hello".getBytes(StandardCharsets.UTF_8));
+        assertThat(rows).hasSize(1);
+        assertThat(rows.get(0).getString(0)).hasToString("hello");
+    }
+
+    @Test
+    void testDeserializeWithNewlineDelimiter_multipleRows() throws Exception {
+        RawFormatDeserializationSchema schema =
+                new RawFormatDeserializationSchema(
+                        STRING_TYPE,
+                        TypeInformation.of(RowData.class),
+                        StandardCharsets.UTF_8.name(),
+                        true,
+                        "\n");
+        openDeser(schema);
+
+        byte[] message = 
"line1\nline2\nline3".getBytes(StandardCharsets.UTF_8);
+        List<RowData> rows = collectRows(schema, message);
+        assertThat(rows).hasSize(3);
+        assertThat(rows.get(0).getString(0)).hasToString("line1");
+        assertThat(rows.get(1).getString(0)).hasToString("line2");
+        assertThat(rows.get(2).getString(0)).hasToString("line3");
+    }
+
+    @Test
+    void testDeserializeWithCustomMultiCharDelimiter() throws Exception {
+        RawFormatDeserializationSchema schema =
+                new RawFormatDeserializationSchema(
+                        STRING_TYPE,
+                        TypeInformation.of(RowData.class),
+                        StandardCharsets.UTF_8.name(),
+                        true,
+                        "||");
+        openDeser(schema);
+
+        byte[] message = 
"record1||record2||record3".getBytes(StandardCharsets.UTF_8);
+        List<RowData> rows = collectRows(schema, message);
+        assertThat(rows).hasSize(3);
+        assertThat(rows.get(0).getString(0)).hasToString("record1");
+        assertThat(rows.get(1).getString(0)).hasToString("record2");
+        assertThat(rows.get(2).getString(0)).hasToString("record3");
+    }
+
+    @Test
+    void testDeserializeWithNullMessage_noOutput() throws Exception {
+        RawFormatDeserializationSchema schema =
+                new RawFormatDeserializationSchema(
+                        STRING_TYPE,
+                        TypeInformation.of(RowData.class),
+                        StandardCharsets.UTF_8.name(),
+                        true,
+                        "\n");
+        openDeser(schema);
+
+        List<RowData> rows = collectRows(schema, null);
+        assertThat(rows).isEmpty();
+    }
+
+    @Test
+    void testDeserializeWithGbkCharset() throws Exception {
+        Charset gbk = Charset.forName("GBK");
+        String original = "你好\n世界";
+        byte[] message = original.getBytes(gbk);
+
+        RawFormatDeserializationSchema schema =
+                new RawFormatDeserializationSchema(
+                        STRING_TYPE, TypeInformation.of(RowData.class), "GBK", 
true, "\n");
+        openDeser(schema);
+
+        List<RowData> rows = collectRows(schema, message);
+        assertThat(rows).hasSize(2);
+        assertThat(rows.get(0).getString(0)).hasToString("你好");
+        assertThat(rows.get(1).getString(0)).hasToString("世界");
+    }
+
+    // -----------------------------------------------------------------------
+    // Serialization tests
+    // -----------------------------------------------------------------------
+
+    @Test
+    void testSerializeWithoutDelimiter_noAppend() throws Exception {
+        RawFormatSerializationSchema schema =
+                new RawFormatSerializationSchema(
+                        STRING_TYPE, StandardCharsets.UTF_8.name(), true, 
null);
+        openSer(schema);
+
+        RowData row = buildStringRow("hello");
+        byte[] result = schema.serialize(row);
+        assertThat(result).isEqualTo("hello".getBytes(StandardCharsets.UTF_8));
+    }
+
+    @Test
+    void testSerializeWithNewlineDelimiter_appendsDelimiter() throws Exception 
{
+        RawFormatSerializationSchema schema =
+                new RawFormatSerializationSchema(
+                        STRING_TYPE, StandardCharsets.UTF_8.name(), true, 
"\n");
+        openSer(schema);
+
+        RowData row = buildStringRow("hello");
+        byte[] result = schema.serialize(row);
+        
assertThat(result).isEqualTo("hello\n".getBytes(StandardCharsets.UTF_8));
+    }
+
+    @Test
+    void testSerializeWithCustomDelimiter_appendsDelimiter() throws Exception {
+        RawFormatSerializationSchema schema =
+                new RawFormatSerializationSchema(
+                        STRING_TYPE, StandardCharsets.UTF_8.name(), true, 
"||");
+        openSer(schema);
+
+        RowData row = buildStringRow("record1");
+        byte[] result = schema.serialize(row);
+        
assertThat(result).isEqualTo("record1||".getBytes(StandardCharsets.UTF_8));
+    }
+
+    @Test
+    void testSerializeNullRow_returnsNull() throws Exception {
+        RawFormatSerializationSchema schema =
+                new RawFormatSerializationSchema(
+                        STRING_TYPE, StandardCharsets.UTF_8.name(), true, 
"\n");
+        openSer(schema);
+
+        GenericRowData nullRow = new GenericRowData(1);
+        nullRow.setField(0, null);
+        byte[] result = schema.serialize(nullRow);
+        assertThat(result).isNull();
+    }
+
+    @Test
+    void testDeserializeTrailingDelimiter_noExtraRow() throws Exception {
+        // Verify that a message ending with the delimiter does not produce a 
trailing empty row.
+        // This ensures round-trip compatibility: serialize("hello") -> 
"hello\n" ->
+        // deserialize -> ["hello"] (1 row, not 2).
+        RawFormatDeserializationSchema schema =
+                new RawFormatDeserializationSchema(
+                        STRING_TYPE,
+                        TypeInformation.of(RowData.class),
+                        StandardCharsets.UTF_8.name(),
+                        true,
+                        "\n");
+        openDeser(schema);
+
+        // Message already ends with the delimiter (as produced by the 
serializer)
+        byte[] message = "hello\n".getBytes(StandardCharsets.UTF_8);
+        List<RowData> rows = collectRows(schema, message);
+        assertThat(rows).hasSize(1);
+        assertThat(rows.get(0).getString(0)).hasToString("hello");
+    }
+
+    @Test
+    void testRoundTrip_serializeThenDeserialize() throws Exception {
+        // Verify that rows written by the serializer can be read back 
correctly by the
+        // deserializer when both share the same delimiter configuration.
+        RawFormatSerializationSchema ser =
+                new RawFormatSerializationSchema(
+                        STRING_TYPE, StandardCharsets.UTF_8.name(), true, 
"\n");
+        openSer(ser);
+
+        RawFormatDeserializationSchema deser =
+                new RawFormatDeserializationSchema(
+                        STRING_TYPE,
+                        TypeInformation.of(RowData.class),
+                        StandardCharsets.UTF_8.name(),
+                        true,
+                        "\n");
+        openDeser(deser);
+
+        // Serialize a single row -> "hello\n"
+        byte[] serialized = ser.serialize(buildStringRow("hello"));
+
+        // Deserialize "hello\n" -> should yield exactly 1 row
+        List<RowData> rows = collectRows(deser, serialized);
+        assertThat(rows).hasSize(1);
+        assertThat(rows.get(0).getString(0)).hasToString("hello");
+    }
+
+    // -----------------------------------------------------------------------
+    // Helpers
+    // -----------------------------------------------------------------------
+
+    private void openDeser(RawFormatDeserializationSchema schema) throws 
Exception {
+        schema.open(
+                new DeserializationSchema.InitializationContext() {
+                    @Override
+                    public MetricGroup getMetricGroup() {
+                        throw new UnsupportedOperationException();
+                    }
+
+                    @Override
+                    public UserCodeClassLoader getUserCodeClassLoader() {
+                        throw new UnsupportedOperationException();
+                    }
+                });
+    }
+
+    private void openSer(RawFormatSerializationSchema schema) throws Exception 
{
+        schema.open(
+                new SerializationSchema.InitializationContext() {
+                    @Override
+                    public MetricGroup getMetricGroup() {
+                        throw new UnsupportedOperationException();
+                    }
+
+                    @Override
+                    public UserCodeClassLoader getUserCodeClassLoader() {
+                        throw new UnsupportedOperationException();
+                    }
+                });
+    }
+
+    private List<RowData> collectRows(RawFormatDeserializationSchema schema, 
byte[] message)
+            throws Exception {
+        List<RowData> rows = new ArrayList<>();
+        schema.deserialize(
+                message,
+                new Collector<RowData>() {
+                    @Override
+                    public void collect(RowData record) {
+                        rows.add(record);
+                    }
+
+                    @Override
+                    public void close() {}
+                });
+        return rows;
+    }
+
+    private RowData buildStringRow(String value) {
+        GenericRowData row = new GenericRowData(1);
+        row.setField(0, StringData.fromString(value));
+        return row;
+    }
+}

Reply via email to