[jira] [Comment Edited] (FLINK-17923) It will throw MemoryAllocationException if rocksdb statebackend and Python UDF are used in the same slot
[ https://issues.apache.org/jira/browse/FLINK-17923?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17119211#comment-17119211 ] Xintong Song edited comment on FLINK-17923 at 5/29/20, 2:33 AM: [~sewen], -I agree with you that python processes use managed memory by default should be the right approach for long term.- -I think ideally we want RocksDB and Python processes share the managed memory, wrt the planned fractions. This is the option #3 that we discussed. For this approach, the changes required are mainly on the runtime & rocksdb side, i.e., calculating fractions for operators with rocksdb states and reserve managed memory wrt the fractions, while no changes are needed on the python side. The concern for this approach is that, the required changes might be too significant for the release testing period.- -A feasible workaround is to make either RocksDB or Python not using managed memory. The workaround is *only needed when RocksDB & python are used together.*- * -For RocksDB, there's already a configuration option allowing user to enable/disable reserving managed memory. We only need to tell users to disable this switch when working together with Python and no code changes are needed. This is the option option #4 we discussed.- * -For Python, I think the option #5 that Dian & Jincheng suggested is to introduce a similar switch for Python that allows users to enable/disable reserving managed memory. By default, Python still uses managed memory. The benefit for introducing this switch is to allow users to choose between RocksDB and Python for which managed memory is disabled.- -One problem I see in option #5 is that, the default configuration does not work when RocksDB and Python UDF are used together. In that case, a meaningful error message is provided and users have to manually modify the configurations. But I think this is the right thing to do, that we admit there's a problem and provide a workaround to users, rather than trying to "fix" it in a way that may also affect other unproblematic use cases.- Nevermind. Seems I haven't understand your point correctly. was (Author: xintongsong): [~sewen], I agree with you that python processes use managed memory by default should be the right approach for long term. I think ideally we want RocksDB and Python processes share the managed memory, wrt the planned fractions. This is the option #3 that we discussed. For this approach, the changes required are mainly on the runtime & rocksdb side, i.e., calculating fractions for operators with rocksdb states and reserve managed memory wrt the fractions, while no changes are needed on the python side. The concern for this approach is that, the required changes might be too significant for the release testing period. A feasible workaround is to make either RocksDB or Python not using managed memory. The workaround is *only needed when RocksDB & python are used together.* * For RocksDB, there's already a configuration option allowing user to enable/disable reserving managed memory. We only need to tell users to disable this switch when working together with Python and no code changes are needed. This is the option option #4 we discussed. * For Python, I think the option #5 that Dian & Jincheng suggested is to introduce a similar switch for Python that allows users to enable/disable reserving managed memory. By default, Python still uses managed memory. The benefit for introducing this switch is to allow users to choose between RocksDB and Python for which managed memory is disabled. One problem I see in option #5 is that, the default configuration does not work when RocksDB and Python UDF are used together. In that case, a meaningful error message is provided and users have to manually modify the configurations. But I think this is the right thing to do, that we admit there's a problem and provide a workaround to users, rather than trying to "fix" it in a way that may also affect other unproblematic use cases. > It will throw MemoryAllocationException if rocksdb statebackend and Python > UDF are used in the same slot > -- > > Key: FLINK-17923 > URL: https://issues.apache.org/jira/browse/FLINK-17923 > Project: Flink > Issue Type: Bug > Components: API / Python, Runtime / State Backends >Affects Versions: 1.10.0, 1.11.0 >Reporter: Dian Fu >Priority: Blocker > Labels: pull-request-available > Fix For: 1.11.0 > > > For the following job: > {code} > import logging > import os > import shutil > import sys > import tempfile > from pyflink.datastream import StreamExecutionEnvironment > from pyflink.table import TableConfig, StreamTableEnvironment,
[jira] [Comment Edited] (FLINK-17923) It will throw MemoryAllocationException if rocksdb statebackend and Python UDF are used in the same slot
[ https://issues.apache.org/jira/browse/FLINK-17923?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17118478#comment-17118478 ] Dian Fu edited comment on FLINK-17923 at 5/28/20, 9:29 AM: --- [~xintongsong] Appreciated your suggestions. It makes sense to me. I want to adjust the PR a bit as following: - Set Python UDF to use managed memory by default. - If Python UDF and RocksDB is used together and both Python UDF and RocksDB are configured to use managed memory, throw exceptions with meaningful suggestions. - If Python UDF is configured to use off-heap memory and the task off-heap memory could not meet the requirement, throw exceptions with meaningful suggestions. In this case, when we support to let Python UDF and RocksDB both use managed memory in the future, we could just remove the checks and there will be no potential backward compatibility issues. What do you think? was (Author: dian.fu): [~xintongsong] Appreciated your suggestions. It makes sense to me. I want to adjust the PR a bit as following: - Set Python UDF to use managed memory by default. - If Python UDF and RocksDB is used together and Python UDF is configured to use managed memory, throw exceptions with meaningful suggestions. - If Python UDF is configured to use off-heap memory and the task off-heap memory could not meet the requirement, throw exceptions with meaningful suggestions. In this case, when we support to let Python UDF and RocksDB both use managed memory in the future, we could just remove the checks and there will be no potential backward compatibility issues. What do you think? > It will throw MemoryAllocationException if rocksdb statebackend and Python > UDF are used in the same slot > -- > > Key: FLINK-17923 > URL: https://issues.apache.org/jira/browse/FLINK-17923 > Project: Flink > Issue Type: Bug > Components: API / Python, Runtime / State Backends >Affects Versions: 1.10.0, 1.11.0 >Reporter: Dian Fu >Priority: Blocker > Labels: pull-request-available > Fix For: 1.11.0 > > > For the following job: > {code} > import logging > import os > import shutil > import sys > import tempfile > from pyflink.datastream import StreamExecutionEnvironment > from pyflink.table import TableConfig, StreamTableEnvironment, DataTypes > from pyflink.table.udf import udf > def word_count(): > content = "line Licensed to the Apache Software Foundation ASF under one > " \ > "line or more contributor license agreements See the NOTICE > file " \ > "line distributed with this work for additional information " \ > "line regarding copyright ownership The ASF licenses this file > " \ > "to you under the Apache License Version the " \ > "License you may not use this file except in compliance " \ > "with the License" > t_config = TableConfig() > env = StreamExecutionEnvironment.get_execution_environment() > t_env = StreamTableEnvironment.create(env, t_config) > # register Results table in table environment > tmp_dir = tempfile.gettempdir() > result_path = tmp_dir + '/result' > if os.path.exists(result_path): > try: > if os.path.isfile(result_path): > os.remove(result_path) > else: > shutil.rmtree(result_path) > except OSError as e: > logging.error("Error removing directory: %s - %s.", e.filename, > e.strerror) > logging.info("Results directory: %s", result_path) > sink_ddl = """ > create table Results( > word VARCHAR, > `count` BIGINT > ) with ( > 'connector' = 'blackhole' > ) > """ > t_env.sql_update(sink_ddl) > @udf(input_types=[DataTypes.BIGINT()], result_type=DataTypes.BIGINT()) > def inc(count): > return count + 1 > t_env.register_function("inc", inc) > elements = [(word, 1) for word in content.split(" ")] > t_env.from_elements(elements, ["word", "count"]) \ > .group_by("word") \ > .select("word, count(1) as count") \ > .select("word, inc(count) as count") \ > .insert_into("Results") > t_env.execute("word_count") > if __name__ == '__main__': > logging.basicConfig(stream=sys.stdout, level=logging.INFO, > format="%(message)s") > word_count() > {code} > It will throw the following exception if rocksdb state backend is used: > {code} > Caused by: org.apache.flink.util.FlinkException: Could not restore keyed > state backend for KeyedProcessOperator_c27dcf7b54ef6bfd6cff02ca8870b681_(1/1) > from any of the 1 provided restore options. > at >
[jira] [Comment Edited] (FLINK-17923) It will throw MemoryAllocationException if rocksdb statebackend and Python UDF are used in the same slot
[ https://issues.apache.org/jira/browse/FLINK-17923?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17118489#comment-17118489 ] Xintong Song edited comment on FLINK-17923 at 5/28/20, 9:29 AM: Thanks [~dian.fu], that sounds good to me. was (Author: xintongsong): [~dian.fu], that sounds good to me. > It will throw MemoryAllocationException if rocksdb statebackend and Python > UDF are used in the same slot > -- > > Key: FLINK-17923 > URL: https://issues.apache.org/jira/browse/FLINK-17923 > Project: Flink > Issue Type: Bug > Components: API / Python, Runtime / State Backends >Affects Versions: 1.10.0, 1.11.0 >Reporter: Dian Fu >Priority: Blocker > Labels: pull-request-available > Fix For: 1.11.0 > > > For the following job: > {code} > import logging > import os > import shutil > import sys > import tempfile > from pyflink.datastream import StreamExecutionEnvironment > from pyflink.table import TableConfig, StreamTableEnvironment, DataTypes > from pyflink.table.udf import udf > def word_count(): > content = "line Licensed to the Apache Software Foundation ASF under one > " \ > "line or more contributor license agreements See the NOTICE > file " \ > "line distributed with this work for additional information " \ > "line regarding copyright ownership The ASF licenses this file > " \ > "to you under the Apache License Version the " \ > "License you may not use this file except in compliance " \ > "with the License" > t_config = TableConfig() > env = StreamExecutionEnvironment.get_execution_environment() > t_env = StreamTableEnvironment.create(env, t_config) > # register Results table in table environment > tmp_dir = tempfile.gettempdir() > result_path = tmp_dir + '/result' > if os.path.exists(result_path): > try: > if os.path.isfile(result_path): > os.remove(result_path) > else: > shutil.rmtree(result_path) > except OSError as e: > logging.error("Error removing directory: %s - %s.", e.filename, > e.strerror) > logging.info("Results directory: %s", result_path) > sink_ddl = """ > create table Results( > word VARCHAR, > `count` BIGINT > ) with ( > 'connector' = 'blackhole' > ) > """ > t_env.sql_update(sink_ddl) > @udf(input_types=[DataTypes.BIGINT()], result_type=DataTypes.BIGINT()) > def inc(count): > return count + 1 > t_env.register_function("inc", inc) > elements = [(word, 1) for word in content.split(" ")] > t_env.from_elements(elements, ["word", "count"]) \ > .group_by("word") \ > .select("word, count(1) as count") \ > .select("word, inc(count) as count") \ > .insert_into("Results") > t_env.execute("word_count") > if __name__ == '__main__': > logging.basicConfig(stream=sys.stdout, level=logging.INFO, > format="%(message)s") > word_count() > {code} > It will throw the following exception if rocksdb state backend is used: > {code} > Caused by: org.apache.flink.util.FlinkException: Could not restore keyed > state backend for KeyedProcessOperator_c27dcf7b54ef6bfd6cff02ca8870b681_(1/1) > from any of the 1 provided restore options. > at > org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135) > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:317) > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:144) > ... 9 more > Caused by: java.io.IOException: Failed to acquire shared cache resource for > RocksDB > at > org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.allocateSharedCachesIfConfigured(RocksDBOperationUtils.java:212) > at > org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:516) > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:301) > at > org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142) > at > org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121) > ... 11 more > Caused by: org.apache.flink.runtime.memory.MemoryAllocationException: Could
[jira] [Comment Edited] (FLINK-17923) It will throw MemoryAllocationException if rocksdb statebackend and Python UDF are used in the same slot
[ https://issues.apache.org/jira/browse/FLINK-17923?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17118478#comment-17118478 ] Dian Fu edited comment on FLINK-17923 at 5/28/20, 9:24 AM: --- [~xintongsong] Appreciated your suggestions. It makes sense to me. I want to adjust the PR a bit as following: - Set Python UDF to use managed memory by default. - If Python UDF and RocksDB is used together and Python UDF is configured to use managed memory, throw exceptions with meaningful suggestions. - If Python UDF is configured to use off-heap memory and the task off-heap memory could not meet the requirement, throw exceptions with meaningful suggestions. In this case, when we support to let Python UDF and RocksDB both use managed memory in the future, we could just remove the checks and there will be no potential backward compatibility issues. What do you think? was (Author: dian.fu): [~xintongsong] Appreciated your suggestions. It makes sense to me. I want to adjust the PR a bit as following: - Set Python UDF to use managed memory by default. - If Python UDF and RocksDB is used together, throw exceptions with meaningful suggestions. - If Python UDF is configured to use off-heap memory and the task off-heap memory could not meet the requirement, throw exceptions with meaningful suggestions. In this case, when we support to let Python UDF and RocksDB both use managed memory in the future, we could just remove the checks and there will be no potential backward compatibility issues. What do you think? > It will throw MemoryAllocationException if rocksdb statebackend and Python > UDF are used in the same slot > -- > > Key: FLINK-17923 > URL: https://issues.apache.org/jira/browse/FLINK-17923 > Project: Flink > Issue Type: Bug > Components: API / Python, Runtime / State Backends >Affects Versions: 1.10.0, 1.11.0 >Reporter: Dian Fu >Priority: Blocker > Labels: pull-request-available > Fix For: 1.11.0 > > > For the following job: > {code} > import logging > import os > import shutil > import sys > import tempfile > from pyflink.datastream import StreamExecutionEnvironment > from pyflink.table import TableConfig, StreamTableEnvironment, DataTypes > from pyflink.table.udf import udf > def word_count(): > content = "line Licensed to the Apache Software Foundation ASF under one > " \ > "line or more contributor license agreements See the NOTICE > file " \ > "line distributed with this work for additional information " \ > "line regarding copyright ownership The ASF licenses this file > " \ > "to you under the Apache License Version the " \ > "License you may not use this file except in compliance " \ > "with the License" > t_config = TableConfig() > env = StreamExecutionEnvironment.get_execution_environment() > t_env = StreamTableEnvironment.create(env, t_config) > # register Results table in table environment > tmp_dir = tempfile.gettempdir() > result_path = tmp_dir + '/result' > if os.path.exists(result_path): > try: > if os.path.isfile(result_path): > os.remove(result_path) > else: > shutil.rmtree(result_path) > except OSError as e: > logging.error("Error removing directory: %s - %s.", e.filename, > e.strerror) > logging.info("Results directory: %s", result_path) > sink_ddl = """ > create table Results( > word VARCHAR, > `count` BIGINT > ) with ( > 'connector' = 'blackhole' > ) > """ > t_env.sql_update(sink_ddl) > @udf(input_types=[DataTypes.BIGINT()], result_type=DataTypes.BIGINT()) > def inc(count): > return count + 1 > t_env.register_function("inc", inc) > elements = [(word, 1) for word in content.split(" ")] > t_env.from_elements(elements, ["word", "count"]) \ > .group_by("word") \ > .select("word, count(1) as count") \ > .select("word, inc(count) as count") \ > .insert_into("Results") > t_env.execute("word_count") > if __name__ == '__main__': > logging.basicConfig(stream=sys.stdout, level=logging.INFO, > format="%(message)s") > word_count() > {code} > It will throw the following exception if rocksdb state backend is used: > {code} > Caused by: org.apache.flink.util.FlinkException: Could not restore keyed > state backend for KeyedProcessOperator_c27dcf7b54ef6bfd6cff02ca8870b681_(1/1) > from any of the 1 provided restore options. > at >
[jira] [Comment Edited] (FLINK-17923) It will throw MemoryAllocationException if rocksdb statebackend and Python UDF are used in the same slot
[ https://issues.apache.org/jira/browse/FLINK-17923?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17118478#comment-17118478 ] Dian Fu edited comment on FLINK-17923 at 5/28/20, 9:16 AM: --- [~xintongsong] Appreciated your suggestions. It makes sense to me. I want to adjust the PR a bit as following: - Set Python UDF to use managed memory by default. - If Python UDF and RocksDB is used together, throw exceptions with meaningful suggestions. - If Python UDF is configured to use off-heap memory and the task off-heap memory could not meet the requirement, throw exceptions with meaningful suggestions. In this case, when we support to let Python UDF and RocksDB both use managed memory in the future, we could just remove the checks and there will be no potential backward compatibility issues. What do you think? was (Author: dian.fu): [~xintongsong] Appreciated your suggestions. It makes sense to me. I want to adjust it a bit as following: - Set Python UDF to use managed memory by default. - If Python UDF and RocksDB is used together, throw exceptions with meaningful suggestions. - If Python UDF is configured to use off-heap memory and the task off-heap memory could not meet the requirement, throw exceptions with meaningful suggestions. In this case, when we support to let Python UDF and RocksDB both use managed memory in the future, we could just remove the checks and there will be no potential backward compatibility issues. What do you think? > It will throw MemoryAllocationException if rocksdb statebackend and Python > UDF are used in the same slot > -- > > Key: FLINK-17923 > URL: https://issues.apache.org/jira/browse/FLINK-17923 > Project: Flink > Issue Type: Bug > Components: API / Python, Runtime / State Backends >Affects Versions: 1.10.0, 1.11.0 >Reporter: Dian Fu >Priority: Blocker > Labels: pull-request-available > Fix For: 1.11.0 > > > For the following job: > {code} > import logging > import os > import shutil > import sys > import tempfile > from pyflink.datastream import StreamExecutionEnvironment > from pyflink.table import TableConfig, StreamTableEnvironment, DataTypes > from pyflink.table.udf import udf > def word_count(): > content = "line Licensed to the Apache Software Foundation ASF under one > " \ > "line or more contributor license agreements See the NOTICE > file " \ > "line distributed with this work for additional information " \ > "line regarding copyright ownership The ASF licenses this file > " \ > "to you under the Apache License Version the " \ > "License you may not use this file except in compliance " \ > "with the License" > t_config = TableConfig() > env = StreamExecutionEnvironment.get_execution_environment() > t_env = StreamTableEnvironment.create(env, t_config) > # register Results table in table environment > tmp_dir = tempfile.gettempdir() > result_path = tmp_dir + '/result' > if os.path.exists(result_path): > try: > if os.path.isfile(result_path): > os.remove(result_path) > else: > shutil.rmtree(result_path) > except OSError as e: > logging.error("Error removing directory: %s - %s.", e.filename, > e.strerror) > logging.info("Results directory: %s", result_path) > sink_ddl = """ > create table Results( > word VARCHAR, > `count` BIGINT > ) with ( > 'connector' = 'blackhole' > ) > """ > t_env.sql_update(sink_ddl) > @udf(input_types=[DataTypes.BIGINT()], result_type=DataTypes.BIGINT()) > def inc(count): > return count + 1 > t_env.register_function("inc", inc) > elements = [(word, 1) for word in content.split(" ")] > t_env.from_elements(elements, ["word", "count"]) \ > .group_by("word") \ > .select("word, count(1) as count") \ > .select("word, inc(count) as count") \ > .insert_into("Results") > t_env.execute("word_count") > if __name__ == '__main__': > logging.basicConfig(stream=sys.stdout, level=logging.INFO, > format="%(message)s") > word_count() > {code} > It will throw the following exception if rocksdb state backend is used: > {code} > Caused by: org.apache.flink.util.FlinkException: Could not restore keyed > state backend for KeyedProcessOperator_c27dcf7b54ef6bfd6cff02ca8870b681_(1/1) > from any of the 1 provided restore options. > at > org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135) > at >
[jira] [Comment Edited] (FLINK-17923) It will throw MemoryAllocationException if rocksdb statebackend and Python UDF are used in the same slot
[ https://issues.apache.org/jira/browse/FLINK-17923?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17118460#comment-17118460 ] Xintong Song edited comment on FLINK-17923 at 5/28/20, 8:48 AM: [~dian.fu], [~sunjincheng121] I'm not familiar with the flink-python code base, thus I cannot speak much to the PR. My only concern is regarding auto-magically setting task.off-heap.size for users. I wonder whether we are trying to be a bit over-smart. It might save some user efforts in many cases, but could also make things hard to understand in other cases. It is one of the main motivations for FLIP-49 to make sure all the memory calculations happen at one place, without such kind of implicit logics. I would suggest not to override the task.off-heap.size configuration. Instead, we can suggest how to set this configuration in both memory configuration and python udf docs. This is similar to RocksDBStateBackend, when managed memory is disabled users need to explicitly make sure enough native memory is reserved for RocksDB. WDYT? was (Author: xintongsong): [~dian.fu], [~sunjincheng121] I'm not familiar with the flink-python code base, thus I cannot speak much to the PR. My only concern is regarding auto-magically setting task.off-heap.size for users. I wonder whether we are trying to be a bit over-smart. It might safe some user efforts in many cases, but could also make things hard to understand in other cases. It is one of the main motivations for FLIP-49 to make sure all the memory calculations happen at one place, without such kind of implicit logics. I would suggest not to override the task.off-heap.size configuration. Instead, we can suggest how to set this configuration in both memory configuration and python udf docs. This is similar to RocksDBStateBackend, when managed memory is disabled users need to explicitly make sure enough native memory is reserved for RocksDB. WDYT? > It will throw MemoryAllocationException if rocksdb statebackend and Python > UDF are used in the same slot > -- > > Key: FLINK-17923 > URL: https://issues.apache.org/jira/browse/FLINK-17923 > Project: Flink > Issue Type: Bug > Components: API / Python, Runtime / State Backends >Affects Versions: 1.10.0, 1.11.0 >Reporter: Dian Fu >Priority: Blocker > Labels: pull-request-available > Fix For: 1.11.0 > > > For the following job: > {code} > import logging > import os > import shutil > import sys > import tempfile > from pyflink.datastream import StreamExecutionEnvironment > from pyflink.table import TableConfig, StreamTableEnvironment, DataTypes > from pyflink.table.udf import udf > def word_count(): > content = "line Licensed to the Apache Software Foundation ASF under one > " \ > "line or more contributor license agreements See the NOTICE > file " \ > "line distributed with this work for additional information " \ > "line regarding copyright ownership The ASF licenses this file > " \ > "to you under the Apache License Version the " \ > "License you may not use this file except in compliance " \ > "with the License" > t_config = TableConfig() > env = StreamExecutionEnvironment.get_execution_environment() > t_env = StreamTableEnvironment.create(env, t_config) > # register Results table in table environment > tmp_dir = tempfile.gettempdir() > result_path = tmp_dir + '/result' > if os.path.exists(result_path): > try: > if os.path.isfile(result_path): > os.remove(result_path) > else: > shutil.rmtree(result_path) > except OSError as e: > logging.error("Error removing directory: %s - %s.", e.filename, > e.strerror) > logging.info("Results directory: %s", result_path) > sink_ddl = """ > create table Results( > word VARCHAR, > `count` BIGINT > ) with ( > 'connector' = 'blackhole' > ) > """ > t_env.sql_update(sink_ddl) > @udf(input_types=[DataTypes.BIGINT()], result_type=DataTypes.BIGINT()) > def inc(count): > return count + 1 > t_env.register_function("inc", inc) > elements = [(word, 1) for word in content.split(" ")] > t_env.from_elements(elements, ["word", "count"]) \ > .group_by("word") \ > .select("word, count(1) as count") \ > .select("word, inc(count) as count") \ > .insert_into("Results") > t_env.execute("word_count") > if __name__ == '__main__': > logging.basicConfig(stream=sys.stdout, level=logging.INFO, > format="%(message)s") > word_count()
[jira] [Comment Edited] (FLINK-17923) It will throw MemoryAllocationException if rocksdb statebackend and Python UDF are used in the same slot
[ https://issues.apache.org/jira/browse/FLINK-17923?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17118460#comment-17118460 ] Xintong Song edited comment on FLINK-17923 at 5/28/20, 8:47 AM: [~dian.fu], [~sunjincheng121] I'm not familiar with the flink-python code base, thus I cannot speak much to the PR. My only concern is regarding auto-magically setting task.off-heap.size for users. I wonder whether we are trying to be a bit over-smart. It might safe some user efforts in many cases, but could also make things hard to understand in other cases. It is one of the main motivations for FLIP-49 to make sure all the memory calculations happen at one place, without such kind of implicit logics. I would suggest not to override the task.off-heap.size configuration. Instead, we can suggest how to set this configuration in both memory configuration and python udf docs. This is similar to RocksDBStateBackend, when managed memory is disabled users need to explicitly make sure enough native memory is reserved for RocksDB. WDYT? was (Author: xintongsong): [~dian.fu][~sunjincheng121] I'm not familiar with the flink-python code base, thus I cannot speak much to the PR. My only concern is regarding auto-magically setting task.off-heap.size for users. I wonder whether we are trying to be a bit over-smart. It might safe some user efforts in many cases, but could also make things hard to understand in other cases. It is one of the main motivations for FLIP-49 to make sure all the memory calculations happen at one place, without such kind of implicit logics. I would suggest not to override the task.off-heap.size configuration. Instead, we can suggest how to set this configuration in both memory configuration and python udf docs. This is similar to RocksDBStateBackend, when managed memory is disabled users need to explicitly make sure enough native memory is reserved for RocksDB. WDYT? > It will throw MemoryAllocationException if rocksdb statebackend and Python > UDF are used in the same slot > -- > > Key: FLINK-17923 > URL: https://issues.apache.org/jira/browse/FLINK-17923 > Project: Flink > Issue Type: Bug > Components: API / Python, Runtime / State Backends >Affects Versions: 1.10.0, 1.11.0 >Reporter: Dian Fu >Priority: Blocker > Labels: pull-request-available > Fix For: 1.11.0 > > > For the following job: > {code} > import logging > import os > import shutil > import sys > import tempfile > from pyflink.datastream import StreamExecutionEnvironment > from pyflink.table import TableConfig, StreamTableEnvironment, DataTypes > from pyflink.table.udf import udf > def word_count(): > content = "line Licensed to the Apache Software Foundation ASF under one > " \ > "line or more contributor license agreements See the NOTICE > file " \ > "line distributed with this work for additional information " \ > "line regarding copyright ownership The ASF licenses this file > " \ > "to you under the Apache License Version the " \ > "License you may not use this file except in compliance " \ > "with the License" > t_config = TableConfig() > env = StreamExecutionEnvironment.get_execution_environment() > t_env = StreamTableEnvironment.create(env, t_config) > # register Results table in table environment > tmp_dir = tempfile.gettempdir() > result_path = tmp_dir + '/result' > if os.path.exists(result_path): > try: > if os.path.isfile(result_path): > os.remove(result_path) > else: > shutil.rmtree(result_path) > except OSError as e: > logging.error("Error removing directory: %s - %s.", e.filename, > e.strerror) > logging.info("Results directory: %s", result_path) > sink_ddl = """ > create table Results( > word VARCHAR, > `count` BIGINT > ) with ( > 'connector' = 'blackhole' > ) > """ > t_env.sql_update(sink_ddl) > @udf(input_types=[DataTypes.BIGINT()], result_type=DataTypes.BIGINT()) > def inc(count): > return count + 1 > t_env.register_function("inc", inc) > elements = [(word, 1) for word in content.split(" ")] > t_env.from_elements(elements, ["word", "count"]) \ > .group_by("word") \ > .select("word, count(1) as count") \ > .select("word, inc(count) as count") \ > .insert_into("Results") > t_env.execute("word_count") > if __name__ == '__main__': > logging.basicConfig(stream=sys.stdout, level=logging.INFO, > format="%(message)s") > word_count() >
[jira] [Comment Edited] (FLINK-17923) It will throw MemoryAllocationException if rocksdb statebackend and Python UDF are used in the same slot
[ https://issues.apache.org/jira/browse/FLINK-17923?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17116772#comment-17116772 ] sunjincheng edited comment on FLINK-17923 at 5/26/20, 2:20 PM: --- At present, users can't start jobs as long as they use rocksDB + Python UDF. The core scenario of our Flink is stream computing. In stream computing, as long as it's an analytical application, it needs to use AGG. In this case, if it's a Python User, the demand for Python UDF is our core function of 1.10/1.11. At present, we have china users waiting to use this feature. We discussed the details of using option 3 today. Later [~zhuzh] will share the design document with you. We can discuss the design first and evaluate whether put this fixing to 1.11 is reasonable. was (Author: sunjincheng121): At present, users can't start jobs as long as they use rocksDB + Python UDF. The core scenario of our Flink is stream computing. In stream computing, as long as it's an analytical application, it needs to use AGG. In this case, if it's a Python User, the demand for Python UDF is our core function of 1.10/1.11. At present, we have china users waiting to use this feature. We discussed the details of using scheme 3 today. Later [~zhuzh] will share the design document with you. We can discuss the design first and evaluate whether put this fixing to 1.11 is reasonable. > It will throw MemoryAllocationException if rocksdb statebackend and Python > UDF are used in the same slot > -- > > Key: FLINK-17923 > URL: https://issues.apache.org/jira/browse/FLINK-17923 > Project: Flink > Issue Type: Bug > Components: API / Python, Runtime / State Backends >Affects Versions: 1.10.0, 1.11.0 >Reporter: Dian Fu >Priority: Blocker > Fix For: 1.11.0 > > > For the following job: > {code} > import logging > import os > import shutil > import sys > import tempfile > from pyflink.datastream import StreamExecutionEnvironment > from pyflink.table import TableConfig, StreamTableEnvironment, DataTypes > from pyflink.table.udf import udf > def word_count(): > content = "line Licensed to the Apache Software Foundation ASF under one > " \ > "line or more contributor license agreements See the NOTICE > file " \ > "line distributed with this work for additional information " \ > "line regarding copyright ownership The ASF licenses this file > " \ > "to you under the Apache License Version the " \ > "License you may not use this file except in compliance " \ > "with the License" > t_config = TableConfig() > env = StreamExecutionEnvironment.get_execution_environment() > t_env = StreamTableEnvironment.create(env, t_config) > # register Results table in table environment > tmp_dir = tempfile.gettempdir() > result_path = tmp_dir + '/result' > if os.path.exists(result_path): > try: > if os.path.isfile(result_path): > os.remove(result_path) > else: > shutil.rmtree(result_path) > except OSError as e: > logging.error("Error removing directory: %s - %s.", e.filename, > e.strerror) > logging.info("Results directory: %s", result_path) > sink_ddl = """ > create table Results( > word VARCHAR, > `count` BIGINT > ) with ( > 'connector' = 'blackhole' > ) > """ > t_env.sql_update(sink_ddl) > @udf(input_types=[DataTypes.BIGINT()], result_type=DataTypes.BIGINT()) > def inc(count): > return count + 1 > t_env.register_function("inc", inc) > elements = [(word, 1) for word in content.split(" ")] > t_env.from_elements(elements, ["word", "count"]) \ > .group_by("word") \ > .select("word, count(1) as count") \ > .select("word, inc(count) as count") \ > .insert_into("Results") > t_env.execute("word_count") > if __name__ == '__main__': > logging.basicConfig(stream=sys.stdout, level=logging.INFO, > format="%(message)s") > word_count() > {code} > It will throw the following exception if rocksdb state backend is used: > {code} > Caused by: org.apache.flink.util.FlinkException: Could not restore keyed > state backend for KeyedProcessOperator_c27dcf7b54ef6bfd6cff02ca8870b681_(1/1) > from any of the 1 provided restore options. > at > org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135) > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:317) > at >
[jira] [Comment Edited] (FLINK-17923) It will throw MemoryAllocationException if rocksdb statebackend and Python UDF are used in the same slot
[ https://issues.apache.org/jira/browse/FLINK-17923?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17116614#comment-17116614 ] sunjincheng edited comment on FLINK-17923 at 5/26/20, 10:40 AM: It's a pity that we do not find this issue earlier(We also need to improve the e2e test for PyFlink after fixing this issue). This is a very critical problem for PyFlink as it means that Python UDF could not be used in most streaming jobs(with state).So I think we should address this problem in 1.11. We( [~zhuzh] [~xintongsong] [~yunta] [~dian.fu] and me) have a further discussion about this problem and will update the status later. Appreciate if you can pay attention to this [~pnowojski] and [~zjwang] . was (Author: sunjincheng121): It's a pity that we do not find this issue earlier(We also need to improve the e2e test for PyFlink after fixing this issue). This is a very critical problem for PyFlink as it means that Python UDF could not be used in most streaming jobs(with state).So I think we should address this problem in 1.11. We( [~zhuzh] [~xintongsong] [~yunta] [~dian.fu] and me) have a further discussion about this problem and will update the status later. Appreciate if you can pay attention to this [~pnowojski] and [~zjwang] . > It will throw MemoryAllocationException if rocksdb statebackend and Python > UDF are used in the same slot > -- > > Key: FLINK-17923 > URL: https://issues.apache.org/jira/browse/FLINK-17923 > Project: Flink > Issue Type: Bug > Components: API / Python, Runtime / State Backends >Affects Versions: 1.10.0, 1.11.0 >Reporter: Dian Fu >Priority: Blocker > Fix For: 1.11.0 > > > For the following job: > {code} > import logging > import os > import shutil > import sys > import tempfile > from pyflink.datastream import StreamExecutionEnvironment > from pyflink.table import TableConfig, StreamTableEnvironment, DataTypes > from pyflink.table.udf import udf > def word_count(): > content = "line Licensed to the Apache Software Foundation ASF under one > " \ > "line or more contributor license agreements See the NOTICE > file " \ > "line distributed with this work for additional information " \ > "line regarding copyright ownership The ASF licenses this file > " \ > "to you under the Apache License Version the " \ > "License you may not use this file except in compliance " \ > "with the License" > t_config = TableConfig() > env = StreamExecutionEnvironment.get_execution_environment() > t_env = StreamTableEnvironment.create(env, t_config) > # register Results table in table environment > tmp_dir = tempfile.gettempdir() > result_path = tmp_dir + '/result' > if os.path.exists(result_path): > try: > if os.path.isfile(result_path): > os.remove(result_path) > else: > shutil.rmtree(result_path) > except OSError as e: > logging.error("Error removing directory: %s - %s.", e.filename, > e.strerror) > logging.info("Results directory: %s", result_path) > sink_ddl = """ > create table Results( > word VARCHAR, > `count` BIGINT > ) with ( > 'connector' = 'blackhole' > ) > """ > t_env.sql_update(sink_ddl) > @udf(input_types=[DataTypes.BIGINT()], result_type=DataTypes.BIGINT()) > def inc(count): > return count + 1 > t_env.register_function("inc", inc) > elements = [(word, 1) for word in content.split(" ")] > t_env.from_elements(elements, ["word", "count"]) \ > .group_by("word") \ > .select("word, count(1) as count") \ > .select("word, inc(count) as count") \ > .insert_into("Results") > t_env.execute("word_count") > if __name__ == '__main__': > logging.basicConfig(stream=sys.stdout, level=logging.INFO, > format="%(message)s") > word_count() > {code} > It will throw the following exception if rocksdb state backend is used: > {code} > Caused by: org.apache.flink.util.FlinkException: Could not restore keyed > state backend for KeyedProcessOperator_c27dcf7b54ef6bfd6cff02ca8870b681_(1/1) > from any of the 1 provided restore options. > at > org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135) > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:317) > at >