This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/master by this push:
     new 3e476570 [FLINK-30373] Flink-table-runtime free for 
flink-table-store-codegen
3e476570 is described below

commit 3e47657054e75d714cd941f502a899e5059cffac
Author: Jingsong Lee <jingsongl...@gmail.com>
AuthorDate: Tue Dec 13 11:27:03 2022 +0800

    [FLINK-30373] Flink-table-runtime free for flink-table-store-codegen
    
    This closes #432
---
 .../table/store/codegen/CodeGeneratorImpl.java     |   9 +-
 .../ComparatorCodeGenerator.scala                  |   8 +-
 .../ProjectionCodeGenerator.scala                  |   9 +-
 .../SortCodeGenerator.scala                        |   5 +-
 .../flink/table/store/codegen/CodeGenerator.java   |  16 +-
 .../flink/table/store/codegen/CompileUtils.java    | 140 ++++++++++++
 .../flink/table/store/codegen/GeneratedClass.java  | 125 ++++++++++
 .../table/store/codegen/NormalizedKeyComputer.java |  47 ++++
 .../flink/table/store/codegen/Projection.java      |  31 +++
 .../table/store/codegen/RecordComparator.java      |  35 +++
 .../flink/table/store/codegen/CodeGenUtils.java    |  25 +-
 .../file/mergetree/SortBufferWriteBuffer.java      |   4 +-
 .../store/file/sort/BinaryExternalMerger.java      |  96 ++++++++
 .../store/file/sort/BinaryExternalSortBuffer.java  |   3 +-
 .../store/file/sort/BinaryInMemorySortBuffer.java  |   5 +-
 .../store/file/sort/BinaryIndexedSortable.java     | 254 +++++++++++++++++++++
 .../store/file/utils/KeyComparatorSupplier.java    |   6 +-
 .../table/store/table/sink/BucketComputer.java     |   8 +-
 .../store/table/sink/SinkRecordConverter.java      |   8 +-
 .../store/file/sort/IntNormalizedKeyComputer.java  |   2 +-
 .../table/store/file/sort/IntRecordComparator.java |   2 +-
 21 files changed, 774 insertions(+), 64 deletions(-)

diff --git 
a/flink-table-store-codegen/src/main/java/org/apache/flink/table/store/codegen/CodeGeneratorImpl.java
 
b/flink-table-store-codegen/src/main/java/org/apache/flink/table/store/codegen/CodeGeneratorImpl.java
index 846ee308..6cacd170 100644
--- 
a/flink-table-store-codegen/src/main/java/org/apache/flink/table/store/codegen/CodeGeneratorImpl.java
+++ 
b/flink-table-store-codegen/src/main/java/org/apache/flink/table/store/codegen/CodeGeneratorImpl.java
@@ -18,9 +18,6 @@
 
 package org.apache.flink.table.store.codegen;
 
-import org.apache.flink.table.runtime.generated.GeneratedNormalizedKeyComputer;
-import org.apache.flink.table.runtime.generated.GeneratedProjection;
-import org.apache.flink.table.runtime.generated.GeneratedRecordComparator;
 import org.apache.flink.table.store.utils.TypeUtils;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.RowType;
