HeartSaVioR commented on a change in pull request #30076: URL: https://github.com/apache/spark/pull/30076#discussion_r507713923
########## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala ########## @@ -151,7 +151,7 @@ case class StreamingSymmetricHashJoinExec( stateWatermarkPredicates = JoinStateWatermarkPredicates(), stateFormatVersion, left, right) } - if (stateFormatVersion < 2 && joinType != Inner) { + if (stateFormatVersion < 2 && (joinType == LeftOuter || joinType == RightOuter)) { Review comment: I see you're only considering state format version 2 while explaining how to implement left semi join. If that's the case, this if condition still applies to left semi as well and you should make it fail. Otherwise, please describe the approach how you deal with left semi with state format version 1. ########## File path: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala ########## @@ -490,15 +490,77 @@ class UnsupportedOperationsSuite extends SparkFunSuite { _.join(_, joinType = LeftSemi), streamStreamSupported = false, batchStreamSupported = false, - expectedMsg = "left semi/anti joins") + expectedMsg = "LeftSemi join") + + // Left semi joins: update and complete mode not allowed + assertNotSupportedInStreamingPlan( + s"left semi join with stream-stream relations and update mode", + streamRelation.join(streamRelation, joinType = LeftSemi, + condition = Some(attribute === attribute)), + OutputMode.Update(), + Seq("is not supported in Update output mode")) + assertNotSupportedInStreamingPlan( + s"left semi join with stream-stream relations and complete mode", + Aggregate(Nil, aggExprs("d"), streamRelation.join(streamRelation, joinType = LeftSemi, + condition = Some(attribute === attribute))), + OutputMode.Complete(), + Seq("is not supported in Complete output mode")) + + // Left ousemiter joins: stream-stream allowed with join on watermark attribute Review comment: nit: ousemiter -> semi ########## File path: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala ########## @@ -490,15 +490,77 @@ class UnsupportedOperationsSuite extends SparkFunSuite { _.join(_, joinType = LeftSemi), streamStreamSupported = false, batchStreamSupported = false, - expectedMsg = "left semi/anti joins") + expectedMsg = "LeftSemi join") + + // Left semi joins: update and complete mode not allowed Review comment: I see the code around left outer join/right outer join/left semi join are very similar which is good to deduplicate, but let's consider it optional in this PR. ########## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala ########## @@ -330,11 +338,17 @@ case class StreamingSymmetricHashJoinExec( } }.map(pair => joinedRow.withLeft(nullLeft).withRight(pair.value)) - innerOutputIter ++ outerOutputIter + hashJoinOutputIter ++ outerOutputIter + case LeftSemi => Review comment: This could be consolidated with Inner. ########## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala ########## @@ -347,24 +361,29 @@ case class StreamingSymmetricHashJoinExec( // Processing time between inner output completion and here comes from the outer portion of a // join, and thus counts as removal time as we remove old state from one side while iterating. - if (innerOutputCompletionTimeNs != 0) { + if (hashJoinOutputCompletionTimeNs != 0) { allRemovalsTimeMs += - math.max(NANOSECONDS.toMillis(System.nanoTime - innerOutputCompletionTimeNs), 0) + math.max(NANOSECONDS.toMillis(System.nanoTime - hashJoinOutputCompletionTimeNs), 0) } allRemovalsTimeMs += timeTakenMs { // Remove any remaining state rows which aren't needed because they're below the watermark. // // For inner joins, we have to remove unnecessary state rows from both sides if possible. + // // For outer joins, we have already removed unnecessary state rows from the outer side // (e.g., left side for left outer join) while generating the outer "null" outputs. Now, we // have to remove unnecessary state rows from the other side (e.g., right side for the left // outer join) if possible. In all cases, nothing needs to be outputted, hence the removal // needs to be done greedily by immediately consuming the returned iterator. + // + // For left semi joins, we have to remove unnecessary state rows from both sides if + // possible. val cleanupIter = joinType match { case Inner => leftSideJoiner.removeOldState() ++ rightSideJoiner.removeOldState() case LeftOuter => rightSideJoiner.removeOldState() case RightOuter => leftSideJoiner.removeOldState() + case LeftSemi => leftSideJoiner.removeOldState() ++ rightSideJoiner.removeOldState() Review comment: Both case statement and code comment could be consolidated with `Inner`. ########## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala ########## @@ -99,15 +99,22 @@ class SymmetricHashJoinStateManager( /** * Get all the matched values for given join condition, with marking matched. * This method is designed to mark joined rows properly without exposing internal index of row. + * + * @param joinOnlyFirstTimeMatchedRow Only join with first-time matched row. + * This is used for right side of left semi join in + * [[StreamingSymmetricHashJoinExec]] only. */ def getJoinedRows( key: UnsafeRow, generateJoinedRow: InternalRow => JoinedRow, - predicate: JoinedRow => Boolean): Iterator[JoinedRow] = { + predicate: JoinedRow => Boolean, + joinOnlyFirstTimeMatchedRow: Boolean = false): Iterator[JoinedRow] = { val numValues = keyToNumValues.get(key) keyWithIndexToValue.getAll(key, numValues).map { keyIdxToValue => val joinedRow = generateJoinedRow(keyIdxToValue.value) - if (predicate(joinedRow)) { + if (joinOnlyFirstTimeMatchedRow && keyIdxToValue.matched) { Review comment: If I'm not missing something, this can be applied as a filter against `keyWithIndexToValue.getAll(key, numValues)` to avoid calling unnecessary `generateJoinedRow`, like ``` keyWithIndexToValue.getAll(key, numValues).filterNot { keyIdxToValue => joinOnlyFirstTimeMatchedRow && keyIdxToValue.matched }.map { keyIdxToValue => val joinedRow = generateJoinedRow(keyIdxToValue.value) if (predicate(joinedRow)) { if (!keyIdxToValue.matched) { keyWithIndexToValue.put(key, keyIdxToValue.valueIndex, keyIdxToValue.value, matched = true) } joinedRow } else { null } }.filter(_ != null) ``` I'm OK to have the check in map if someone concerns about having another filters - the point is, it's unnecessary to call generateJoinedRow if the condition is fulfilled. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org