Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/19327#discussion_r141986696 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala --- @@ -413,36 +414,103 @@ class UnsupportedOperationsSuite extends SparkFunSuite { batchStreamSupported = false, streamBatchSupported = false) - // Left outer joins: *-stream not allowed + // Left outer joins: *-stream not allowed with default condition testBinaryOperationInStreamingPlan( "left outer join", _.join(_, joinType = LeftOuter), - streamStreamSupported = false, batchStreamSupported = false, - expectedMsg = "left outer/semi/anti joins") + streamStreamSupported = false, + expectedMsg = "outer join") + + // Left outer joins: stream-stream allowed with join on watermark attribute + assertSupportedInStreamingPlan( + s"left outer join with stream-stream relations and join on watermark attribute key", + streamRelation.join(streamRelation, joinType = LeftOuter, + condition = Some(attributeWithWatermark === attributeWithWatermark)), + OutputMode.Append()) + + // Left outer joins: stream-stream allowed with range condition yielding state value watermark + assertSupportedInStreamingPlan( + s"left outer join with stream-stream relations and state value watermark", { + val firstRelationWithWatermark = new TestStreamingRelation(attributeWithWatermark) + val secondAttribute = AttributeReference("b", IntegerType)().withMetadata(watermarkMetadata) + firstRelationWithWatermark.join( + new TestStreamingRelation(secondAttribute), + joinType = LeftOuter, + condition = Some(secondAttribute < attributeWithWatermark + 10 && + secondAttribute > attributeWithWatermark - 10)) + }, + OutputMode.Append()) + + // Left outer joins: stream-stream not allowed with insufficient range condition + assertNotSupportedInStreamingPlan( + s"left outer join with stream-stream relations and state value watermark", { + val firstRelationWithWatermark = new TestStreamingRelation(attributeWithWatermark) + val secondAttribute = AttributeReference("b", IntegerType)().withMetadata(watermarkMetadata) + firstRelationWithWatermark.join( + new TestStreamingRelation(secondAttribute), + joinType = LeftOuter, + condition = Some(secondAttribute > attributeWithWatermark - 10)) + }, + OutputMode.Append(), + Seq("appropriate range condition")) // Left semi joins: stream-* not allowed testBinaryOperationInStreamingPlan( "left semi join", _.join(_, joinType = LeftSemi), streamStreamSupported = false, batchStreamSupported = false, - expectedMsg = "left outer/semi/anti joins") + expectedMsg = "left semi/anti joins") // Left anti joins: stream-* not allowed testBinaryOperationInStreamingPlan( "left anti join", _.join(_, joinType = LeftAnti), streamStreamSupported = false, batchStreamSupported = false, - expectedMsg = "left outer/semi/anti joins") + expectedMsg = "left semi/anti joins") - // Right outer joins: stream-* not allowed + // Right outer joins: stream-* not allowed with default condition testBinaryOperationInStreamingPlan( "right outer join", _.join(_, joinType = RightOuter), + streamBatchSupported = false, streamStreamSupported = false, - streamBatchSupported = false) + expectedMsg = "outer join") + + // Right outer joins: stream-stream allowed with join on watermark attribute + assertSupportedInStreamingPlan( + s"right outer join with stream-stream relations and join on watermark attribute key", + streamRelation.join(streamRelation, joinType = RightOuter, + condition = Some(attributeWithWatermark === attributeWithWatermark)), + OutputMode.Append()) + + // Right outer joins: stream-stream allowed with range condition yielding state value watermark + assertSupportedInStreamingPlan( + s"right outer join with stream-stream relations and state value watermark", { + val firstRelationWithWatermark = new TestStreamingRelation(attributeWithWatermark) + val secondAttribute = AttributeReference("b", IntegerType)().withMetadata(watermarkMetadata) + firstRelationWithWatermark.join( + new TestStreamingRelation(secondAttribute), + joinType = RightOuter, + condition = Some(secondAttribute < attributeWithWatermark + 10 && --- End diff -- similar comment as i gave for left outer join.
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org