I;m getting an interesting null pointer exception when trying to add any
value in a custom accumulator.


*code*:

object StagingFacade {

    //Accumulator declared
    var  filesAccumulator : CollectionAccumulator[(String, String, String)]
= _

    def apply(appArgs: Array[String], spark: SparkSession): Unit = {
              //accumulator is set
               filesAccumulator =
spark.sparkContext.collectionAccumulator("filesAccumulator")


streamDF
            .writeStream
            .foreachBatch({
filesAccumulator.reset()

batchDf
.map(
x => my_UDF_function(
filesAccumulator.add(some_value)
)
)
.show()
})
..........


Why does it throw a nullpointer exception at
* filesAccumulator.add?*
I tried changing the *object to **case** object* as well, but to no avail.

Please help!

Thanks,
Abhimanyu

Reply via email to