HeartSaVioR commented on a change in pull request #30076:
URL: https://github.com/apache/spark/pull/30076#discussion_r507713923



##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
##########
@@ -151,7 +151,7 @@ case class StreamingSymmetricHashJoinExec(
       stateWatermarkPredicates = JoinStateWatermarkPredicates(), 
stateFormatVersion, left, right)
   }
 
-  if (stateFormatVersion < 2 && joinType != Inner) {
+  if (stateFormatVersion < 2 && (joinType == LeftOuter || joinType == 
RightOuter)) {

Review comment:
       I see you're only considering state format version 2 while explaining 
how to implement left semi join. If that's the case, this if condition still 
applies to left semi as well and you should make it fail. Otherwise, please 
describe the approach how you deal with left semi with state format version 1.

##########
File path: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala
##########
@@ -490,15 +490,77 @@ class UnsupportedOperationsSuite extends SparkFunSuite {
     _.join(_, joinType = LeftSemi),
     streamStreamSupported = false,
     batchStreamSupported = false,
-    expectedMsg = "left semi/anti joins")
+    expectedMsg = "LeftSemi join")
+
+  // Left semi joins: update and complete mode not allowed
+  assertNotSupportedInStreamingPlan(
+    s"left semi join with stream-stream relations and update mode",
+    streamRelation.join(streamRelation, joinType = LeftSemi,
+      condition = Some(attribute === attribute)),
+    OutputMode.Update(),
+    Seq("is not supported in Update output mode"))
+  assertNotSupportedInStreamingPlan(
+    s"left semi join with stream-stream relations and complete mode",
+    Aggregate(Nil, aggExprs("d"), streamRelation.join(streamRelation, joinType 
= LeftSemi,
+      condition = Some(attribute === attribute))),
+    OutputMode.Complete(),
+    Seq("is not supported in Complete output mode"))
+
+  // Left ousemiter joins: stream-stream allowed with join on watermark 
attribute

Review comment:
       nit: ousemiter -> semi

##########
File path: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala
##########
@@ -490,15 +490,77 @@ class UnsupportedOperationsSuite extends SparkFunSuite {
     _.join(_, joinType = LeftSemi),
     streamStreamSupported = false,
     batchStreamSupported = false,
-    expectedMsg = "left semi/anti joins")
+    expectedMsg = "LeftSemi join")
+
+  // Left semi joins: update and complete mode not allowed

Review comment:
       I see the code around left outer join/right outer join/left semi join 
are very similar which is good to deduplicate, but let's consider it optional 
in this PR.

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
##########
@@ -330,11 +338,17 @@ case class StreamingSymmetricHashJoinExec(
           }
         }.map(pair => joinedRow.withLeft(nullLeft).withRight(pair.value))
 
-        innerOutputIter ++ outerOutputIter
+        hashJoinOutputIter ++ outerOutputIter
+      case LeftSemi =>

Review comment:
       This could be consolidated with Inner.

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
##########
@@ -347,24 +361,29 @@ case class StreamingSymmetricHashJoinExec(
 
       // Processing time between inner output completion and here comes from 
the outer portion of a
       // join, and thus counts as removal time as we remove old state from one 
side while iterating.
-      if (innerOutputCompletionTimeNs != 0) {
+      if (hashJoinOutputCompletionTimeNs != 0) {
         allRemovalsTimeMs +=
-          math.max(NANOSECONDS.toMillis(System.nanoTime - 
innerOutputCompletionTimeNs), 0)
+          math.max(NANOSECONDS.toMillis(System.nanoTime - 
hashJoinOutputCompletionTimeNs), 0)
       }
 
       allRemovalsTimeMs += timeTakenMs {
         // Remove any remaining state rows which aren't needed because they're 
below the watermark.
         //
         // For inner joins, we have to remove unnecessary state rows from both 
sides if possible.
+        //
         // For outer joins, we have already removed unnecessary state rows 
from the outer side
         // (e.g., left side for left outer join) while generating the outer 
"null" outputs. Now, we
         // have to remove unnecessary state rows from the other side (e.g., 
right side for the left
         // outer join) if possible. In all cases, nothing needs to be 
outputted, hence the removal
         // needs to be done greedily by immediately consuming the returned 
iterator.
+        //
+        // For left semi joins, we have to remove unnecessary state rows from 
both sides if
+        // possible.
         val cleanupIter = joinType match {
           case Inner => leftSideJoiner.removeOldState() ++ 
rightSideJoiner.removeOldState()
           case LeftOuter => rightSideJoiner.removeOldState()
           case RightOuter => leftSideJoiner.removeOldState()
+          case LeftSemi => leftSideJoiner.removeOldState() ++ 
rightSideJoiner.removeOldState()

Review comment:
       Both case statement and code comment could be consolidated with `Inner`.

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala
##########
@@ -99,15 +99,22 @@ class SymmetricHashJoinStateManager(
   /**
    * Get all the matched values for given join condition, with marking matched.
    * This method is designed to mark joined rows properly without exposing 
internal index of row.
+   *
+   * @param joinOnlyFirstTimeMatchedRow Only join with first-time matched row.
+   *                                    This is used for right side of left 
semi join in
+   *                                    [[StreamingSymmetricHashJoinExec]] 
only.
    */
   def getJoinedRows(
       key: UnsafeRow,
       generateJoinedRow: InternalRow => JoinedRow,
-      predicate: JoinedRow => Boolean): Iterator[JoinedRow] = {
+      predicate: JoinedRow => Boolean,
+      joinOnlyFirstTimeMatchedRow: Boolean = false): Iterator[JoinedRow] = {
     val numValues = keyToNumValues.get(key)
     keyWithIndexToValue.getAll(key, numValues).map { keyIdxToValue =>
       val joinedRow = generateJoinedRow(keyIdxToValue.value)
-      if (predicate(joinedRow)) {
+      if (joinOnlyFirstTimeMatchedRow && keyIdxToValue.matched) {

Review comment:
       If I'm not missing something, this can be applied as a filter against 
`keyWithIndexToValue.getAll(key, numValues)` to avoid calling unnecessary 
`generateJoinedRow`, like
   
   ```
   keyWithIndexToValue.getAll(key, numValues).filterNot { keyIdxToValue =>
     joinOnlyFirstTimeMatchedRow && keyIdxToValue.matched
   }.map { keyIdxToValue =>
     val joinedRow = generateJoinedRow(keyIdxToValue.value)
     if (predicate(joinedRow)) {
       if (!keyIdxToValue.matched) {
         keyWithIndexToValue.put(key, keyIdxToValue.valueIndex, 
keyIdxToValue.value,
           matched = true)
       }
       joinedRow
     } else {
       null
     }
   }.filter(_ != null)
   ```
   
   I'm OK to have the check in map if someone concerns about having another 
filters - the point is, it's unnecessary to call generateJoinedRow if the 
condition is fulfilled.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to