rangadi commented on code in PR #44323: URL: https://github.com/apache/spark/pull/44323#discussion_r1513647366
########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinHelper.scala: ########## @@ -198,11 +198,15 @@ object StreamingSymmetricHashJoinHelper extends Logging { val joinKeyOrdinalForWatermark: Option[Int] = findJoinKeyOrdinalForWatermark( leftKeys, rightKeys) + // Returns a predicate that drops data less than the state watermark. + // oneSideInputAttributes are the attributes to base the state watermark off of, while + // otherSideInputAttributes are the attributes on which the watermark is defined. def getOneSideStateWatermarkPredicate( oneSideInputAttributes: Seq[Attribute], oneSideJoinKeys: Seq[Expression], otherSideInputAttributes: Seq[Attribute]): Option[JoinStateWatermarkPredicate] = { - val isWatermarkDefinedOnInput = oneSideInputAttributes.exists(_.metadata.contains(delayKey)) + val watermarkAttribute = otherSideInputAttributes.find(_.metadata.contains(delayKey)) Review Comment: Is this used val used outside next line? ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinHelper.scala: ########## @@ -218,11 +222,27 @@ object StreamingSymmetricHashJoinHelper extends Logging { attributesToFindStateWatermarkFor = AttributeSet(oneSideInputAttributes), attributesWithEventWatermark = AttributeSet(otherSideInputAttributes), condition, - eventTimeWatermarkForEviction) - val inputAttributeWithWatermark = oneSideInputAttributes.find(_.metadata.contains(delayKey)) - val expr = watermarkExpression(inputAttributeWithWatermark, stateValueWatermark) - expr.map(JoinStateValueWatermarkPredicate.apply _) + eventTimeWatermarkForEviction + ) Review Comment: spurious change? ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinHelper.scala: ########## @@ -218,11 +222,27 @@ object StreamingSymmetricHashJoinHelper extends Logging { attributesToFindStateWatermarkFor = AttributeSet(oneSideInputAttributes), attributesWithEventWatermark = AttributeSet(otherSideInputAttributes), condition, - eventTimeWatermarkForEviction) - val inputAttributeWithWatermark = oneSideInputAttributes.find(_.metadata.contains(delayKey)) - val expr = watermarkExpression(inputAttributeWithWatermark, stateValueWatermark) - expr.map(JoinStateValueWatermarkPredicate.apply _) + eventTimeWatermarkForEviction + ) + + val attributesInCondition = AttributeSet( + condition.get.collect { case a: AttributeReference => a } Review Comment: add a comment. What are we doing here? I don't know why it is the right thing to do. ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinHelper.scala: ########## @@ -198,31 +198,50 @@ object StreamingSymmetricHashJoinHelper extends Logging { val joinKeyOrdinalForWatermark: Option[Int] = findJoinKeyOrdinalForWatermark( leftKeys, rightKeys) + // Returns a predicate that drops data less than the state watermark. def getOneSideStateWatermarkPredicate( - oneSideInputAttributes: Seq[Attribute], - oneSideJoinKeys: Seq[Expression], - otherSideInputAttributes: Seq[Attribute]): Option[JoinStateWatermarkPredicate] = { - val isWatermarkDefinedOnInput = oneSideInputAttributes.exists(_.metadata.contains(delayKey)) + stateRemovalSideAttributes: Seq[Attribute], Review Comment: It is clear right? we need to use the other side watermark to make a decision on this side's state clean up. Is my understanding correct? ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinHelper.scala: ########## @@ -218,11 +222,27 @@ object StreamingSymmetricHashJoinHelper extends Logging { attributesToFindStateWatermarkFor = AttributeSet(oneSideInputAttributes), attributesWithEventWatermark = AttributeSet(otherSideInputAttributes), condition, - eventTimeWatermarkForEviction) - val inputAttributeWithWatermark = oneSideInputAttributes.find(_.metadata.contains(delayKey)) - val expr = watermarkExpression(inputAttributeWithWatermark, stateValueWatermark) - expr.map(JoinStateValueWatermarkPredicate.apply _) + eventTimeWatermarkForEviction + ) + + val attributesInCondition = AttributeSet( + condition.get.collect { case a: AttributeReference => a } Review Comment: Is every `Attribute` an `AttributeReference`? (I don't know much about this class hierarchy) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org