This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch release-1.19
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.19 by this push:
     new e7816f714ef [FLINK-35097][table] Fix 'raw' format deserialization
e7816f714ef is described below

commit e7816f714ef5298e1ca978aeddf62732794bb93f
Author: Kumar Mallikarjuna <kumarmallikarjuna.w...@gmail.com>
AuthorDate: Sun Apr 14 19:00:58 2024 +0530

    [FLINK-35097][table] Fix 'raw' format deserialization
---
 .../raw/RawFormatDeserializationSchema.java        |   9 +-
 .../formats/raw/RawFormatSerDeSchemaTest.java      | 128 +++++++++++++--------
 2 files changed, 81 insertions(+), 56 deletions(-)

diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/formats/raw/RawFormatDeserializationSchema.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/formats/raw/RawFormatDeserializationSchema.java
index daa3d508857..94d1cb30a8e 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/formats/raw/RawFormatDeserializationSchema.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/formats/raw/RawFormatDeserializationSchema.java
@@ -59,8 +59,6 @@ public class RawFormatDeserializationSchema implements 
DeserializationSchema<Row
 
     private final DataLengthValidator validator;
 
-    private transient GenericRowData reuse;
-
     public RawFormatDeserializationSchema(
             LogicalType deserializedType,
             TypeInformation<RowData> producedTypeInfo,
@@ -76,7 +74,6 @@ public class RawFormatDeserializationSchema implements 
DeserializationSchema<Row
 
     @Override
     public void open(InitializationContext context) throws Exception {
-        reuse = new GenericRowData(1);
         converter.open();
     }
 
@@ -89,8 +86,10 @@ public class RawFormatDeserializationSchema implements 
DeserializationSchema<Row
             validator.validate(message);
             field = converter.convert(message);
         }
-        reuse.setField(0, field);
-        return reuse;
+
+        GenericRowData rowData = new GenericRowData(1);
+        rowData.setField(0, field);
+        return rowData;
     }
 
     @Override
