Hi, I extended org.apache.spark.streaming.TestSuiteBase for some testing, and I was able to run this test fine:
test("Sliding window join with 3 second window duration") { val input1 = Seq( Seq("req1"), Seq("req2", "req3"), Seq(), Seq("req4", "req5", "req6"), Seq("req7"), Seq(), Seq() ) val input2 = Seq( Seq(("tx1", "req1")), Seq(), Seq(("tx2", "req3")), Seq(("tx3", "req2")), Seq(), Seq(("tx4", "req7")), Seq(("tx5", "req5"), ("tx6", "req4")) ) val expectedOutput = Seq( Seq(("req1", (1, "tx1"))), Seq(), Seq(("req3", (1, "tx2"))), Seq(("req2", (1, "tx3"))), Seq(), Seq(("req7", (1, "tx4"))), Seq() ) val operation = (rq: DStream[String], tx: DStream[(String,String)]) => { rq.window(Seconds(3), Seconds(1)).map(x => (x, 1)).join(tx.map{ case (k, v) => (v, k)}) } testOperation(input1, input2, operation, expectedOutput, useSet=true) } However, this seemingly OK looking test fails with operation timeout: test("Sliding window join with 3 second window duration + a tumbling window") { val input1 = Seq( Seq("req1"), Seq("req2", "req3"), Seq(), Seq("req4", "req5", "req6"), Seq("req7"), Seq() ) val input2 = Seq( Seq(("tx1", "req1")), Seq(), Seq(("tx2", "req3")), Seq(("tx3", "req2")), Seq(), Seq(("tx4", "req7")) ) val expectedOutput = Seq( Seq(("req1", (1, "tx1"))), Seq(("req2", (1, "tx3")), ("req3", (1, "tx3"))), Seq(("req7", (1, "tx4"))) ) val operation = (rq: DStream[String], tx: DStream[(String,String)]) => { rq.window(Seconds(3), Seconds(2)).map(x => (x, 1)).join(tx.window(Seconds(2), Seconds(2)).map{ case (k, v) => (v, k)}) } testOperation(input1, input2, operation, expectedOutput, useSet=true) } Stacktrace: 10033 was not less than 10000 Operation timed out after 10033 ms org.scalatest.exceptions.TestFailedException: 10033 was not less than 10000 Operation timed out after 10033 ms at org.scalatest.Assertions$class.newAssertionFailedException(Assertions.scala:500) at org.scalatest.FunSuite.newAssertionFailedException(FunSuite.scala:1555) at org.scalatest.Assertions$AssertionsHelper.macroAssert(Assertions.scala:466) at org.apache.spark.streaming.TestSuiteBase$class.runStreamsWithPartitions(TestSuiteBase.scala:338) Does anybody know why this could be? ᐧ