This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch release-0.2
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/release-0.2 by this push:
     new aa684608 [FLINK-29700] Serializer to BinaryInMemorySortBuffer is wrong
aa684608 is described below

commit aa6846089afc61e6715a7d50ae40e6bb9d8efc0f
Author: Jingsong Lee <jingsongl...@gmail.com>
AuthorDate: Fri Oct 21 14:52:06 2022 +0800

    [FLINK-29700] Serializer to BinaryInMemorySortBuffer is wrong
    
    This closes #325
---
 .../table/store/file/memory/sort/BinaryInMemorySortBuffer.java | 10 ++++++----
 .../flink/table/store/file/mergetree/SortBufferMemTable.java   |  2 --
 2 files changed, 6 insertions(+), 6 deletions(-)

diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/memory/sort/BinaryInMemorySortBuffer.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/memory/sort/BinaryInMemorySortBuffer.java
index 951e46bd..8bf12f3c 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/memory/sort/BinaryInMemorySortBuffer.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/memory/sort/BinaryInMemorySortBuffer.java
@@ -65,7 +65,6 @@ public class BinaryInMemorySortBuffer extends 
BinaryIndexedSortable {
     public static BinaryInMemorySortBuffer createBuffer(
             NormalizedKeyComputer normalizedKeyComputer,
             AbstractRowDataSerializer<RowData> inputSerializer,
-            BinaryRowDataSerializer serializer,
             RecordComparator comparator,
             MemorySegmentPool memoryPool) {
         checkArgument(memoryPool.freePages() >= MIN_REQUIRED_BUFFERS);
@@ -73,7 +72,6 @@ public class BinaryInMemorySortBuffer extends 
BinaryIndexedSortable {
         return new BinaryInMemorySortBuffer(
                 normalizedKeyComputer,
                 inputSerializer,
-                serializer,
                 comparator,
                 recordBufferSegments,
                 new SimpleCollectingOutputView(
@@ -84,12 +82,16 @@ public class BinaryInMemorySortBuffer extends 
BinaryIndexedSortable {
     private BinaryInMemorySortBuffer(
             NormalizedKeyComputer normalizedKeyComputer,
             AbstractRowDataSerializer<RowData> inputSerializer,
-            BinaryRowDataSerializer serializer,
             RecordComparator comparator,
             ArrayList<MemorySegment> recordBufferSegments,
             SimpleCollectingOutputView recordCollector,
             MemorySegmentPool pool) {
-        super(normalizedKeyComputer, serializer, comparator, 
recordBufferSegments, pool);
+        super(
+                normalizedKeyComputer,
+                new BinaryRowDataSerializer(inputSerializer.getArity()),
+                comparator,
+                recordBufferSegments,
+                pool);
         this.inputSerializer = inputSerializer;
         this.recordBufferSegments = recordBufferSegments;
         this.recordCollector = recordCollector;
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/SortBufferMemTable.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/SortBufferMemTable.java
index 53406885..1ee5f95a 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/SortBufferMemTable.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/SortBufferMemTable.java
@@ -24,7 +24,6 @@ import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.binary.BinaryRowData;
 import org.apache.flink.table.runtime.generated.NormalizedKeyComputer;
 import org.apache.flink.table.runtime.generated.RecordComparator;
-import org.apache.flink.table.runtime.typeutils.BinaryRowDataSerializer;
 import org.apache.flink.table.runtime.typeutils.InternalSerializers;
 import org.apache.flink.table.runtime.util.MemorySegmentPool;
 import org.apache.flink.table.store.codegen.CodeGenUtils;
@@ -78,7 +77,6 @@ public class SortBufferMemTable implements MemTable {
                 BinaryInMemorySortBuffer.createBuffer(
                         normalizedKeyComputer,
                         InternalSerializers.create(KeyValue.schema(keyType, 
valueType)),
-                        new BinaryRowDataSerializer(sortKeyTypes.size()),
                         keyComparator,
                         memoryPool);
     }

Reply via email to