diff --git 
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/formats/raw/RawFormatSerDeSchemaTest.java
 
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/formats/raw/RawFormatSerDeSchemaTest.java
index 10c97497cd3..fd78c8c6638 100644
--- 
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/formats/raw/RawFormatSerDeSchemaTest.java
+++ 
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/formats/raw/RawFormatSerDeSchemaTest.java
@@ -39,6 +39,7 @@ import org.junit.runners.Parameterized;
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 import java.time.LocalDateTime;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 
@@ -67,76 +68,79 @@ public class RawFormatSerDeSchemaTest {
     @Parameterized.Parameters(name = "{index}: {0}")
     public static List<TestSpec> testData() {
         return Arrays.asList(
-                TestSpec.type(TINYINT()).value(Byte.MAX_VALUE).binary(new 
byte[] {Byte.MAX_VALUE}),
-                
TestSpec.type(SMALLINT()).value(Short.MAX_VALUE).binary(hexStringToByte("7fff")),
+                TestSpec.type(TINYINT()).values(Byte.MAX_VALUE).binary(new 
byte[] {Byte.MAX_VALUE}),
+                
TestSpec.type(SMALLINT()).values(Short.MAX_VALUE).binary(hexStringToByte("7fff")),
                 TestSpec.type(SMALLINT())
-                        .value(Short.MAX_VALUE)
+                        .values(Short.MAX_VALUE)
                         .withLittleEndian()
                         .binary(hexStringToByte("ff7f")),
-                
TestSpec.type(INT()).value(Integer.MAX_VALUE).binary(hexStringToByte("7fffffff")),
+                
TestSpec.type(INT()).values(Integer.MAX_VALUE).binary(hexStringToByte("7fffffff")),
                 TestSpec.type(INT())
-                        .value(Integer.MAX_VALUE)
+                        .values(Integer.MAX_VALUE)
                         .withLittleEndian()
                         .binary(hexStringToByte("ffffff7f")),
                 TestSpec.type(BIGINT())
-                        .value(Long.MAX_VALUE)
+                        .values(Long.MAX_VALUE)
                         .binary(hexStringToByte("7fffffffffffffff")),
                 TestSpec.type(BIGINT())
-                        .value(Long.MAX_VALUE)
+                        .values(Long.MAX_VALUE)
                         .withLittleEndian()
                         .binary(hexStringToByte("ffffffffffffff7f")),
-                
TestSpec.type(FLOAT()).value(Float.MAX_VALUE).binary(hexStringToByte("7f7fffff")),
+                
TestSpec.type(FLOAT()).values(Float.MAX_VALUE).binary(hexStringToByte("7f7fffff")),
                 TestSpec.type(FLOAT())
-                        .value(Float.MAX_VALUE)
+                        .values(Float.MAX_VALUE)
                         .withLittleEndian()
                         .binary(hexStringToByte("ffff7f7f")),
                 TestSpec.type(DOUBLE())
-                        .value(Double.MAX_VALUE)
+                        .values(Double.MAX_VALUE)
                         .binary(hexStringToByte("7fefffffffffffff")),
                 TestSpec.type(DOUBLE())
-                        .value(Double.MAX_VALUE)
+                        .values(Double.MAX_VALUE)
                         .withLittleEndian()
                         .binary(hexStringToByte("ffffffffffffef7f")),
-                TestSpec.type(BOOLEAN()).value(true).binary(new byte[] {1}),
-                TestSpec.type(BOOLEAN()).value(false).binary(new byte[] {0}),
-                TestSpec.type(STRING()).value("Hello World").binary("Hello 
World".getBytes()),
+                TestSpec.type(BOOLEAN()).values(true).binary(new byte[] {1}),
+                TestSpec.type(BOOLEAN()).values(false).binary(new byte[] {0}),
+                TestSpec.type(STRING()).values("Hello World").binary("Hello 
World".getBytes()),
                 TestSpec.type(STRING())
-                        .value("你好世界,Hello World")
+                        .values("你好世界,Hello World")
                         .binary("你好世界,Hello World".getBytes()),
                 TestSpec.type(STRING())
-                        .value("Flink Awesome!")
+                        .values("Flink Awesome!")
                         .withCharset("UTF-16")
                         .binary("Flink 
Awesome!".getBytes(StandardCharsets.UTF_16)),
                 TestSpec.type(STRING())
-                        .value("Flink 帅哭!")
+                        .values("Flink 帅哭!")
                         .withCharset("UTF-16")
                         .binary("Flink 帅哭!".getBytes(StandardCharsets.UTF_16)),
-                TestSpec.type(STRING()).value("").binary("".getBytes()),
-                
TestSpec.type(VARCHAR(5)).value("HELLO").binary("HELLO".getBytes()),
+                TestSpec.type(STRING()).values("").binary("".getBytes()),
+                
TestSpec.type(VARCHAR(5)).values("HELLO").binary("HELLO".getBytes()),
+                TestSpec.type(STRING())
+                        .values("line 1", "line 2", "line 3")
+                        .binary("line 1".getBytes(), "line 2".getBytes(), 
"line 3".getBytes()),
                 TestSpec.type(BYTES())
-                        .value(new byte[] {1, 3, 5, 7, 9})
+                        .values(new byte[] {1, 3, 5, 7, 9})
                         .binary(new byte[] {1, 3, 5, 7, 9}),
-                TestSpec.type(BYTES()).value(new byte[] {}).binary(new byte[] 
{}),
-                TestSpec.type(BINARY(3)).value(new byte[] {1, 3, 
5}).binary(new byte[] {1, 3, 5}),
+                TestSpec.type(BYTES()).values(new byte[] {}).binary(new byte[] 
{}),
+                TestSpec.type(BINARY(3)).values(new byte[] {1, 3, 
5}).binary(new byte[] {1, 3, 5}),
                 TestSpec.type(RAW(LocalDateTime.class, new 
LocalDateTimeSerializer()))
-                        .value(LocalDateTime.parse("2020-11-11T18:08:01.123"))
+                        .values(LocalDateTime.parse("2020-11-11T18:08:01.123"))
                         .binary(
                                 serializeLocalDateTime(
                                         
LocalDateTime.parse("2020-11-11T18:08:01.123"))),
 
                 // test nulls
-                TestSpec.type(TINYINT()).value(null).binary(null),
-                TestSpec.type(SMALLINT()).value(null).binary(null),
-                TestSpec.type(INT()).value(null).binary(null),
-                TestSpec.type(BIGINT()).value(null).binary(null),
-                TestSpec.type(FLOAT()).value(null).binary(null),
-                TestSpec.type(DOUBLE()).value(null).binary(null),
-                TestSpec.type(BOOLEAN()).value(null).binary(null),
-                TestSpec.type(STRING()).value(null).binary(null),
-                TestSpec.type(BYTES()).value(null).binary(null),
+                TestSpec.type(TINYINT()).values((Object) null).binary((byte[]) 
null),
+                TestSpec.type(SMALLINT()).values((Object) 
null).binary((byte[]) null),
+                TestSpec.type(INT()).values((Object) null).binary((byte[]) 
null),
+                TestSpec.type(BIGINT()).values((Object) null).binary((byte[]) 
null),
+                TestSpec.type(FLOAT()).values((Object) null).binary((byte[]) 
null),
+                TestSpec.type(DOUBLE()).values((Object) null).binary((byte[]) 
null),
+                TestSpec.type(BOOLEAN()).values((Object) null).binary((byte[]) 
null),
+                TestSpec.type(STRING()).values((Object) null).binary((byte[]) 
null),
+                TestSpec.type(BYTES()).values((Object) null).binary((byte[]) 
null),
                 TestSpec.type(RAW(LocalDateTime.class, new 
LocalDateTimeSerializer()))
-                        .value(null)
-                        .binary(null));
+                        .values((Object) null)
+                        .binary((byte[]) null));
     }
 
     @Parameterized.Parameter public TestSpec testSpec;
@@ -155,17 +159,35 @@ public class RawFormatSerDeSchemaTest {
         
deserializationSchema.open(mock(DeserializationSchema.InitializationContext.class));
         
serializationSchema.open(mock(SerializationSchema.InitializationContext.class));
 
-        Row row = Row.of(testSpec.value);
         DataStructureConverter<Object, Object> converter =
                 DataStructureConverters.getConverter(ROW(FIELD("single", 
testSpec.type)));
-        RowData originalRowData = (RowData) converter.toInternal(row);
 
-        byte[] serializedBytes = 
serializationSchema.serialize(originalRowData);
-        assertThat(serializedBytes).isEqualTo(testSpec.binary);
+        byte[][] serializedBytesArr = new byte[testSpec.values.length][];
+        RowData[] deserializedRowDataArr = new RowData[testSpec.values.length];
+
+        // The following loops are partitioned to ensure the 
serialized/deserialized
+        // values are not copied by reference. (see FLINK-35097)
+
+        // Process serialization
+        for (int i = 0; i < testSpec.values.length; i++) {
+            Row row = Row.of(testSpec.values[i]);
+            RowData originalRowData = (RowData) converter.toInternal(row);
+            serializedBytesArr[i] = 
serializationSchema.serialize(originalRowData);
+        }
+
+        // Test serialization and process deserialization
+        for (int i = 0; i < testSpec.values.length; i++) {
+            assertThat(serializedBytesArr[i]).isEqualTo(testSpec.binary[i]);
 
-        RowData deserializeRowData = 
deserializationSchema.deserialize(serializedBytes);
-        Row actual = (Row) converter.toExternal(deserializeRowData);
-        assertThat(actual).isEqualTo(row);
+            deserializedRowDataArr[i] = 
deserializationSchema.deserialize(serializedBytesArr[i]);
+        }
+
+        // Test deserialization
+        for (int i = 0; i < testSpec.values.length; i++) {
+            Row row = Row.of(testSpec.values[i]);
+            Row actual = (Row) converter.toExternal(deserializedRowDataArr[i]);
+            assertThat(actual).isEqualTo(row);
+        }
     }
 
     private static byte[] serializeLocalDateTime(LocalDateTime localDateTime) {
@@ -183,9 +205,9 @@ public class RawFormatSerDeSchemaTest {
 
     private static class TestSpec {
 
-        private Object value;
-        private byte[] binary;
-        private DataType type;
+        private Object[] values;
+        private byte[][] binary;
+        private final DataType type;
         private String charsetName = "UTF-8";
         private boolean isBigEndian = true;
 
@@ -197,12 +219,12 @@ public class RawFormatSerDeSchemaTest {
             return new TestSpec(fieldType);
         }
 
-        public TestSpec value(Object value) {
-            this.value = value;
+        public TestSpec values(Object... values) {
+            this.values = values;
             return this;
         }
 
-        public TestSpec binary(byte[] bytes) {
+        public TestSpec binary(byte[]... bytes) {
             this.binary = bytes;
             return this;
         }
@@ -219,12 +241,16 @@ public class RawFormatSerDeSchemaTest {
 
         @Override
         public String toString() {
-            String hex = binary == null ? "null" : "0x" + 
StringUtils.byteToHexString(binary);
+            ArrayList<String> hexes = new ArrayList<>();
+            for (byte[] b : binary) {
+                hexes.add(b == null ? "" : "0x" + 
StringUtils.byteToHexString(b));
+            }
+
             return "TestSpec{"
-                    + "value="
-                    + value
+                    + "values="
+                    + Arrays.toString(values)
                     + ", binary="
-                    + hex
+                    + hexes
                     + ", type="
                     + type
                     + ", charsetName='"

Reply via email to