Mark Lidenberg created FLINK-34631:
--------------------------------------
Summary: Memory leak in pyflink when using state with RocksDB
Key: FLINK-34631
URL: https://issues.apache.org/jira/browse/FLINK-34631
Project: Flink
Issue Type: Bug
Components: API / Python
Affects Versions: 1.18.1
Reporter: Mark Lidenberg
I have had issues with memory constantly growing on pyflink task managers,
which should not really happen when we use RocksDB as our state backend.
I've made a simple example to demonstrate the memory leak. In this example I
update state with 1mb value for each key and then sleep for 1 second. Memory
growth 1mb per second, as if the state value stays in memory. Same thing
happens if I send 100 messages per second with 10kb each. Memory keeps growing
indefinitely. I've also tested `MapState`, it's the same.
```python
import time
import psutil
from pyflink.common import Types
from pyflink.datastream import (
EmbeddedRocksDBStateBackend,
KeyedProcessFunction,
RuntimeContext,
StreamExecutionEnvironment,
)
from pyflink.datastream.state import ValueStateDescriptor
class Processor(KeyedProcessFunction):
def open(self, runtime_context: RuntimeContext):
self.state = runtime_context.get_state(
ValueStateDescriptor(
name="my_state",
value_type_info=Types.STRING(),
)
)
def process_element(self, value: int, ctx: KeyedProcessFunction.Context):
print("Processing", value, "Memory: ",
round(psutil.Process().memory_info().rss / 1024 / 1024, 2), "MB")
# Processing 1 Memory: 171.25 MB -> Processing 2 Memory: 172.12 MB ->
... grows 1mb per second, which should not happen because we use RocksDB as
state backend
self.state.update("a" * 1_000_000) # 1 mb of data per second
time.sleep(1.0)
if __name__ == "__main__":
# - Create flink environment
environment =
StreamExecutionEnvironment.get_execution_environment().set_parallelism(1)
# - Make sure to use RocksDB as state backend
environment.set_state_backend(EmbeddedRocksDBStateBackend())
# - Create pipeline
(
environment.from_collection(
collection=list(range(3600 * 12)),
)
.key_by(lambda value: value)
.process(Processor())
)
# - Execute pipeline
environment.execute(job_name="memory_leak_test")
```
--
This message was sent by Atlassian Jira
(v8.20.10#820010)