Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21028#discussion_r188143390
  
    --- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 ---
    @@ -529,6 +567,239 @@ case class ArrayContains(left: Expression, right: 
Expression)
       override def prettyName: String = "array_contains"
     }
     
    +/**
    + * Checks if the two arrays contain at least one common element.
    + */
    +// scalastyle:off line.size.limit
    +@ExpressionDescription(
    +  usage = "_FUNC_(a1, a2) - Returns true if a1 contains at least a 
non-null element present also in a2. If the arrays have no common element and 
they are both non-empty and either of them contains a null element null is 
returned, false otherwise.",
    +  examples = """
    +    Examples:
    +      > SELECT _FUNC_(array(1, 2, 3), array(3, 4, 5));
    +       true
    +  """, since = "2.4.0")
    +// scalastyle:off line.size.limit
    +case class ArraysOverlap(left: Expression, right: Expression)
    +  extends BinaryArrayExpressionWithImplicitCast {
    +
    +  override def checkInputDataTypes(): TypeCheckResult = 
super.checkInputDataTypes() match {
    +    case TypeCheckResult.TypeCheckSuccess =>
    +      if (RowOrdering.isOrderable(elementType)) {
    +        TypeCheckResult.TypeCheckSuccess
    +      } else {
    +        TypeCheckResult.TypeCheckFailure(s"${elementType.simpleString} 
cannot be used in comparison.")
    +      }
    +    case failure => failure
    +  }
    +
    +  @transient private lazy val ordering: Ordering[Any] =
    +    TypeUtils.getInterpretedOrdering(elementType)
    +
    +  @transient private lazy val elementTypeSupportEquals = elementType match 
{
    +    case BinaryType => false
    +    case _: AtomicType => true
    +    case _ => false
    +  }
    +
    +  @transient private lazy val doEvaluation = if (elementTypeSupportEquals) 
{
    +    fastEval _
    +  } else {
    +    bruteForceEval _
    +  }
    +
    +  override def dataType: DataType = BooleanType
    +
    +  override def nullable: Boolean = {
    +    left.nullable || right.nullable || 
left.dataType.asInstanceOf[ArrayType].containsNull ||
    +      right.dataType.asInstanceOf[ArrayType].containsNull
    +  }
    +
    +  override def nullSafeEval(a1: Any, a2: Any): Any = {
    +    doEvaluation(a1.asInstanceOf[ArrayData], a2.asInstanceOf[ArrayData])
    +  }
    +
    +  /**
    +   * A fast implementation which puts all the elements from the smaller 
array in a set
    +   * and then performs a lookup on it for each element of the bigger one.
    +   * This eval mode works only for data types which implements properly 
the equals method.
    +   */
    +  private def fastEval(arr1: ArrayData, arr2: ArrayData): Any = {
    +    var hasNull = false
    +    val (bigger, smaller) = if (arr1.numElements() > arr2.numElements()) {
    +      (arr1, arr2)
    +    } else {
    +      (arr2, arr1)
    +    }
    +    if (smaller.numElements() > 0) {
    +      val smallestSet = new mutable.HashSet[Any]
    +      smaller.foreach(elementType, (_, v) =>
    +        if (v == null) {
    +          hasNull = true
    +        } else {
    +          smallestSet += v
    +        })
    +      bigger.foreach(elementType, (_, v1) =>
    +        if (v1 == null) {
    +          hasNull = true
    +        } else if (smallestSet.contains(v1)) {
    +          return true
    +        }
    +      )
    +    }
    +    if (hasNull) {
    +      null
    +    } else {
    +      false
    +    }
    +  }
    +
    +  /**
    +   * A slower evaluation which performs a nested loop and supports all the 
data types.
    +   */
    +  private def bruteForceEval(arr1: ArrayData, arr2: ArrayData): Any = {
    +    var hasNull = false
    +    if (arr1.numElements() > 0) {
    +      arr1.foreach(elementType, (_, v1) =>
    +        if (v1 == null) {
    +          hasNull = true
    +        } else {
    +          arr2.foreach(elementType, (_, v2) =>
    +            if (v1 == null) {
    +              hasNull = true
    +            } else if (ordering.equiv(v1, v2)) {
    +              return true
    +            }
    +          )
    +        })
    +    }
    +    if (hasNull) {
    +      null
    +    } else {
    +      false
    +    }
    +  }
    +
    +  override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
    +    nullSafeCodeGen(ctx, ev, (a1, a2) => {
    +      val smaller = ctx.freshName("smallerArray")
    +      val bigger = ctx.freshName("biggerArray")
    +      val comparisonCode = if (elementTypeSupportEquals) {
    +        fastCodegen(ctx, ev, smaller, bigger)
    +      } else {
    +        bruteForceCodegen(ctx, ev, smaller, bigger)
    +      }
    +      s"""
    +         |ArrayData $smaller;
    +         |ArrayData $bigger;
    +         |if ($a1.numElements() > $a2.numElements()) {
    +         |  $bigger = $a1;
    +         |  $smaller = $a2;
    +         |} else {
    +         |  $smaller = $a1;
    +         |  $bigger = $a2;
    +         |}
    +         |if ($smaller.numElements() > 0) {
    +         |  $comparisonCode
    +         |}
    --- End diff --
    
    ah i see, sorry I misread the code in `nullSafeCodeGen`


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to