rangadi commented on code in PR #44323:
URL: https://github.com/apache/spark/pull/44323#discussion_r1513647366


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinHelper.scala:
##########
@@ -198,11 +198,15 @@ object StreamingSymmetricHashJoinHelper extends Logging {
     val joinKeyOrdinalForWatermark: Option[Int] = 
findJoinKeyOrdinalForWatermark(
       leftKeys, rightKeys)
 
+    // Returns a predicate that drops data less than the state watermark.
+    // oneSideInputAttributes are the attributes to base the state watermark 
off of, while
+    // otherSideInputAttributes are the attributes on which the watermark is 
defined.
     def getOneSideStateWatermarkPredicate(
         oneSideInputAttributes: Seq[Attribute],
         oneSideJoinKeys: Seq[Expression],
         otherSideInputAttributes: Seq[Attribute]): 
Option[JoinStateWatermarkPredicate] = {
-      val isWatermarkDefinedOnInput = 
oneSideInputAttributes.exists(_.metadata.contains(delayKey))
+      val watermarkAttribute = 
otherSideInputAttributes.find(_.metadata.contains(delayKey))

Review Comment:
   Is this used val used outside next line?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinHelper.scala:
##########
@@ -218,11 +222,27 @@ object StreamingSymmetricHashJoinHelper extends Logging {
           attributesToFindStateWatermarkFor = 
AttributeSet(oneSideInputAttributes),
           attributesWithEventWatermark = 
AttributeSet(otherSideInputAttributes),
           condition,
-          eventTimeWatermarkForEviction)
-        val inputAttributeWithWatermark = 
oneSideInputAttributes.find(_.metadata.contains(delayKey))
-        val expr = watermarkExpression(inputAttributeWithWatermark, 
stateValueWatermark)
-        expr.map(JoinStateValueWatermarkPredicate.apply _)
+          eventTimeWatermarkForEviction
+        )

Review Comment:
   spurious change?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinHelper.scala:
##########
@@ -218,11 +222,27 @@ object StreamingSymmetricHashJoinHelper extends Logging {
           attributesToFindStateWatermarkFor = 
AttributeSet(oneSideInputAttributes),
           attributesWithEventWatermark = 
AttributeSet(otherSideInputAttributes),
           condition,
-          eventTimeWatermarkForEviction)
-        val inputAttributeWithWatermark = 
oneSideInputAttributes.find(_.metadata.contains(delayKey))
-        val expr = watermarkExpression(inputAttributeWithWatermark, 
stateValueWatermark)
-        expr.map(JoinStateValueWatermarkPredicate.apply _)
+          eventTimeWatermarkForEviction
+        )
+
+        val attributesInCondition = AttributeSet(
+          condition.get.collect { case a: AttributeReference => a }

Review Comment:
   add a comment. What are we doing here? I don't know why it is the right 
thing to do.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinHelper.scala:
##########
@@ -198,31 +198,50 @@ object StreamingSymmetricHashJoinHelper extends Logging {
     val joinKeyOrdinalForWatermark: Option[Int] = 
findJoinKeyOrdinalForWatermark(
       leftKeys, rightKeys)
 
+    // Returns a predicate that drops data less than the state watermark.
     def getOneSideStateWatermarkPredicate(
-        oneSideInputAttributes: Seq[Attribute],
-        oneSideJoinKeys: Seq[Expression],
-        otherSideInputAttributes: Seq[Attribute]): 
Option[JoinStateWatermarkPredicate] = {
-      val isWatermarkDefinedOnInput = 
oneSideInputAttributes.exists(_.metadata.contains(delayKey))
+        stateRemovalSideAttributes: Seq[Attribute],

Review Comment:
   It is clear right? we need to use the other side watermark to make a 
decision on this side's state clean up. Is my understanding correct?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinHelper.scala:
##########
@@ -218,11 +222,27 @@ object StreamingSymmetricHashJoinHelper extends Logging {
           attributesToFindStateWatermarkFor = 
AttributeSet(oneSideInputAttributes),
           attributesWithEventWatermark = 
AttributeSet(otherSideInputAttributes),
           condition,
-          eventTimeWatermarkForEviction)
-        val inputAttributeWithWatermark = 
oneSideInputAttributes.find(_.metadata.contains(delayKey))
-        val expr = watermarkExpression(inputAttributeWithWatermark, 
stateValueWatermark)
-        expr.map(JoinStateValueWatermarkPredicate.apply _)
+          eventTimeWatermarkForEviction
+        )
+
+        val attributesInCondition = AttributeSet(
+          condition.get.collect { case a: AttributeReference => a }

Review Comment:
   Is every `Attribute` an `AttributeReference`? (I don't know much about this 
class hierarchy)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to