Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/19327#discussion_r142338734 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala --- @@ -470,3 +475,283 @@ class StreamingJoinSuite extends StreamTest with StateStoreMetricsTest with Befo } } } + +class StreamingOuterJoinSuite extends StreamTest with StateStoreMetricsTest with BeforeAndAfter { + + import testImplicits._ + import org.apache.spark.sql.functions._ + + before { + SparkSession.setActiveSession(spark) // set this before force initializing 'joinExec' + spark.streams.stateStoreCoordinator // initialize the lazy coordinator + } + + after { + StateStore.stop() + } + + private def setupStream(prefix: String, multiplier: Int): (MemoryStream[Int], DataFrame) = { + val input = MemoryStream[Int] + + val df = input.toDF + .select( + 'value as "key", + 'value.cast("timestamp") as s"${prefix}Time", + ('value * multiplier) as s"${prefix}Value") + .withWatermark(s"${prefix}Time", "10 seconds") + + return (input, df) + } + + private def setupWindowedJoin(joinType: String) = { + val (input1, df1) = setupStream("left", 2) + val (input2, df2) = setupStream("right", 3) + + val windowed1 = df1.select('key, window('leftTime, "10 second"), 'leftValue) + + val windowed2 = df2.select('key, window('rightTime, "10 second"), 'rightValue) + + val joined = windowed1.join(windowed2, Seq("key", "window"), joinType) + .select('key, $"window.end".cast("long"), 'leftValue, 'rightValue) + + (input1, input2, joined) + } + + test("left stream batch outer join") { + val stream = MemoryStream[Int] + .toDF() + .withColumn("timestamp", 'value.cast("timestamp")) + .withWatermark("timestamp", "1 second") + val joined = + stream.join(Seq(1).toDF(), Seq("value"), "left_outer") + + // This test is in the suite just to confirm the validations below don't block this valid join. + // We don't need to check results, just that the join can happen. + testStream(joined)() + } + + test("left batch stream outer join") { + val stream = MemoryStream[Int] + .toDF() + .withColumn("timestamp", 'value.cast("timestamp")) + .withWatermark("timestamp", "1 second") + val joined = + Seq(1).toDF().join(stream, Seq("value"), "left_outer") + + val thrown = intercept[AnalysisException] { + testStream(joined)() + } + + assert(thrown.getMessage.contains( + "Left outer join with a streaming DataFrame/Dataset on the right and a static")) + } + + test("right stream batch outer join") { --- End diff -- where is "right batch stream outer join"?
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org