I have a unitTest in SparkStreaming which has an input parameters.
-DStream[String]
Inside of the code I want to update an LongAccumulator. When I execute the
test I get an NullPointerException because the accumulator doesn't exist.
Is there any way to test this?
My accumulator is updated in different methods.
def execute(stream: DStream[String]): Unit = {
stream.foreachRDD { rdd =>
rdd.foreach { r =>
if (r == "A"){
acc.add(1)
sendKafka(...)
}`enter code here`
}
}
It's possible to test this kind of method?
runAction[String](input, service.execute)
When it try to update the accumulator it doesn't work because it doesn't
have inited. I could add a new parameter to the execute method, and it's
okay, but runAction doesn't admint more parameters either.