[ https://issues.apache.org/jira/browse/FLINK-17923?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17119211#comment-17119211 ]
Xintong Song edited comment on FLINK-17923 at 5/29/20, 2:33 AM: ---------------------------------------------------------------- [~sewen], -I agree with you that python processes use managed memory by default should be the right approach for long term.- -I think ideally we want RocksDB and Python processes share the managed memory, wrt the planned fractions. This is the option #3 that we discussed. For this approach, the changes required are mainly on the runtime & rocksdb side, i.e., calculating fractions for operators with rocksdb states and reserve managed memory wrt the fractions, while no changes are needed on the python side. The concern for this approach is that, the required changes might be too significant for the release testing period.- -A feasible workaround is to make either RocksDB or Python not using managed memory. The workaround is *only needed when RocksDB & python are used together.*- * -For RocksDB, there's already a configuration option allowing user to enable/disable reserving managed memory. We only need to tell users to disable this switch when working together with Python and no code changes are needed. This is the option option #4 we discussed.- * -For Python, I think the option #5 that Dian & Jincheng suggested is to introduce a similar switch for Python that allows users to enable/disable reserving managed memory. By default, Python still uses managed memory. The benefit for introducing this switch is to allow users to choose between RocksDB and Python for which managed memory is disabled.- -One problem I see in option #5 is that, the default configuration does not work when RocksDB and Python UDF are used together. In that case, a meaningful error message is provided and users have to manually modify the configurations. But I think this is the right thing to do, that we admit there's a problem and provide a workaround to users, rather than trying to "fix" it in a way that may also affect other unproblematic use cases.- Nevermind. Seems I haven't understand your point correctly. was (Author: xintongsong): [~sewen], I agree with you that python processes use managed memory by default should be the right approach for long term. I think ideally we want RocksDB and Python processes share the managed memory, wrt the planned fractions. This is the option #3 that we discussed. For this approach, the changes required are mainly on the runtime & rocksdb side, i.e., calculating fractions for operators with rocksdb states and reserve managed memory wrt the fractions, while no changes are needed on the python side. The concern for this approach is that, the required changes might be too significant for the release testing period. A feasible workaround is to make either RocksDB or Python not using managed memory. The workaround is *only needed when RocksDB & python are used together.* * For RocksDB, there's already a configuration option allowing user to enable/disable reserving managed memory. We only need to tell users to disable this switch when working together with Python and no code changes are needed. This is the option option #4 we discussed. * For Python, I think the option #5 that Dian & Jincheng suggested is to introduce a similar switch for Python that allows users to enable/disable reserving managed memory. By default, Python still uses managed memory. The benefit for introducing this switch is to allow users to choose between RocksDB and Python for which managed memory is disabled. One problem I see in option #5 is that, the default configuration does not work when RocksDB and Python UDF are used together. In that case, a meaningful error message is provided and users have to manually modify the configurations. But I think this is the right thing to do, that we admit there's a problem and provide a workaround to users, rather than trying to "fix" it in a way that may also affect other unproblematic use cases. > It will throw MemoryAllocationException if rocksdb statebackend and Python > UDF are used in the same slot > ---------------------------------------------------------------------------------------------------------- > > Key: FLINK-17923 > URL: https://issues.apache.org/jira/browse/FLINK-17923 > Project: Flink > Issue Type: Bug > Components: API / Python, Runtime / State Backends > Affects Versions: 1.10.0, 1.11.0 > Reporter: Dian Fu > Priority: Blocker > Labels: pull-request-available > Fix For: 1.11.0 > > > For the following job: > {code} > import logging > import os > import shutil > import sys > import tempfile > from pyflink.datastream import StreamExecutionEnvironment > from pyflink.table import TableConfig, StreamTableEnvironment, DataTypes > from pyflink.table.udf import udf > def word_count(): > content = "line Licensed to the Apache Software Foundation ASF under one > " \ > "line or more contributor license agreements See the NOTICE > file " \ > "line distributed with this work for additional information " \ > "line regarding copyright ownership The ASF licenses this file > " \ > "to you under the Apache License Version the " \ > "License you may not use this file except in compliance " \ > "with the License" > t_config = TableConfig() > env = StreamExecutionEnvironment.get_execution_environment() > t_env = StreamTableEnvironment.create(env, t_config) > # register Results table in table environment > tmp_dir = tempfile.gettempdir() > result_path = tmp_dir + '/result' > if os.path.exists(result_path): > try: > if os.path.isfile(result_path): > os.remove(result_path) > else: > shutil.rmtree(result_path) > except OSError as e: > logging.error("Error removing directory: %s - %s.", e.filename, > e.strerror) > logging.info("Results directory: %s", result_path) > sink_ddl = """ > create table Results( > word VARCHAR, > `count` BIGINT > ) with ( > 'connector' = 'blackhole' > ) > """ > t_env.sql_update(sink_ddl) > @udf(input_types=[DataTypes.BIGINT()], result_type=DataTypes.BIGINT()) > def inc(count): > return count + 1 > t_env.register_function("inc", inc) > elements = [(word, 1) for word in content.split(" ")] > t_env.from_elements(elements, ["word", "count"]) \ > .group_by("word") \ > .select("word, count(1) as count") \ > .select("word, inc(count) as count") \ > .insert_into("Results") > t_env.execute("word_count") > if __name__ == '__main__': > logging.basicConfig(stream=sys.stdout, level=logging.INFO, > format="%(message)s") > word_count() > {code} > It will throw the following exception if rocksdb state backend is used: > {code} > Caused by: org.apache.flink.util.FlinkException: Could not restore keyed > state backend for KeyedProcessOperator_c27dcf7b54ef6bfd6cff02ca8870b681_(1/1) > from any of the 1 provided restore options. > at > org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135) > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:317) > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:144) > ... 9 more > Caused by: java.io.IOException: Failed to acquire shared cache resource for > RocksDB > at > org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.allocateSharedCachesIfConfigured(RocksDBOperationUtils.java:212) > at > org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:516) > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:301) > at > org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142) > at > org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121) > ... 11 more > Caused by: org.apache.flink.runtime.memory.MemoryAllocationException: Could > not created the shared memory resource of size 536870920. Not enough memory > left to reserve from the slot's managed memory. > at > org.apache.flink.runtime.memory.MemoryManager.lambda$getSharedMemoryResourceForManagedMemory$8(MemoryManager.java:603) > at > org.apache.flink.runtime.memory.SharedResources.createResource(SharedResources.java:130) > at > org.apache.flink.runtime.memory.SharedResources.getOrAllocateSharedResource(SharedResources.java:72) > at > org.apache.flink.runtime.memory.MemoryManager.getSharedMemoryResourceForManagedMemory(MemoryManager.java:617) > at > org.apache.flink.runtime.memory.MemoryManager.getSharedMemoryResourceForManagedMemory(MemoryManager.java:566) > at > org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.allocateSharedCachesIfConfigured(RocksDBOperationUtils.java:208) > ... 15 more > Caused by: org.apache.flink.runtime.memory.MemoryReservationException: Could > not allocate 536870920 bytes. Only 454033416 bytes are remaining. > at > org.apache.flink.runtime.memory.MemoryManager.reserveMemory(MemoryManager.java:461) > at > org.apache.flink.runtime.memory.MemoryManager.lambda$getSharedMemoryResourceForManagedMemory$8(MemoryManager.java:601) > ... 20 more > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)