I have a unitTest in SparkStreaming which has an input parameters.
-DStream[String]

Inside of the code I want to update an LongAccumulator. When I execute the
test I get an NullPointerException because the accumulator doesn't exist.
Is there any way to test this?

My accumulator is updated in different methods.

    def execute(stream: DStream[String]): Unit = {
        stream.foreachRDD { rdd =>
          rdd.foreach { r =>
            if (r == "A"){
              acc.add(1)
              sendKafka(...)
            }`enter code here`
        }
    }

It's possible to test this kind of method?

    runAction[String](input, service.execute)

When it try to update the accumulator it doesn't work because it doesn't
have inited. I could add a new parameter to the execute method, and it's
okay, but runAction doesn't admint more parameters either.

Reply via email to