agrawaldevesh commented on a change in pull request #29104: URL: https://github.com/apache/spark/pull/29104#discussion_r459804718
########## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala ########## @@ -81,6 +81,34 @@ private[execution] sealed trait HashedRelation extends KnownSizeEstimation { */ def asReadOnlyCopy(): HashedRelation + + /** + * Normally HashedRelation is built from an Source (input: Iterator[InternalRow]). + * This indicates the original input is empty. + * Note that, the hashed relation can be empty despite the input being not empty, + * since the hashed relation skips over null keys. + */ + var isOriginalInputEmpty: Boolean + + def setOriginInputEmtpy(isOriginalInputEmpty: Boolean): HashedRelation = { Review comment: oops Forgot to change the name of the setter. ########## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoinExec.scala ########## @@ -454,6 +491,48 @@ case class BroadcastHashJoinExec( val (matched, checkCondition, _) = getJoinCondition(ctx, input) val numOutput = metricTerm(ctx, "numOutputRows") + // fast stop if isOriginalInputEmpty = true, should accept all rows in streamedSide + if (broadcastRelation.value.isOriginalInputEmpty) { + return s""" + |// Anti Join isOriginalInputEmpty(true) accept all + |$numOutput.add(1); + |${consume(ctx, input)} + """.stripMargin + } + + if (isNullAwareAntiJoin) { + if (broadcastRelation.value.allNullColumnKeyExistsInOriginalInput) { + return s""" + |// NAAJ Review comment: It would be awesome to verify using org.apache.spark.sql.execution.debug.DebugQuery#debugCodegen that this case does not generate an empty spin-loop -- ie a loop with no executable body. ########## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala ########## @@ -323,11 +374,18 @@ private[joins] object UnsafeHashedRelation { // Create a mapping of buildKeys -> rows val keyGenerator = UnsafeProjection.create(key) var numFields = 0 + val numKeys = key.length + val isOriginalInputEmpty = !input.hasNext + var allNullColumnKeyExistsInOriginalInput: Boolean = false while (input.hasNext) { val row = input.next().asInstanceOf[UnsafeRow] numFields = row.numFields() val key = keyGenerator(row) - if (!key.anyNull) { + if ((0 until numKeys).forall(key.isNullAt)) { Review comment: I think you should only do this check if isNullAware is true. Why pay the perf penalty of checking the row again otherwise ? ########## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala ########## @@ -323,11 +374,18 @@ private[joins] object UnsafeHashedRelation { // Create a mapping of buildKeys -> rows val keyGenerator = UnsafeProjection.create(key) var numFields = 0 + val numKeys = key.length + val isOriginalInputEmpty = !input.hasNext + var allNullColumnKeyExistsInOriginalInput: Boolean = false while (input.hasNext) { val row = input.next().asInstanceOf[UnsafeRow] numFields = row.numFields() val key = keyGenerator(row) - if (!key.anyNull) { + if ((0 until numKeys).forall(key.isNullAt)) { + allNullColumnKeyExistsInOriginalInput = true + } + + if (isNullAware || (!isNullAware && !key.anyNull)) { Review comment: Can we simplify this to: `isNullAware || !key.anyNull` ? ########## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala ########## @@ -896,22 +967,32 @@ private[joins] object LongHashedRelation { // Create a mapping of key -> rows var numFields = 0 + val isOriginalInputEmpty: Boolean = !input.hasNext + var allNullColumnKeyExistsInOriginalInput: Boolean = false while (input.hasNext) { val unsafeRow = input.next().asInstanceOf[UnsafeRow] numFields = unsafeRow.numFields() val rowKey = keyGenerator(unsafeRow) if (!rowKey.isNullAt(0)) { + // LongToUnsafeRowMap can't insert null key val key = rowKey.getLong(0) map.append(key, unsafeRow) + } else { + // LongHashedRelation is single-column key Review comment: LongHashedRelation only stores single column keys. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org