Mark Lidenberg created FLINK-34631: -------------------------------------- Summary: Memory leak in pyflink when using state with RocksDB Key: FLINK-34631 URL: https://issues.apache.org/jira/browse/FLINK-34631 Project: Flink Issue Type: Bug Components: API / Python Affects Versions: 1.18.1 Reporter: Mark Lidenberg
I have had issues with memory constantly growing on pyflink task managers, which should not really happen when we use RocksDB as our state backend. I've made a simple example to demonstrate the memory leak. In this example I update state with 1mb value for each key and then sleep for 1 second. Memory growth 1mb per second, as if the state value stays in memory. Same thing happens if I send 100 messages per second with 10kb each. Memory keeps growing indefinitely. I've also tested `MapState`, it's the same. ```python import time import psutil from pyflink.common import Types from pyflink.datastream import ( EmbeddedRocksDBStateBackend, KeyedProcessFunction, RuntimeContext, StreamExecutionEnvironment, ) from pyflink.datastream.state import ValueStateDescriptor class Processor(KeyedProcessFunction): def open(self, runtime_context: RuntimeContext): self.state = runtime_context.get_state( ValueStateDescriptor( name="my_state", value_type_info=Types.STRING(), ) ) def process_element(self, value: int, ctx: KeyedProcessFunction.Context): print("Processing", value, "Memory: ", round(psutil.Process().memory_info().rss / 1024 / 1024, 2), "MB") # Processing 1 Memory: 171.25 MB -> Processing 2 Memory: 172.12 MB -> ... grows 1mb per second, which should not happen because we use RocksDB as state backend self.state.update("a" * 1_000_000) # 1 mb of data per second time.sleep(1.0) if __name__ == "__main__": # - Create flink environment environment = StreamExecutionEnvironment.get_execution_environment().set_parallelism(1) # - Make sure to use RocksDB as state backend environment.set_state_backend(EmbeddedRocksDBStateBackend()) # - Create pipeline ( environment.from_collection( collection=list(range(3600 * 12)), ) .key_by(lambda value: value) .process(Processor()) ) # - Execute pipeline environment.execute(job_name="memory_leak_test") ``` -- This message was sent by Atlassian Jira (v8.20.10#820010)