This is an automated email from the ASF dual-hosted git repository. kurt pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new e20a9f8 [FLINK-11881][table-planner-blink] Introduce code generated typed sorter to blink table (#7958) e20a9f8 is described below commit e20a9f8947244315f7732c719ebf8f77e7699a57 Author: Jingsong Lee <lzljs3620...@aliyun.com> AuthorDate: Thu Mar 14 21:36:23 2019 +0800 [FLINK-11881][table-planner-blink] Introduce code generated typed sorter to blink table (#7958) --- .../flink/table/calcite/FlinkPlannerImpl.scala | 24 +- .../apache/flink/table/codegen/CodeGenUtils.scala | 203 ++++++++ .../flink/table/codegen/SortCodeGenerator.scala | 485 ++++++++++++++++++ .../flink/table/codegen/SortCodeGeneratorTest.java | 550 +++++++++++++++++++++ .../table/dataformat/AbstractBinaryWriter.java | 14 +- .../apache/flink/table/dataformat/BinaryArray.java | 9 + .../flink/table/dataformat/BinaryFormat.java | 81 +++ .../flink/table/dataformat/BinaryGeneric.java | 13 + .../apache/flink/table/dataformat/BinaryRow.java | 28 ++ .../flink/table/dataformat/BinaryString.java | 38 -- .../flink/table/dataformat/BinaryWriter.java | 6 +- .../table/dataformat/DataFormatConverters.java | 55 +-- .../apache/flink/table/dataformat/JoinedRow.java | 9 + .../apache/flink/table/dataformat/NestedRow.java | 8 + .../flink/table/dataformat/ObjectArrayRow.java | 5 + .../flink/table/dataformat/TypeGetterSetters.java | 7 + .../apache/flink/table/generated/CompileUtils.java | 14 + .../flink/table/generated/GeneratedClass.java | 5 +- .../GeneratedNormalizedKeyComputer.java} | 28 +- .../GeneratedRecordComparator.java} | 28 +- .../sort => generated}/NormalizedKeyComputer.java | 27 +- .../sort => generated}/RecordComparator.java | 20 +- .../table/runtime/sort/BinaryExternalMerger.java | 1 + .../table/runtime/sort/BinaryExternalSorter.java | 3 +- .../runtime/sort/BinaryInMemorySortBuffer.java | 2 + .../table/runtime/sort/BinaryIndexedSortable.java | 2 + .../table/runtime/sort/BinaryKVExternalMerger.java | 1 + .../runtime/sort/BinaryKVInMemorySortBuffer.java | 2 + .../table/runtime/sort/BinaryMergeIterator.java | 1 + .../runtime/sort/BufferedKVExternalSorter.java | 2 + .../apache/flink/table/runtime/sort/SortUtil.java | 101 +++- .../org/apache/flink/table/type/ArrayType.java | 2 + .../table/type/{TimeType.java => BinaryType.java} | 18 +- .../org/apache/flink/table/type/BooleanType.java | 2 + .../java/org/apache/flink/table/type/ByteType.java | 2 + .../java/org/apache/flink/table/type/CharType.java | 2 + .../java/org/apache/flink/table/type/DateType.java | 2 + .../org/apache/flink/table/type/DecimalType.java | 2 + .../org/apache/flink/table/type/DoubleType.java | 2 + .../org/apache/flink/table/type/FloatType.java | 2 + .../org/apache/flink/table/type/GenericType.java | 2 + .../java/org/apache/flink/table/type/IntType.java | 2 + .../org/apache/flink/table/type/InternalTypes.java | 2 +- .../java/org/apache/flink/table/type/LongType.java | 2 + .../java/org/apache/flink/table/type/MapType.java | 2 + .../org/apache/flink/table/type/PrimitiveType.java | 2 + .../java/org/apache/flink/table/type/RowType.java | 2 + .../org/apache/flink/table/type/ShortType.java | 2 + .../org/apache/flink/table/type/StringType.java | 2 + .../java/org/apache/flink/table/type/TimeType.java | 2 + .../org/apache/flink/table/type/TimestampType.java | 2 + .../apache/flink/table/type/TypeConverters.java | 1 + .../apache/flink/table/dataformat/BaseRowTest.java | 20 +- .../flink/table/dataformat/BinaryArrayTest.java | 15 + .../flink/table/dataformat/BinaryRowTest.java | 14 + .../flink/table/dataformat/BinaryStringTest.java | 6 +- .../runtime/sort/BinaryMergeIteratorTest.java | 1 + .../runtime/sort/BufferedKVExternalSorterTest.java | 2 + .../runtime/sort/IntNormalizedKeyComputer.java | 3 +- .../table/runtime/sort/IntRecordComparator.java | 3 +- .../flink/table/runtime/sort/SortUtilTest.java | 4 +- 61 files changed, 1730 insertions(+), 167 deletions(-) diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/calcite/FlinkPlannerImpl.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/calcite/FlinkPlannerImpl.scala index 951bc05..8560a1c 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/calcite/FlinkPlannerImpl.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/calcite/FlinkPlannerImpl.scala @@ -20,9 +20,8 @@ package org.apache.flink.table.calcite import java.util import java.util.Properties - import com.google.common.collect.ImmutableList -import org.apache.calcite.config.{CalciteConnectionConfigImpl, CalciteConnectionProperty} +import org.apache.calcite.config.{CalciteConnectionConfigImpl, CalciteConnectionProperty, NullCollation} import org.apache.calcite.jdbc.CalciteSchema import org.apache.calcite.plan.RelOptTable.ViewExpander import org.apache.calcite.plan._ @@ -221,4 +220,25 @@ object FlinkPlannerImpl { rootSchema(schema.getParentSchema) } } + + /** + * the null default direction if not specified. Consistent with HIVE/SPARK/MYSQL/BLINK-RUNTIME. + * So the default value only is set [[NullCollation.LOW]] for keeping consistent with + * BLINK-RUNTIME. + * [[NullCollation.LOW]] means null values appear first when the order is ASC (ascending), and + * ordered last when the order is DESC (descending). + */ + val defaultNullCollation: NullCollation = NullCollation.LOW + + /** Returns the default null direction if not specified. */ + def getNullDefaultOrders(ascendings: Array[Boolean]): Array[Boolean] = { + ascendings.map { asc => + FlinkPlannerImpl.defaultNullCollation.last(!asc) + } + } + + /** Returns the default null direction if not specified. */ + def getNullDefaultOrder(ascending: Boolean): Boolean = { + FlinkPlannerImpl.defaultNullCollation.last(!ascending) + } } diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/CodeGenUtils.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/CodeGenUtils.scala index 94236ed..d07815f 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/CodeGenUtils.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/CodeGenUtils.scala @@ -18,8 +18,12 @@ package org.apache.flink.table.codegen +import org.apache.flink.api.common.ExecutionConfig +import org.apache.flink.api.common.typeinfo.{AtomicType => AtomicTypeInfo} import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.core.memory.MemorySegment import org.apache.flink.table.`type`._ +import org.apache.flink.table.calcite.FlinkPlannerImpl import org.apache.flink.table.dataformat._ import org.apache.flink.table.typeutils.TypeCheckUtils @@ -27,6 +31,8 @@ import java.lang.reflect.Method import java.lang.{Boolean => JBoolean, Byte => JByte, Character => JChar, Double => JDouble, Float => JFloat, Integer => JInt, Long => JLong, Short => JShort} import java.util.concurrent.atomic.AtomicInteger +import scala.collection.mutable + object CodeGenUtils { // ------------------------------- DEFAULT TERMS ------------------------------------------ @@ -36,9 +42,12 @@ object CodeGenUtils { // -------------------------- CANONICAL CLASS NAMES --------------------------------------- val BINARY_ROW: String = className[BinaryRow] + val BINARY_ARRAY: String = className[BinaryArray] + val BINARY_GENERIC: String = className[BinaryGeneric[_]] val BINARY_STRING: String = className[BinaryString] val BASE_ROW: String = className[BaseRow] val GENERIC_ROW: String = className[GenericRow] + val SEGMENT: String = className[MemorySegment] // ---------------------------------------------------------------------------------------- @@ -108,6 +117,7 @@ object CodeGenUtils { case InternalTypes.TIMESTAMP => boxedTypeTermForType(InternalTypes.LONG) case InternalTypes.STRING => BINARY_STRING + case InternalTypes.BINARY => "byte[]" case _: DecimalType => className[Decimal] // BINARY is also an ArrayType and uses BinaryArray internally too @@ -230,4 +240,197 @@ object CodeGenUtils { } } + def baseRowFieldReadAccess( + ctx: CodeGeneratorContext, pos: Int, rowTerm: String, fieldType: InternalType) : String = + baseRowFieldReadAccess(ctx, pos.toString, rowTerm, fieldType) + + def baseRowFieldReadAccess( + ctx: CodeGeneratorContext, pos: String, rowTerm: String, fieldType: InternalType) : String = + fieldType match { + case InternalTypes.INT => s"$rowTerm.getInt($pos)" + case InternalTypes.LONG => s"$rowTerm.getLong($pos)" + case InternalTypes.SHORT => s"$rowTerm.getShort($pos)" + case InternalTypes.BYTE => s"$rowTerm.getByte($pos)" + case InternalTypes.FLOAT => s"$rowTerm.getFloat($pos)" + case InternalTypes.DOUBLE => s"$rowTerm.getDouble($pos)" + case InternalTypes.BOOLEAN => s"$rowTerm.getBoolean($pos)" + case InternalTypes.STRING => s"$rowTerm.getString($pos)" + case InternalTypes.BINARY => s"$rowTerm.getBinary($pos)" + case dt: DecimalType => s"$rowTerm.getDecimal($pos, ${dt.precision()}, ${dt.scale()})" + case InternalTypes.CHAR => s"$rowTerm.getChar($pos)" + case _: TimestampType => s"$rowTerm.getLong($pos)" + case _: DateType => s"$rowTerm.getInt($pos)" + case InternalTypes.TIME => s"$rowTerm.getInt($pos)" + case _: ArrayType => s"$rowTerm.getArray($pos)" + case _: MapType => s"$rowTerm.getMap($pos)" + case rt: RowType => s"$rowTerm.getRow($pos, ${rt.getArity})" + case _: GenericType[_] => s"$rowTerm.getGeneric($pos)" + } + + /** + * Generates code for comparing two field. + */ + def genCompare( + ctx: CodeGeneratorContext, + t: InternalType, + nullsIsLast: Boolean, + c1: String, + c2: String): String = t match { + case InternalTypes.BOOLEAN => s"($c1 == $c2 ? 0 : ($c1 ? 1 : -1))" + case _: PrimitiveType | _: DateType | _: TimeType | _: TimestampType => + s"($c1 > $c2 ? 1 : $c1 < $c2 ? -1 : 0)" + case InternalTypes.BINARY => + val sortUtil = classOf[org.apache.flink.table.runtime.sort.SortUtil].getCanonicalName + s"$sortUtil.compareBinary($c1, $c2)" + case at: ArrayType => + val compareFunc = newName("compareArray") + val compareCode = genArrayCompare( + ctx, + FlinkPlannerImpl.getNullDefaultOrder(true), at, "a", "b") + val funcCode: String = + s""" + public int $compareFunc($BINARY_ARRAY a, $BINARY_ARRAY b) { + $compareCode + return 0; + } + """ + ctx.addReusableMember(funcCode) + s"$compareFunc($c1, $c2)" + case rowType: RowType => + val orders = rowType.getFieldTypes.map(_ => true) + val comparisons = genRowCompare( + ctx, + rowType.getFieldTypes.indices.toArray, + rowType.getFieldTypes, + orders, + FlinkPlannerImpl.getNullDefaultOrders(orders), + "a", + "b") + val compareFunc = newName("compareRow") + val funcCode: String = + s""" + public int $compareFunc($BASE_ROW a, $BASE_ROW b) { + $comparisons + return 0; + } + """ + ctx.addReusableMember(funcCode) + s"$compareFunc($c1, $c2)" + case gt: GenericType[_] => + val ser = ctx.addReusableObject(gt.getSerializer, "serializer") + val comp = ctx.addReusableObject( + gt.getTypeInfo.asInstanceOf[AtomicTypeInfo[_]].createComparator(true, new ExecutionConfig), + "comparator") + s""" + |$comp.compare( + | $BINARY_GENERIC.getJavaObjectFromBinaryGeneric($c1, $ser), + | $BINARY_GENERIC.getJavaObjectFromBinaryGeneric($c2, $ser) + |) + """.stripMargin + case other if other.isInstanceOf[AtomicType] => s"$c1.compareTo($c2)" + } + + /** + * Generates code for comparing array. + */ + def genArrayCompare( + ctx: CodeGeneratorContext, nullsIsLast: Boolean, t: ArrayType, a: String, b: String) + : String = { + val nullIsLastRet = if (nullsIsLast) 1 else -1 + val elementType = t.getElementType + val fieldA = newName("fieldA") + val isNullA = newName("isNullA") + val lengthA = newName("lengthA") + val fieldB = newName("fieldB") + val isNullB = newName("isNullB") + val lengthB = newName("lengthB") + val minLength = newName("minLength") + val i = newName("i") + val comp = newName("comp") + val typeTerm = primitiveTypeTermForType(elementType) + s""" + int $lengthA = a.numElements(); + int $lengthB = b.numElements(); + int $minLength = ($lengthA > $lengthB) ? $lengthB : $lengthA; + for (int $i = 0; $i < $minLength; $i++) { + boolean $isNullA = a.isNullAt($i); + boolean $isNullB = b.isNullAt($i); + if ($isNullA && $isNullB) { + // Continue to compare the next element + } else if ($isNullA) { + return $nullIsLastRet; + } else if ($isNullB) { + return ${-nullIsLastRet}; + } else { + $typeTerm $fieldA = ${baseRowFieldReadAccess(ctx, i, a, elementType)}; + $typeTerm $fieldB = ${baseRowFieldReadAccess(ctx, i, b, elementType)}; + int $comp = ${genCompare(ctx, elementType, nullsIsLast, fieldA, fieldB)}; + if ($comp != 0) { + return $comp; + } + } + } + + if ($lengthA < $lengthB) { + return -1; + } else if ($lengthA > $lengthB) { + return 1; + } + """ + } + + /** + * Generates code for comparing row keys. + */ + def genRowCompare( + ctx: CodeGeneratorContext, + keys: Array[Int], + keyTypes: Array[InternalType], + orders: Array[Boolean], + nullsIsLast: Array[Boolean], + row1: String, + row2: String): String = { + + val compares = new mutable.ArrayBuffer[String] + + for (i <- keys.indices) { + val index = keys(i) + + val symbol = if (orders(i)) "" else "-" + + val nullIsLastRet = if (nullsIsLast(i)) 1 else -1 + + val t = keyTypes(i) + + val typeTerm = primitiveTypeTermForType(t) + val fieldA = newName("fieldA") + val isNullA = newName("isNullA") + val fieldB = newName("fieldB") + val isNullB = newName("isNullB") + val comp = newName("comp") + + val code = + s""" + |boolean $isNullA = $row1.isNullAt($index); + |boolean $isNullB = $row2.isNullAt($index); + |if ($isNullA && $isNullB) { + | // Continue to compare the next element + |} else if ($isNullA) { + | return $nullIsLastRet; + |} else if ($isNullB) { + | return ${-nullIsLastRet}; + |} else { + | $typeTerm $fieldA = ${baseRowFieldReadAccess(ctx, index, row1, t)}; + | $typeTerm $fieldB = ${baseRowFieldReadAccess(ctx, index, row2, t)}; + | int $comp = ${genCompare(ctx, t, nullsIsLast(i), fieldA, fieldB)}; + | if ($comp != 0) { + | return $symbol$comp; + | } + |} + """.stripMargin + compares += code + } + compares.mkString + } + } diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/SortCodeGenerator.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/SortCodeGenerator.scala new file mode 100644 index 0000000..2e3df95 --- /dev/null +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/SortCodeGenerator.scala @@ -0,0 +1,485 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.codegen + +import org.apache.flink.table.`type`.{DateType, DecimalType, InternalType, InternalTypes, PrimitiveType, TimeType, TimestampType} +import org.apache.flink.table.api.TableConfig +import org.apache.flink.table.codegen.CodeGenUtils.{BASE_ROW, SEGMENT, newName} +import org.apache.flink.table.codegen.Indenter.toISC +import org.apache.flink.table.dataformat.{BinaryRow, Decimal} +import org.apache.flink.table.generated.{GeneratedNormalizedKeyComputer, GeneratedRecordComparator, NormalizedKeyComputer, RecordComparator} +import org.apache.flink.table.runtime.sort.SortUtil + +import scala.collection.mutable + +/** + * A code generator for generating [[NormalizedKeyComputer]] and [[RecordComparator]]. + * + * @param keys key positions describe which fields are keys in what order. + * @param keyTypes types for the key fields, in the same order as the key fields. + * @param orders sorting orders for the key fields. + * @param nullsIsLast Ordering of nulls. + */ +class SortCodeGenerator( + conf: TableConfig, + val keys: Array[Int], + val keyTypes: Array[InternalType], + val orders: Array[Boolean], + val nullsIsLast: Array[Boolean]) { + + private val MAX_NORMALIZED_KEY_LEN = 16 + + private val SORT_UTIL = classOf[SortUtil].getCanonicalName + + /** Chunks for long, int, short, byte */ + private val POSSIBLE_CHUNK_SIZES = Array(8, 4, 2, 1) + + /** For get${operator} set${operator} of [[org.apache.flink.core.memory.MemorySegment]] */ + private val BYTE_OPERATOR_MAPPING = Map(8 -> "Long", 4 -> "Int", 2 -> "Short", 1 -> "") + + /** For primitive define */ + private val BYTE_DEFINE_MAPPING = Map(8 -> "long", 4 -> "int", 2 -> "short", 1 -> "byte") + + /** For Class of primitive type */ + private val BYTE_CLASS_MAPPING = Map(8 -> "Long", 4 -> "Integer", 2 -> "Short", 1 -> "Byte") + + /** Normalized meta */ + val (nullAwareNormalizedKeyLen, normalizedKeyNum, invertNormalizedKey, normalizedKeyLengths) = { + var keyLen = 0 + var keyNum = 0 + var inverted = false + val keyLengths = new mutable.ArrayBuffer[Int] + var break = false + var i = 0 + while (i < keys.length && !break) { + val t = keyTypes(i) + if (supportNormalizedKey(t)) { + val invert = !orders(i) + + if (i == 0) { + // the first comparator decides whether we need to invert the key direction + inverted = invert + } + + if (invert != inverted) { + // if a successor does not agree on the inversion direction, + // it cannot be part of the normalized key + break = true + } else { + keyNum += 1 + // Need add null aware 1 byte + val len = safeAddLength(getNormalizeKeyLen(t), 1) + if (len < 0) { + throw new RuntimeException( + s"$t specifies an invalid length for the normalized key: " + len) + } + keyLengths += len + keyLen = safeAddLength(keyLen, len) + if (keyLen == Integer.MAX_VALUE) { + break = true + } + } + } else { + break = true + } + i += 1 + } + + (keyLen, keyNum, inverted, keyLengths) + } + + def getKeyFullyDeterminesAndBytes: (Boolean, Int) = { + if (nullAwareNormalizedKeyLen > 18) { + // The maximum setting is 18 because want to put two null aware long as much as possible. + // Anyway, we can't fit it, so align the most efficient 8 bytes. + (false, Math.min(MAX_NORMALIZED_KEY_LEN, 8 * normalizedKeyNum)) + } else { + (normalizedKeyNum == keys.length, nullAwareNormalizedKeyLen) + } + } + + /** + * Generates a [[NormalizedKeyComputer]] that can be passed to a Java compiler. + * + * @param name Class name of the function. + * Does not need to be unique but has to be a valid Java class identifier. + * @return A GeneratedNormalizedKeyComputer + */ + def generateNormalizedKeyComputer(name: String): GeneratedNormalizedKeyComputer = { + + val className = newName(name) + + val (keyFullyDetermines, numKeyBytes) = getKeyFullyDeterminesAndBytes + + val putKeys = generatePutNormalizedKeys(numKeyBytes) + + val chunks = calculateChunks(numKeyBytes) + + val reverseKeys = generateReverseNormalizedKeys(chunks) + + val compareKeys = generateCompareNormalizedKeys(chunks) + + val swapKeys = generateSwapNormalizedKeys(chunks) + + val baseClass = classOf[NormalizedKeyComputer] + + val code = + j""" + public class $className implements ${baseClass.getCanonicalName} { + + public $className(Object[] references) { + // useless + } + + @Override + public void putKey($BASE_ROW record, $SEGMENT target, int offset) { + ${putKeys.mkString} + ${reverseKeys.mkString} + } + + @Override + public int compareKey($SEGMENT segI, int offsetI, $SEGMENT segJ, int offsetJ) { + ${compareKeys.mkString} + } + + @Override + public void swapKey($SEGMENT segI, int offsetI, $SEGMENT segJ, int offsetJ) { + ${swapKeys.mkString} + } + + @Override + public int getNumKeyBytes() { + return $numKeyBytes; + } + + @Override + public boolean isKeyFullyDetermines() { + return $keyFullyDetermines; + } + + @Override + public boolean invertKey() { + return $invertNormalizedKey; + } + + } + """.stripMargin + + new GeneratedNormalizedKeyComputer(className, code) + } + + def generatePutNormalizedKeys(numKeyBytes: Int): mutable.ArrayBuffer[String] = { + /* Example generated code, for int: + if (record.isNullAt(0)) { + org.apache.flink.table.dataformat.util.BinaryRowUtil.minNormalizedKey(target, offset+0, 5); + } else { + target.put(offset+0, (byte) 1); + org.apache.flink.table.dataformat.util.BinaryRowUtil.putIntNormalizedKey( + record.getInt(0), target, offset+1, 4); + } + */ + val putKeys = new mutable.ArrayBuffer[String] + var bytesLeft = numKeyBytes + var currentOffset = 0 + var keyIndex = 0 + while (bytesLeft > 0 && keyIndex < normalizedKeyNum) { + var len = normalizedKeyLengths(keyIndex) + val index = keys(keyIndex) + val nullIsMaxValue = orders(keyIndex) == nullsIsLast(keyIndex) + len = if (bytesLeft >= len) len else bytesLeft + val t = keyTypes(keyIndex) + val prefix = prefixGetFromBinaryRow(t) + val putCode = t match { + case _ if getNormalizeKeyLen(t) != Int.MaxValue => + val get = getter(t, index) + s""" + |target.put(offset+$currentOffset, (byte) 1); + |$SORT_UTIL.put${prefixPutNormalizedKey(t)}NormalizedKey( + | record.$get, target, offset+${currentOffset + 1}, ${len - 1}); + | + """.stripMargin + case _ => + // It is BinaryString/byte[].., we can omit the null aware byte(zero is the smallest), + // because there is no other field behind, and is not keyFullyDetermines. + s""" + |$SORT_UTIL.put${prefixPutNormalizedKey(t)}NormalizedKey( + | record.get$prefix($index), target, offset+$currentOffset, $len); + |""".stripMargin + } + val nullCode = if (nullIsMaxValue) { + s"$SORT_UTIL.maxNormalizedKey(target, offset+$currentOffset, $len);" + } else { + s"$SORT_UTIL.minNormalizedKey(target, offset+$currentOffset, $len);" + } + + val code = + s""" + |if (record.isNullAt($index)) { + | $nullCode + |} else { + | $putCode + |} + |""".stripMargin + + putKeys += code + bytesLeft -= len + currentOffset += len + keyIndex += 1 + } + putKeys + } + + /** + * In order to better performance and not use MemorySegment's compare() and swap(), + * we CodeGen more efficient chunk method. + */ + def calculateChunks(numKeyBytes: Int): Array[Int] = { + /* Example chunks, for int: + calculateChunks(5) = Array(4, 1) + */ + val chunks = new mutable.ArrayBuffer[Int] + var i = 0 + var remainBytes = numKeyBytes + while (remainBytes > 0) { + val bytes = POSSIBLE_CHUNK_SIZES(i) + if (bytes <= remainBytes) { + chunks += bytes + remainBytes -= bytes + } else { + i += 1 + } + } + chunks.toArray + } + + /** + * Because we put normalizedKeys in big endian way, if we are the little endian, + * we need to reverse these data with chunks for comparation. + */ + def generateReverseNormalizedKeys(chunks: Array[Int]): mutable.ArrayBuffer[String] = { + /* Example generated code, for int: + target.putInt(offset+0, Integer.reverseBytes(target.getInt(offset+0))); + //byte don't need reverse. + */ + val reverseKeys = new mutable.ArrayBuffer[String] + // If it is big endian, it would be better, no reverse. + if (BinaryRow.LITTLE_ENDIAN) { + var reverseOffset = 0 + for (chunk <- chunks) { + val operator = BYTE_OPERATOR_MAPPING(chunk) + val className = BYTE_CLASS_MAPPING(chunk) + if (chunk != 1) { + val reverseKey = + s""" + |target.put$operator(offset+$reverseOffset, + | $className.reverseBytes(target.get$operator(offset+$reverseOffset))); + """.stripMargin + reverseKeys += reverseKey + } + reverseOffset += chunk + } + } + reverseKeys + } + + /** + * Compare bytes with chunks and nsigned. + */ + def generateCompareNormalizedKeys(chunks: Array[Int]): mutable.ArrayBuffer[String] = { + /* Example generated code, for int: + int l_0_1 = segI.getInt(offsetI+0); + int l_0_2 = segJ.getInt(offsetJ+0); + if (l_0_1 != l_0_2) { + return ((l_0_1 < l_0_2) ^ (l_0_1 < 0) ^ + (l_0_2 < 0) ? -1 : 1); + } + + byte l_1_1 = segI.get(offsetI+4); + byte l_1_2 = segJ.get(offsetJ+4); + if (l_1_1 != l_1_2) { + return ((l_1_1 < l_1_2) ^ (l_1_1 < 0) ^ + (l_1_2 < 0) ? -1 : 1); + } + return 0; + */ + val compareKeys = new mutable.ArrayBuffer[String] + var compareOffset = 0 + for (i <- chunks.indices) { + val chunk = chunks(i) + val operator = BYTE_OPERATOR_MAPPING(chunk) + val define = BYTE_DEFINE_MAPPING(chunk) + val compareKey = + s""" + |$define l_${i}_1 = segI.get$operator(offsetI+$compareOffset); + |$define l_${i}_2 = segJ.get$operator(offsetJ+$compareOffset); + |if (l_${i}_1 != l_${i}_2) { + | return ((l_${i}_1 < l_${i}_2) ^ (l_${i}_1 < 0) ^ + | (l_${i}_2 < 0) ? -1 : 1); + |} + """.stripMargin + compareKeys += compareKey + compareOffset += chunk + } + compareKeys += "return 0;" + compareKeys + } + + /** + * Swap bytes with chunks. + */ + def generateSwapNormalizedKeys(chunks: Array[Int]): mutable.ArrayBuffer[String] = { + /* Example generated code, for int: + int temp0 = segI.getInt(offsetI+0); + segI.putInt(offsetI+0, segJ.getInt(offsetJ+0)); + segJ.putInt(offsetJ+0, temp0); + + byte temp1 = segI.get(offsetI+4); + segI.put(offsetI+4, segJ.get(offsetJ+4)); + segJ.put(offsetJ+4, temp1); + */ + val swapKeys = new mutable.ArrayBuffer[String] + var swapOffset = 0 + for (i <- chunks.indices) { + val chunk = chunks(i) + val operator = BYTE_OPERATOR_MAPPING(chunk) + val define = BYTE_DEFINE_MAPPING(chunk) + val swapKey = + s""" + |$define temp$i = segI.get$operator(offsetI+$swapOffset); + |segI.put$operator(offsetI+$swapOffset, segJ.get$operator(offsetJ+$swapOffset)); + |segJ.put$operator(offsetJ+$swapOffset, temp$i); + """.stripMargin + swapKeys += swapKey + swapOffset += chunk + } + swapKeys + } + + /** + * Generates a [[RecordComparator]] that can be passed to a Java compiler. + * + * @param name Class name of the function. + * Does not need to be unique but has to be a valid Java class identifier. + * @return A GeneratedRecordComparator + */ + def generateRecordComparator(name: String): GeneratedRecordComparator = { + val className = newName(name) + val baseClass = classOf[RecordComparator] + + val ctx = new CodeGeneratorContext(conf) + val compareCode = CodeGenUtils.genRowCompare( + ctx, keys, keyTypes, orders, nullsIsLast, "o1", "o2") + + val code = + j""" + public class $className implements ${baseClass.getCanonicalName} { + + private final Object[] references; + ${ctx.reuseMemberCode()} + + public $className(Object[] references) { + this.references = references; + ${ctx.reuseInitCode()} + ${ctx.reuseOpenCode()} + } + + @Override + public int compare($BASE_ROW o1, $BASE_ROW o2) { + $compareCode + return 0; + } + + } + """.stripMargin + + new GeneratedRecordComparator(className, code, ctx.references.toArray) + } + + def getter(t: InternalType, index: Int): String = { + val prefix = prefixGetFromBinaryRow(t) + t match { + case dt: DecimalType => + s"get$prefix($index, ${dt.precision()}, ${dt.scale()})" + case _ => + s"get$prefix($index)" + } + } + + /** + * For put${prefix}NormalizedKey() and compare$prefix() of [[SortUtil]]. + */ + def prefixPutNormalizedKey(t: InternalType): String = prefixGetFromBinaryRow(t) + + /** + * For get$prefix() of [[org.apache.flink.table.dataformat.TypeGetterSetters]]. + */ + def prefixGetFromBinaryRow(t: InternalType): String = t match { + case InternalTypes.INT => "Int" + case InternalTypes.LONG => "Long" + case InternalTypes.SHORT => "Short" + case InternalTypes.BYTE => "Byte" + case InternalTypes.FLOAT => "Float" + case InternalTypes.DOUBLE => "Double" + case InternalTypes.BOOLEAN => "Boolean" + case InternalTypes.CHAR => "Char" + case InternalTypes.STRING => "String" + case InternalTypes.BINARY => "Binary" + case _: DecimalType => "Decimal" + case _: DateType => "Int" + case InternalTypes.TIME => "Int" + case _: TimestampType => "Long" + case _ => null + } + + /** + * Preventing overflow. + */ + def safeAddLength(i: Int, j: Int): Int = { + val sum = i + j + if (sum < i || sum < j) { + Integer.MAX_VALUE + } else { + sum + } + } + + def supportNormalizedKey(t: InternalType): Boolean = { + t match { + case _: PrimitiveType | InternalTypes.STRING | InternalTypes.BINARY | + _: DateType | _: TimeType | _: TimestampType => true + case dt: DecimalType => Decimal.isCompact(dt.precision()) + case _ => false + } + } + + def getNormalizeKeyLen(t: InternalType): Int = { + t match { + case InternalTypes.BOOLEAN => 1 + case InternalTypes.BYTE => 1 + case InternalTypes.SHORT => 2 + case InternalTypes.CHAR => 2 + case InternalTypes.INT => 4 + case InternalTypes.FLOAT => 4 + case InternalTypes.DOUBLE => 8 + case InternalTypes.LONG => 8 + case dt: DecimalType if Decimal.isCompact(dt.precision()) => 8 + case InternalTypes.STRING | InternalTypes.BINARY => Int.MaxValue + } + } +} diff --git a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/codegen/SortCodeGeneratorTest.java b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/codegen/SortCodeGeneratorTest.java new file mode 100644 index 0000000..0023831 --- /dev/null +++ b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/codegen/SortCodeGeneratorTest.java @@ -0,0 +1,550 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.codegen; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.core.memory.MemorySegmentFactory; +import org.apache.flink.runtime.operators.sort.QuickSort; +import org.apache.flink.table.api.TableConfig; +import org.apache.flink.table.calcite.FlinkPlannerImpl; +import org.apache.flink.table.dataformat.BinaryArray; +import org.apache.flink.table.dataformat.BinaryGeneric; +import org.apache.flink.table.dataformat.BinaryRow; +import org.apache.flink.table.dataformat.BinaryRowWriter; +import org.apache.flink.table.dataformat.BinaryString; +import org.apache.flink.table.dataformat.BinaryWriter; +import org.apache.flink.table.dataformat.DataFormatConverters; +import org.apache.flink.table.dataformat.Decimal; +import org.apache.flink.table.dataformat.GenericRow; +import org.apache.flink.table.dataformat.TypeGetterSetters; +import org.apache.flink.table.generated.GeneratedNormalizedKeyComputer; +import org.apache.flink.table.generated.GeneratedRecordComparator; +import org.apache.flink.table.generated.NormalizedKeyComputer; +import org.apache.flink.table.generated.RecordComparator; +import org.apache.flink.table.runtime.sort.BinaryInMemorySortBuffer; +import org.apache.flink.table.type.ArrayType; +import org.apache.flink.table.type.DecimalType; +import org.apache.flink.table.type.GenericType; +import org.apache.flink.table.type.InternalType; +import org.apache.flink.table.type.InternalTypes; +import org.apache.flink.table.type.RowType; +import org.apache.flink.table.typeutils.AbstractRowSerializer; +import org.apache.flink.table.typeutils.BinaryRowSerializer; +import org.apache.flink.util.MutableObjectIterator; + +import org.apache.commons.lang3.RandomStringUtils; +import org.junit.Assert; +import org.junit.Test; + +import java.math.BigDecimal; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.LinkedList; +import java.util.List; +import java.util.Random; +import java.util.concurrent.ThreadLocalRandom; + +/** + * Random test for sort code generator. + */ +public class SortCodeGeneratorTest { + + private static final int RECORD_NUM = 3000; + + private final InternalType[] types = new InternalType[]{ + InternalTypes.BOOLEAN, + InternalTypes.BYTE, + InternalTypes.SHORT, + InternalTypes.INT, + InternalTypes.LONG, + InternalTypes.FLOAT, + InternalTypes.DOUBLE, + InternalTypes.CHAR, + InternalTypes.STRING, + new DecimalType(18, 2), + new DecimalType(38, 18), + InternalTypes.BINARY, + new ArrayType(InternalTypes.BYTE), + new RowType(InternalTypes.INT), + new RowType(new RowType(InternalTypes.INT)), + new GenericType<>(Types.INT) + }; + + private int[] fields; + private int[] keys; + private boolean[] orders; + private boolean[] nullsIsLast; + + private static final DataFormatConverters.DataFormatConverter INT_ROW_CONV = + DataFormatConverters.getConverterForTypeInfo(new RowTypeInfo(Types.INT)); + private static final TypeComparator INT_ROW_COMP = new RowTypeInfo(Types.INT).createComparator( + new int[] {0}, new boolean[] {true}, 0, new ExecutionConfig()); + private static final DataFormatConverters.DataFormatConverter NEST_ROW_CONV = + DataFormatConverters.getConverterForTypeInfo(new RowTypeInfo(new RowTypeInfo(Types.INT))); + private static final TypeComparator NEST_ROW_COMP = new RowTypeInfo(new RowTypeInfo(Types.INT)).createComparator( + new int[] {0}, new boolean[] {true}, 0, new ExecutionConfig()); + + @Test + public void testMultiKeys() throws Exception { + for (int i = 0; i < 100; i++) { + randomKeysAndOrders(); + testInner(); + } + } + + @Test + public void testOneKey() throws Exception { + for (int time = 0; time < 100; time++) { + Random rnd = new Random(); + fields = new int[rnd.nextInt(9) + 1]; + for (int i = 0; i < fields.length; i++) { + fields[i] = rnd.nextInt(types.length); + } + + keys = new int[] {0}; + orders = new boolean[] {rnd.nextBoolean()}; + nullsIsLast = FlinkPlannerImpl.getNullDefaultOrders(orders); + testInner(); + } + } + + private void randomKeysAndOrders() { + Random rnd = new Random(); + fields = new int[rnd.nextInt(9) + 1]; + for (int i = 0; i < fields.length; i++) { + fields[i] = rnd.nextInt(types.length); + } + + keys = new int[rnd.nextInt(fields.length) + 1]; + LinkedList<Integer> indexQueue = new LinkedList<>(); + for (int i = 0; i < fields.length; i++) { + indexQueue.add(i); + } + Collections.shuffle(indexQueue); + orders = new boolean[keys.length]; + for (int i = 0; i < keys.length; i++) { + keys[i] = indexQueue.poll(); + orders[i] = rnd.nextBoolean(); + } + nullsIsLast = FlinkPlannerImpl.getNullDefaultOrders(orders); + } + + private Object[] shuffle(Object[] objects) { + Collections.shuffle(Arrays.asList(objects)); + return objects; + } + + private BinaryRow row(int i, Object[][] values) { + BinaryRow row = new BinaryRow(fields.length); + BinaryRowWriter writer = new BinaryRowWriter(row); + for (int j = 0; j < fields.length; j++) { + Object value = values[j][i]; + if (value == null) { + writer.setNullAt(j); + } else { + BinaryWriter.write(writer, j, value, types[fields[j]]); + } + } + + writer.complete(); + return row; + } + + private BinaryRow[] getTestData() { + BinaryRow[] result = new BinaryRow[RECORD_NUM]; + Object[][] values = new Object[fields.length][]; + for (int i = 0; i < fields.length; i++) { + values[i] = shuffle(generateValues(types[fields[i]])); + } + for (int i = 0; i < RECORD_NUM; i++) { + result[i] = row(i, values); + } + return result; + } + + private Object[] generateValues(InternalType type) { + + Random rnd = new Random(); + + int seedNum = RECORD_NUM / 5; + Object[] seeds = new Object[seedNum]; + seeds[0] = null; + seeds[1] = value1(type, rnd); + seeds[2] = value2(type, rnd); + seeds[3] = value3(type, rnd); + for (int i = 4; i < seeds.length; i++) { + if (type.equals(InternalTypes.BOOLEAN)) { + seeds[i] = rnd.nextBoolean(); + } else if (type.equals(InternalTypes.BYTE)) { + seeds[i] = (byte) rnd.nextLong(); + } else if (type.equals(InternalTypes.SHORT)) { + seeds[i] = (short) rnd.nextLong(); + } else if (type.equals(InternalTypes.INT)) { + seeds[i] = rnd.nextInt(); + } else if (type.equals(InternalTypes.LONG)) { + seeds[i] = rnd.nextLong(); + } else if (type.equals(InternalTypes.FLOAT)) { + seeds[i] = rnd.nextFloat() * rnd.nextLong(); + } else if (type.equals(InternalTypes.DOUBLE)) { + seeds[i] = rnd.nextDouble() * rnd.nextLong(); + } else if (type.equals(InternalTypes.CHAR)) { + seeds[i] = (char) rnd.nextInt(); + } else if (type.equals(InternalTypes.STRING)) { + seeds[i] = BinaryString.fromString(RandomStringUtils.random(rnd.nextInt(20))); + } else if (type instanceof DecimalType) { + DecimalType decimalType = (DecimalType) type; + BigDecimal decimal = new BigDecimal( + rnd.nextInt()).divide( + new BigDecimal(ThreadLocalRandom.current().nextInt(1, 256)), + ThreadLocalRandom.current().nextInt(1, 30), BigDecimal.ROUND_HALF_EVEN); + seeds[i] = Decimal.fromBigDecimal(decimal, decimalType.precision(), decimalType.scale()); + } else if (type instanceof ArrayType || type.equals(InternalTypes.BINARY)) { + byte[] bytes = new byte[rnd.nextInt(16) + 1]; + rnd.nextBytes(bytes); + seeds[i] = type.equals(InternalTypes.BINARY) ? bytes : BinaryArray.fromPrimitiveArray(bytes); + } else if (type instanceof RowType) { + RowType rowType = (RowType) type; + if (rowType.getTypeAt(0).equals(InternalTypes.INT)) { + seeds[i] = GenericRow.of(rnd.nextInt()); + } else { + seeds[i] = GenericRow.of(GenericRow.of(rnd.nextInt())); + } + } else if (type instanceof GenericType) { + seeds[i] = new BinaryGeneric<>(rnd.nextInt(), IntSerializer.INSTANCE); + } else { + throw new RuntimeException("Not support!"); + } + } + + // result values + Object[] results = new Object[RECORD_NUM]; + for (int i = 0; i < RECORD_NUM; i++) { + results[i] = seeds[rnd.nextInt(seedNum)]; + } + return results; + } + + private Object value1(InternalType type, Random rnd) { + if (type.equals(InternalTypes.BOOLEAN)) { + return false; + } else if (type.equals(InternalTypes.BYTE)) { + return Byte.MIN_VALUE; + } else if (type.equals(InternalTypes.SHORT)) { + return Short.MIN_VALUE; + } else if (type.equals(InternalTypes.INT)) { + return Integer.MIN_VALUE; + } else if (type.equals(InternalTypes.LONG)) { + return Long.MIN_VALUE; + } else if (type.equals(InternalTypes.FLOAT)) { + return Float.MIN_VALUE; + } else if (type.equals(InternalTypes.DOUBLE)) { + return Double.MIN_VALUE; + } else if (type.equals(InternalTypes.CHAR)) { + return '1'; + } else if (type.equals(InternalTypes.STRING)) { + return BinaryString.fromString(""); + } else if (type instanceof DecimalType) { + DecimalType decimalType = (DecimalType) type; + return Decimal.fromBigDecimal(new BigDecimal(Integer.MIN_VALUE), + decimalType.precision(), decimalType.scale()); + } else if (type instanceof ArrayType) { + byte[] bytes = new byte[rnd.nextInt(7) + 1]; + rnd.nextBytes(bytes); + BinaryArray array = BinaryArray.fromPrimitiveArray(bytes); + for (int i = 0; i < bytes.length; i++) { + array.setNullByte(i); + } + return array; + } else if (type.equals(InternalTypes.BINARY)) { + byte[] bytes = new byte[rnd.nextInt(7) + 1]; + rnd.nextBytes(bytes); + return bytes; + } else if (type instanceof RowType) { + return GenericRow.of(new Object[]{null}); + } else if (type instanceof GenericType) { + return new BinaryGeneric<>(rnd.nextInt(), IntSerializer.INSTANCE); + } else { + throw new RuntimeException("Not support!"); + } + } + + private Object value2(InternalType type, Random rnd) { + if (type.equals(InternalTypes.BOOLEAN)) { + return false; + } else if (type.equals(InternalTypes.BYTE)) { + return (byte) 0; + } else if (type.equals(InternalTypes.SHORT)) { + return (short) 0; + } else if (type.equals(InternalTypes.INT)) { + return 0; + } else if (type.equals(InternalTypes.LONG)) { + return 0L; + } else if (type.equals(InternalTypes.FLOAT)) { + return 0f; + } else if (type.equals(InternalTypes.DOUBLE)) { + return 0d; + } else if (type.equals(InternalTypes.CHAR)) { + return '0'; + } else if (type.equals(InternalTypes.STRING)) { + return BinaryString.fromString("0"); + } else if (type instanceof DecimalType) { + DecimalType decimalType = (DecimalType) type; + return Decimal.fromBigDecimal(new BigDecimal(0), + decimalType.precision(), decimalType.scale()); + } else if (type instanceof ArrayType || type.equals(InternalTypes.BINARY)) { + byte[] bytes = new byte[rnd.nextInt(7) + 10]; + rnd.nextBytes(bytes); + return type.equals(InternalTypes.BINARY) ? bytes : BinaryArray.fromPrimitiveArray(bytes); + } else if (type instanceof RowType) { + RowType rowType = (RowType) type; + if (rowType.getTypeAt(0).equals(InternalTypes.INT)) { + return GenericRow.of(rnd.nextInt()); + } else { + return GenericRow.of(GenericRow.of(new Object[]{null})); + } + } else if (type instanceof GenericType) { + return new BinaryGeneric<>(rnd.nextInt(), IntSerializer.INSTANCE); + } else { + throw new RuntimeException("Not support!"); + } + } + + private Object value3(InternalType type, Random rnd) { + if (type.equals(InternalTypes.BOOLEAN)) { + return true; + } else if (type.equals(InternalTypes.BYTE)) { + return Byte.MAX_VALUE; + } else if (type.equals(InternalTypes.SHORT)) { + return Short.MAX_VALUE; + } else if (type.equals(InternalTypes.INT)) { + return Integer.MAX_VALUE; + } else if (type.equals(InternalTypes.LONG)) { + return Long.MAX_VALUE; + } else if (type.equals(InternalTypes.FLOAT)) { + return Float.MAX_VALUE; + } else if (type.equals(InternalTypes.DOUBLE)) { + return Double.MAX_VALUE; + } else if (type.equals(InternalTypes.CHAR)) { + return '鼎'; + } else if (type.equals(InternalTypes.STRING)) { + return BinaryString.fromString(RandomStringUtils.random(100)); + } else if (type instanceof DecimalType) { + DecimalType decimalType = (DecimalType) type; + return Decimal.fromBigDecimal(new BigDecimal(Integer.MAX_VALUE), + decimalType.precision(), decimalType.scale()); + } else if (type instanceof ArrayType || type.equals(InternalTypes.BINARY)) { + byte[] bytes = new byte[rnd.nextInt(100) + 100]; + rnd.nextBytes(bytes); + return type.equals(InternalTypes.BINARY) ? bytes : BinaryArray.fromPrimitiveArray(bytes); + } else if (type instanceof RowType) { + RowType rowType = (RowType) type; + if (rowType.getTypeAt(0).equals(InternalTypes.INT)) { + return GenericRow.of(rnd.nextInt()); + } else { + return GenericRow.of(GenericRow.of(rnd.nextInt())); + } + } else if (type instanceof GenericType) { + return new BinaryGeneric<>(rnd.nextInt(), IntSerializer.INSTANCE); + } else { + throw new RuntimeException("Not support!"); + } + } + + private InternalType[] getFieldTypes() { + InternalType[] result = new InternalType[fields.length]; + for (int i = 0; i < fields.length; i++) { + result[i] = types[fields[i]]; + } + return result; + } + + private InternalType[] getKeyTypes() { + InternalType[] result = new InternalType[keys.length]; + for (int i = 0; i < keys.length; i++) { + result[i] = types[fields[keys[i]]]; + } + return result; + } + + private void testInner() throws Exception { + List<MemorySegment> segments = new ArrayList<>(); + for (int i = 0; i < 100; i++) { + segments.add(MemorySegmentFactory.wrap(new byte[32768])); + } + + InternalType[] fieldTypes = getFieldTypes(); + InternalType[] keyTypes = getKeyTypes(); + + Tuple2<NormalizedKeyComputer, RecordComparator> tuple2 = getSortBaseWithNulls( + this.getClass().getSimpleName(), keyTypes, keys, orders, nullsIsLast); + + BinaryRowSerializer serializer = new BinaryRowSerializer(fieldTypes.length); + + BinaryInMemorySortBuffer sortBuffer = BinaryInMemorySortBuffer.createBuffer( + tuple2.f0, (AbstractRowSerializer) serializer, serializer, + tuple2.f1, segments); + + BinaryRow[] dataArray = getTestData(); + + List<BinaryRow> data = Arrays.asList(dataArray.clone()); + List<BinaryRow> binaryRows = Arrays.asList(dataArray.clone()); + Collections.shuffle(binaryRows); + + for (BinaryRow row : binaryRows) { + if (!sortBuffer.write(row)) { + throw new RuntimeException(); + } + } + + new QuickSort().sort(sortBuffer); + + MutableObjectIterator<BinaryRow> iter = sortBuffer.getIterator(); + List<BinaryRow> result = new ArrayList<>(); + BinaryRow row = serializer.createInstance(); + while ((row = iter.next(row)) != null) { + result.add(row.copy()); + } + + data.sort((o1, o2) -> { + for (int i = 0; i < keys.length; i++) { + InternalType t = types[fields[keys[i]]]; + boolean order = orders[i]; + Object first = null; + Object second = null; + if (!o1.isNullAt(keys[i])) { + first = TypeGetterSetters.get(o1, keys[i], keyTypes[i]); + } + if (!o2.isNullAt(keys[i])) { + second = TypeGetterSetters.get(o2, keys[i], keyTypes[i]); + } + + if (first == null && second == null) { + } else if (first == null) { + return order ? -1 : 1; + } else if (second == null) { + return order ? 1 : -1; + } else if (first instanceof Comparable) { + int ret = ((Comparable) first).compareTo(second); + if (ret != 0) { + return order ? ret : -ret; + } + } else if (t instanceof ArrayType) { + BinaryArray leftArray = (BinaryArray) first; + BinaryArray rightArray = (BinaryArray) second; + int minLength = Math.min(leftArray.numElements(), rightArray.numElements()); + for (int j = 0; j < minLength; j++) { + boolean isNullLeft = leftArray.isNullAt(j); + boolean isNullRight = rightArray.isNullAt(j); + if (isNullLeft && isNullRight) { + // Do nothing. + } else if (isNullLeft) { + return order ? -1 : 1; + } else if (isNullRight) { + return order ? 1 : -1; + } else { + int comp = Byte.compare(leftArray.getByte(j), rightArray.getByte(j)); + if (comp != 0) { + return order ? comp : -comp; + } + } + } + if (leftArray.numElements() < rightArray.numElements()) { + return order ? -1 : 1; + } else if (leftArray.numElements() > rightArray.numElements()) { + return order ? 1 : -1; + } + } else if (t.equals(InternalTypes.BINARY)) { + int comp = org.apache.flink.table.runtime.sort.SortUtil.compareBinary( + (byte[]) first, (byte[]) second); + if (comp != 0) { + return order ? comp : -comp; + } + } else if (t instanceof RowType) { + RowType rowType = (RowType) t; + int comp; + if (rowType.getTypeAt(0).equals(InternalTypes.INT)) { + comp = INT_ROW_COMP.compare(INT_ROW_CONV.toExternal(first), + INT_ROW_CONV.toExternal(second)); + } else { + comp = NEST_ROW_COMP.compare(NEST_ROW_CONV.toExternal(first), + NEST_ROW_CONV.toExternal(second)); + } + if (comp != 0) { + return order ? comp : -comp; + } + } else if (t instanceof GenericType) { + Integer i1 = BinaryGeneric.getJavaObjectFromBinaryGeneric((BinaryGeneric) first, IntSerializer.INSTANCE); + Integer i2 = BinaryGeneric.getJavaObjectFromBinaryGeneric((BinaryGeneric) second, IntSerializer.INSTANCE); + int comp = Integer.compare(i1, i2); + if (comp != 0) { + return order ? comp : -comp; + } + } else { + throw new RuntimeException(); + } + } + return 0; + }); + + StringBuilder builder = new StringBuilder(); + for (int i = 0; i < data.size(); i++) { + builder.append("\n") + .append("expect: ") + .append(data.get(i).toOriginString(fieldTypes)) + .append("; actual: ") + .append(result.get(i).toOriginString(fieldTypes)); + } + builder.append("\n").append("types: ").append(Arrays.asList(fieldTypes)); + builder.append("\n").append("keys: ").append(Arrays.toString(keys)); + String msg = builder.toString(); + for (int i = 0; i < data.size(); i++) { + for (int j = 0; j < keys.length; j++) { + boolean isNull1 = data.get(i).isNullAt(keys[j]); + boolean isNull2 = result.get(i).isNullAt(keys[j]); + Assert.assertEquals(msg, isNull1, isNull2); + if (!isNull1 || !isNull2) { + Object o1 = TypeGetterSetters.get(data.get(i), keys[j], keyTypes[j]); + Object o2 = TypeGetterSetters.get(result.get(i), keys[j], keyTypes[j]); + if (keyTypes[j].equals(InternalTypes.BINARY)) { + Assert.assertArrayEquals(msg, (byte[]) o1, (byte[]) o2); + } else { + Assert.assertEquals(msg, o1, o2); + } + } + } + } + } + + public static Tuple2<NormalizedKeyComputer, RecordComparator> getSortBaseWithNulls( + String namePrefix, InternalType[] keyTypes, int[] keys, boolean[] orders, boolean[] nullsIsLast) + throws IllegalAccessException, InstantiationException { + SortCodeGenerator generator = new SortCodeGenerator(new TableConfig(), keys, keyTypes, orders, nullsIsLast); + GeneratedNormalizedKeyComputer computer = generator.generateNormalizedKeyComputer(namePrefix + "Computer"); + GeneratedRecordComparator comparator = generator.generateRecordComparator(namePrefix + "Comparator"); + ClassLoader cl = Thread.currentThread().getContextClassLoader(); + return new Tuple2<>(computer.newInstance(cl), comparator.newInstance(cl)); + } +} diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/AbstractBinaryWriter.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/AbstractBinaryWriter.java index 663bb26..613d294 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/AbstractBinaryWriter.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/AbstractBinaryWriter.java @@ -27,6 +27,8 @@ import java.io.IOException; import java.io.OutputStream; import java.util.Arrays; +import static org.apache.flink.table.dataformat.BinaryFormat.MAX_FIX_PART_DATA_SIZE; + /** * Use the special format to write data to a {@link MemorySegment} (its capacity grows * automatically). @@ -86,7 +88,7 @@ public abstract class AbstractBinaryWriter implements BinaryWriter { private void writeBytes(int pos, byte[] bytes) { int len = bytes.length; - if (len <= 7) { + if (len <= MAX_FIX_PART_DATA_SIZE) { writeBytesToFixLenPart(segment, getFieldOffset(pos), bytes, len); } else { writeBytesToVarLenPart(pos, bytes, len); @@ -142,6 +144,16 @@ public abstract class AbstractBinaryWriter implements BinaryWriter { } @Override + public void writeBinary(int pos, byte[] bytes) { + int len = bytes.length; + if (len <= MAX_FIX_PART_DATA_SIZE) { + writeBytesToFixLenPart(segment, getFieldOffset(pos), bytes, len); + } else { + writeBytesToVarLenPart(pos, bytes, len); + } + } + + @Override public void writeDecimal(int pos, Decimal value, int precision) { assert value == null || (value.getPrecision() == precision); diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryArray.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryArray.java index 5055bdc..a02c68b 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryArray.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryArray.java @@ -201,6 +201,15 @@ public class BinaryArray extends BinaryFormat implements TypeGetterSetters { } @Override + public byte[] getBinary(int pos) { + assertIndexIsValid(pos); + int fieldOffset = getElementOffset(pos, 8); + final long offsetAndSize = SegmentsUtil.getLong(segments, fieldOffset); + return readBinaryFieldFromSegments( + segments, offset, fieldOffset, offsetAndSize); + } + + @Override public BinaryArray getArray(int pos) { assertIndexIsValid(pos); return BinaryArray.readBinaryArrayFieldFromSegments(segments, offset, getLong(pos)); diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryFormat.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryFormat.java index 7d07c09..fbb0fc4 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryFormat.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryFormat.java @@ -25,6 +25,31 @@ import org.apache.flink.table.util.SegmentsUtil; */ public abstract class BinaryFormat { + /** + * It decides whether to put data in FixLenPart or VarLenPart. See more in {@link BinaryRow}. + * + * <p>If len is less than 8, its binary format is: + * 1bit mark(1) = 1, 7bits len, and 7bytes data. + * Data is stored in fix-length part. + * + * <p>If len is greater or equal to 8, its binary format is: + * 1bit mark(1) = 0, 31bits offset, and 4bytes len. + * Data is stored in variable-length part. + */ + static final int MAX_FIX_PART_DATA_SIZE = 7; + + /** + * To get the mark in highest first bit. + * Form: 1000 0000 0000 0000 ... + */ + private static final long HIGHEST_FIRST_BIT = Long.MIN_VALUE; + + /** + * To get the 7 bits length in second bit to eighth bit. + * Form: 0111 1111 0000 0000 ... + */ + private static final long HIGHEST_SECOND_TO_EIGHTH_BIT = 0x7FL << 56; + protected MemorySegment[] segments; protected int offset; protected int sizeInBytes; @@ -75,4 +100,60 @@ public abstract class BinaryFormat { public int hashCode() { return SegmentsUtil.hash(segments, offset, sizeInBytes); } + + /** + * Get binary, if len less than 8, will be include in variablePartOffsetAndLen. + * + * <p>Note: Need to consider the ByteOrder. + * + * @param baseOffset base offset of composite binary format. + * @param fieldOffset absolute start offset of 'variablePartOffsetAndLen'. + * @param variablePartOffsetAndLen a long value, real data or offset and len. + */ + static byte[] readBinaryFieldFromSegments( + MemorySegment[] segments, int baseOffset, int fieldOffset, + long variablePartOffsetAndLen) { + long mark = variablePartOffsetAndLen & HIGHEST_FIRST_BIT; + if (mark == 0) { + final int subOffset = (int) (variablePartOffsetAndLen >> 32); + final int len = (int) variablePartOffsetAndLen; + return SegmentsUtil.copyToBytes(segments, baseOffset + subOffset, len); + } else { + int len = (int) ((variablePartOffsetAndLen & HIGHEST_SECOND_TO_EIGHTH_BIT) >>> 56); + if (SegmentsUtil.LITTLE_ENDIAN) { + return SegmentsUtil.copyToBytes(segments, fieldOffset, len); + } else { + // fieldOffset + 1 to skip header. + return SegmentsUtil.copyToBytes(segments, fieldOffset + 1, len); + } + } + } + + /** + * Get binary string, if len less than 8, will be include in variablePartOffsetAndLen. + * + * <p>Note: Need to consider the ByteOrder. + * + * @param baseOffset base offset of composite binary format. + * @param fieldOffset absolute start offset of 'variablePartOffsetAndLen'. + * @param variablePartOffsetAndLen a long value, real data or offset and len. + */ + static BinaryString readBinaryStringFieldFromSegments( + MemorySegment[] segments, int baseOffset, int fieldOffset, + long variablePartOffsetAndLen) { + long mark = variablePartOffsetAndLen & HIGHEST_FIRST_BIT; + if (mark == 0) { + final int subOffset = (int) (variablePartOffsetAndLen >> 32); + final int len = (int) variablePartOffsetAndLen; + return new BinaryString(segments, baseOffset + subOffset, len); + } else { + int len = (int) ((variablePartOffsetAndLen & HIGHEST_SECOND_TO_EIGHTH_BIT) >>> 56); + if (SegmentsUtil.LITTLE_ENDIAN) { + return new BinaryString(segments, fieldOffset, len); + } else { + // fieldOffset + 1 to skip header. + return new BinaryString(segments, fieldOffset + 1, len); + } + } + } } diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryGeneric.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryGeneric.java index 8519ba9..9dc1313 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryGeneric.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryGeneric.java @@ -22,6 +22,7 @@ import org.apache.flink.core.memory.MemorySegment; import org.apache.flink.core.memory.MemorySegmentFactory; import org.apache.flink.table.type.GenericType; import org.apache.flink.table.util.SegmentsUtil; +import org.apache.flink.util.FlinkRuntimeException; import org.apache.flink.util.InstantiationUtil; import java.io.IOException; @@ -79,4 +80,16 @@ public class BinaryGeneric<T> extends LazyBinaryFormat<T> { int offset = (int) (offsetAndSize >> 32); return new BinaryGeneric<>(segments, offset + baseOffset, size, null); } + + public static <T> T getJavaObjectFromBinaryGeneric(BinaryGeneric<T> value, TypeSerializer<T> ser) { + if (value.getJavaObject() == null) { + try { + value.setJavaObject(InstantiationUtil.deserializeFromByteArray(ser, + SegmentsUtil.copyToBytes(value.getSegments(), value.getOffset(), value.getSizeInBytes()))); + } catch (IOException e) { + throw new FlinkRuntimeException(e); + } + } + return value.getJavaObject(); + } } diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryRow.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryRow.java index bc4b536..7108548 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryRow.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryRow.java @@ -334,6 +334,14 @@ public final class BinaryRow extends BinaryFormat implements BaseRow { } @Override + public byte[] getBinary(int pos) { + assertIndexIsValid(pos); + int fieldOffset = getFieldOffset(pos); + final long offsetAndLen = segments[0].getLong(fieldOffset); + return readBinaryFieldFromSegments(segments, offset, fieldOffset, offsetAndLen); + } + + @Override public BinaryArray getArray(int pos) { assertIndexIsValid(pos); return BinaryArray.readBinaryArrayFieldFromSegments(segments, offset, getLong(pos)); @@ -394,4 +402,24 @@ public final class BinaryRow extends BinaryFormat implements BaseRow { public int hashCode() { return SegmentsUtil.hashByWords(segments, offset, sizeInBytes); } + + public String toOriginString(InternalType... types) { + return toOriginString(this, types); + } + + public static String toOriginString(BaseRow row, InternalType[] types) { + checkArgument(types.length == row.getArity()); + StringBuilder build = new StringBuilder("["); + build.append(row.getHeader()); + for (int i = 0; i < row.getArity(); i++) { + build.append(','); + if (row.isNullAt(i)) { + build.append("null"); + } else { + build.append(TypeGetterSetters.get(row, i, types[i])); + } + } + build.append(']'); + return build.toString(); + } } diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryString.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryString.java index 0db18b4..64edc6d 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryString.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryString.java @@ -36,9 +36,6 @@ import static org.apache.flink.util.Preconditions.checkArgument; */ public class BinaryString extends LazyBinaryFormat<String> implements Comparable<BinaryString> { - private static final long HIGHEST_FIRST_BIT = Long.MIN_VALUE; - private static final long HIGHEST_SECOND_TO_EIGHTH_BIT = 0x7FL << 56; - public static final BinaryString EMPTY_UTF8 = BinaryString.fromBytes("".getBytes()); public BinaryString(MemorySegment[] segments, int offset, int sizeInBytes) { @@ -245,39 +242,4 @@ public class BinaryString extends LazyBinaryFormat<String> implements Comparable return sizeInBytes - other.sizeInBytes; } - - /** - * Get binary string, if len less than 8, will be include in variablePartOffsetAndLen. - * - * <p>If len is less than 8, its binary format is: - * 1bit mark(1) = 1, 7bits len, and 7bytes data. - * - * <p>If len is greater or equal to 8, its binary format is: - * 1bit mark(1) = 0, 31bits offset, and 4bytes len. - * Data is stored in variable-length part. - * - * <p>Note: Need to consider the ByteOrder. - * - * @param baseOffset base offset of composite binary format. - * @param fieldOffset absolute start offset of 'variablePartOffsetAndLen'. - * @param variablePartOffsetAndLen a long value, real data or offset and len. - */ - static BinaryString readBinaryStringFieldFromSegments( - MemorySegment[] segments, int baseOffset, int fieldOffset, - long variablePartOffsetAndLen) { - long mark = variablePartOffsetAndLen & HIGHEST_FIRST_BIT; - if (mark == 0) { - final int subOffset = (int) (variablePartOffsetAndLen >> 32); - final int len = (int) variablePartOffsetAndLen; - return new BinaryString(segments, baseOffset + subOffset, len); - } else { - int len = (int) ((variablePartOffsetAndLen & HIGHEST_SECOND_TO_EIGHTH_BIT) >>> 56); - if (SegmentsUtil.LITTLE_ENDIAN) { - return new BinaryString(segments, fieldOffset, len); - } else { - // fieldOffset + 1 to skip header. - return new BinaryString(segments, fieldOffset + 1, len); - } - } - } } diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryWriter.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryWriter.java index deb5e3c..b1730ec 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryWriter.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryWriter.java @@ -62,6 +62,8 @@ public interface BinaryWriter { void writeString(int pos, BinaryString value); + void writeBinary(int pos, byte[] bytes); + void writeDecimal(int pos, Decimal value, int precision); void writeArray(int pos, BinaryArray value); @@ -112,8 +114,10 @@ public interface BinaryWriter { } else if (type instanceof RowType) { RowType rowType = (RowType) type; writer.writeRow(pos, (BaseRow) o, rowType.getBaseRowSerializer()); - } else if (type instanceof GenericType) { + } else if (type instanceof GenericType) { writer.writeGeneric(pos, (BinaryGeneric) o); + } else if (type.equals(InternalTypes.BINARY)) { + writer.writeBinary(pos, (byte[]) o); } else { throw new RuntimeException("Not support type: " + type); } diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/DataFormatConverters.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/DataFormatConverters.java index d568cfe..e3f371b 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/DataFormatConverters.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/DataFormatConverters.java @@ -44,11 +44,8 @@ import org.apache.flink.table.typeutils.BinaryGenericTypeInfo; import org.apache.flink.table.typeutils.BinaryMapTypeInfo; import org.apache.flink.table.typeutils.BinaryStringTypeInfo; import org.apache.flink.table.typeutils.DecimalTypeInfo; -import org.apache.flink.table.util.SegmentsUtil; import org.apache.flink.types.Row; -import org.apache.flink.util.InstantiationUtil; -import java.io.IOException; import java.lang.reflect.Array; import java.math.BigDecimal; import java.sql.Date; @@ -488,15 +485,7 @@ public class DataFormatConverters { @Override T toExternalImpl(BinaryGeneric<T> value) { - if (value.getJavaObject() == null) { - try { - value.setJavaObject(InstantiationUtil.deserializeFromByteArray(serializer, - SegmentsUtil.copyToBytes(value.getSegments(), value.getOffset(), value.getSizeInBytes()))); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - return value.getJavaObject(); + return BinaryGeneric.getJavaObjectFromBinaryGeneric(value, serializer); } @Override @@ -633,25 +622,15 @@ public class DataFormatConverters { /** * Converter for primitive byte array. */ - public static class PrimitiveByteArrayConverter extends DataFormatConverter<BinaryArray, byte[]> { + public static class PrimitiveByteArrayConverter extends IdentityConverter<byte[]> { public static final PrimitiveByteArrayConverter INSTANCE = new PrimitiveByteArrayConverter(); private PrimitiveByteArrayConverter() {} @Override - BinaryArray toInternalImpl(byte[] value) { - return BinaryArray.fromPrimitiveArray(value); - } - - @Override - byte[] toExternalImpl(BinaryArray value) { - return value.toByteArray(); - } - - @Override byte[] toExternalImpl(BaseRow row, int column) { - return toExternalImpl(row.getArray(column)); + return row.getBinary(column); } } @@ -917,7 +896,7 @@ public class DataFormatConverters { /** * Abstract converter for internal base row. */ - public abstract static class AbstractBaseRowConverter<I extends BaseRow, E> extends DataFormatConverter<I, E> { + public abstract static class AbstractBaseRowConverter<E> extends DataFormatConverter<BaseRow, E> { protected final DataFormatConverter[] converters; @@ -930,7 +909,7 @@ public class DataFormatConverters { @Override E toExternalImpl(BaseRow row, int column) { - throw new RuntimeException("Not support yet!"); + return toExternalImpl(row.getRow(column, converters.length)); } } @@ -952,7 +931,7 @@ public class DataFormatConverters { /** * Converter for pojo. */ - public static class PojoConverter<T> extends AbstractBaseRowConverter<GenericRow, T> { + public static class PojoConverter<T> extends AbstractBaseRowConverter<T> { private final PojoTypeInfo<T> t; private final PojoField[] fields; @@ -968,7 +947,7 @@ public class DataFormatConverters { } @Override - GenericRow toInternalImpl(T value) { + BaseRow toInternalImpl(T value) { GenericRow genericRow = new GenericRow(t.getArity()); for (int i = 0; i < t.getArity(); i++) { try { @@ -982,7 +961,7 @@ public class DataFormatConverters { } @Override - T toExternalImpl(GenericRow value) { + T toExternalImpl(BaseRow value) { try { T pojo = t.getTypeClass().newInstance(); for (int i = 0; i < t.getArity(); i++) { @@ -998,7 +977,7 @@ public class DataFormatConverters { /** * Converter for row. */ - public static class RowConverter extends AbstractBaseRowConverter<GenericRow, Row> { + public static class RowConverter extends AbstractBaseRowConverter<Row> { private final RowTypeInfo t; @@ -1008,7 +987,7 @@ public class DataFormatConverters { } @Override - GenericRow toInternalImpl(Row value) { + BaseRow toInternalImpl(Row value) { GenericRow genericRow = new GenericRow(t.getArity()); for (int i = 0; i < t.getArity(); i++) { genericRow.setField(i, converters[i].toInternal(value.getField(i))); @@ -1017,7 +996,7 @@ public class DataFormatConverters { } @Override - Row toExternalImpl(GenericRow value) { + Row toExternalImpl(BaseRow value) { Row row = new Row(t.getArity()); for (int i = 0; i < t.getArity(); i++) { row.setField(i, converters[i].toExternal(value, i)); @@ -1029,7 +1008,7 @@ public class DataFormatConverters { /** * Converter for flink tuple. */ - public static class TupleConverter extends AbstractBaseRowConverter<GenericRow, Tuple> { + public static class TupleConverter extends AbstractBaseRowConverter<Tuple> { private final TupleTypeInfo t; @@ -1039,7 +1018,7 @@ public class DataFormatConverters { } @Override - GenericRow toInternalImpl(Tuple value) { + BaseRow toInternalImpl(Tuple value) { GenericRow genericRow = new GenericRow(t.getArity()); for (int i = 0; i < t.getArity(); i++) { genericRow.setField(i, converters[i].toInternal(value.getField(i))); @@ -1048,7 +1027,7 @@ public class DataFormatConverters { } @Override - Tuple toExternalImpl(GenericRow value) { + Tuple toExternalImpl(BaseRow value) { try { Tuple tuple = (Tuple) t.getTypeClass().newInstance(); for (int i = 0; i < t.getArity(); i++) { @@ -1065,7 +1044,7 @@ public class DataFormatConverters { /** * Converter for case class. */ - public static class CaseClassConverter extends AbstractBaseRowConverter<GenericRow, Product> { + public static class CaseClassConverter extends AbstractBaseRowConverter<Product> { private final TupleTypeInfoBase t; private final TupleSerializerBase serializer; @@ -1077,7 +1056,7 @@ public class DataFormatConverters { } @Override - GenericRow toInternalImpl(Product value) { + BaseRow toInternalImpl(Product value) { GenericRow genericRow = new GenericRow(t.getArity()); for (int i = 0; i < t.getArity(); i++) { genericRow.setField(i, converters[i].toInternal(value.productElement(i))); @@ -1086,7 +1065,7 @@ public class DataFormatConverters { } @Override - Product toExternalImpl(GenericRow value) { + Product toExternalImpl(BaseRow value) { Object[] fields = new Object[t.getArity()]; for (int i = 0; i < t.getArity(); i++) { fields[i] = converters[i].toExternal(value, i); diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/JoinedRow.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/JoinedRow.java index 85dd2c9..0747c7b 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/JoinedRow.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/JoinedRow.java @@ -154,6 +154,15 @@ public final class JoinedRow implements BaseRow { } @Override + public byte[] getBinary(int i) { + if (i < row1.getArity()) { + return row1.getBinary(i); + } else { + return row2.getBinary(i - row1.getArity()); + } + } + + @Override public BinaryString getString(int i) { if (i < row1.getArity()) { return row1.getString(i); diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/NestedRow.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/NestedRow.java index 163216b..1c8b43c 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/NestedRow.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/NestedRow.java @@ -257,6 +257,14 @@ public final class NestedRow extends BinaryFormat implements BaseRow { } @Override + public byte[] getBinary(int pos) { + assertIndexIsValid(pos); + int fieldOffset = getFieldOffset(pos); + final long offsetAndLen = segments[0].getLong(fieldOffset); + return readBinaryFieldFromSegments(segments, offset, fieldOffset, offsetAndLen); + } + + @Override public BaseRow getRow(int pos, int numFields) { assertIndexIsValid(pos); return NestedRow.readNestedRowFieldFromSegments(segments, numFields, offset, getLong(pos)); diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/ObjectArrayRow.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/ObjectArrayRow.java index 28edebe..42fe8b6 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/ObjectArrayRow.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/ObjectArrayRow.java @@ -70,6 +70,11 @@ public abstract class ObjectArrayRow implements BaseRow { } @Override + public byte[] getBinary(int ordinal) { + return (byte[]) this.fields[ordinal]; + } + + @Override public BinaryArray getArray(int ordinal) { return (BinaryArray) this.fields[ordinal]; } diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/TypeGetterSetters.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/TypeGetterSetters.java index be572b4..f1e080c 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/TypeGetterSetters.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/TypeGetterSetters.java @@ -105,6 +105,11 @@ public interface TypeGetterSetters { <T> BinaryGeneric<T> getGeneric(int ordinal); /** + * Get binary value, internal format is byte[]. + */ + byte[] getBinary(int ordinal); + + /** * Get array value, internal format is BinaryArray. */ BinaryArray getArray(int ordinal); @@ -205,6 +210,8 @@ public interface TypeGetterSetters { return row.getRow(ordinal, ((RowType) type).getArity()); } else if (type instanceof GenericType) { return row.getGeneric(ordinal); + } else if (type.equals(InternalTypes.BINARY)) { + return row.getBinary(ordinal); } else { throw new RuntimeException("Not support type: " + type); } diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/generated/CompileUtils.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/generated/CompileUtils.java index c81d2cc..99f5dba 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/generated/CompileUtils.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/generated/CompileUtils.java @@ -77,6 +77,7 @@ public final class CompileUtils { try { compiler.cook(code); } catch (Throwable t) { + System.out.println(addLineNumber(code)); throw new InvalidProgramException( "Table program cannot be compiled. This is a bug. Please file an issue.", t); } @@ -87,4 +88,17 @@ public final class CompileUtils { throw new RuntimeException("Can not load class " + name, e); } } + + /** + * To output more information when an error occurs. + * Generally, when cook fails, it shows which line is wrong. This line number starts at 1. + */ + private static String addLineNumber(String code) { + String[] lines = code.split("\n"); + StringBuilder builder = new StringBuilder(); + for (int i = 0; i < lines.length; i++) { + builder.append(i + 1).append(" ").append(lines[i]).append("\n"); + } + return builder.toString(); + } } diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/generated/GeneratedClass.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/generated/GeneratedClass.java index 8f26e14..043340b 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/generated/GeneratedClass.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/generated/GeneratedClass.java @@ -49,7 +49,10 @@ public abstract class GeneratedClass<T> implements Serializable { @SuppressWarnings("unchecked") public T newInstance(ClassLoader classLoader) { try { - return (T) compile(classLoader).getConstructor(Object[].class).newInstance(references); + return (T) compile(classLoader).getConstructor(Object[].class) + // Because Constructor.newInstance(Object... initargs), we need to load + // references into a new Object[], otherwise it cannot be compiled. + .newInstance(new Object[] {references}); } catch (Exception e) { throw new RuntimeException( "Could not instantiate generated class '" + className + "'", e); diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/PrimitiveType.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/generated/GeneratedNormalizedKeyComputer.java similarity index 61% copy from flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/PrimitiveType.java copy to flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/generated/GeneratedNormalizedKeyComputer.java index 002f4e0..e524d40 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/PrimitiveType.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/generated/GeneratedNormalizedKeyComputer.java @@ -16,26 +16,22 @@ * limitations under the License. */ -package org.apache.flink.table.type; +package org.apache.flink.table.generated; /** - * Primitive type. + * Describes a generated {@link NormalizedKeyComputer}. */ -public abstract class PrimitiveType implements AtomicType { +public class GeneratedNormalizedKeyComputer extends GeneratedClass<NormalizedKeyComputer> { - @Override - public boolean equals(Object o) { - return this == o || o != null && getClass() == o.getClass(); - } - - @Override - public int hashCode() { - return getClass().hashCode(); - } + private static final long serialVersionUID = 1L; - @Override - public String toString() { - return getClass().getSimpleName(); + /** + * Creates a GeneratedNormalizedKeyComputer. + * + * @param className class name of the generated class. + * @param code code of the generated class. + */ + public GeneratedNormalizedKeyComputer(String className, String code) { + super(className, code, new Object[0]); } - } diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/PrimitiveType.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/generated/GeneratedRecordComparator.java similarity index 59% copy from flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/PrimitiveType.java copy to flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/generated/GeneratedRecordComparator.java index 002f4e0..2db63cc 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/PrimitiveType.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/generated/GeneratedRecordComparator.java @@ -16,26 +16,24 @@ * limitations under the License. */ -package org.apache.flink.table.type; +package org.apache.flink.table.generated; /** - * Primitive type. + * Describes a generated {@link RecordComparator}. */ -public abstract class PrimitiveType implements AtomicType { +public class GeneratedRecordComparator extends GeneratedClass<RecordComparator> { - @Override - public boolean equals(Object o) { - return this == o || o != null && getClass() == o.getClass(); - } - - @Override - public int hashCode() { - return getClass().hashCode(); - } + private static final long serialVersionUID = 1L; - @Override - public String toString() { - return getClass().getSimpleName(); + /** + * Creates a GeneratedRecordComparator. + * + * @param className class name of the generated class. + * @param code code of the generated class. + * @param references referenced objects of the generated class. + */ + public GeneratedRecordComparator(String className, String code, Object[] references) { + super(className, code, references); } } diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/sort/NormalizedKeyComputer.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/generated/NormalizedKeyComputer.java similarity index 64% rename from flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/sort/NormalizedKeyComputer.java rename to flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/generated/NormalizedKeyComputer.java index 121fb50..d960a74 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/sort/NormalizedKeyComputer.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/generated/NormalizedKeyComputer.java @@ -16,54 +16,45 @@ * limitations under the License. */ -package org.apache.flink.table.runtime.sort; +package org.apache.flink.table.generated; -import org.apache.flink.api.common.typeutils.TypeComparator; -import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.core.memory.MemorySegment; import org.apache.flink.table.dataformat.BaseRow; +import org.apache.flink.table.runtime.sort.BinaryInMemorySortBuffer; /** * Normalized key computer for {@link BinaryInMemorySortBuffer}. * For performance, subclasses are usually implemented through CodeGenerator. */ -public abstract class NormalizedKeyComputer { - - protected TypeSerializer[] serializers; - protected TypeComparator[] comparators; - - public void init(TypeSerializer[] serializers, TypeComparator[] comparators) { - this.serializers = serializers; - this.comparators = comparators; - } +public interface NormalizedKeyComputer { /** * Writes a normalized key for the given record into the target {@link MemorySegment}. */ - public abstract void putKey(BaseRow record, MemorySegment target, int offset); + void putKey(BaseRow record, MemorySegment target, int offset); /** * Compares two normalized keys in respective {@link MemorySegment}. */ - public abstract int compareKey(MemorySegment segI, int offsetI, MemorySegment segJ, int offsetJ); + int compareKey(MemorySegment segI, int offsetI, MemorySegment segJ, int offsetJ); /** * Swaps two normalized keys in respective {@link MemorySegment}. */ - public abstract void swapKey(MemorySegment segI, int offsetI, MemorySegment segJ, int offsetJ); + void swapKey(MemorySegment segI, int offsetI, MemorySegment segJ, int offsetJ); /** * Get normalized keys bytes length. */ - public abstract int getNumKeyBytes(); + int getNumKeyBytes(); /** * whether the normalized key can fully determines the comparison. */ - public abstract boolean isKeyFullyDetermines(); + boolean isKeyFullyDetermines(); /** * Flag whether normalized key comparisons should be inverted key. */ - public abstract boolean invertKey(); + boolean invertKey(); } diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/sort/RecordComparator.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/generated/RecordComparator.java similarity index 64% rename from flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/sort/RecordComparator.java rename to flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/generated/RecordComparator.java index c5a274e..5331bcf 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/sort/RecordComparator.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/generated/RecordComparator.java @@ -16,11 +16,10 @@ * limitations under the License. */ -package org.apache.flink.table.runtime.sort; +package org.apache.flink.table.generated; -import org.apache.flink.api.common.typeutils.TypeComparator; -import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.table.dataformat.BaseRow; +import org.apache.flink.table.runtime.sort.BinaryInMemorySortBuffer; import java.io.Serializable; import java.util.Comparator; @@ -28,19 +27,10 @@ import java.util.Comparator; /** * Record comparator for {@link BinaryInMemorySortBuffer}. * For performance, subclasses are usually implemented through CodeGenerator. + * A new interface for helping JVM inline. */ -public abstract class RecordComparator implements Comparator<BaseRow>, Serializable { - - private static final long serialVersionUID = 1L; - - protected TypeSerializer[] serializers; - protected TypeComparator[] comparators; - - public void init(TypeSerializer[] serializers, TypeComparator[] comparators) { - this.serializers = serializers; - this.comparators = comparators; - } +public interface RecordComparator extends Comparator<BaseRow>, Serializable { @Override - public abstract int compare(BaseRow o1, BaseRow o2); + int compare(BaseRow o1, BaseRow o2); } diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/sort/BinaryExternalMerger.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/sort/BinaryExternalMerger.java index 366d822..97a9b75 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/sort/BinaryExternalMerger.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/sort/BinaryExternalMerger.java @@ -23,6 +23,7 @@ import org.apache.flink.runtime.io.disk.iomanager.AbstractChannelReaderInputView import org.apache.flink.runtime.io.disk.iomanager.IOManager; import org.apache.flink.runtime.memory.AbstractPagedOutputView; import org.apache.flink.table.dataformat.BinaryRow; +import org.apache.flink.table.generated.RecordComparator; import org.apache.flink.table.runtime.compression.BlockCompressionFactory; import org.apache.flink.table.typeutils.BinaryRowSerializer; import org.apache.flink.util.MutableObjectIterator; diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/sort/BinaryExternalSorter.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/sort/BinaryExternalSorter.java index d04f0ae..cf32bb0 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/sort/BinaryExternalSorter.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/sort/BinaryExternalSorter.java @@ -35,6 +35,8 @@ import org.apache.flink.runtime.util.EmptyMutableObjectIterator; import org.apache.flink.table.api.TableConfigOptions; import org.apache.flink.table.dataformat.BaseRow; import org.apache.flink.table.dataformat.BinaryRow; +import org.apache.flink.table.generated.NormalizedKeyComputer; +import org.apache.flink.table.generated.RecordComparator; import org.apache.flink.table.runtime.compression.BlockCompressionFactory; import org.apache.flink.table.runtime.io.ChannelWithMeta; import org.apache.flink.table.runtime.util.FileChannelUtil; @@ -970,7 +972,6 @@ public class BinaryExternalSorter implements Sorter<BinaryRow> { // loop as long as the thread is marked alive and we do not see the final currWriteBuffer while (isRunning()) { try { - // TODO let cache in memory instead of disk. element = cache.isEmpty() ? queues.spill.take() : cache.poll(); } catch (InterruptedException iex) { if (isRunning()) { diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/sort/BinaryInMemorySortBuffer.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/sort/BinaryInMemorySortBuffer.java index 166afca..f51cc1a 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/sort/BinaryInMemorySortBuffer.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/sort/BinaryInMemorySortBuffer.java @@ -22,6 +22,8 @@ import org.apache.flink.core.memory.MemorySegment; import org.apache.flink.runtime.io.disk.SimpleCollectingOutputView; import org.apache.flink.table.dataformat.BaseRow; import org.apache.flink.table.dataformat.BinaryRow; +import org.apache.flink.table.generated.NormalizedKeyComputer; +import org.apache.flink.table.generated.RecordComparator; import org.apache.flink.table.runtime.util.MemorySegmentPool; import org.apache.flink.table.typeutils.AbstractRowSerializer; import org.apache.flink.table.typeutils.BinaryRowSerializer; diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/sort/BinaryIndexedSortable.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/sort/BinaryIndexedSortable.java index a6e13f0..76cf216 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/sort/BinaryIndexedSortable.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/sort/BinaryIndexedSortable.java @@ -24,6 +24,8 @@ import org.apache.flink.runtime.memory.AbstractPagedOutputView; import org.apache.flink.runtime.operators.sort.IndexedSortable; import org.apache.flink.table.dataformat.BaseRow; import org.apache.flink.table.dataformat.BinaryRow; +import org.apache.flink.table.generated.NormalizedKeyComputer; +import org.apache.flink.table.generated.RecordComparator; import org.apache.flink.table.runtime.util.MemorySegmentPool; import org.apache.flink.table.typeutils.BinaryRowSerializer; diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/sort/BinaryKVExternalMerger.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/sort/BinaryKVExternalMerger.java index 487651c..137dcdd 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/sort/BinaryKVExternalMerger.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/sort/BinaryKVExternalMerger.java @@ -22,6 +22,7 @@ import org.apache.flink.runtime.io.disk.iomanager.AbstractChannelReaderInputView import org.apache.flink.runtime.io.disk.iomanager.IOManager; import org.apache.flink.runtime.memory.AbstractPagedOutputView; import org.apache.flink.table.dataformat.BinaryRow; +import org.apache.flink.table.generated.RecordComparator; import org.apache.flink.table.runtime.compression.BlockCompressionFactory; import org.apache.flink.table.typeutils.BinaryRowSerializer; import org.apache.flink.util.MutableObjectIterator; diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/sort/BinaryKVInMemorySortBuffer.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/sort/BinaryKVInMemorySortBuffer.java index d4ca5b2..ce1dd90 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/sort/BinaryKVInMemorySortBuffer.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/sort/BinaryKVInMemorySortBuffer.java @@ -21,6 +21,8 @@ import org.apache.flink.core.memory.MemorySegment; import org.apache.flink.runtime.io.disk.RandomAccessInputView; import org.apache.flink.runtime.memory.AbstractPagedOutputView; import org.apache.flink.table.dataformat.BinaryRow; +import org.apache.flink.table.generated.NormalizedKeyComputer; +import org.apache.flink.table.generated.RecordComparator; import org.apache.flink.table.runtime.util.MemorySegmentPool; import org.apache.flink.table.typeutils.BinaryRowSerializer; diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/sort/BinaryMergeIterator.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/sort/BinaryMergeIterator.java index fe3e1c0..9e447aa 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/sort/BinaryMergeIterator.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/sort/BinaryMergeIterator.java @@ -20,6 +20,7 @@ package org.apache.flink.table.runtime.sort; import org.apache.flink.runtime.operators.sort.MergeIterator; import org.apache.flink.runtime.operators.sort.PartialOrderPriorityQueue; +import org.apache.flink.table.generated.RecordComparator; import org.apache.flink.util.MutableObjectIterator; import java.io.IOException; diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/sort/BufferedKVExternalSorter.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/sort/BufferedKVExternalSorter.java index 3c23738..c601a4b 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/sort/BufferedKVExternalSorter.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/sort/BufferedKVExternalSorter.java @@ -27,6 +27,8 @@ import org.apache.flink.runtime.operators.sort.IndexedSorter; import org.apache.flink.runtime.operators.sort.QuickSort; import org.apache.flink.table.api.TableConfigOptions; import org.apache.flink.table.dataformat.BinaryRow; +import org.apache.flink.table.generated.NormalizedKeyComputer; +import org.apache.flink.table.generated.RecordComparator; import org.apache.flink.table.runtime.compression.BlockCompressionFactory; import org.apache.flink.table.runtime.io.ChannelWithMeta; import org.apache.flink.table.runtime.util.FileChannelUtil; diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/sort/SortUtil.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/sort/SortUtil.java index a8e5607..e199efe 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/sort/SortUtil.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/sort/SortUtil.java @@ -23,11 +23,19 @@ import org.apache.flink.core.memory.MemorySegment; import org.apache.flink.table.dataformat.BinaryString; import org.apache.flink.table.dataformat.Decimal; +import java.nio.ByteOrder; + +import static org.apache.flink.core.memory.MemoryUtils.UNSAFE; + /** - * Util for data formats. + * Util for sort. */ public class SortUtil { + private static final int BYTE_ARRAY_BASE_OFFSET = UNSAFE.arrayBaseOffset(byte[].class); + private static final boolean LITTLE_ENDIAN = ByteOrder.nativeOrder() == ByteOrder.LITTLE_ENDIAN; + private static final int LONG_BYTES = 8; + public static void minNormalizedKey(MemorySegment target, int offset, int numBytes) { //write min value. for (int i = 0; i < numBytes; i++) { @@ -63,7 +71,7 @@ public class SortUtil { /** * UTF-8 supports bytes comparison. */ - public static void putBinaryStringNormalizedKey( + public static void putStringNormalizedKey( BinaryString value, MemorySegment target, int offset, int numBytes) { final int limit = offset + numBytes; final int end = value.getSizeInBytes(); @@ -117,4 +125,93 @@ public class SortUtil { public static void putCharNormalizedKey(char value, MemorySegment target, int offset, int numBytes) { NormalizedKeyUtil.putCharNormalizedKey(value, target, offset, numBytes); } + + public static void putBinaryNormalizedKey( + byte[] value, MemorySegment target, int offset, int numBytes) { + final int limit = offset + numBytes; + final int end = value.length; + for (int i = 0; i < end && offset < limit; i++) { + target.put(offset++, value[i]); + } + + for (int i = offset; i < limit; i++) { + target.put(i, (byte) 0); + } + } + + public static int compareBinary(byte[] a, byte[] b) { + return compareBinary(a, 0, a.length, b, 0, b.length); + } + + public static int compareBinary( + byte[] buffer1, int offset1, int length1, + byte[] buffer2, int offset2, int length2) { + // Short circuit equal case + if (buffer1 == buffer2 && + offset1 == offset2 && + length1 == length2) { + return 0; + } + int minLength = Math.min(length1, length2); + int minWords = minLength / LONG_BYTES; + int offset1Adj = offset1 + BYTE_ARRAY_BASE_OFFSET; + int offset2Adj = offset2 + BYTE_ARRAY_BASE_OFFSET; + + /* + * Compare 8 bytes at a time. Benchmarking shows comparing 8 bytes at a + * time is no slower than comparing 4 bytes at a time even on 32-bit. + * On the other hand, it is substantially faster on 64-bit. + */ + for (int i = 0; i < minWords * LONG_BYTES; i += LONG_BYTES) { + long lw = UNSAFE.getLong(buffer1, offset1Adj + (long) i); + long rw = UNSAFE.getLong(buffer2, offset2Adj + (long) i); + long diff = lw ^ rw; + + if (diff != 0) { + if (!LITTLE_ENDIAN) { + return lessThanUnsigned(lw, rw) ? -1 : 1; + } + + // Use binary search + int n = 0; + int y; + int x = (int) diff; + if (x == 0) { + x = (int) (diff >>> 32); + n = 32; + } + + y = x << 16; + if (y == 0) { + n += 16; + } else { + x = y; + } + + y = x << 8; + if (y == 0) { + n += 8; + } + return (int) (((lw >>> n) & 0xFFL) - ((rw >>> n) & 0xFFL)); + } + } + + // The epilogue to cover the last (minLength % 8) elements. + for (int i = minWords * LONG_BYTES; i < minLength; i++) { + int result = unsignedByteToInt(buffer1[offset1 + i]) - + unsignedByteToInt(buffer2[offset2 + i]); + if (result != 0) { + return result; + } + } + return length1 - length2; + } + + private static int unsignedByteToInt(byte value) { + return value & 0xff; + } + + private static boolean lessThanUnsigned(long x1, long x2) { + return (x1 + Long.MIN_VALUE) < (x2 + Long.MIN_VALUE); + } } diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/ArrayType.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/ArrayType.java index f825b22..a768a75 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/ArrayType.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/ArrayType.java @@ -25,6 +25,8 @@ import org.apache.flink.util.Preconditions; */ public class ArrayType implements InternalType { + private static final long serialVersionUID = 1L; + private final InternalType elementType; public ArrayType(InternalType elementType) { diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/TimeType.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/BinaryType.java similarity index 63% copy from flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/TimeType.java copy to flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/BinaryType.java index b3cdf5b..7c79243 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/TimeType.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/BinaryType.java @@ -19,13 +19,22 @@ package org.apache.flink.table.type; /** - * Sql time type. + * Binary type, It differs from ArrayType(Byte): + * + * <p>1. Comparisons: Unsigned comparisons, not signed byte comparisons. + * According to: https://docs.oracle.com/cd/E11882_01/timesten.112/e21642/types.htm#TTSQL148 + * The BINARY data type is a fixed-length binary value with a length of n bytes. In database, + * byte value usually expresses a range of 0-255, so the comparison is unsigned comparisons. + * + * <p>2. Its elements cannot have null values. */ -public class TimeType implements AtomicType { +public class BinaryType implements AtomicType { - public static final TimeType INSTANCE = new TimeType(); + private static final long serialVersionUID = 1L; - private TimeType() {} + public static final BinaryType INSTANCE = new BinaryType(); + + private BinaryType() {} @Override public boolean equals(Object o) { @@ -41,5 +50,4 @@ public class TimeType implements AtomicType { public String toString() { return getClass().getSimpleName(); } - } diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/BooleanType.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/BooleanType.java index 02416ed..5bfc550 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/BooleanType.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/BooleanType.java @@ -23,6 +23,8 @@ package org.apache.flink.table.type; */ public class BooleanType extends PrimitiveType { + private static final long serialVersionUID = 1L; + public static final BooleanType INSTANCE = new BooleanType(); private BooleanType() {} diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/ByteType.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/ByteType.java index 343acb1..6a15d81 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/ByteType.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/ByteType.java @@ -23,6 +23,8 @@ package org.apache.flink.table.type; */ public class ByteType extends PrimitiveType { + private static final long serialVersionUID = 1L; + public static final ByteType INSTANCE = new ByteType(); private ByteType() {} diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/CharType.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/CharType.java index bdb523c..90aced6 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/CharType.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/CharType.java @@ -23,6 +23,8 @@ package org.apache.flink.table.type; */ public class CharType extends PrimitiveType { + private static final long serialVersionUID = 1L; + public static final CharType INSTANCE = new CharType(); private CharType() {} diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/DateType.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/DateType.java index 2c86a85..da36035 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/DateType.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/DateType.java @@ -23,6 +23,8 @@ package org.apache.flink.table.type; */ public class DateType implements AtomicType { + private static final long serialVersionUID = 1L; + public static final DateType DATE = new DateType(0, "DateType"); public static final DateType INTERVAL_MONTHS = new DateType(1, "IntervalMonths"); diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/DecimalType.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/DecimalType.java index bb07118..5c66d19 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/DecimalType.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/DecimalType.java @@ -27,6 +27,8 @@ import static java.lang.String.format; */ public class DecimalType implements AtomicType { + private static final long serialVersionUID = 1L; + public static final int MAX_PRECISION = 38; public static final int MAX_COMPACT_PRECISION = 18; diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/DoubleType.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/DoubleType.java index fb1e46b..9262371 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/DoubleType.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/DoubleType.java @@ -23,6 +23,8 @@ package org.apache.flink.table.type; */ public class DoubleType extends PrimitiveType { + private static final long serialVersionUID = 1L; + public static final DoubleType INSTANCE = new DoubleType(); private DoubleType() {} diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/FloatType.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/FloatType.java index a250f161..26a36e4 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/FloatType.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/FloatType.java @@ -23,6 +23,8 @@ package org.apache.flink.table.type; */ public class FloatType extends PrimitiveType { + private static final long serialVersionUID = 1L; + public static final FloatType INSTANCE = new FloatType(); private FloatType() {} diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/GenericType.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/GenericType.java index d647480..f344e06 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/GenericType.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/GenericType.java @@ -30,6 +30,8 @@ import static org.apache.flink.util.Preconditions.checkNotNull; */ public class GenericType<T> implements AtomicType { + private static final long serialVersionUID = 1L; + private final TypeInformation<T> typeInfo; private transient TypeSerializer<T> serializer; diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/IntType.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/IntType.java index e89032b..5b39ea7 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/IntType.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/IntType.java @@ -23,6 +23,8 @@ package org.apache.flink.table.type; */ public class IntType extends PrimitiveType { + private static final long serialVersionUID = 1L; + public static final IntType INSTANCE = new IntType(); private IntType() {} diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/InternalTypes.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/InternalTypes.java index 7d750e6..644f428 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/InternalTypes.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/InternalTypes.java @@ -43,7 +43,7 @@ public class InternalTypes { public static final CharType CHAR = CharType.INSTANCE; - public static final ArrayType BINARY = new ArrayType(BYTE); + public static final BinaryType BINARY = BinaryType.INSTANCE; public static final DateType DATE = DateType.DATE; diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/LongType.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/LongType.java index c21936f..7be209d 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/LongType.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/LongType.java @@ -23,6 +23,8 @@ package org.apache.flink.table.type; */ public class LongType extends PrimitiveType { + private static final long serialVersionUID = 1L; + public static final LongType INSTANCE = new LongType(); private LongType() {} diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/MapType.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/MapType.java index 4498d5d..f5dc0a1 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/MapType.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/MapType.java @@ -23,6 +23,8 @@ package org.apache.flink.table.type; */ public class MapType implements InternalType { + private static final long serialVersionUID = 1L; + private final InternalType keyType; private final InternalType valueType; diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/PrimitiveType.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/PrimitiveType.java index 002f4e0..03c78ab 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/PrimitiveType.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/PrimitiveType.java @@ -23,6 +23,8 @@ package org.apache.flink.table.type; */ public abstract class PrimitiveType implements AtomicType { + private static final long serialVersionUID = 1L; + @Override public boolean equals(Object o) { return this == o || o != null && getClass() == o.getClass(); diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/RowType.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/RowType.java index 3401573..380ec44 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/RowType.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/RowType.java @@ -31,6 +31,8 @@ import java.util.Arrays; */ public class RowType implements InternalType { + private static final long serialVersionUID = 1L; + private final InternalType[] types; private final String[] fieldNames; diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/ShortType.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/ShortType.java index 32d242d..173c688 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/ShortType.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/ShortType.java @@ -23,6 +23,8 @@ package org.apache.flink.table.type; */ public class ShortType extends PrimitiveType { + private static final long serialVersionUID = 1L; + public static final ShortType INSTANCE = new ShortType(); private ShortType() {} diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/StringType.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/StringType.java index 8893dc2..504a494 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/StringType.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/StringType.java @@ -23,6 +23,8 @@ package org.apache.flink.table.type; */ public class StringType implements AtomicType { + private static final long serialVersionUID = 1L; + public static final StringType INSTANCE = new StringType(); private StringType() { diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/TimeType.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/TimeType.java index b3cdf5b..a313c3a 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/TimeType.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/TimeType.java @@ -23,6 +23,8 @@ package org.apache.flink.table.type; */ public class TimeType implements AtomicType { + private static final long serialVersionUID = 1L; + public static final TimeType INSTANCE = new TimeType(); private TimeType() {} diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/TimestampType.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/TimestampType.java index 20b3652..34737c1 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/TimestampType.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/TimestampType.java @@ -23,6 +23,8 @@ package org.apache.flink.table.type; */ public class TimestampType implements AtomicType { + private static final long serialVersionUID = 1L; + public static final TimestampType TIMESTAMP = new TimestampType(0, "TimestampType"); public static final TimestampType INTERVAL_MILLIS = new TimestampType(1, "IntervalMillis"); diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/TypeConverters.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/TypeConverters.java index c016590..ea90a77 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/TypeConverters.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/type/TypeConverters.java @@ -88,6 +88,7 @@ public class TypeConverters { internalTypeToInfo.put(InternalTypes.DATE, BasicTypeInfo.INT_TYPE_INFO); internalTypeToInfo.put(InternalTypes.TIMESTAMP, BasicTypeInfo.LONG_TYPE_INFO); internalTypeToInfo.put(InternalTypes.TIME, BasicTypeInfo.INT_TYPE_INFO); + internalTypeToInfo.put(InternalTypes.BINARY, PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO); INTERNAL_TYPE_TO_INTERNAL_TYPE_INFO = Collections.unmodifiableMap(internalTypeToInfo); } diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/BaseRowTest.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/BaseRowTest.java index 96057af..35dc8d6 100644 --- a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/BaseRowTest.java +++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/BaseRowTest.java @@ -27,6 +27,7 @@ import org.junit.Test; import java.math.BigDecimal; +import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; @@ -43,6 +44,7 @@ public class BaseRowTest { private BinaryArray array; private BinaryMap map; private BinaryRow underRow; + private byte[] bytes; @Before public void before() { @@ -65,6 +67,7 @@ public class BaseRowTest { writer.writeInt(1, 16); writer.complete(); } + bytes = new byte[] {1, 5, 6}; } @Test @@ -78,11 +81,11 @@ public class BaseRowTest { BinaryRowWriter writer = new BinaryRowWriter(row); writer.writeRow(0, getBinaryRow(), null); writer.complete(); - testAll(row.getRow(0, 15)); + testAll(row.getRow(0, 16)); } private BinaryRow getBinaryRow() { - BinaryRow row = new BinaryRow(15); + BinaryRow row = new BinaryRow(16); BinaryRowWriter writer = new BinaryRowWriter(row); writer.writeBoolean(0, true); writer.writeByte(1, (byte) 1); @@ -100,12 +103,13 @@ public class BaseRowTest { writer.writeMap(13, map); writer.writeRow(14, underRow, new BaseRowSerializer( new ExecutionConfig(), InternalTypes.INT, InternalTypes.INT)); + writer.writeBinary(15, bytes); return row; } @Test public void testGenericRow() { - GenericRow row = new GenericRow(15); + GenericRow row = new GenericRow(16); row.setField(0, true); row.setField(1, (byte) 1); row.setField(2, (short) 2); @@ -121,12 +125,13 @@ public class BaseRowTest { row.setField(12, array); row.setField(13, map); row.setField(14, underRow); + row.setField(15, bytes); testAll(row); } @Test public void testBoxedWrapperRow() { - BoxedWrapperRow row = new BoxedWrapperRow(15); + BoxedWrapperRow row = new BoxedWrapperRow(16); row.setBoolean(0, true); row.setByte(1, (byte) 1); row.setShort(2, (short) 2); @@ -142,6 +147,7 @@ public class BaseRowTest { row.setNonPrimitiveValue(12, array); row.setNonPrimitiveValue(13, map); row.setNonPrimitiveValue(14, underRow); + row.setNonPrimitiveValue(15, bytes); testAll(row); } @@ -154,7 +160,7 @@ public class BaseRowTest { row1.setField(3, 3); row1.setField(4, (long) 4); - GenericRow row2 = new GenericRow(10); + GenericRow row2 = new GenericRow(11); row2.setField(0, (float) 5); row2.setField(1, (double) 6); row2.setField(2, (char) 7); @@ -165,11 +171,12 @@ public class BaseRowTest { row2.setField(7, array); row2.setField(8, map); row2.setField(9, underRow); + row2.setField(10, bytes); testAll(new JoinedRow(row1, row2)); } private void testAll(BaseRow row) { - assertEquals(15, row.getArity()); + assertEquals(16, row.getArity()); // test header assertEquals(0, row.getHeader()); @@ -193,6 +200,7 @@ public class BaseRowTest { assertEquals(map, row.getMap(13)); assertEquals(15, row.getRow(14, 2).getInt(0)); assertEquals(16, row.getRow(14, 2).getInt(1)); + assertArrayEquals(bytes, row.getBinary(15)); // test set row.setBoolean(0, false); diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/BinaryArrayTest.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/BinaryArrayTest.java index abc23ce..beeb8c0 100644 --- a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/BinaryArrayTest.java +++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/BinaryArrayTest.java @@ -26,6 +26,7 @@ import org.apache.flink.table.type.InternalTypes; import org.apache.flink.table.typeutils.BaseRowSerializer; import org.apache.flink.table.util.SegmentsUtil; +import org.junit.Assert; import org.junit.Test; import java.math.BigDecimal; @@ -502,4 +503,18 @@ public class BinaryArrayTest { assertEquals(1, nestedRow.getInt(1)); assertTrue(array.isNullAt(1)); } + + @Test + public void testBinary() { + BinaryArray array = new BinaryArray(); + BinaryArrayWriter writer = new BinaryArrayWriter(array, 2, 8); + byte[] bytes1 = new byte[] {1, -1, 5}; + byte[] bytes2 = new byte[] {1, -1, 5, 5, 1, 5, 1, 5}; + writer.writeBinary(0, bytes1); + writer.writeBinary(1, bytes2); + writer.complete(); + + Assert.assertArrayEquals(bytes1, array.getBinary(0)); + Assert.assertArrayEquals(bytes2, array.getBinary(1)); + } } diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/BinaryRowTest.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/BinaryRowTest.java index 1803e5a..15334f1 100644 --- a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/BinaryRowTest.java +++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/BinaryRowTest.java @@ -450,4 +450,18 @@ public class BinaryRowTest { assertEquals(1, nestedRow.getInt(1)); assertTrue(row.isNullAt(1)); } + + @Test + public void testBinary() { + BinaryRow row = new BinaryRow(2); + BinaryRowWriter writer = new BinaryRowWriter(row); + byte[] bytes1 = new byte[] {1, -1, 5}; + byte[] bytes2 = new byte[] {1, -1, 5, 5, 1, 5, 1, 5}; + writer.writeBinary(0, bytes1); + writer.writeBinary(1, bytes2); + writer.complete(); + + Assert.assertArrayEquals(bytes1, row.getBinary(0)); + Assert.assertArrayEquals(bytes2, row.getBinary(1)); + } } diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/BinaryStringTest.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/BinaryStringTest.java index 1b78155..4570608 100644 --- a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/BinaryStringTest.java +++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/dataformat/BinaryStringTest.java @@ -166,10 +166,10 @@ public class BinaryStringTest { MemorySegment segment1 = MemorySegmentFactory.allocateUnpooledSegment(1024); MemorySegment segment2 = MemorySegmentFactory.allocateUnpooledSegment(1024); - SortUtil.putBinaryStringNormalizedKey(fromString("abcabcabc"), segment1, 0, 9); - SortUtil.putBinaryStringNormalizedKey(fromString("abcabcabC"), segment2, 0, 9); + SortUtil.putStringNormalizedKey(fromString("abcabcabc"), segment1, 0, 9); + SortUtil.putStringNormalizedKey(fromString("abcabcabC"), segment2, 0, 9); assertTrue(segment1.compare(segment2, 0, 0, 9) > 0); - SortUtil.putBinaryStringNormalizedKey(fromString("abcab"), segment1, 0, 9); + SortUtil.putStringNormalizedKey(fromString("abcab"), segment1, 0, 9); assertTrue(segment1.compare(segment2, 0, 0, 9) < 0); } diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/sort/BinaryMergeIteratorTest.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/sort/BinaryMergeIteratorTest.java index 30bde3e..d170f5c 100644 --- a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/sort/BinaryMergeIteratorTest.java +++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/sort/BinaryMergeIteratorTest.java @@ -23,6 +23,7 @@ import org.apache.flink.api.common.typeutils.base.IntComparator; import org.apache.flink.table.dataformat.BinaryRow; import org.apache.flink.table.dataformat.BinaryRowWriter; import org.apache.flink.table.dataformat.BinaryString; +import org.apache.flink.table.generated.RecordComparator; import org.apache.flink.table.typeutils.BinaryRowSerializer; import org.apache.flink.util.MutableObjectIterator; diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/sort/BufferedKVExternalSorterTest.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/sort/BufferedKVExternalSorterTest.java index 13356cc..1928696 100644 --- a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/sort/BufferedKVExternalSorterTest.java +++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/sort/BufferedKVExternalSorterTest.java @@ -29,6 +29,8 @@ import org.apache.flink.table.api.TableConfigOptions; import org.apache.flink.table.dataformat.BinaryRow; import org.apache.flink.table.dataformat.BinaryRowWriter; import org.apache.flink.table.dataformat.BinaryString; +import org.apache.flink.table.generated.NormalizedKeyComputer; +import org.apache.flink.table.generated.RecordComparator; import org.apache.flink.table.typeutils.BinaryRowSerializer; import org.apache.flink.util.MutableObjectIterator; diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/sort/IntNormalizedKeyComputer.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/sort/IntNormalizedKeyComputer.java index 9cad238..42706e7 100644 --- a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/sort/IntNormalizedKeyComputer.java +++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/sort/IntNormalizedKeyComputer.java @@ -20,11 +20,12 @@ package org.apache.flink.table.runtime.sort; import org.apache.flink.core.memory.MemorySegment; import org.apache.flink.table.dataformat.BaseRow; +import org.apache.flink.table.generated.NormalizedKeyComputer; /** * Example for int {@link NormalizedKeyComputer}. */ -public class IntNormalizedKeyComputer extends org.apache.flink.table.runtime.sort.NormalizedKeyComputer { +public class IntNormalizedKeyComputer implements NormalizedKeyComputer { public static final IntNormalizedKeyComputer INSTANCE = new IntNormalizedKeyComputer(); diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/sort/IntRecordComparator.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/sort/IntRecordComparator.java index b41de22..5bbc48e 100644 --- a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/sort/IntRecordComparator.java +++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/sort/IntRecordComparator.java @@ -19,11 +19,12 @@ package org.apache.flink.table.runtime.sort; import org.apache.flink.table.dataformat.BaseRow; +import org.apache.flink.table.generated.RecordComparator; /** * Example String {@link RecordComparator}. */ -public class IntRecordComparator extends RecordComparator { +public class IntRecordComparator implements RecordComparator { public static final IntRecordComparator INSTANCE = new IntRecordComparator(); diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/sort/SortUtilTest.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/sort/SortUtilTest.java index de2f61e..73a491e 100644 --- a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/sort/SortUtilTest.java +++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/sort/SortUtilTest.java @@ -115,12 +115,12 @@ public class SortUtilTest { BinaryString[] arr = new BinaryString[len]; for (int i = 0; i < len; i++) { arr[i] = BinaryString.fromString(String.valueOf(random.nextLong())); - SortUtil.putBinaryStringNormalizedKey(arr[i], segments[i], 0, 8); + SortUtil.putStringNormalizedKey(arr[i], segments[i], 0, 8); } Arrays.sort(arr, BinaryString::compareTo); for (int i = 0; i < len; i++) { - SortUtil.putBinaryStringNormalizedKey(arr[i], compareSegs[i], 0, 8); + SortUtil.putStringNormalizedKey(arr[i], compareSegs[i], 0, 8); } Arrays.sort(segments, (o1, o2) -> o1.compare(o2, 0, 0, 8));