@@ -31,7 +28,7 @@ import java.util.List;
 public class CodeGeneratorImpl implements CodeGenerator {
 
     @Override
-    public GeneratedProjection generateProjection(
+    public GeneratedClass<Projection> generateProjection(
             String name, RowType inputType, int[] inputMapping) {
         RowType outputType = TypeUtils.project(inputType, inputMapping);
         return ProjectionCodeGenerator.generateProjection(
@@ -39,7 +36,7 @@ public class CodeGeneratorImpl implements CodeGenerator {
     }
 
     @Override
-    public GeneratedNormalizedKeyComputer generateNormalizedKeyComputer(
+    public GeneratedClass<NormalizedKeyComputer> generateNormalizedKeyComputer(
             List<LogicalType> fieldTypes, String name) {
         return new SortCodeGenerator(
                         RowType.of(fieldTypes.toArray(new LogicalType[0])),
@@ -48,7 +45,7 @@ public class CodeGeneratorImpl implements CodeGenerator {
     }
 
     @Override
-    public GeneratedRecordComparator generateRecordComparator(
+    public GeneratedClass<RecordComparator> generateRecordComparator(
             List<LogicalType> fieldTypes, String name) {
         return ComparatorCodeGenerator.gen(
                 name,
diff --git 
a/flink-table-store-codegen/src/main/scala/org.apache.flink.table.store.codegen/ComparatorCodeGenerator.scala
 
b/flink-table-store-codegen/src/main/scala/org.apache.flink.table.store.codegen/ComparatorCodeGenerator.scala
index ee327b34..093e5e17 100644
--- 
a/flink-table-store-codegen/src/main/scala/org.apache.flink.table.store.codegen/ComparatorCodeGenerator.scala
+++ 
b/flink-table-store-codegen/src/main/scala/org.apache.flink.table.store.codegen/ComparatorCodeGenerator.scala
@@ -17,7 +17,6 @@
  */
 package org.apache.flink.table.store.codegen
 
-import org.apache.flink.table.runtime.generated.{GeneratedRecordComparator, 
RecordComparator}
 import org.apache.flink.table.store.codegen.GenerateUtils.{newName, ROW_DATA}
 import org.apache.flink.table.types.logical.RowType
 
@@ -37,7 +36,10 @@ object ComparatorCodeGenerator {
    * @return
    *   A GeneratedRecordComparator
    */
-  def gen(name: String, inputType: RowType, sortSpec: SortSpec): 
GeneratedRecordComparator = {
+  def gen(
+      name: String,
+      inputType: RowType,
+      sortSpec: SortSpec): GeneratedClass[RecordComparator] = {
     val className = newName(name)
     val baseClass = classOf[RecordComparator]
 
@@ -65,7 +67,7 @@ object ComparatorCodeGenerator {
       }
       """.stripMargin
 
-    new GeneratedRecordComparator(className, code, ctx.references.toArray)
+    new GeneratedClass(className, code, ctx.references.toArray)
   }
 
 }
diff --git 
a/flink-table-store-codegen/src/main/scala/org.apache.flink.table.store.codegen/ProjectionCodeGenerator.scala
 
b/flink-table-store-codegen/src/main/scala/org.apache.flink.table.store.codegen/ProjectionCodeGenerator.scala
index f536b082..8a8b6548 100644
--- 
a/flink-table-store-codegen/src/main/scala/org.apache.flink.table.store.codegen/ProjectionCodeGenerator.scala
+++ 
b/flink-table-store-codegen/src/main/scala/org.apache.flink.table.store.codegen/ProjectionCodeGenerator.scala
@@ -19,7 +19,6 @@ package org.apache.flink.table.store.codegen
 
 import org.apache.flink.table.data.RowData
 import org.apache.flink.table.data.binary.BinaryRowData
-import org.apache.flink.table.runtime.generated.{GeneratedProjection, 
Projection}
 import org.apache.flink.table.store.codegen.GeneratedExpression.{NEVER_NULL, 
NO_CODE}
 import 
org.apache.flink.table.store.codegen.GenerateUtils.{generateRecordStatement, 
newName, DEFAULT_INPUT1_TERM, DEFAULT_OUT_RECORD_TERM, 
DEFAULT_OUT_RECORD_WRITER_TERM, ROW_DATA}
 import org.apache.flink.table.types.logical.RowType
@@ -85,9 +84,9 @@ object ProjectionCodeGenerator {
       inputTerm: String = DEFAULT_INPUT1_TERM,
       outRecordTerm: String = DEFAULT_OUT_RECORD_TERM,
       outRecordWriterTerm: String = DEFAULT_OUT_RECORD_WRITER_TERM,
-      reusedOutRecord: Boolean = true): GeneratedProjection = {
+      reusedOutRecord: Boolean = true): GeneratedClass[Projection] = {
     val className = newName(name)
-    val baseClass = classOf[Projection[_, _]]
+    val baseClass = classOf[Projection]
 
     val expression = generateProjectionExpression(
       ctx,
@@ -119,7 +118,7 @@ object ProjectionCodeGenerator {
          |}
         """.stripMargin
 
-    new GeneratedProjection(className, code, ctx.references.toArray)
+    new GeneratedClass(className, code, ctx.references.toArray)
   }
 
   /** For java invoke. */
@@ -128,7 +127,7 @@ object ProjectionCodeGenerator {
       name: String,
       inputType: RowType,
       outputType: RowType,
-      inputMapping: Array[Int]): GeneratedProjection =
+      inputMapping: Array[Int]): GeneratedClass[Projection] =
     generateProjection(
       ctx,
       name,
diff --git 
a/flink-table-store-codegen/src/main/scala/org.apache.flink.table.store.codegen/SortCodeGenerator.scala
 
b/flink-table-store-codegen/src/main/scala/org.apache.flink.table.store.codegen/SortCodeGenerator.scala
index b8b43d9c..70a12c72 100644
--- 
a/flink-table-store-codegen/src/main/scala/org.apache.flink.table.store.codegen/SortCodeGenerator.scala
+++ 
b/flink-table-store-codegen/src/main/scala/org.apache.flink.table.store.codegen/SortCodeGenerator.scala
@@ -19,7 +19,6 @@ package org.apache.flink.table.store.codegen
 
 import org.apache.flink.table.data.{DecimalData, TimestampData}
 import org.apache.flink.table.data.binary.BinaryRowData
-import 
org.apache.flink.table.runtime.generated.{GeneratedNormalizedKeyComputer, 
NormalizedKeyComputer, RecordComparator}
 import org.apache.flink.table.runtime.operators.sort.SortUtil
 import org.apache.flink.table.runtime.types.PlannerTypeUtils
 import org.apache.flink.table.store.codegen.GenerateUtils.{newName, ROW_DATA, 
SEGMENT}
@@ -119,7 +118,7 @@ class SortCodeGenerator(val input: RowType, val sortSpec: 
SortSpec) {
    * @return
    *   A GeneratedNormalizedKeyComputer
    */
-  def generateNormalizedKeyComputer(name: String): 
GeneratedNormalizedKeyComputer = {
+  def generateNormalizedKeyComputer(name: String): 
GeneratedClass[NormalizedKeyComputer] = {
 
     val className = newName(name)
 
@@ -179,7 +178,7 @@ class SortCodeGenerator(val input: RowType, val sortSpec: 
SortSpec) {
       }
     """.stripMargin
 
-    new GeneratedNormalizedKeyComputer(className, code)
+    new GeneratedClass(className, code)
   }
 
   def generatePutNormalizedKeys(numKeyBytes: Int): mutable.ArrayBuffer[String] 
= {
diff --git 
a/flink-table-store-common/src/main/java/org/apache/flink/table/store/codegen/CodeGenerator.java
 
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/codegen/CodeGenerator.java
index 83a2a3ea..977419f0 100644
--- 
a/flink-table-store-common/src/main/java/org/apache/flink/table/store/codegen/CodeGenerator.java
+++ 
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/codegen/CodeGenerator.java
@@ -18,10 +18,6 @@
 
 package org.apache.flink.table.store.codegen;
 
-import org.apache.flink.table.runtime.generated.GeneratedClass;
-import org.apache.flink.table.runtime.generated.GeneratedNormalizedKeyComputer;
-import org.apache.flink.table.runtime.generated.GeneratedProjection;
-import org.apache.flink.table.runtime.generated.GeneratedRecordComparator;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.RowType;
 
@@ -30,24 +26,26 @@ import java.util.List;
 /** {@link GeneratedClass} generator. */
 public interface CodeGenerator {
 
-    GeneratedProjection generateProjection(String name, RowType inputType, 
int[] inputMapping);
+    GeneratedClass<Projection> generateProjection(
+            String name, RowType inputType, int[] inputMapping);
 
     /**
-     * Generate a {@link GeneratedNormalizedKeyComputer}.
+     * Generate a {@link NormalizedKeyComputer}.
      *
      * @param fieldTypes Both the input row field types and the sort key field 
types. Records are
      *     compared by the first field, then the second field, then the third 
field and so on. All
      *     fields are compared in ascending order.
      */
-    GeneratedNormalizedKeyComputer generateNormalizedKeyComputer(
+    GeneratedClass<NormalizedKeyComputer> generateNormalizedKeyComputer(
             List<LogicalType> fieldTypes, String name);
 
     /**
-     * Generate a {@link GeneratedRecordComparator}.
+     * Generate a {@link RecordComparator}.
      *
      * @param fieldTypes Both the input row field types and the sort key field 
types. Records are *
      *     compared by the first field, then the second field, then the third 
field and so on. All *
      *     fields are compared in ascending order.
      */
-    GeneratedRecordComparator generateRecordComparator(List<LogicalType> 
fieldTypes, String name);
+    GeneratedClass<RecordComparator> generateRecordComparator(
+            List<LogicalType> fieldTypes, String name);
 }
diff --git 
a/flink-table-store-common/src/main/java/org/apache/flink/table/store/codegen/CompileUtils.java
 
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/codegen/CompileUtils.java
new file mode 100644
index 00000000..8e63eab9
--- /dev/null
+++ 
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/codegen/CompileUtils.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.codegen;
+
+import org.apache.flink.api.common.InvalidProgramException;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import org.apache.flink.shaded.guava30.com.google.common.cache.Cache;
+import org.apache.flink.shaded.guava30.com.google.common.cache.CacheBuilder;
+
+import org.codehaus.janino.SimpleCompiler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.Objects;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** Utilities to compile a generated code to a Class. Copied from Flink. */
+public final class CompileUtils {
+
+    // used for logging the generated codes to a same place
+    private static final Logger CODE_LOG = 
LoggerFactory.getLogger(CompileUtils.class);
+
+    /**
+     * Cache of compile, Janino generates a new Class Loader and a new Class 
file every compile
+     * (guaranteeing that the class name will not be repeated). This leads to 
multiple tasks of the
+     * same process that generate a large number of duplicate class, resulting 
in a large number of
+     * Meta zone GC (class unloading), resulting in performance bottlenecks. 
So we add a cache to
+     * avoid this problem.
+     */
+    static final Cache<ClassKey, Class<?>> COMPILED_CLASS_CACHE =
+            CacheBuilder.newBuilder()
+                    // estimated maximum planning/startup time
+                    .expireAfterAccess(Duration.ofMinutes(5))
+                    // estimated cache size
+                    .maximumSize(300)
+                    .softValues()
+                    .build();
+
+    /**
+     * Compiles a generated code to a Class.
+     *
+     * @param cl the ClassLoader used to load the class
+     * @param name the class name
+     * @param code the generated code
+     * @param <T> the class type
+     * @return the compiled class
+     */
+    @SuppressWarnings("unchecked")
+    public static <T> Class<T> compile(ClassLoader cl, String name, String 
code) {
+        try {
+            // The class name is part of the "code" and makes the string 
unique,
+            // to prevent class leaks we don't cache the class loader directly
+            // but only its hash code
+            final ClassKey classKey = new ClassKey(cl.hashCode(), code);
+            return (Class<T>) COMPILED_CLASS_CACHE.get(classKey, () -> 
doCompile(cl, name, code));
+        } catch (Exception e) {
+            throw new FlinkRuntimeException(e.getMessage(), e);
+        }
+    }
+
+    private static <T> Class<T> doCompile(ClassLoader cl, String name, String 
code) {
+        checkNotNull(cl, "Classloader must not be null.");
+        CODE_LOG.debug("Compiling: {} \n\n Code:\n{}", name, code);
+        SimpleCompiler compiler = new SimpleCompiler();
+        compiler.setParentClassLoader(cl);
+        try {
+            compiler.cook(code);
+        } catch (Throwable t) {
+            System.out.println(addLineNumber(code));
+            throw new InvalidProgramException(
+                    "Table program cannot be compiled. This is a bug. Please 
file an issue.", t);
+        }
+        try {
+            //noinspection unchecked
+            return (Class<T>) compiler.getClassLoader().loadClass(name);
+        } catch (ClassNotFoundException e) {
+            throw new RuntimeException("Can not load class " + name, e);
+        }
+    }
+
+    /**
+     * To output more information when an error occurs. Generally, when cook 
fails, it shows which
+     * line is wrong. This line number starts at 1.
+     */
+    private static String addLineNumber(String code) {
+        String[] lines = code.split("\n");
+        StringBuilder builder = new StringBuilder();
+        for (int i = 0; i < lines.length; i++) {
+            builder.append("/* ").append(i + 1).append(" 
*/").append(lines[i]).append("\n");
+        }
+        return builder.toString();
+    }
+
+    /** Class to use as key for the {@link #COMPILED_CLASS_CACHE}. */
+    private static class ClassKey {
+        private final int classLoaderId;
+        private final String code;
+
+        private ClassKey(int classLoaderId, String code) {
+            this.classLoaderId = classLoaderId;
+            this.code = code;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+            ClassKey classKey = (ClassKey) o;
+            return classLoaderId == classKey.classLoaderId && 
code.equals(classKey.code);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(classLoaderId, code);
+        }
+    }
+}
diff --git 
a/flink-table-store-common/src/main/java/org/apache/flink/table/store/codegen/GeneratedClass.java
 
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/codegen/GeneratedClass.java
new file mode 100644
index 00000000..83517f06
--- /dev/null
+++ 
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/codegen/GeneratedClass.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.codegen;
+
+import org.apache.flink.table.api.config.TableConfigOptions;
+import org.apache.flink.table.codesplit.JavaCodeSplitter;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A wrapper for generated class, defines a {@link #newInstance(ClassLoader)} 
method to get an
+ * instance by reference objects easily. Copied from Flink.
+ */
+public final class GeneratedClass<T> implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(GeneratedClass.class);
+
+    private final String className;
+    private final String code;
+    private final String splitCode;
+    private final Object[] references;
+
+    private transient Class<T> compiledClass;
+
+    public GeneratedClass(String className, String code) {
+        this(className, code, new Object[0]);
+    }
+
+    public GeneratedClass(String className, String code, Object[] references) {
+        checkNotNull(className, "name must not be null");
+        checkNotNull(code, "code must not be null");
+        checkNotNull(references, "references must not be null");
+        this.className = className;
+        this.code = code;
+        this.splitCode =
+                code.isEmpty()
+                        ? code
+                        : JavaCodeSplitter.split(
+                                code,
+                                
TableConfigOptions.MAX_LENGTH_GENERATED_CODE.defaultValue(),
+                                
TableConfigOptions.MAX_MEMBERS_GENERATED_CODE.defaultValue());
+        this.references = references;
+    }
+
+    /** Create a new instance of this generated class. */
+    public T newInstance(ClassLoader classLoader) {
+        try {
+            return compile(classLoader)
+                    .getConstructor(Object[].class)
+                    // Because Constructor.newInstance(Object... initargs), we 
need to load
+                    // references into a new Object[], otherwise it cannot be 
compiled.
+                    .newInstance(new Object[] {references});
+        } catch (Throwable e) {
+            throw new RuntimeException(
+                    "Could not instantiate generated class '" + className + 
"'", e);
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    public T newInstance(ClassLoader classLoader, Object... args) {
+        try {
+            return (T) 
compile(classLoader).getConstructors()[0].newInstance(args);
+        } catch (Exception e) {
+            throw new RuntimeException(
+                    "Could not instantiate generated class '" + className + 
"'", e);
+        }
+    }
+
+    /**
+     * Compiles the generated code, the compiled class will be cached in the 
{@link GeneratedClass}.
+     */
+    public Class<T> compile(ClassLoader classLoader) {
+        if (compiledClass == null) {
+            // cache the compiled class
+            try {
+                // first try to compile the split code
+                compiledClass = CompileUtils.compile(classLoader, className, 
splitCode);
+            } catch (Throwable t) {
+                // compile the original code as fallback
+                LOG.warn("Failed to compile split code, falling back to 
original code", t);
+                compiledClass = CompileUtils.compile(classLoader, className, 
code);
+            }
+        }
+        return compiledClass;
+    }
+
+    public String getClassName() {
+        return className;
+    }
+
+    public String getCode() {
+        return code;
+    }
+
+    public Object[] getReferences() {
+        return references;
+    }
+
+    public Class<T> getClass(ClassLoader classLoader) {
+        return compile(classLoader);
+    }
+}
diff --git 
a/flink-table-store-common/src/main/java/org/apache/flink/table/store/codegen/NormalizedKeyComputer.java
 
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/codegen/NormalizedKeyComputer.java
new file mode 100644
index 00000000..80bbc64c
--- /dev/null
+++ 
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/codegen/NormalizedKeyComputer.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.codegen;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.table.data.RowData;
+
+/**
+ * Normalized key computer for {@code SortBuffer}. For performance, subclasses 
are usually
+ * implemented through CodeGenerator. Copied from Flink.
+ */
+public interface NormalizedKeyComputer {
+
+    /** Writes a normalized key for the given record into the target {@link 
MemorySegment}. */
+    void putKey(RowData record, MemorySegment target, int offset);
+
+    /** Compares two normalized keys in respective {@link MemorySegment}. */
+    int compareKey(MemorySegment segI, int offsetI, MemorySegment segJ, int 
offsetJ);
+
+    /** Swaps two normalized keys in respective {@link MemorySegment}. */
+    void swapKey(MemorySegment segI, int offsetI, MemorySegment segJ, int 
offsetJ);
+
+    /** Get normalized keys bytes length. */
+    int getNumKeyBytes();
+
+    /** whether the normalized key can fully determines the comparison. */
+    boolean isKeyFullyDetermines();
+
+    /** Flag whether normalized key comparisons should be inverted key. */
+    boolean invertKey();
+}
diff --git 
a/flink-table-store-common/src/main/java/org/apache/flink/table/store/codegen/Projection.java
 
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/codegen/Projection.java
new file mode 100644
index 00000000..5b3abd7c
--- /dev/null
+++ 
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/codegen/Projection.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.codegen;
+
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.binary.BinaryRowData;
+
+/**
+ * Interface for code generated projection, which will map a RowData to 
another BinaryRowData.
+ * Copied from Flink.
+ */
+public interface Projection {
+
+    BinaryRowData apply(RowData row);
+}
diff --git 
a/flink-table-store-common/src/main/java/org/apache/flink/table/store/codegen/RecordComparator.java
 
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/codegen/RecordComparator.java
new file mode 100644
index 00000000..eba2a3c7
--- /dev/null
+++ 
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/codegen/RecordComparator.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.codegen;
+
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.operators.sort.BinaryInMemorySortBuffer;
+
+import java.io.Serializable;
+import java.util.Comparator;
+
+/**
+ * Record comparator for {@link BinaryInMemorySortBuffer}. For performance, 
subclasses are usually
+ * implemented through CodeGenerator. A new interface for helping JVM inline. 
Copied from Flink.
+ */
+public interface RecordComparator extends Comparator<RowData>, Serializable {
+
+    @Override
+    int compare(RowData o1, RowData o2);
+}
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/codegen/CodeGenUtils.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/codegen/CodeGenUtils.java
index da5e2e4e..c8775012 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/codegen/CodeGenUtils.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/codegen/CodeGenUtils.java
@@ -18,12 +18,6 @@
 
 package org.apache.flink.table.store.codegen;
 
-import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.data.binary.BinaryRowData;
-import org.apache.flink.table.runtime.generated.GeneratedRecordComparator;
-import org.apache.flink.table.runtime.generated.NormalizedKeyComputer;
-import org.apache.flink.table.runtime.generated.Projection;
-import org.apache.flink.table.runtime.generated.RecordComparator;
 import org.apache.flink.table.store.utils.BinaryRowDataUtil;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.RowType;
@@ -33,22 +27,17 @@ import java.util.List;
 /** Utils for code generations. */
 public class CodeGenUtils {
 
-    public static final Projection<RowData, BinaryRowData> EMPTY_PROJECTION =
-            input -> BinaryRowDataUtil.EMPTY_ROW;
+    public static final Projection EMPTY_PROJECTION = input -> 
BinaryRowDataUtil.EMPTY_ROW;
 
-    public static Projection<RowData, BinaryRowData> newProjection(
-            RowType inputType, int[] mapping) {
+    public static Projection newProjection(RowType inputType, int[] mapping) {
         if (mapping.length == 0) {
             return EMPTY_PROJECTION;
         }
 
-        @SuppressWarnings("unchecked")
-        Projection<RowData, BinaryRowData> projection =
-                CodeGenLoader.getInstance()
-                        .discover(CodeGenerator.class)
-                        .generateProjection("Projection", inputType, mapping)
-                        .newInstance(CodeGenUtils.class.getClassLoader());
-        return projection;
+        return CodeGenLoader.getInstance()
+                .discover(CodeGenerator.class)
+                .generateProjection("Projection", inputType, mapping)
+                .newInstance(CodeGenUtils.class.getClassLoader());
     }
 
     public static NormalizedKeyComputer newNormalizedKeyComputer(
@@ -59,7 +48,7 @@ public class CodeGenUtils {
                 .newInstance(CodeGenUtils.class.getClassLoader());
     }
 
-    public static GeneratedRecordComparator generateRecordComparator(
+    public static GeneratedClass<RecordComparator> generateRecordComparator(
             List<LogicalType> fieldTypes, String name) {
         return CodeGenLoader.getInstance()
                 .discover(CodeGenerator.class)
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/SortBufferWriteBuffer.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/SortBufferWriteBuffer.java
index 45ac26f4..8156f06d 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/SortBufferWriteBuffer.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/SortBufferWriteBuffer.java
@@ -22,13 +22,13 @@ import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.binary.BinaryRowData;
-import org.apache.flink.table.runtime.generated.NormalizedKeyComputer;
-import org.apache.flink.table.runtime.generated.RecordComparator;
 import org.apache.flink.table.runtime.typeutils.BinaryRowDataSerializer;
 import org.apache.flink.table.runtime.typeutils.InternalSerializers;
 import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
 import org.apache.flink.table.runtime.util.MemorySegmentPool;
 import org.apache.flink.table.store.codegen.CodeGenUtils;
+import org.apache.flink.table.store.codegen.NormalizedKeyComputer;
+import org.apache.flink.table.store.codegen.RecordComparator;
 import org.apache.flink.table.store.file.KeyValue;
 import org.apache.flink.table.store.file.KeyValueSerializer;
 import org.apache.flink.table.store.file.mergetree.compact.MergeFunction;
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/sort/BinaryExternalMerger.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/sort/BinaryExternalMerger.java
new file mode 100644
index 00000000..489f909d
--- /dev/null
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/sort/BinaryExternalMerger.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.sort;
+
+import org.apache.flink.runtime.io.compression.BlockCompressionFactory;
+import org.apache.flink.runtime.io.disk.ChannelReaderInputViewIterator;
+import 
org.apache.flink.runtime.io.disk.iomanager.AbstractChannelReaderInputView;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.memory.AbstractPagedOutputView;
+import org.apache.flink.table.data.binary.BinaryRowData;
+import 
org.apache.flink.table.runtime.operators.sort.AbstractBinaryExternalMerger;
+import org.apache.flink.table.runtime.operators.sort.SpillChannelManager;
+import org.apache.flink.table.runtime.typeutils.BinaryRowDataSerializer;
+import org.apache.flink.table.store.codegen.RecordComparator;
+import org.apache.flink.util.MutableObjectIterator;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+
+/** Record merger for sort of BinaryRowData. Copied from Flink. */
+public class BinaryExternalMerger extends 
AbstractBinaryExternalMerger<BinaryRowData> {
+
+    private final BinaryRowDataSerializer serializer;
+    private final RecordComparator comparator;
+
+    public BinaryExternalMerger(
+            IOManager ioManager,
+            int pageSize,
+            int maxFanIn,
+            SpillChannelManager channelManager,
+            BinaryRowDataSerializer serializer,
+            RecordComparator comparator,
+            boolean compressionEnable,
+            BlockCompressionFactory compressionCodecFactory,
+            int compressionBlockSize) {
+        super(
+                ioManager,
+                pageSize,
+                maxFanIn,
+                channelManager,
+                compressionEnable,
+                compressionCodecFactory,
+                compressionBlockSize);
+        this.serializer = serializer;
+        this.comparator = comparator;
+    }
+
+    @Override
+    protected MutableObjectIterator<BinaryRowData> 
channelReaderInputViewIterator(
+            AbstractChannelReaderInputView inView) {
+        return new ChannelReaderInputViewIterator<>(inView, null, 
serializer.duplicate());
+    }
+
+    @Override
+    protected Comparator<BinaryRowData> mergeComparator() {
+        return comparator::compare;
+    }
+
+    @Override
+    protected List<BinaryRowData> mergeReusedEntries(int size) {
+        ArrayList<BinaryRowData> reused = new ArrayList<>(size);
+        for (int i = 0; i < size; i++) {
+            reused.add(serializer.createInstance());
+        }
+        return reused;
+    }
+
+    @Override
+    protected void writeMergingOutput(
+            MutableObjectIterator<BinaryRowData> mergeIterator, 
AbstractPagedOutputView output)
+            throws IOException {
+        // read the merged stream and write the data back
+        BinaryRowData rec = serializer.createInstance();
+        while ((rec = mergeIterator.next(rec)) != null) {
+            serializer.serialize(rec, output);
+        }
+    }
+}
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/sort/BinaryExternalSortBuffer.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/sort/BinaryExternalSortBuffer.java
index cd46d54b..9e49c447 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/sort/BinaryExternalSortBuffer.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/sort/BinaryExternalSortBuffer.java
@@ -28,13 +28,12 @@ import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.operators.sort.QuickSort;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.binary.BinaryRowData;
-import org.apache.flink.table.runtime.generated.RecordComparator;
 import org.apache.flink.table.runtime.io.ChannelWithMeta;
-import org.apache.flink.table.runtime.operators.sort.BinaryExternalMerger;
 import org.apache.flink.table.runtime.operators.sort.BinaryMergeIterator;
 import org.apache.flink.table.runtime.operators.sort.SpillChannelManager;
 import org.apache.flink.table.runtime.typeutils.BinaryRowDataSerializer;
 import org.apache.flink.table.runtime.util.FileChannelUtil;
+import org.apache.flink.table.store.codegen.RecordComparator;
 import org.apache.flink.util.MutableObjectIterator;
 
 import java.io.IOException;
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/sort/BinaryInMemorySortBuffer.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/sort/BinaryInMemorySortBuffer.java
index fe88e472..b1bb1aff 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/sort/BinaryInMemorySortBuffer.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/sort/BinaryInMemorySortBuffer.java
@@ -23,12 +23,11 @@ import 
org.apache.flink.runtime.io.disk.SimpleCollectingOutputView;
 import org.apache.flink.runtime.operators.sort.QuickSort;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.binary.BinaryRowData;
-import org.apache.flink.table.runtime.generated.NormalizedKeyComputer;
-import org.apache.flink.table.runtime.generated.RecordComparator;
-import org.apache.flink.table.runtime.operators.sort.BinaryIndexedSortable;
 import org.apache.flink.table.runtime.typeutils.AbstractRowDataSerializer;
 import org.apache.flink.table.runtime.typeutils.BinaryRowDataSerializer;
 import org.apache.flink.table.runtime.util.MemorySegmentPool;
+import org.apache.flink.table.store.codegen.NormalizedKeyComputer;
+import org.apache.flink.table.store.codegen.RecordComparator;
 import org.apache.flink.util.MutableObjectIterator;
 
 import java.io.EOFException;
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/sort/BinaryIndexedSortable.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/sort/BinaryIndexedSortable.java
new file mode 100644
index 00000000..32ecf5b7
--- /dev/null
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/sort/BinaryIndexedSortable.java
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.sort;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.disk.RandomAccessInputView;
+import org.apache.flink.runtime.memory.AbstractPagedOutputView;
+import org.apache.flink.runtime.operators.sort.IndexedSortable;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.runtime.typeutils.BinaryRowDataSerializer;
+import org.apache.flink.table.runtime.util.MemorySegmentPool;
+import org.apache.flink.table.store.codegen.NormalizedKeyComputer;
+import org.apache.flink.table.store.codegen.RecordComparator;
+
+import java.io.IOException;
+import java.util.ArrayList;
+
+/**
+ * An abstract sortable, provide basic compare and swap. Support writing of 
index and normalizedKey.
+ * Copied from Flink.
+ */
+public abstract class BinaryIndexedSortable implements IndexedSortable {
+
+    public static final int OFFSET_LEN = 8;
+
+    // put/compare/swap normalized key
+    private final NormalizedKeyComputer normalizedKeyComputer;
+    protected final BinaryRowDataSerializer serializer;
+
+    // if normalized key not fully determines, need compare record.
+    private final RecordComparator comparator;
+
+    protected final RandomAccessInputView recordBuffer;
+    private final RandomAccessInputView recordBufferForComparison;
+
+    // segments
+    protected MemorySegment currentSortIndexSegment;
+    protected final MemorySegmentPool memorySegmentPool;
+    protected final ArrayList<MemorySegment> sortIndex;
+
+    // normalized key attributes
+    private final int numKeyBytes;
+    protected final int indexEntrySize;
+    private final int indexEntriesPerSegment;
+    protected final int lastIndexEntryOffset;
+    private final boolean normalizedKeyFullyDetermines;
+    private final boolean useNormKeyUninverted;
+
+    // for serialized comparison
+    protected final BinaryRowDataSerializer serializer1;
+    private final BinaryRowDataSerializer serializer2;
+    protected final BinaryRowData row1;
+    private final BinaryRowData row2;
+
+    // runtime variables
+    protected int currentSortIndexOffset;
+    protected int numRecords;
+
+    public BinaryIndexedSortable(
+            NormalizedKeyComputer normalizedKeyComputer,
+            BinaryRowDataSerializer serializer,
+            RecordComparator comparator,
+            ArrayList<MemorySegment> recordBufferSegments,
+            MemorySegmentPool memorySegmentPool) {
+        if (normalizedKeyComputer == null || serializer == null) {
+            throw new NullPointerException();
+        }
+        this.normalizedKeyComputer = normalizedKeyComputer;
+        this.serializer = serializer;
+        this.comparator = comparator;
+        this.memorySegmentPool = memorySegmentPool;
+        this.useNormKeyUninverted = !normalizedKeyComputer.invertKey();
+
+        this.numKeyBytes = normalizedKeyComputer.getNumKeyBytes();
+
+        int segmentSize = memorySegmentPool.pageSize();
+        this.recordBuffer = new RandomAccessInputView(recordBufferSegments, 
segmentSize);
+        this.recordBufferForComparison =
+                new RandomAccessInputView(recordBufferSegments, segmentSize);
+
+        this.normalizedKeyFullyDetermines = 
normalizedKeyComputer.isKeyFullyDetermines();
+
+        // compute the index entry size and limits
+        this.indexEntrySize = numKeyBytes + OFFSET_LEN;
+        this.indexEntriesPerSegment = segmentSize / this.indexEntrySize;
+        this.lastIndexEntryOffset = (this.indexEntriesPerSegment - 1) * 
this.indexEntrySize;
+
+        this.serializer1 = (BinaryRowDataSerializer) serializer.duplicate();
+        this.serializer2 = (BinaryRowDataSerializer) serializer.duplicate();
+        this.row1 = this.serializer1.createInstance();
+        this.row2 = this.serializer2.createInstance();
+
+        // set to initial state
+        this.sortIndex = new ArrayList<>(16);
+        this.currentSortIndexSegment = nextMemorySegment();
+        sortIndex.add(currentSortIndexSegment);
+    }
+
+    protected MemorySegment nextMemorySegment() {
+        return this.memorySegmentPool.nextSegment();
+    }
+
+    /** check if we need request next index memory. */
+    protected boolean checkNextIndexOffset() {
+        if (this.currentSortIndexOffset > this.lastIndexEntryOffset) {
+            MemorySegment returnSegment = nextMemorySegment();
+            if (returnSegment != null) {
+                this.currentSortIndexSegment = returnSegment;
+                this.sortIndex.add(this.currentSortIndexSegment);
+                this.currentSortIndexOffset = 0;
+            } else {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    /** Write of index and normalizedKey. */
+    protected void writeIndexAndNormalizedKey(RowData record, long currOffset) 
{
+        // add the pointer and the normalized key
+        this.currentSortIndexSegment.putLong(this.currentSortIndexOffset, 
currOffset);
+
+        if (this.numKeyBytes != 0) {
+            normalizedKeyComputer.putKey(
+                    record, this.currentSortIndexSegment, 
this.currentSortIndexOffset + OFFSET_LEN);
+        }
+
+        this.currentSortIndexOffset += this.indexEntrySize;
+        this.numRecords++;
+    }
+
+    @Override
+    public int compare(int i, int j) {
+        final int segmentNumberI = i / this.indexEntriesPerSegment;
+        final int segmentOffsetI = (i % this.indexEntriesPerSegment) * 
this.indexEntrySize;
+
+        final int segmentNumberJ = j / this.indexEntriesPerSegment;
+        final int segmentOffsetJ = (j % this.indexEntriesPerSegment) * 
this.indexEntrySize;
+
+        return compare(segmentNumberI, segmentOffsetI, segmentNumberJ, 
segmentOffsetJ);
+    }
+
+    @Override
+    public int compare(
+            int segmentNumberI, int segmentOffsetI, int segmentNumberJ, int 
segmentOffsetJ) {
+        final MemorySegment segI = this.sortIndex.get(segmentNumberI);
+        final MemorySegment segJ = this.sortIndex.get(segmentNumberJ);
+
+        int val =
+                normalizedKeyComputer.compareKey(
+                        segI, segmentOffsetI + OFFSET_LEN, segJ, 
segmentOffsetJ + OFFSET_LEN);
+
+        if (val != 0 || this.normalizedKeyFullyDetermines) {
+            return this.useNormKeyUninverted ? val : -val;
+        }
+
+        final long pointerI = segI.getLong(segmentOffsetI);
+        final long pointerJ = segJ.getLong(segmentOffsetJ);
+
+        return compareRecords(pointerI, pointerJ);
+    }
+
+    private int compareRecords(long pointer1, long pointer2) {
+        this.recordBuffer.setReadPosition(pointer1);
+        this.recordBufferForComparison.setReadPosition(pointer2);
+
+        try {
+            return this.comparator.compare(
+                    serializer1.mapFromPages(row1, recordBuffer),
+                    serializer2.mapFromPages(row2, recordBufferForComparison));
+        } catch (IOException ioex) {
+            throw new RuntimeException("Error comparing two records.", ioex);
+        }
+    }
+
+    @Override
+    public void swap(int i, int j) {
+        final int segmentNumberI = i / this.indexEntriesPerSegment;
+        final int segmentOffsetI = (i % this.indexEntriesPerSegment) * 
this.indexEntrySize;
+
+        final int segmentNumberJ = j / this.indexEntriesPerSegment;
+        final int segmentOffsetJ = (j % this.indexEntriesPerSegment) * 
this.indexEntrySize;
+
+        swap(segmentNumberI, segmentOffsetI, segmentNumberJ, segmentOffsetJ);
+    }
+
+    @Override
+    public void swap(
+            int segmentNumberI, int segmentOffsetI, int segmentNumberJ, int 
segmentOffsetJ) {
+        final MemorySegment segI = this.sortIndex.get(segmentNumberI);
+        final MemorySegment segJ = this.sortIndex.get(segmentNumberJ);
+
+        // swap offset
+        long index = segI.getLong(segmentOffsetI);
+        segI.putLong(segmentOffsetI, segJ.getLong(segmentOffsetJ));
+        segJ.putLong(segmentOffsetJ, index);
+
+        // swap key
+        normalizedKeyComputer.swapKey(
+                segI, segmentOffsetI + OFFSET_LEN, segJ, segmentOffsetJ + 
OFFSET_LEN);
+    }
+
+    @Override
+    public int size() {
+        return this.numRecords;
+    }
+
+    @Override
+    public int recordSize() {
+        return indexEntrySize;
+    }
+
+    @Override
+    public int recordsPerSegment() {
+        return indexEntriesPerSegment;
+    }
+
+    /** Spill: Write all records to a {@link AbstractPagedOutputView}. */
+    public void writeToOutput(AbstractPagedOutputView output) throws 
IOException {
+        final int numRecords = this.numRecords;
+        int currentMemSeg = 0;
+        int currentRecord = 0;
+
+        while (currentRecord < numRecords) {
+            final MemorySegment currentIndexSegment = 
this.sortIndex.get(currentMemSeg++);
+
+            // go through all records in the memory segment
+            for (int offset = 0;
+                    currentRecord < numRecords && offset <= 
this.lastIndexEntryOffset;
+                    currentRecord++, offset += this.indexEntrySize) {
+                final long pointer = currentIndexSegment.getLong(offset);
+                this.recordBuffer.setReadPosition(pointer);
+                this.serializer.copyFromPagesToView(this.recordBuffer, output);
+            }
+        }
+    }
+}
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/KeyComparatorSupplier.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/KeyComparatorSupplier.java
index a9910f6b..11b3631d 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/KeyComparatorSupplier.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/KeyComparatorSupplier.java
@@ -19,9 +19,9 @@
 package org.apache.flink.table.store.file.utils;
 
 import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.runtime.generated.GeneratedRecordComparator;
-import org.apache.flink.table.runtime.generated.RecordComparator;
 import org.apache.flink.table.store.codegen.CodeGenUtils;
+import org.apache.flink.table.store.codegen.GeneratedClass;
+import org.apache.flink.table.store.codegen.RecordComparator;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.util.function.SerializableSupplier;
 
@@ -33,7 +33,7 @@ public class KeyComparatorSupplier implements 
SerializableSupplier<Comparator<Ro
 
     private static final long serialVersionUID = 1L;
 
-    private final GeneratedRecordComparator genRecordComparator;
+    private final GeneratedClass<RecordComparator> genRecordComparator;
 
     public KeyComparatorSupplier(RowType keyType) {
         genRecordComparator =
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/BucketComputer.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/BucketComputer.java
index 533e9e3a..f2261a7e 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/BucketComputer.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/BucketComputer.java
@@ -20,9 +20,9 @@ package org.apache.flink.table.store.table.sink;
 
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.binary.BinaryRowData;
-import org.apache.flink.table.runtime.generated.Projection;
 import org.apache.flink.table.store.CoreOptions;
 import org.apache.flink.table.store.codegen.CodeGenUtils;
+import org.apache.flink.table.store.codegen.Projection;
 import org.apache.flink.table.store.file.schema.TableSchema;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.types.RowKind;
@@ -34,9 +34,9 @@ public class BucketComputer {
 
     private final int numBucket;
 
-    private final Projection<RowData, BinaryRowData> rowProjection;
-    private final Projection<RowData, BinaryRowData> bucketProjection;
-    private final Projection<RowData, BinaryRowData> pkProjection;
+    private final Projection rowProjection;
+    private final Projection bucketProjection;
+    private final Projection pkProjection;
 
     public BucketComputer(TableSchema tableSchema) {
         this(
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/SinkRecordConverter.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/SinkRecordConverter.java
index 3b68d77f..f6131021 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/SinkRecordConverter.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/SinkRecordConverter.java
@@ -20,8 +20,8 @@ package org.apache.flink.table.store.table.sink;
 
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.binary.BinaryRowData;
-import org.apache.flink.table.runtime.generated.Projection;
 import org.apache.flink.table.store.codegen.CodeGenUtils;
+import org.apache.flink.table.store.codegen.Projection;
 import org.apache.flink.table.store.file.schema.TableSchema;
 import org.apache.flink.table.types.logical.RowType;
 
@@ -34,11 +34,11 @@ public class SinkRecordConverter {
 
     private final BucketComputer bucketComputer;
 
-    private final Projection<RowData, BinaryRowData> partProjection;
+    private final Projection partProjection;
 
-    private final Projection<RowData, BinaryRowData> pkProjection;
+    private final Projection pkProjection;
 
-    @Nullable private final Projection<RowData, BinaryRowData> logPkProjection;
+    @Nullable private final Projection logPkProjection;
 
     public SinkRecordConverter(TableSchema tableSchema) {
         this(
diff --git 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/sort/IntNormalizedKeyComputer.java
 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/sort/IntNormalizedKeyComputer.java
index 80f410fd..af920330 100644
--- 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/sort/IntNormalizedKeyComputer.java
+++ 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/sort/IntNormalizedKeyComputer.java
@@ -20,8 +20,8 @@ package org.apache.flink.table.store.file.sort;
 
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.runtime.generated.NormalizedKeyComputer;
 import org.apache.flink.table.runtime.operators.sort.SortUtil;
+import org.apache.flink.table.store.codegen.NormalizedKeyComputer;
 
 /** Example for int {@link NormalizedKeyComputer}. */
 public class IntNormalizedKeyComputer implements NormalizedKeyComputer {
diff --git 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/sort/IntRecordComparator.java
 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/sort/IntRecordComparator.java
index 64b09c48..ba0511fc 100644
--- 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/sort/IntRecordComparator.java
+++ 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/sort/IntRecordComparator.java
@@ -19,7 +19,7 @@
 package org.apache.flink.table.store.file.sort;
 
 import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.runtime.generated.RecordComparator;
+import org.apache.flink.table.store.codegen.RecordComparator;
 
 /** Example Int {@link RecordComparator}. */
 public class IntRecordComparator implements RecordComparator {

Reply via email to