Hi All, I'm working on simple structured streaming query that uses flatMapGroupsWithState to maintain relatively a large size state.
After running the application for few minutes on my local machine, it starts to slow down and then crashes with OutOfMemoryError. Tracking the code led me to HDFSBackedStateStoreProvider. It seems the provider loads all the data for all of its StateStores with versions in a ConcurrentHashMap (loadedMaps) and never free that up unless the provider itself is closed. While I can see the performance advantage when having small size of state, for other use cases this approach seems not to be scalable. It might make sense to load in memory the StateStores that are needed for the active tasks and unload them when task is done. This way user can divide state in larger number of partitions to make it fit in memory. I was able to get the application to work without memory problems by adding code in HDFSBackedStateStore.commit() that clears loadedMaps and its content, but I'm pretty sure this will introduce bugs related to concurrency. Not sure if I'm missing something or there is a way to configure the behavior of HDFSBackedStateStoreProvider memory allocation. Thanks, Ahmed