This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/master by this push: new 3e476570 [FLINK-30373] Flink-table-runtime free for flink-table-store-codegen 3e476570 is described below commit 3e47657054e75d714cd941f502a899e5059cffac Author: Jingsong Lee <jingsongl...@gmail.com> AuthorDate: Tue Dec 13 11:27:03 2022 +0800 [FLINK-30373] Flink-table-runtime free for flink-table-store-codegen This closes #432 --- .../table/store/codegen/CodeGeneratorImpl.java | 9 +- .../ComparatorCodeGenerator.scala | 8 +- .../ProjectionCodeGenerator.scala | 9 +- .../SortCodeGenerator.scala | 5 +- .../flink/table/store/codegen/CodeGenerator.java | 16 +- .../flink/table/store/codegen/CompileUtils.java | 140 ++++++++++++ .../flink/table/store/codegen/GeneratedClass.java | 125 ++++++++++ .../table/store/codegen/NormalizedKeyComputer.java | 47 ++++ .../flink/table/store/codegen/Projection.java | 31 +++ .../table/store/codegen/RecordComparator.java | 35 +++ .../flink/table/store/codegen/CodeGenUtils.java | 25 +- .../file/mergetree/SortBufferWriteBuffer.java | 4 +- .../store/file/sort/BinaryExternalMerger.java | 96 ++++++++ .../store/file/sort/BinaryExternalSortBuffer.java | 3 +- .../store/file/sort/BinaryInMemorySortBuffer.java | 5 +- .../store/file/sort/BinaryIndexedSortable.java | 254 +++++++++++++++++++++ .../store/file/utils/KeyComparatorSupplier.java | 6 +- .../table/store/table/sink/BucketComputer.java | 8 +- .../store/table/sink/SinkRecordConverter.java | 8 +- .../store/file/sort/IntNormalizedKeyComputer.java | 2 +- .../table/store/file/sort/IntRecordComparator.java | 2 +- 21 files changed, 774 insertions(+), 64 deletions(-) diff --git a/flink-table-store-codegen/src/main/java/org/apache/flink/table/store/codegen/CodeGeneratorImpl.java b/flink-table-store-codegen/src/main/java/org/apache/flink/table/store/codegen/CodeGeneratorImpl.java index 846ee308..6cacd170 100644 --- a/flink-table-store-codegen/src/main/java/org/apache/flink/table/store/codegen/CodeGeneratorImpl.java +++ b/flink-table-store-codegen/src/main/java/org/apache/flink/table/store/codegen/CodeGeneratorImpl.java @@ -18,9 +18,6 @@ package org.apache.flink.table.store.codegen; -import org.apache.flink.table.runtime.generated.GeneratedNormalizedKeyComputer; -import org.apache.flink.table.runtime.generated.GeneratedProjection; -import org.apache.flink.table.runtime.generated.GeneratedRecordComparator; import org.apache.flink.table.store.utils.TypeUtils; import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.table.types.logical.RowType; @@ -31,7 +28,7 @@ import java.util.List; public class CodeGeneratorImpl implements CodeGenerator { @Override - public GeneratedProjection generateProjection( + public GeneratedClass<Projection> generateProjection( String name, RowType inputType, int[] inputMapping) { RowType outputType = TypeUtils.project(inputType, inputMapping); return ProjectionCodeGenerator.generateProjection( @@ -39,7 +36,7 @@ public class CodeGeneratorImpl implements CodeGenerator { } @Override - public GeneratedNormalizedKeyComputer generateNormalizedKeyComputer( + public GeneratedClass<NormalizedKeyComputer> generateNormalizedKeyComputer( List<LogicalType> fieldTypes, String name) { return new SortCodeGenerator( RowType.of(fieldTypes.toArray(new LogicalType[0])), @@ -48,7 +45,7 @@ public class CodeGeneratorImpl implements CodeGenerator { } @Override - public GeneratedRecordComparator generateRecordComparator( + public GeneratedClass<RecordComparator> generateRecordComparator( List<LogicalType> fieldTypes, String name) { return ComparatorCodeGenerator.gen( name, diff --git a/flink-table-store-codegen/src/main/scala/org.apache.flink.table.store.codegen/ComparatorCodeGenerator.scala b/flink-table-store-codegen/src/main/scala/org.apache.flink.table.store.codegen/ComparatorCodeGenerator.scala index ee327b34..093e5e17 100644 --- a/flink-table-store-codegen/src/main/scala/org.apache.flink.table.store.codegen/ComparatorCodeGenerator.scala +++ b/flink-table-store-codegen/src/main/scala/org.apache.flink.table.store.codegen/ComparatorCodeGenerator.scala @@ -17,7 +17,6 @@ */ package org.apache.flink.table.store.codegen -import org.apache.flink.table.runtime.generated.{GeneratedRecordComparator, RecordComparator} import org.apache.flink.table.store.codegen.GenerateUtils.{newName, ROW_DATA} import org.apache.flink.table.types.logical.RowType @@ -37,7 +36,10 @@ object ComparatorCodeGenerator { * @return * A GeneratedRecordComparator */ - def gen(name: String, inputType: RowType, sortSpec: SortSpec): GeneratedRecordComparator = { + def gen( + name: String, + inputType: RowType, + sortSpec: SortSpec): GeneratedClass[RecordComparator] = { val className = newName(name) val baseClass = classOf[RecordComparator] @@ -65,7 +67,7 @@ object ComparatorCodeGenerator { } """.stripMargin - new GeneratedRecordComparator(className, code, ctx.references.toArray) + new GeneratedClass(className, code, ctx.references.toArray) } } diff --git a/flink-table-store-codegen/src/main/scala/org.apache.flink.table.store.codegen/ProjectionCodeGenerator.scala b/flink-table-store-codegen/src/main/scala/org.apache.flink.table.store.codegen/ProjectionCodeGenerator.scala index f536b082..8a8b6548 100644 --- a/flink-table-store-codegen/src/main/scala/org.apache.flink.table.store.codegen/ProjectionCodeGenerator.scala +++ b/flink-table-store-codegen/src/main/scala/org.apache.flink.table.store.codegen/ProjectionCodeGenerator.scala @@ -19,7 +19,6 @@ package org.apache.flink.table.store.codegen import org.apache.flink.table.data.RowData import org.apache.flink.table.data.binary.BinaryRowData -import org.apache.flink.table.runtime.generated.{GeneratedProjection, Projection} import org.apache.flink.table.store.codegen.GeneratedExpression.{NEVER_NULL, NO_CODE} import org.apache.flink.table.store.codegen.GenerateUtils.{generateRecordStatement, newName, DEFAULT_INPUT1_TERM, DEFAULT_OUT_RECORD_TERM, DEFAULT_OUT_RECORD_WRITER_TERM, ROW_DATA} import org.apache.flink.table.types.logical.RowType @@ -85,9 +84,9 @@ object ProjectionCodeGenerator { inputTerm: String = DEFAULT_INPUT1_TERM, outRecordTerm: String = DEFAULT_OUT_RECORD_TERM, outRecordWriterTerm: String = DEFAULT_OUT_RECORD_WRITER_TERM, - reusedOutRecord: Boolean = true): GeneratedProjection = { + reusedOutRecord: Boolean = true): GeneratedClass[Projection] = { val className = newName(name) - val baseClass = classOf[Projection[_, _]] + val baseClass = classOf[Projection] val expression = generateProjectionExpression( ctx, @@ -119,7 +118,7 @@ object ProjectionCodeGenerator { |} """.stripMargin - new GeneratedProjection(className, code, ctx.references.toArray) + new GeneratedClass(className, code, ctx.references.toArray) } /** For java invoke. */ @@ -128,7 +127,7 @@ object ProjectionCodeGenerator { name: String, inputType: RowType, outputType: RowType, - inputMapping: Array[Int]): GeneratedProjection = + inputMapping: Array[Int]): GeneratedClass[Projection] = generateProjection( ctx, name, diff --git a/flink-table-store-codegen/src/main/scala/org.apache.flink.table.store.codegen/SortCodeGenerator.scala b/flink-table-store-codegen/src/main/scala/org.apache.flink.table.store.codegen/SortCodeGenerator.scala index b8b43d9c..70a12c72 100644 --- a/flink-table-store-codegen/src/main/scala/org.apache.flink.table.store.codegen/SortCodeGenerator.scala +++ b/flink-table-store-codegen/src/main/scala/org.apache.flink.table.store.codegen/SortCodeGenerator.scala @@ -19,7 +19,6 @@ package org.apache.flink.table.store.codegen import org.apache.flink.table.data.{DecimalData, TimestampData} import org.apache.flink.table.data.binary.BinaryRowData -import org.apache.flink.table.runtime.generated.{GeneratedNormalizedKeyComputer, NormalizedKeyComputer, RecordComparator} import org.apache.flink.table.runtime.operators.sort.SortUtil import org.apache.flink.table.runtime.types.PlannerTypeUtils import org.apache.flink.table.store.codegen.GenerateUtils.{newName, ROW_DATA, SEGMENT} @@ -119,7 +118,7 @@ class SortCodeGenerator(val input: RowType, val sortSpec: SortSpec) { * @return * A GeneratedNormalizedKeyComputer */ - def generateNormalizedKeyComputer(name: String): GeneratedNormalizedKeyComputer = { + def generateNormalizedKeyComputer(name: String): GeneratedClass[NormalizedKeyComputer] = { val className = newName(name) @@ -179,7 +178,7 @@ class SortCodeGenerator(val input: RowType, val sortSpec: SortSpec) { } """.stripMargin - new GeneratedNormalizedKeyComputer(className, code) + new GeneratedClass(className, code) } def generatePutNormalizedKeys(numKeyBytes: Int): mutable.ArrayBuffer[String] = { diff --git a/flink-table-store-common/src/main/java/org/apache/flink/table/store/codegen/CodeGenerator.java b/flink-table-store-common/src/main/java/org/apache/flink/table/store/codegen/CodeGenerator.java index 83a2a3ea..977419f0 100644 --- a/flink-table-store-common/src/main/java/org/apache/flink/table/store/codegen/CodeGenerator.java +++ b/flink-table-store-common/src/main/java/org/apache/flink/table/store/codegen/CodeGenerator.java @@ -18,10 +18,6 @@ package org.apache.flink.table.store.codegen; -import org.apache.flink.table.runtime.generated.GeneratedClass; -import org.apache.flink.table.runtime.generated.GeneratedNormalizedKeyComputer; -import org.apache.flink.table.runtime.generated.GeneratedProjection; -import org.apache.flink.table.runtime.generated.GeneratedRecordComparator; import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.table.types.logical.RowType; @@ -30,24 +26,26 @@ import java.util.List; /** {@link GeneratedClass} generator. */ public interface CodeGenerator { - GeneratedProjection generateProjection(String name, RowType inputType, int[] inputMapping); + GeneratedClass<Projection> generateProjection( + String name, RowType inputType, int[] inputMapping); /** - * Generate a {@link GeneratedNormalizedKeyComputer}. + * Generate a {@link NormalizedKeyComputer}. * * @param fieldTypes Both the input row field types and the sort key field types. Records are * compared by the first field, then the second field, then the third field and so on. All * fields are compared in ascending order. */ - GeneratedNormalizedKeyComputer generateNormalizedKeyComputer( + GeneratedClass<NormalizedKeyComputer> generateNormalizedKeyComputer( List<LogicalType> fieldTypes, String name); /** - * Generate a {@link GeneratedRecordComparator}. + * Generate a {@link RecordComparator}. * * @param fieldTypes Both the input row field types and the sort key field types. Records are * * compared by the first field, then the second field, then the third field and so on. All * * fields are compared in ascending order. */ - GeneratedRecordComparator generateRecordComparator(List<LogicalType> fieldTypes, String name); + GeneratedClass<RecordComparator> generateRecordComparator( + List<LogicalType> fieldTypes, String name); } diff --git a/flink-table-store-common/src/main/java/org/apache/flink/table/store/codegen/CompileUtils.java b/flink-table-store-common/src/main/java/org/apache/flink/table/store/codegen/CompileUtils.java new file mode 100644 index 00000000..8e63eab9 --- /dev/null +++ b/flink-table-store-common/src/main/java/org/apache/flink/table/store/codegen/CompileUtils.java @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.store.codegen; + +import org.apache.flink.api.common.InvalidProgramException; +import org.apache.flink.util.FlinkRuntimeException; + +import org.apache.flink.shaded.guava30.com.google.common.cache.Cache; +import org.apache.flink.shaded.guava30.com.google.common.cache.CacheBuilder; + +import org.codehaus.janino.SimpleCompiler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Duration; +import java.util.Objects; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** Utilities to compile a generated code to a Class. Copied from Flink. */ +public final class CompileUtils { + + // used for logging the generated codes to a same place + private static final Logger CODE_LOG = LoggerFactory.getLogger(CompileUtils.class); + + /** + * Cache of compile, Janino generates a new Class Loader and a new Class file every compile + * (guaranteeing that the class name will not be repeated). This leads to multiple tasks of the + * same process that generate a large number of duplicate class, resulting in a large number of + * Meta zone GC (class unloading), resulting in performance bottlenecks. So we add a cache to + * avoid this problem. + */ + static final Cache<ClassKey, Class<?>> COMPILED_CLASS_CACHE = + CacheBuilder.newBuilder() + // estimated maximum planning/startup time + .expireAfterAccess(Duration.ofMinutes(5)) + // estimated cache size + .maximumSize(300) + .softValues() + .build(); + + /** + * Compiles a generated code to a Class. + * + * @param cl the ClassLoader used to load the class + * @param name the class name + * @param code the generated code + * @param <T> the class type + * @return the compiled class + */ + @SuppressWarnings("unchecked") + public static <T> Class<T> compile(ClassLoader cl, String name, String code) { + try { + // The class name is part of the "code" and makes the string unique, + // to prevent class leaks we don't cache the class loader directly + // but only its hash code + final ClassKey classKey = new ClassKey(cl.hashCode(), code); + return (Class<T>) COMPILED_CLASS_CACHE.get(classKey, () -> doCompile(cl, name, code)); + } catch (Exception e) { + throw new FlinkRuntimeException(e.getMessage(), e); + } + } + + private static <T> Class<T> doCompile(ClassLoader cl, String name, String code) { + checkNotNull(cl, "Classloader must not be null."); + CODE_LOG.debug("Compiling: {} \n\n Code:\n{}", name, code); + SimpleCompiler compiler = new SimpleCompiler(); + compiler.setParentClassLoader(cl); + try { + compiler.cook(code); + } catch (Throwable t) { + System.out.println(addLineNumber(code)); + throw new InvalidProgramException( + "Table program cannot be compiled. This is a bug. Please file an issue.", t); + } + try { + //noinspection unchecked + return (Class<T>) compiler.getClassLoader().loadClass(name); + } catch (ClassNotFoundException e) { + throw new RuntimeException("Can not load class " + name, e); + } + } + + /** + * To output more information when an error occurs. Generally, when cook fails, it shows which + * line is wrong. This line number starts at 1. + */ + private static String addLineNumber(String code) { + String[] lines = code.split("\n"); + StringBuilder builder = new StringBuilder(); + for (int i = 0; i < lines.length; i++) { + builder.append("/* ").append(i + 1).append(" */").append(lines[i]).append("\n"); + } + return builder.toString(); + } + + /** Class to use as key for the {@link #COMPILED_CLASS_CACHE}. */ + private static class ClassKey { + private final int classLoaderId; + private final String code; + + private ClassKey(int classLoaderId, String code) { + this.classLoaderId = classLoaderId; + this.code = code; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + ClassKey classKey = (ClassKey) o; + return classLoaderId == classKey.classLoaderId && code.equals(classKey.code); + } + + @Override + public int hashCode() { + return Objects.hash(classLoaderId, code); + } + } +} diff --git a/flink-table-store-common/src/main/java/org/apache/flink/table/store/codegen/GeneratedClass.java b/flink-table-store-common/src/main/java/org/apache/flink/table/store/codegen/GeneratedClass.java new file mode 100644 index 00000000..83517f06 --- /dev/null +++ b/flink-table-store-common/src/main/java/org/apache/flink/table/store/codegen/GeneratedClass.java @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.store.codegen; + +import org.apache.flink.table.api.config.TableConfigOptions; +import org.apache.flink.table.codesplit.JavaCodeSplitter; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Serializable; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * A wrapper for generated class, defines a {@link #newInstance(ClassLoader)} method to get an + * instance by reference objects easily. Copied from Flink. + */ +public final class GeneratedClass<T> implements Serializable { + + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(GeneratedClass.class); + + private final String className; + private final String code; + private final String splitCode; + private final Object[] references; + + private transient Class<T> compiledClass; + + public GeneratedClass(String className, String code) { + this(className, code, new Object[0]); + } + + public GeneratedClass(String className, String code, Object[] references) { + checkNotNull(className, "name must not be null"); + checkNotNull(code, "code must not be null"); + checkNotNull(references, "references must not be null"); + this.className = className; + this.code = code; + this.splitCode = + code.isEmpty() + ? code + : JavaCodeSplitter.split( + code, + TableConfigOptions.MAX_LENGTH_GENERATED_CODE.defaultValue(), + TableConfigOptions.MAX_MEMBERS_GENERATED_CODE.defaultValue()); + this.references = references; + } + + /** Create a new instance of this generated class. */ + public T newInstance(ClassLoader classLoader) { + try { + return compile(classLoader) + .getConstructor(Object[].class) + // Because Constructor.newInstance(Object... initargs), we need to load + // references into a new Object[], otherwise it cannot be compiled. + .newInstance(new Object[] {references}); + } catch (Throwable e) { + throw new RuntimeException( + "Could not instantiate generated class '" + className + "'", e); + } + } + + @SuppressWarnings("unchecked") + public T newInstance(ClassLoader classLoader, Object... args) { + try { + return (T) compile(classLoader).getConstructors()[0].newInstance(args); + } catch (Exception e) { + throw new RuntimeException( + "Could not instantiate generated class '" + className + "'", e); + } + } + + /** + * Compiles the generated code, the compiled class will be cached in the {@link GeneratedClass}. + */ + public Class<T> compile(ClassLoader classLoader) { + if (compiledClass == null) { + // cache the compiled class + try { + // first try to compile the split code + compiledClass = CompileUtils.compile(classLoader, className, splitCode); + } catch (Throwable t) { + // compile the original code as fallback + LOG.warn("Failed to compile split code, falling back to original code", t); + compiledClass = CompileUtils.compile(classLoader, className, code); + } + } + return compiledClass; + } + + public String getClassName() { + return className; + } + + public String getCode() { + return code; + } + + public Object[] getReferences() { + return references; + } + + public Class<T> getClass(ClassLoader classLoader) { + return compile(classLoader); + } +} diff --git a/flink-table-store-common/src/main/java/org/apache/flink/table/store/codegen/NormalizedKeyComputer.java b/flink-table-store-common/src/main/java/org/apache/flink/table/store/codegen/NormalizedKeyComputer.java new file mode 100644 index 00000000..80bbc64c --- /dev/null +++ b/flink-table-store-common/src/main/java/org/apache/flink/table/store/codegen/NormalizedKeyComputer.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.store.codegen; + +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.table.data.RowData; + +/** + * Normalized key computer for {@code SortBuffer}. For performance, subclasses are usually + * implemented through CodeGenerator. Copied from Flink. + */ +public interface NormalizedKeyComputer { + + /** Writes a normalized key for the given record into the target {@link MemorySegment}. */ + void putKey(RowData record, MemorySegment target, int offset); + + /** Compares two normalized keys in respective {@link MemorySegment}. */ + int compareKey(MemorySegment segI, int offsetI, MemorySegment segJ, int offsetJ); + + /** Swaps two normalized keys in respective {@link MemorySegment}. */ + void swapKey(MemorySegment segI, int offsetI, MemorySegment segJ, int offsetJ); + + /** Get normalized keys bytes length. */ + int getNumKeyBytes(); + + /** whether the normalized key can fully determines the comparison. */ + boolean isKeyFullyDetermines(); + + /** Flag whether normalized key comparisons should be inverted key. */ + boolean invertKey(); +} diff --git a/flink-table-store-common/src/main/java/org/apache/flink/table/store/codegen/Projection.java b/flink-table-store-common/src/main/java/org/apache/flink/table/store/codegen/Projection.java new file mode 100644 index 00000000..5b3abd7c --- /dev/null +++ b/flink-table-store-common/src/main/java/org/apache/flink/table/store/codegen/Projection.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.store.codegen; + +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.binary.BinaryRowData; + +/** + * Interface for code generated projection, which will map a RowData to another BinaryRowData. + * Copied from Flink. + */ +public interface Projection { + + BinaryRowData apply(RowData row); +} diff --git a/flink-table-store-common/src/main/java/org/apache/flink/table/store/codegen/RecordComparator.java b/flink-table-store-common/src/main/java/org/apache/flink/table/store/codegen/RecordComparator.java new file mode 100644 index 00000000..eba2a3c7 --- /dev/null +++ b/flink-table-store-common/src/main/java/org/apache/flink/table/store/codegen/RecordComparator.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.store.codegen; + +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.runtime.operators.sort.BinaryInMemorySortBuffer; + +import java.io.Serializable; +import java.util.Comparator; + +/** + * Record comparator for {@link BinaryInMemorySortBuffer}. For performance, subclasses are usually + * implemented through CodeGenerator. A new interface for helping JVM inline. Copied from Flink. + */ +public interface RecordComparator extends Comparator<RowData>, Serializable { + + @Override + int compare(RowData o1, RowData o2); +} diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/codegen/CodeGenUtils.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/codegen/CodeGenUtils.java index da5e2e4e..c8775012 100644 --- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/codegen/CodeGenUtils.java +++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/codegen/CodeGenUtils.java @@ -18,12 +18,6 @@ package org.apache.flink.table.store.codegen; -import org.apache.flink.table.data.RowData; -import org.apache.flink.table.data.binary.BinaryRowData; -import org.apache.flink.table.runtime.generated.GeneratedRecordComparator; -import org.apache.flink.table.runtime.generated.NormalizedKeyComputer; -import org.apache.flink.table.runtime.generated.Projection; -import org.apache.flink.table.runtime.generated.RecordComparator; import org.apache.flink.table.store.utils.BinaryRowDataUtil; import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.table.types.logical.RowType; @@ -33,22 +27,17 @@ import java.util.List; /** Utils for code generations. */ public class CodeGenUtils { - public static final Projection<RowData, BinaryRowData> EMPTY_PROJECTION = - input -> BinaryRowDataUtil.EMPTY_ROW; + public static final Projection EMPTY_PROJECTION = input -> BinaryRowDataUtil.EMPTY_ROW; - public static Projection<RowData, BinaryRowData> newProjection( - RowType inputType, int[] mapping) { + public static Projection newProjection(RowType inputType, int[] mapping) { if (mapping.length == 0) { return EMPTY_PROJECTION; } - @SuppressWarnings("unchecked") - Projection<RowData, BinaryRowData> projection = - CodeGenLoader.getInstance() - .discover(CodeGenerator.class) - .generateProjection("Projection", inputType, mapping) - .newInstance(CodeGenUtils.class.getClassLoader()); - return projection; + return CodeGenLoader.getInstance() + .discover(CodeGenerator.class) + .generateProjection("Projection", inputType, mapping) + .newInstance(CodeGenUtils.class.getClassLoader()); } public static NormalizedKeyComputer newNormalizedKeyComputer( @@ -59,7 +48,7 @@ public class CodeGenUtils { .newInstance(CodeGenUtils.class.getClassLoader()); } - public static GeneratedRecordComparator generateRecordComparator( + public static GeneratedClass<RecordComparator> generateRecordComparator( List<LogicalType> fieldTypes, String name) { return CodeGenLoader.getInstance() .discover(CodeGenerator.class) diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/SortBufferWriteBuffer.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/SortBufferWriteBuffer.java index 45ac26f4..8156f06d 100644 --- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/SortBufferWriteBuffer.java +++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/SortBufferWriteBuffer.java @@ -22,13 +22,13 @@ import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.runtime.io.disk.iomanager.IOManager; import org.apache.flink.table.data.RowData; import org.apache.flink.table.data.binary.BinaryRowData; -import org.apache.flink.table.runtime.generated.NormalizedKeyComputer; -import org.apache.flink.table.runtime.generated.RecordComparator; import org.apache.flink.table.runtime.typeutils.BinaryRowDataSerializer; import org.apache.flink.table.runtime.typeutils.InternalSerializers; import org.apache.flink.table.runtime.typeutils.RowDataSerializer; import org.apache.flink.table.runtime.util.MemorySegmentPool; import org.apache.flink.table.store.codegen.CodeGenUtils; +import org.apache.flink.table.store.codegen.NormalizedKeyComputer; +import org.apache.flink.table.store.codegen.RecordComparator; import org.apache.flink.table.store.file.KeyValue; import org.apache.flink.table.store.file.KeyValueSerializer; import org.apache.flink.table.store.file.mergetree.compact.MergeFunction; diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/sort/BinaryExternalMerger.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/sort/BinaryExternalMerger.java new file mode 100644 index 00000000..489f909d --- /dev/null +++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/sort/BinaryExternalMerger.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.store.file.sort; + +import org.apache.flink.runtime.io.compression.BlockCompressionFactory; +import org.apache.flink.runtime.io.disk.ChannelReaderInputViewIterator; +import org.apache.flink.runtime.io.disk.iomanager.AbstractChannelReaderInputView; +import org.apache.flink.runtime.io.disk.iomanager.IOManager; +import org.apache.flink.runtime.memory.AbstractPagedOutputView; +import org.apache.flink.table.data.binary.BinaryRowData; +import org.apache.flink.table.runtime.operators.sort.AbstractBinaryExternalMerger; +import org.apache.flink.table.runtime.operators.sort.SpillChannelManager; +import org.apache.flink.table.runtime.typeutils.BinaryRowDataSerializer; +import org.apache.flink.table.store.codegen.RecordComparator; +import org.apache.flink.util.MutableObjectIterator; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; + +/** Record merger for sort of BinaryRowData. Copied from Flink. */ +public class BinaryExternalMerger extends AbstractBinaryExternalMerger<BinaryRowData> { + + private final BinaryRowDataSerializer serializer; + private final RecordComparator comparator; + + public BinaryExternalMerger( + IOManager ioManager, + int pageSize, + int maxFanIn, + SpillChannelManager channelManager, + BinaryRowDataSerializer serializer, + RecordComparator comparator, + boolean compressionEnable, + BlockCompressionFactory compressionCodecFactory, + int compressionBlockSize) { + super( + ioManager, + pageSize, + maxFanIn, + channelManager, + compressionEnable, + compressionCodecFactory, + compressionBlockSize); + this.serializer = serializer; + this.comparator = comparator; + } + + @Override + protected MutableObjectIterator<BinaryRowData> channelReaderInputViewIterator( + AbstractChannelReaderInputView inView) { + return new ChannelReaderInputViewIterator<>(inView, null, serializer.duplicate()); + } + + @Override + protected Comparator<BinaryRowData> mergeComparator() { + return comparator::compare; + } + + @Override + protected List<BinaryRowData> mergeReusedEntries(int size) { + ArrayList<BinaryRowData> reused = new ArrayList<>(size); + for (int i = 0; i < size; i++) { + reused.add(serializer.createInstance()); + } + return reused; + } + + @Override + protected void writeMergingOutput( + MutableObjectIterator<BinaryRowData> mergeIterator, AbstractPagedOutputView output) + throws IOException { + // read the merged stream and write the data back + BinaryRowData rec = serializer.createInstance(); + while ((rec = mergeIterator.next(rec)) != null) { + serializer.serialize(rec, output); + } + } +} diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/sort/BinaryExternalSortBuffer.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/sort/BinaryExternalSortBuffer.java index cd46d54b..9e49c447 100644 --- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/sort/BinaryExternalSortBuffer.java +++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/sort/BinaryExternalSortBuffer.java @@ -28,13 +28,12 @@ import org.apache.flink.runtime.io.disk.iomanager.IOManager; import org.apache.flink.runtime.operators.sort.QuickSort; import org.apache.flink.table.data.RowData; import org.apache.flink.table.data.binary.BinaryRowData; -import org.apache.flink.table.runtime.generated.RecordComparator; import org.apache.flink.table.runtime.io.ChannelWithMeta; -import org.apache.flink.table.runtime.operators.sort.BinaryExternalMerger; import org.apache.flink.table.runtime.operators.sort.BinaryMergeIterator; import org.apache.flink.table.runtime.operators.sort.SpillChannelManager; import org.apache.flink.table.runtime.typeutils.BinaryRowDataSerializer; import org.apache.flink.table.runtime.util.FileChannelUtil; +import org.apache.flink.table.store.codegen.RecordComparator; import org.apache.flink.util.MutableObjectIterator; import java.io.IOException; diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/sort/BinaryInMemorySortBuffer.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/sort/BinaryInMemorySortBuffer.java index fe88e472..b1bb1aff 100644 --- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/sort/BinaryInMemorySortBuffer.java +++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/sort/BinaryInMemorySortBuffer.java @@ -23,12 +23,11 @@ import org.apache.flink.runtime.io.disk.SimpleCollectingOutputView; import org.apache.flink.runtime.operators.sort.QuickSort; import org.apache.flink.table.data.RowData; import org.apache.flink.table.data.binary.BinaryRowData; -import org.apache.flink.table.runtime.generated.NormalizedKeyComputer; -import org.apache.flink.table.runtime.generated.RecordComparator; -import org.apache.flink.table.runtime.operators.sort.BinaryIndexedSortable; import org.apache.flink.table.runtime.typeutils.AbstractRowDataSerializer; import org.apache.flink.table.runtime.typeutils.BinaryRowDataSerializer; import org.apache.flink.table.runtime.util.MemorySegmentPool; +import org.apache.flink.table.store.codegen.NormalizedKeyComputer; +import org.apache.flink.table.store.codegen.RecordComparator; import org.apache.flink.util.MutableObjectIterator; import java.io.EOFException; diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/sort/BinaryIndexedSortable.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/sort/BinaryIndexedSortable.java new file mode 100644 index 00000000..32ecf5b7 --- /dev/null +++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/sort/BinaryIndexedSortable.java @@ -0,0 +1,254 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.store.file.sort; + +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.runtime.io.disk.RandomAccessInputView; +import org.apache.flink.runtime.memory.AbstractPagedOutputView; +import org.apache.flink.runtime.operators.sort.IndexedSortable; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.binary.BinaryRowData; +import org.apache.flink.table.runtime.typeutils.BinaryRowDataSerializer; +import org.apache.flink.table.runtime.util.MemorySegmentPool; +import org.apache.flink.table.store.codegen.NormalizedKeyComputer; +import org.apache.flink.table.store.codegen.RecordComparator; + +import java.io.IOException; +import java.util.ArrayList; + +/** + * An abstract sortable, provide basic compare and swap. Support writing of index and normalizedKey. + * Copied from Flink. + */ +public abstract class BinaryIndexedSortable implements IndexedSortable { + + public static final int OFFSET_LEN = 8; + + // put/compare/swap normalized key + private final NormalizedKeyComputer normalizedKeyComputer; + protected final BinaryRowDataSerializer serializer; + + // if normalized key not fully determines, need compare record. + private final RecordComparator comparator; + + protected final RandomAccessInputView recordBuffer; + private final RandomAccessInputView recordBufferForComparison; + + // segments + protected MemorySegment currentSortIndexSegment; + protected final MemorySegmentPool memorySegmentPool; + protected final ArrayList<MemorySegment> sortIndex; + + // normalized key attributes + private final int numKeyBytes; + protected final int indexEntrySize; + private final int indexEntriesPerSegment; + protected final int lastIndexEntryOffset; + private final boolean normalizedKeyFullyDetermines; + private final boolean useNormKeyUninverted; + + // for serialized comparison + protected final BinaryRowDataSerializer serializer1; + private final BinaryRowDataSerializer serializer2; + protected final BinaryRowData row1; + private final BinaryRowData row2; + + // runtime variables + protected int currentSortIndexOffset; + protected int numRecords; + + public BinaryIndexedSortable( + NormalizedKeyComputer normalizedKeyComputer, + BinaryRowDataSerializer serializer, + RecordComparator comparator, + ArrayList<MemorySegment> recordBufferSegments, + MemorySegmentPool memorySegmentPool) { + if (normalizedKeyComputer == null || serializer == null) { + throw new NullPointerException(); + } + this.normalizedKeyComputer = normalizedKeyComputer; + this.serializer = serializer; + this.comparator = comparator; + this.memorySegmentPool = memorySegmentPool; + this.useNormKeyUninverted = !normalizedKeyComputer.invertKey(); + + this.numKeyBytes = normalizedKeyComputer.getNumKeyBytes(); + + int segmentSize = memorySegmentPool.pageSize(); + this.recordBuffer = new RandomAccessInputView(recordBufferSegments, segmentSize); + this.recordBufferForComparison = + new RandomAccessInputView(recordBufferSegments, segmentSize); + + this.normalizedKeyFullyDetermines = normalizedKeyComputer.isKeyFullyDetermines(); + + // compute the index entry size and limits + this.indexEntrySize = numKeyBytes + OFFSET_LEN; + this.indexEntriesPerSegment = segmentSize / this.indexEntrySize; + this.lastIndexEntryOffset = (this.indexEntriesPerSegment - 1) * this.indexEntrySize; + + this.serializer1 = (BinaryRowDataSerializer) serializer.duplicate(); + this.serializer2 = (BinaryRowDataSerializer) serializer.duplicate(); + this.row1 = this.serializer1.createInstance(); + this.row2 = this.serializer2.createInstance(); + + // set to initial state + this.sortIndex = new ArrayList<>(16); + this.currentSortIndexSegment = nextMemorySegment(); + sortIndex.add(currentSortIndexSegment); + } + + protected MemorySegment nextMemorySegment() { + return this.memorySegmentPool.nextSegment(); + } + + /** check if we need request next index memory. */ + protected boolean checkNextIndexOffset() { + if (this.currentSortIndexOffset > this.lastIndexEntryOffset) { + MemorySegment returnSegment = nextMemorySegment(); + if (returnSegment != null) { + this.currentSortIndexSegment = returnSegment; + this.sortIndex.add(this.currentSortIndexSegment); + this.currentSortIndexOffset = 0; + } else { + return false; + } + } + return true; + } + + /** Write of index and normalizedKey. */ + protected void writeIndexAndNormalizedKey(RowData record, long currOffset) { + // add the pointer and the normalized key + this.currentSortIndexSegment.putLong(this.currentSortIndexOffset, currOffset); + + if (this.numKeyBytes != 0) { + normalizedKeyComputer.putKey( + record, this.currentSortIndexSegment, this.currentSortIndexOffset + OFFSET_LEN); + } + + this.currentSortIndexOffset += this.indexEntrySize; + this.numRecords++; + } + + @Override + public int compare(int i, int j) { + final int segmentNumberI = i / this.indexEntriesPerSegment; + final int segmentOffsetI = (i % this.indexEntriesPerSegment) * this.indexEntrySize; + + final int segmentNumberJ = j / this.indexEntriesPerSegment; + final int segmentOffsetJ = (j % this.indexEntriesPerSegment) * this.indexEntrySize; + + return compare(segmentNumberI, segmentOffsetI, segmentNumberJ, segmentOffsetJ); + } + + @Override + public int compare( + int segmentNumberI, int segmentOffsetI, int segmentNumberJ, int segmentOffsetJ) { + final MemorySegment segI = this.sortIndex.get(segmentNumberI); + final MemorySegment segJ = this.sortIndex.get(segmentNumberJ); + + int val = + normalizedKeyComputer.compareKey( + segI, segmentOffsetI + OFFSET_LEN, segJ, segmentOffsetJ + OFFSET_LEN); + + if (val != 0 || this.normalizedKeyFullyDetermines) { + return this.useNormKeyUninverted ? val : -val; + } + + final long pointerI = segI.getLong(segmentOffsetI); + final long pointerJ = segJ.getLong(segmentOffsetJ); + + return compareRecords(pointerI, pointerJ); + } + + private int compareRecords(long pointer1, long pointer2) { + this.recordBuffer.setReadPosition(pointer1); + this.recordBufferForComparison.setReadPosition(pointer2); + + try { + return this.comparator.compare( + serializer1.mapFromPages(row1, recordBuffer), + serializer2.mapFromPages(row2, recordBufferForComparison)); + } catch (IOException ioex) { + throw new RuntimeException("Error comparing two records.", ioex); + } + } + + @Override + public void swap(int i, int j) { + final int segmentNumberI = i / this.indexEntriesPerSegment; + final int segmentOffsetI = (i % this.indexEntriesPerSegment) * this.indexEntrySize; + + final int segmentNumberJ = j / this.indexEntriesPerSegment; + final int segmentOffsetJ = (j % this.indexEntriesPerSegment) * this.indexEntrySize; + + swap(segmentNumberI, segmentOffsetI, segmentNumberJ, segmentOffsetJ); + } + + @Override + public void swap( + int segmentNumberI, int segmentOffsetI, int segmentNumberJ, int segmentOffsetJ) { + final MemorySegment segI = this.sortIndex.get(segmentNumberI); + final MemorySegment segJ = this.sortIndex.get(segmentNumberJ); + + // swap offset + long index = segI.getLong(segmentOffsetI); + segI.putLong(segmentOffsetI, segJ.getLong(segmentOffsetJ)); + segJ.putLong(segmentOffsetJ, index); + + // swap key + normalizedKeyComputer.swapKey( + segI, segmentOffsetI + OFFSET_LEN, segJ, segmentOffsetJ + OFFSET_LEN); + } + + @Override + public int size() { + return this.numRecords; + } + + @Override + public int recordSize() { + return indexEntrySize; + } + + @Override + public int recordsPerSegment() { + return indexEntriesPerSegment; + } + + /** Spill: Write all records to a {@link AbstractPagedOutputView}. */ + public void writeToOutput(AbstractPagedOutputView output) throws IOException { + final int numRecords = this.numRecords; + int currentMemSeg = 0; + int currentRecord = 0; + + while (currentRecord < numRecords) { + final MemorySegment currentIndexSegment = this.sortIndex.get(currentMemSeg++); + + // go through all records in the memory segment + for (int offset = 0; + currentRecord < numRecords && offset <= this.lastIndexEntryOffset; + currentRecord++, offset += this.indexEntrySize) { + final long pointer = currentIndexSegment.getLong(offset); + this.recordBuffer.setReadPosition(pointer); + this.serializer.copyFromPagesToView(this.recordBuffer, output); + } + } + } +} diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/KeyComparatorSupplier.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/KeyComparatorSupplier.java index a9910f6b..11b3631d 100644 --- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/KeyComparatorSupplier.java +++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/KeyComparatorSupplier.java @@ -19,9 +19,9 @@ package org.apache.flink.table.store.file.utils; import org.apache.flink.table.data.RowData; -import org.apache.flink.table.runtime.generated.GeneratedRecordComparator; -import org.apache.flink.table.runtime.generated.RecordComparator; import org.apache.flink.table.store.codegen.CodeGenUtils; +import org.apache.flink.table.store.codegen.GeneratedClass; +import org.apache.flink.table.store.codegen.RecordComparator; import org.apache.flink.table.types.logical.RowType; import org.apache.flink.util.function.SerializableSupplier; @@ -33,7 +33,7 @@ public class KeyComparatorSupplier implements SerializableSupplier<Comparator<Ro private static final long serialVersionUID = 1L; - private final GeneratedRecordComparator genRecordComparator; + private final GeneratedClass<RecordComparator> genRecordComparator; public KeyComparatorSupplier(RowType keyType) { genRecordComparator = diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/BucketComputer.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/BucketComputer.java index 533e9e3a..f2261a7e 100644 --- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/BucketComputer.java +++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/BucketComputer.java @@ -20,9 +20,9 @@ package org.apache.flink.table.store.table.sink; import org.apache.flink.table.data.RowData; import org.apache.flink.table.data.binary.BinaryRowData; -import org.apache.flink.table.runtime.generated.Projection; import org.apache.flink.table.store.CoreOptions; import org.apache.flink.table.store.codegen.CodeGenUtils; +import org.apache.flink.table.store.codegen.Projection; import org.apache.flink.table.store.file.schema.TableSchema; import org.apache.flink.table.types.logical.RowType; import org.apache.flink.types.RowKind; @@ -34,9 +34,9 @@ public class BucketComputer { private final int numBucket; - private final Projection<RowData, BinaryRowData> rowProjection; - private final Projection<RowData, BinaryRowData> bucketProjection; - private final Projection<RowData, BinaryRowData> pkProjection; + private final Projection rowProjection; + private final Projection bucketProjection; + private final Projection pkProjection; public BucketComputer(TableSchema tableSchema) { this( diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/SinkRecordConverter.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/SinkRecordConverter.java index 3b68d77f..f6131021 100644 --- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/SinkRecordConverter.java +++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/SinkRecordConverter.java @@ -20,8 +20,8 @@ package org.apache.flink.table.store.table.sink; import org.apache.flink.table.data.RowData; import org.apache.flink.table.data.binary.BinaryRowData; -import org.apache.flink.table.runtime.generated.Projection; import org.apache.flink.table.store.codegen.CodeGenUtils; +import org.apache.flink.table.store.codegen.Projection; import org.apache.flink.table.store.file.schema.TableSchema; import org.apache.flink.table.types.logical.RowType; @@ -34,11 +34,11 @@ public class SinkRecordConverter { private final BucketComputer bucketComputer; - private final Projection<RowData, BinaryRowData> partProjection; + private final Projection partProjection; - private final Projection<RowData, BinaryRowData> pkProjection; + private final Projection pkProjection; - @Nullable private final Projection<RowData, BinaryRowData> logPkProjection; + @Nullable private final Projection logPkProjection; public SinkRecordConverter(TableSchema tableSchema) { this( diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/sort/IntNormalizedKeyComputer.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/sort/IntNormalizedKeyComputer.java index 80f410fd..af920330 100644 --- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/sort/IntNormalizedKeyComputer.java +++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/sort/IntNormalizedKeyComputer.java @@ -20,8 +20,8 @@ package org.apache.flink.table.store.file.sort; import org.apache.flink.core.memory.MemorySegment; import org.apache.flink.table.data.RowData; -import org.apache.flink.table.runtime.generated.NormalizedKeyComputer; import org.apache.flink.table.runtime.operators.sort.SortUtil; +import org.apache.flink.table.store.codegen.NormalizedKeyComputer; /** Example for int {@link NormalizedKeyComputer}. */ public class IntNormalizedKeyComputer implements NormalizedKeyComputer { diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/sort/IntRecordComparator.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/sort/IntRecordComparator.java index 64b09c48..ba0511fc 100644 --- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/sort/IntRecordComparator.java +++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/sort/IntRecordComparator.java @@ -19,7 +19,7 @@ package org.apache.flink.table.store.file.sort; import org.apache.flink.table.data.RowData; -import org.apache.flink.table.runtime.generated.RecordComparator; +import org.apache.flink.table.store.codegen.RecordComparator; /** Example Int {@link RecordComparator}. */ public class IntRecordComparator implements RecordComparator {