I tried to create a data source, however our use case is bit hard as
we do only know the available offsets within the tasks, not on the
driver. I therefore planned to use accumulators in the
InputPartitionReader but they seem not to work.

Example accumulation is done here
https://github.com/kortemik/spark-source/blob/master/src/main/java/com/teragrep/pth06/ArchiveMicroBatchInputPartitionReader.java#L118

I get on the task logs that the System.out.println() are called, so it
can not be that the flow itself is broken, but the accumulators seem
to work only while on the driver as on the logs at the
https://github.com/kortemik/spark-source/tree/master

Is it intentional that the accumulators do not work within the data source?

One might ask why all this so I give brief explanation. We use gzipped
files as the storage blobs and it's unknown prior to execution how
many records they contain. Of course this can be mitigated by
decompressing the files on the driver and then sending the offsets
through to executors but it's a double effort. The aim however was to
decompress them only once by doing a forward-lookup into the data and
use accumulator to inform the driver that there is stuff available for
the next batch as well or that the file is done and driver needs to
pull the next one to keep executors busy.

Any advices are welcome.

Kind regards,
-Mikko Kortelainen

---------------------------------------------------------------------
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org

Reply via email to