Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/19452#discussion_r144131323 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala --- @@ -220,37 +232,36 @@ case class StreamingSymmetricHashJoinExec( // stored left input, and also stores all the right input. It also generates all rows from // matching new left input with new right input, since the new left input has become stored // by that point. This tiny asymmetry is necessary to avoid duplication. - val leftOutputIter = leftSideJoiner.storeAndJoinWithOtherSide(rightSideJoiner) { - (input: UnsafeRow, matched: UnsafeRow) => joinedRow.withLeft(input).withRight(matched) + val leftOutputIter = leftSideJoiner.storeAndJoinWithOtherSide( + rightSideJoiner, joinedFilter) { + (input: InternalRow, matched: InternalRow) => joinedRow.withLeft(input).withRight(matched) } - val rightOutputIter = rightSideJoiner.storeAndJoinWithOtherSide(leftSideJoiner) { - (input: UnsafeRow, matched: UnsafeRow) => joinedRow.withLeft(matched).withRight(input) + val rightOutputIter = rightSideJoiner.storeAndJoinWithOtherSide( + leftSideJoiner, joinedFilter) { + (input: InternalRow, matched: InternalRow) => joinedRow.withLeft(matched).withRight(input) } - // Filter the joined rows based on the given condition. - val outputFilterFunction = newPredicate(condition.getOrElse(Literal(true)), output).eval _ - // We need to save the time that the inner join output iterator completes, since outer join // output counts as both update and removal time. var innerOutputCompletionTimeNs: Long = 0 def onInnerOutputCompletion = { innerOutputCompletionTimeNs = System.nanoTime } val filteredInnerOutputIter = CompletionIterator[InternalRow, Iterator[InternalRow]]( --- End diff -- this is not a filtered iterator any more, please rename
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org