[ 
https://issues.apache.org/jira/browse/FLINK-17923?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17119211#comment-17119211
 ] 

Xintong Song edited comment on FLINK-17923 at 5/29/20, 2:33 AM:
----------------------------------------------------------------

[~sewen],

-I agree with you that python processes use managed memory by default should be 
the right approach for long term.-

-I think ideally we want RocksDB and Python processes share the managed memory, 
wrt the planned fractions. This is the option #3 that we discussed. For this 
approach, the changes required are mainly on the runtime & rocksdb side, i.e., 
calculating fractions for operators with rocksdb states and reserve managed 
memory wrt the fractions, while no changes are needed on the python side. The 
concern for this approach is that, the required changes might be too 
significant for the release testing period.-

-A feasible workaround is to make either RocksDB or Python not using managed 
memory. The workaround is *only needed when RocksDB & python are used 
together.*-
 * -For RocksDB, there's already a configuration option allowing user to 
enable/disable reserving managed memory. We only need to tell users to disable 
this switch when working together with Python and no code changes are needed. 
This is the option option #4 we discussed.-
 * -For Python, I think the option #5 that Dian & Jincheng suggested is to 
introduce a similar switch for Python that allows users to enable/disable 
reserving managed memory. By default, Python still uses managed memory. The 
benefit for introducing this switch is to allow users to choose between RocksDB 
and Python for which managed memory is disabled.-

-One problem I see in option #5 is that, the default configuration does not 
work when RocksDB and Python UDF are used together. In that case, a meaningful 
error message is provided and users have to manually modify the configurations. 
But I think this is the right thing to do, that we admit there's a problem and 
provide a workaround to users, rather than trying to "fix" it in a way that may 
also affect other unproblematic use cases.-

Nevermind. Seems I haven't understand your point correctly.


was (Author: xintongsong):
[~sewen],

I agree with you that python processes use managed memory by default should be 
the right approach for long term.

I think ideally we want RocksDB and Python processes share the managed memory, 
wrt the planned fractions. This is the option #3 that we discussed. For this 
approach, the changes required are mainly on the runtime & rocksdb side, i.e., 
calculating fractions for operators with rocksdb states and reserve managed 
memory wrt the fractions, while no changes are needed on the python side. The 
concern for this approach is that, the required changes might be too 
significant for the release testing period.

A feasible workaround is to make either RocksDB or Python not using managed 
memory. The workaround is *only needed when RocksDB & python are used together.*
 * For RocksDB, there's already a configuration option allowing user to 
enable/disable reserving managed memory. We only need to tell users to disable 
this switch when working together with Python and no code changes are needed. 
This is the option option #4 we discussed.
 * For Python, I think the option #5 that Dian & Jincheng suggested is to 
introduce a similar switch for Python that allows users to enable/disable 
reserving managed memory. By default, Python still uses managed memory. The 
benefit for introducing this switch is to allow users to choose between RocksDB 
and Python for which managed memory is disabled.

One problem I see in option #5 is that, the default configuration does not work 
when RocksDB and Python UDF are used together. In that case, a meaningful error 
message is provided and users have to manually modify the configurations. But I 
think this is the right thing to do, that we admit there's a problem and 
provide a workaround to users, rather than trying to "fix" it in a way that may 
also affect other unproblematic use cases.

