[ 
https://issues.apache.org/jira/browse/FLINK-3140?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15062112#comment-15062112
 ] 

ASF GitHub Bot commented on FLINK-3140:
---------------------------------------

Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1465#discussion_r47911612
  
    --- Diff: 
flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/RowComparator.scala
 ---
    @@ -0,0 +1,468 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.table.typeinfo
    +
    +import java.util
    +
    +import org.apache.flink.api.common.typeutils.base.BasicTypeComparator
    +import org.apache.flink.api.common.typeutils.{CompositeTypeComparator, 
TypeComparator, TypeSerializer}
    +import org.apache.flink.api.java.typeutils.runtime.TupleComparatorBase
    +import org.apache.flink.api.table.Row
    +import org.apache.flink.api.table.typeinfo.NullMaskUtils.readIntoNullMask
    +import org.apache.flink.api.table.typeinfo.RowComparator.NullChecker
    +import org.apache.flink.core.memory.{MemorySegment, DataOutputView, 
DataInputView}
    +import org.apache.flink.types.{KeyFieldOutOfBoundsException, 
NullKeyFieldException}
    +
    +/**
    + * Comparator for [[Row]].
    + */
    +class RowComparator private (
    +    /** key positions describe which fields are keys in what order */
    +    val keyPositions: Array[Int],
    +    /** comparators for the key fields, in the same order as the key 
fields */
    +    val comparators: Array[TypeComparator[_]],
    +    /** serializers to deserialize the first n fields for comparison */
    +    val serializers: Array[TypeSerializer[_]],
    +    /** auxiliary fields for normalized key support */
    +    private var normalizedKeyLengths: Array[Int],
    +    private var numLeadingNormalizableKeys: Int,
    +    private var normalizableKeyPrefixLen: Int,
    +    private var invertNormKey: Boolean)
    +  extends CompositeTypeComparator[Row] with Serializable {
    +
    +  // null masks for serialized comparison
    +  private val nullMask1 = new Array[Boolean](serializers.length)
    +  private val nullMask2 = new Array[Boolean](serializers.length)
    +
    +  // cache for the deserialized key field objects
    +  @transient
    +  private var deserializedKeyFields1: Array[Any] = null
    +  @transient
    +  private var deserializedKeyFields2: Array[Any] = null
    +
    +  // null checker for reference comparison
    +  private val nullChecker = new NullChecker()
    +
    +  def this(
    +      keyPositions: Array[Int],
    +      comparators: Array[TypeComparator[_]],
    +      serializers: Array[TypeSerializer[_]]) = {
    +    this(keyPositions, comparators, serializers, new 
Array[Int](keyPositions.length), 0, 0, false)
    +    // set up auxiliary fields for normalized key support
    +    setupAuxiliaryFields()
    +  }
    +
    +  private def setupAuxiliaryFields(): Unit = {
    +    var i = 0
    +    while (i < keyPositions.length) {
    +      val k: TypeComparator[_] = comparators(i)
    +      // as long as the leading keys support normalized keys, we can build 
up the composite key
    +      if (k.supportsNormalizedKey()) {
    +        if (i == 0) {
    +          // the first comparator decides whether we need to invert the 
key direction
    +          invertNormKey = k.invertNormalizedKey()
    +        }
    +        else if (k.invertNormalizedKey() != invertNormKey) {
    +          // if a successor does not agree on the inversion direction, it 
cannot be part of the
    +          // normalized key
    +          return
    +        }
    +        numLeadingNormalizableKeys += 1
    +        val len: Int = k.getNormalizeKeyLen
    +        if (len < 0) {
    +          throw new RuntimeException("Comparator " + k.getClass.getName +
    +            " specifies an invalid length for the normalized key: " + len)
    +        }
    +        normalizedKeyLengths(i) = len + 1 // add one for a null byte
    +        normalizableKeyPrefixLen += len + 1 // add one for a null byte
    +        if (normalizableKeyPrefixLen < 0) {
    +          // overflow, which means we are out of budget for normalized key 
space anyways
    +          normalizableKeyPrefixLen = Integer.MAX_VALUE
    +          return
    +        }
    +      }
    +      else {
    +        return
    +      }
    +      i += 1
    +    }
    +  }
    +
    +  private def instantiateDeserializationUtils(): Unit = {
    +    deserializedKeyFields1 = new Array[Any](serializers.length)
    +    deserializedKeyFields2 = new Array[Any](serializers.length)
    +
    +    var i = 0
    +    while (i < serializers.length) {
    +      deserializedKeyFields1(i) = serializers(i).createInstance()
    +      deserializedKeyFields2(i) = serializers(i).createInstance()
    +      i += 1
    +    }
    +  }
    +
    +  // 
--------------------------------------------------------------------------------------------
    +  //  Comparator Methods
    +  // 
--------------------------------------------------------------------------------------------
    +
    +  override def getFlatComparator(flatComparators: 
util.List[TypeComparator[_]]): Unit =
    +    comparators.foreach {
    +      case ctc: CompositeTypeComparator[_] => 
ctc.getFlatComparator(flatComparators)
    +      case c@_ => flatComparators.add(c)
    +    }
    +
    +  override def compareToReference(referencedComparator: 
TypeComparator[Row]): Int = {
    +    val other: RowComparator = 
referencedComparator.asInstanceOf[RowComparator]
    +    var i = 0
    +    try {
    +      while (i < keyPositions.length) {
    +        val comparator = comparators(i).asInstanceOf[TypeComparator[Any]]
    +        val otherComparator = 
other.comparators(i).asInstanceOf[TypeComparator[Any]]
    +
    +        val nullCheck = comparator.equalToReference(nullChecker)
    +        val nullCheckOther = otherComparator.equalToReference(nullChecker)
    +
    +        var cmp = 0
    +        // both values are null -> equality
    +        if (nullCheck && nullCheckOther) {
    +          cmp = 0
    +        }
    +        // one value is null -> inequality
    +        // order is considered for basic types
    +        else if (nullCheck || nullCheckOther) {
    +          if (comparator.isInstanceOf[BasicTypeComparator[_]]) {
    +            val basicComp = comparator.asInstanceOf[BasicTypeComparator[_]]
    +            if (nullCheck) {
    +              return if (basicComp.isAscendingComparison) 1 else -1
    +            }
    +            else if (nullCheckOther) {
    +              return if (basicComp.isAscendingComparison) -1 else 1
    +            }
    +          }
    +          else {
    +            return if (nullCheck) 1 else -1
    +          }
    +        }
    +        else {
    +          cmp = comparator.compareToReference(otherComparator)
    +        }
    +
    +        if (cmp != 0) {
    +          return cmp
    +        }
    +        i = i + 1
    +      }
    +      0
    +    }
    +    catch {
    +      case npex: NullPointerException =>
    +        throw new NullKeyFieldException(keyPositions(i))
    +      case iobex: IndexOutOfBoundsException =>
    +        throw new KeyFieldOutOfBoundsException(keyPositions(i))
    +    }
    +  }
    --- End diff --
    
    We could write this as
    
    ```
    override def compareToReference(referencedComparator: TypeComparator[Row]): 
Int = {
        val other: RowComparator = 
referencedComparator.asInstanceOf[RowComparator]
    
        comparators.zip(other.comparators).view.map{
          case (comparator, otherComparator) =>
            val nullCheck = 
comparator.asInstanceOf[TypeComparator[Any]].equalToReference(nullChecker)
            val nullCheckOther = 
otherComparator.asInstanceOf[TypeComparator[Any]]
              .equalToReference(nullChecker)
    
            (nullCheck, nullCheckOther) match {
              case (true, true) => 0
              case (true, false) =>
                comparator match {
                  case basicComp: BasicTypeComparator[_] if 
basicComp.isAscendingComparison =>
                    1
                  case basicComp: BasicTypeComparator[_] if 
!basicComp.isAscendingComparison =>
                    -1
                  case _ => 1
                }
              case (false, true) =>
                comparator match {
                  case basicComp: BasicTypeComparator[_] if 
basicComp.isAscendingComparison =>
                    -1
                  case basicComp: BasicTypeComparator[_] if 
!basicComp.isAscendingComparison =>
                    1
                  case _ => -1
                }
              case (false, false) =>
                comparator.compareToReference(otherComparator)
            }
        }.dropWhile(_ == 0).headOption.getOrElse(0)
      }
    ```


> NULL value data layout in Row Serializer/Comparator
> ---------------------------------------------------
>
>                 Key: FLINK-3140
>                 URL: https://issues.apache.org/jira/browse/FLINK-3140
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Table API
>            Reporter: Chengxiang Li
>            Assignee: Timo Walther
>
> To store/materialize NULL value in Row objects, we should need new Row 
> Serializer/Comparator which is aware of NULL value fields.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to