Hi,

I extended org.apache.spark.streaming.TestSuiteBase for some testing, and I
was able to run this test fine:

test("Sliding window join with 3 second window duration") {

  val input1 =
    Seq(
      Seq("req1"),
      Seq("req2", "req3"),
      Seq(),
      Seq("req4", "req5", "req6"),
      Seq("req7"),
      Seq(),
      Seq()
    )

  val input2 =
    Seq(
      Seq(("tx1", "req1")),
      Seq(),
      Seq(("tx2", "req3")),
      Seq(("tx3", "req2")),
      Seq(),
      Seq(("tx4", "req7")),
      Seq(("tx5", "req5"), ("tx6", "req4"))
    )

  val expectedOutput = Seq(
    Seq(("req1", (1, "tx1"))),
    Seq(),
    Seq(("req3", (1, "tx2"))),
    Seq(("req2", (1, "tx3"))),
    Seq(),
    Seq(("req7", (1, "tx4"))),
    Seq()
  )
  val operation = (rq: DStream[String], tx: DStream[(String,String)]) => {
    rq.window(Seconds(3), Seconds(1)).map(x => (x, 1)).join(tx.map{ case
(k, v) => (v, k)})
  }
  testOperation(input1, input2, operation, expectedOutput, useSet=true)
}

However, this seemingly OK looking test fails with operation timeout:

test("Sliding window join with 3 second window duration + a tumbling
window") {

  val input1 =
    Seq(
      Seq("req1"),
      Seq("req2", "req3"),
      Seq(),
      Seq("req4", "req5", "req6"),
      Seq("req7"),
      Seq()
    )

  val input2 =
    Seq(
      Seq(("tx1", "req1")),
      Seq(),
      Seq(("tx2", "req3")),
      Seq(("tx3", "req2")),
      Seq(),
      Seq(("tx4", "req7"))
    )

  val expectedOutput = Seq(
    Seq(("req1", (1, "tx1"))),
    Seq(("req2", (1, "tx3")), ("req3", (1, "tx3"))),
    Seq(("req7", (1, "tx4")))
  )
  val operation = (rq: DStream[String], tx: DStream[(String,String)]) => {
    rq.window(Seconds(3), Seconds(2)).map(x => (x,
1)).join(tx.window(Seconds(2), Seconds(2)).map{ case (k, v) => (v, k)})
  }
  testOperation(input1, input2, operation, expectedOutput, useSet=true)
}

Stacktrace:
10033 was not less than 10000 Operation timed out after 10033 ms
org.scalatest.exceptions.TestFailedException: 10033 was not less than 10000
Operation timed out after 10033 ms
at
org.scalatest.Assertions$class.newAssertionFailedException(Assertions.scala:500)
at org.scalatest.FunSuite.newAssertionFailedException(FunSuite.scala:1555)
at
org.scalatest.Assertions$AssertionsHelper.macroAssert(Assertions.scala:466)
at
org.apache.spark.streaming.TestSuiteBase$class.runStreamsWithPartitions(TestSuiteBase.scala:338)

Does anybody know why this could be?
ᐧ

Reply via email to