[ https://issues.apache.org/jira/browse/FLINK-3140?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15062112#comment-15062112 ]
ASF GitHub Bot commented on FLINK-3140: --------------------------------------- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1465#discussion_r47911612 --- Diff: flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/RowComparator.scala --- @@ -0,0 +1,468 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.typeinfo + +import java.util + +import org.apache.flink.api.common.typeutils.base.BasicTypeComparator +import org.apache.flink.api.common.typeutils.{CompositeTypeComparator, TypeComparator, TypeSerializer} +import org.apache.flink.api.java.typeutils.runtime.TupleComparatorBase +import org.apache.flink.api.table.Row +import org.apache.flink.api.table.typeinfo.NullMaskUtils.readIntoNullMask +import org.apache.flink.api.table.typeinfo.RowComparator.NullChecker +import org.apache.flink.core.memory.{MemorySegment, DataOutputView, DataInputView} +import org.apache.flink.types.{KeyFieldOutOfBoundsException, NullKeyFieldException} + +/** + * Comparator for [[Row]]. + */ +class RowComparator private ( + /** key positions describe which fields are keys in what order */ + val keyPositions: Array[Int], + /** comparators for the key fields, in the same order as the key fields */ + val comparators: Array[TypeComparator[_]], + /** serializers to deserialize the first n fields for comparison */ + val serializers: Array[TypeSerializer[_]], + /** auxiliary fields for normalized key support */ + private var normalizedKeyLengths: Array[Int], + private var numLeadingNormalizableKeys: Int, + private var normalizableKeyPrefixLen: Int, + private var invertNormKey: Boolean) + extends CompositeTypeComparator[Row] with Serializable { + + // null masks for serialized comparison + private val nullMask1 = new Array[Boolean](serializers.length) + private val nullMask2 = new Array[Boolean](serializers.length) + + // cache for the deserialized key field objects + @transient + private var deserializedKeyFields1: Array[Any] = null + @transient + private var deserializedKeyFields2: Array[Any] = null + + // null checker for reference comparison + private val nullChecker = new NullChecker() + + def this( + keyPositions: Array[Int], + comparators: Array[TypeComparator[_]], + serializers: Array[TypeSerializer[_]]) = { + this(keyPositions, comparators, serializers, new Array[Int](keyPositions.length), 0, 0, false) + // set up auxiliary fields for normalized key support + setupAuxiliaryFields() + } + + private def setupAuxiliaryFields(): Unit = { + var i = 0 + while (i < keyPositions.length) { + val k: TypeComparator[_] = comparators(i) + // as long as the leading keys support normalized keys, we can build up the composite key + if (k.supportsNormalizedKey()) { + if (i == 0) { + // the first comparator decides whether we need to invert the key direction + invertNormKey = k.invertNormalizedKey() + } + else if (k.invertNormalizedKey() != invertNormKey) { + // if a successor does not agree on the inversion direction, it cannot be part of the + // normalized key + return + } + numLeadingNormalizableKeys += 1 + val len: Int = k.getNormalizeKeyLen + if (len < 0) { + throw new RuntimeException("Comparator " + k.getClass.getName + + " specifies an invalid length for the normalized key: " + len) + } + normalizedKeyLengths(i) = len + 1 // add one for a null byte + normalizableKeyPrefixLen += len + 1 // add one for a null byte + if (normalizableKeyPrefixLen < 0) { + // overflow, which means we are out of budget for normalized key space anyways + normalizableKeyPrefixLen = Integer.MAX_VALUE + return + } + } + else { + return + } + i += 1 + } + } + + private def instantiateDeserializationUtils(): Unit = { + deserializedKeyFields1 = new Array[Any](serializers.length) + deserializedKeyFields2 = new Array[Any](serializers.length) + + var i = 0 + while (i < serializers.length) { + deserializedKeyFields1(i) = serializers(i).createInstance() + deserializedKeyFields2(i) = serializers(i).createInstance() + i += 1 + } + } + + // -------------------------------------------------------------------------------------------- + // Comparator Methods + // -------------------------------------------------------------------------------------------- + + override def getFlatComparator(flatComparators: util.List[TypeComparator[_]]): Unit = + comparators.foreach { + case ctc: CompositeTypeComparator[_] => ctc.getFlatComparator(flatComparators) + case c@_ => flatComparators.add(c) + } + + override def compareToReference(referencedComparator: TypeComparator[Row]): Int = { + val other: RowComparator = referencedComparator.asInstanceOf[RowComparator] + var i = 0 + try { + while (i < keyPositions.length) { + val comparator = comparators(i).asInstanceOf[TypeComparator[Any]] + val otherComparator = other.comparators(i).asInstanceOf[TypeComparator[Any]] + + val nullCheck = comparator.equalToReference(nullChecker) + val nullCheckOther = otherComparator.equalToReference(nullChecker) + + var cmp = 0 + // both values are null -> equality + if (nullCheck && nullCheckOther) { + cmp = 0 + } + // one value is null -> inequality + // order is considered for basic types + else if (nullCheck || nullCheckOther) { + if (comparator.isInstanceOf[BasicTypeComparator[_]]) { + val basicComp = comparator.asInstanceOf[BasicTypeComparator[_]] + if (nullCheck) { + return if (basicComp.isAscendingComparison) 1 else -1 + } + else if (nullCheckOther) { + return if (basicComp.isAscendingComparison) -1 else 1 + } + } + else { + return if (nullCheck) 1 else -1 + } + } + else { + cmp = comparator.compareToReference(otherComparator) + } + + if (cmp != 0) { + return cmp + } + i = i + 1 + } + 0 + } + catch { + case npex: NullPointerException => + throw new NullKeyFieldException(keyPositions(i)) + case iobex: IndexOutOfBoundsException => + throw new KeyFieldOutOfBoundsException(keyPositions(i)) + } + } --- End diff -- We could write this as ``` override def compareToReference(referencedComparator: TypeComparator[Row]): Int = { val other: RowComparator = referencedComparator.asInstanceOf[RowComparator] comparators.zip(other.comparators).view.map{ case (comparator, otherComparator) => val nullCheck = comparator.asInstanceOf[TypeComparator[Any]].equalToReference(nullChecker) val nullCheckOther = otherComparator.asInstanceOf[TypeComparator[Any]] .equalToReference(nullChecker) (nullCheck, nullCheckOther) match { case (true, true) => 0 case (true, false) => comparator match { case basicComp: BasicTypeComparator[_] if basicComp.isAscendingComparison => 1 case basicComp: BasicTypeComparator[_] if !basicComp.isAscendingComparison => -1 case _ => 1 } case (false, true) => comparator match { case basicComp: BasicTypeComparator[_] if basicComp.isAscendingComparison => -1 case basicComp: BasicTypeComparator[_] if !basicComp.isAscendingComparison => 1 case _ => -1 } case (false, false) => comparator.compareToReference(otherComparator) } }.dropWhile(_ == 0).headOption.getOrElse(0) } ``` > NULL value data layout in Row Serializer/Comparator > --------------------------------------------------- > > Key: FLINK-3140 > URL: https://issues.apache.org/jira/browse/FLINK-3140 > Project: Flink > Issue Type: Sub-task > Components: Table API > Reporter: Chengxiang Li > Assignee: Timo Walther > > To store/materialize NULL value in Row objects, we should need new Row > Serializer/Comparator which is aware of NULL value fields. -- This message was sent by Atlassian JIRA (v6.3.4#6332)