Hi All,

I'm working on simple structured streaming query that
uses flatMapGroupsWithState to maintain relatively a large size state.

After running the application for few minutes on my local machine, it
starts to slow down and then crashes with OutOfMemoryError.

Tracking the code led me to HDFSBackedStateStoreProvider. It seems the
provider loads all the data for all of its StateStores with versions in a
ConcurrentHashMap (loadedMaps) and never free that up unless the provider
itself is closed.

While I can see the performance advantage when having small size of state,
for other use cases this approach seems not to be scalable. It might make
sense to load in memory the StateStores that are needed for the active
tasks and unload them when task is done. This way user can divide state in
larger number of partitions to make it fit in memory.

I was able to get the application to work without memory problems by adding
code in HDFSBackedStateStore.commit() that clears loadedMaps and its
content, but I'm pretty sure this will introduce bugs related to
concurrency.

Not sure if I'm missing something or there is a way to configure the
behavior of HDFSBackedStateStoreProvider memory allocation.

Thanks,
Ahmed

Reply via email to