Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19327#discussion_r141986696
  
    --- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala
 ---
    @@ -413,36 +414,103 @@ class UnsupportedOperationsSuite extends 
SparkFunSuite {
         batchStreamSupported = false,
         streamBatchSupported = false)
     
    -  // Left outer joins: *-stream not allowed
    +  // Left outer joins: *-stream not allowed with default condition
       testBinaryOperationInStreamingPlan(
         "left outer join",
         _.join(_, joinType = LeftOuter),
    -    streamStreamSupported = false,
         batchStreamSupported = false,
    -    expectedMsg = "left outer/semi/anti joins")
    +    streamStreamSupported = false,
    +    expectedMsg = "outer join")
    +
    +  // Left outer joins: stream-stream allowed with join on watermark 
attribute
    +  assertSupportedInStreamingPlan(
    +    s"left outer join with stream-stream relations and join on watermark 
attribute key",
    +    streamRelation.join(streamRelation, joinType = LeftOuter,
    +      condition = Some(attributeWithWatermark === attributeWithWatermark)),
    +    OutputMode.Append())
    +
    +  // Left outer joins: stream-stream allowed with range condition yielding 
state value watermark
    +  assertSupportedInStreamingPlan(
    +    s"left outer join with stream-stream relations and state value 
watermark", {
    +      val firstRelationWithWatermark = new 
TestStreamingRelation(attributeWithWatermark)
    +      val secondAttribute = AttributeReference("b", 
IntegerType)().withMetadata(watermarkMetadata)
    +      firstRelationWithWatermark.join(
    +        new TestStreamingRelation(secondAttribute),
    +        joinType = LeftOuter,
    +        condition = Some(secondAttribute < attributeWithWatermark + 10 &&
    +          secondAttribute > attributeWithWatermark - 10))
    +    },
    +    OutputMode.Append())
    +
    +  // Left outer joins: stream-stream not allowed with insufficient range 
condition
    +  assertNotSupportedInStreamingPlan(
    +    s"left outer join with stream-stream relations and state value 
watermark", {
    +      val firstRelationWithWatermark = new 
TestStreamingRelation(attributeWithWatermark)
    +      val secondAttribute = AttributeReference("b", 
IntegerType)().withMetadata(watermarkMetadata)
    +      firstRelationWithWatermark.join(
    +        new TestStreamingRelation(secondAttribute),
    +        joinType = LeftOuter,
    +        condition = Some(secondAttribute > attributeWithWatermark - 10))
    +    },
    +    OutputMode.Append(),
    +    Seq("appropriate range condition"))
     
       // Left semi joins: stream-* not allowed
       testBinaryOperationInStreamingPlan(
         "left semi join",
         _.join(_, joinType = LeftSemi),
         streamStreamSupported = false,
         batchStreamSupported = false,
    -    expectedMsg = "left outer/semi/anti joins")
    +    expectedMsg = "left semi/anti joins")
     
       // Left anti joins: stream-* not allowed
       testBinaryOperationInStreamingPlan(
         "left anti join",
         _.join(_, joinType = LeftAnti),
         streamStreamSupported = false,
         batchStreamSupported = false,
    -    expectedMsg = "left outer/semi/anti joins")
    +    expectedMsg = "left semi/anti joins")
     
    -  // Right outer joins: stream-* not allowed
    +  // Right outer joins: stream-* not allowed with default condition
       testBinaryOperationInStreamingPlan(
         "right outer join",
         _.join(_, joinType = RightOuter),
    +    streamBatchSupported = false,
         streamStreamSupported = false,
    -    streamBatchSupported = false)
    +    expectedMsg = "outer join")
    +
    +  // Right outer joins: stream-stream allowed with join on watermark 
attribute
    +  assertSupportedInStreamingPlan(
    +    s"right outer join with stream-stream relations and join on watermark 
attribute key",
    +    streamRelation.join(streamRelation, joinType = RightOuter,
    +      condition = Some(attributeWithWatermark === attributeWithWatermark)),
    +    OutputMode.Append())
    +
    +  // Right outer joins: stream-stream allowed with range condition 
yielding state value watermark
    +  assertSupportedInStreamingPlan(
    +    s"right outer join with stream-stream relations and state value 
watermark", {
    +      val firstRelationWithWatermark = new 
TestStreamingRelation(attributeWithWatermark)
    +      val secondAttribute = AttributeReference("b", 
IntegerType)().withMetadata(watermarkMetadata)
    +      firstRelationWithWatermark.join(
    +        new TestStreamingRelation(secondAttribute),
    +        joinType = RightOuter,
    +        condition = Some(secondAttribute < attributeWithWatermark + 10 &&
    --- End diff --
    
    similar comment as i gave for left outer join.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to