> It will throw MemoryAllocationException if rocksdb statebackend and Python 
> UDF are used in the same slot  
> ----------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-17923
>                 URL: https://issues.apache.org/jira/browse/FLINK-17923
>             Project: Flink
>          Issue Type: Bug
>          Components: API / Python, Runtime / State Backends
>    Affects Versions: 1.10.0, 1.11.0
>            Reporter: Dian Fu
>            Priority: Blocker
>              Labels: pull-request-available
>             Fix For: 1.11.0
>
>
> For the following job:
> {code}
> import logging
> import os
> import shutil
> import sys
> import tempfile
> from pyflink.datastream import StreamExecutionEnvironment
> from pyflink.table import TableConfig, StreamTableEnvironment, DataTypes
> from pyflink.table.udf import udf
> def word_count():
>     content = "line Licensed to the Apache Software Foundation ASF under one 
> " \
>               "line or more contributor license agreements See the NOTICE 
> file " \
>               "line distributed with this work for additional information " \
>               "line regarding copyright ownership The ASF licenses this file 
> " \
>               "to you under the Apache License Version the " \
>               "License you may not use this file except in compliance " \
>               "with the License"
>     t_config = TableConfig()
>     env = StreamExecutionEnvironment.get_execution_environment()
>     t_env = StreamTableEnvironment.create(env, t_config)
>     # register Results table in table environment
>     tmp_dir = tempfile.gettempdir()
>     result_path = tmp_dir + '/result'
>     if os.path.exists(result_path):
>         try:
>             if os.path.isfile(result_path):
>                 os.remove(result_path)
>             else:
>                 shutil.rmtree(result_path)
>         except OSError as e:
>             logging.error("Error removing directory: %s - %s.", e.filename, 
> e.strerror)
>     logging.info("Results directory: %s", result_path)
>     sink_ddl = """
>         create table Results(
>             word VARCHAR,
>             `count` BIGINT
>         ) with (
>             'connector' = 'blackhole'
>         )
>         """
>     t_env.sql_update(sink_ddl)
>     @udf(input_types=[DataTypes.BIGINT()], result_type=DataTypes.BIGINT())
>     def inc(count):
>         return count + 1
>     t_env.register_function("inc", inc)
>     elements = [(word, 1) for word in content.split(" ")]
>     t_env.from_elements(elements, ["word", "count"]) \
>          .group_by("word") \
>          .select("word, count(1) as count") \
>          .select("word, inc(count) as count") \
>          .insert_into("Results")
>     t_env.execute("word_count")
> if __name__ == '__main__':
>     logging.basicConfig(stream=sys.stdout, level=logging.INFO, 
> format="%(message)s")
>     word_count()
> {code}
> It will throw the following exception if rocksdb state backend is used:
> {code}
> Caused by: org.apache.flink.util.FlinkException: Could not restore keyed 
> state backend for KeyedProcessOperator_c27dcf7b54ef6bfd6cff02ca8870b681_(1/1) 
> from any of the 1 provided restore options.
>       at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
>       at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:317)
>       at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:144)
>       ... 9 more
> Caused by: java.io.IOException: Failed to acquire shared cache resource for 
> RocksDB
>       at 
> org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.allocateSharedCachesIfConfigured(RocksDBOperationUtils.java:212)
>       at 
> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:516)
>       at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:301)
>       at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
>       at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
>       ... 11 more
> Caused by: org.apache.flink.runtime.memory.MemoryAllocationException: Could 
> not created the shared memory resource of size 536870920. Not enough memory 
> left to reserve from the slot's managed memory.
>       at 
> org.apache.flink.runtime.memory.MemoryManager.lambda$getSharedMemoryResourceForManagedMemory$8(MemoryManager.java:603)
>       at 
> org.apache.flink.runtime.memory.SharedResources.createResource(SharedResources.java:130)
>       at 
> org.apache.flink.runtime.memory.SharedResources.getOrAllocateSharedResource(SharedResources.java:72)
>       at 
> org.apache.flink.runtime.memory.MemoryManager.getSharedMemoryResourceForManagedMemory(MemoryManager.java:617)
>       at 
> org.apache.flink.runtime.memory.MemoryManager.getSharedMemoryResourceForManagedMemory(MemoryManager.java:566)
>       at 
> org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.allocateSharedCachesIfConfigured(RocksDBOperationUtils.java:208)
>       ... 15 more
> Caused by: org.apache.flink.runtime.memory.MemoryReservationException: Could 
> not allocate 536870920 bytes. Only 454033416 bytes are remaining.
>       at 
> org.apache.flink.runtime.memory.MemoryManager.reserveMemory(MemoryManager.java:461)
>       at 
> org.apache.flink.runtime.memory.MemoryManager.lambda$getSharedMemoryResourceForManagedMemory$8(MemoryManager.java:601)
>       ... 20 more
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to