Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19327#discussion_r140607924
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala
 ---
    @@ -89,61 +89,124 @@ class SymmetricHashJoinStateManager(
       /**
        * Remove using a predicate on keys. See class docs for more context and 
implement details.
        */
    -  def removeByKeyCondition(condition: UnsafeRow => Boolean): Unit = {
    -    val allKeyToNumValues = keyToNumValues.iterator
    -
    -    while (allKeyToNumValues.hasNext) {
    -      val keyToNumValue = allKeyToNumValues.next
    -      if (condition(keyToNumValue.key)) {
    -        keyToNumValues.remove(keyToNumValue.key)
    -        keyWithIndexToValue.removeAllValues(keyToNumValue.key, 
keyToNumValue.numValue)
    +  def removeByKeyCondition(condition: UnsafeRow => Boolean): 
Iterator[(UnsafeRow, UnsafeRow)] = {
    --- End diff --
    
    Use UnsafeRowPair for return. Same reason as above.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to