[jira] [Comment Edited] (FLINK-17923) It will throw MemoryAllocationException if rocksdb statebackend and Python UDF are used in the same slot

2020-05-28 Thread Xintong Song (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17923?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17119211#comment-17119211
 ] 

Xintong Song edited comment on FLINK-17923 at 5/29/20, 2:33 AM:


[~sewen],

-I agree with you that python processes use managed memory by default should be 
the right approach for long term.-

-I think ideally we want RocksDB and Python processes share the managed memory, 
wrt the planned fractions. This is the option #3 that we discussed. For this 
approach, the changes required are mainly on the runtime & rocksdb side, i.e., 
calculating fractions for operators with rocksdb states and reserve managed 
memory wrt the fractions, while no changes are needed on the python side. The 
concern for this approach is that, the required changes might be too 
significant for the release testing period.-

-A feasible workaround is to make either RocksDB or Python not using managed 
memory. The workaround is *only needed when RocksDB & python are used 
together.*-
 * -For RocksDB, there's already a configuration option allowing user to 
enable/disable reserving managed memory. We only need to tell users to disable 
this switch when working together with Python and no code changes are needed. 
This is the option option #4 we discussed.-
 * -For Python, I think the option #5 that Dian & Jincheng suggested is to 
introduce a similar switch for Python that allows users to enable/disable 
reserving managed memory. By default, Python still uses managed memory. The 
benefit for introducing this switch is to allow users to choose between RocksDB 
and Python for which managed memory is disabled.-

-One problem I see in option #5 is that, the default configuration does not 
work when RocksDB and Python UDF are used together. In that case, a meaningful 
error message is provided and users have to manually modify the configurations. 
But I think this is the right thing to do, that we admit there's a problem and 
provide a workaround to users, rather than trying to "fix" it in a way that may 
also affect other unproblematic use cases.-

Nevermind. Seems I haven't understand your point correctly.


was (Author: xintongsong):
[~sewen],

I agree with you that python processes use managed memory by default should be 
the right approach for long term.

I think ideally we want RocksDB and Python processes share the managed memory, 
wrt the planned fractions. This is the option #3 that we discussed. For this 
approach, the changes required are mainly on the runtime & rocksdb side, i.e., 
calculating fractions for operators with rocksdb states and reserve managed 
memory wrt the fractions, while no changes are needed on the python side. The 
concern for this approach is that, the required changes might be too 
significant for the release testing period.

A feasible workaround is to make either RocksDB or Python not using managed 
memory. The workaround is *only needed when RocksDB & python are used together.*
 * For RocksDB, there's already a configuration option allowing user to 
enable/disable reserving managed memory. We only need to tell users to disable 
this switch when working together with Python and no code changes are needed. 
This is the option option #4 we discussed.
 * For Python, I think the option #5 that Dian & Jincheng suggested is to 
introduce a similar switch for Python that allows users to enable/disable 
reserving managed memory. By default, Python still uses managed memory. The 
benefit for introducing this switch is to allow users to choose between RocksDB 
and Python for which managed memory is disabled.

One problem I see in option #5 is that, the default configuration does not work 
when RocksDB and Python UDF are used together. In that case, a meaningful error 
message is provided and users have to manually modify the configurations. But I 
think this is the right thing to do, that we admit there's a problem and 
provide a workaround to users, rather than trying to "fix" it in a way that may 
also affect other unproblematic use cases.

> It will throw MemoryAllocationException if rocksdb statebackend and Python 
> UDF are used in the same slot  
> --
>
> Key: FLINK-17923
> URL: https://issues.apache.org/jira/browse/FLINK-17923
> Project: Flink
>  Issue Type: Bug
>  Components: API / Python, Runtime / State Backends
>Affects Versions: 1.10.0, 1.11.0
>Reporter: Dian Fu
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.11.0
>
>
> For the following job:
> {code}
> import logging
> import os
> import shutil
> import sys
> import tempfile
> from pyflink.datastream import StreamExecutionEnvironment
> from pyflink.table import TableConfig, StreamTableEnvironment, 

[jira] [Comment Edited] (FLINK-17923) It will throw MemoryAllocationException if rocksdb statebackend and Python UDF are used in the same slot

2020-05-28 Thread Dian Fu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17923?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17118478#comment-17118478
 ] 

Dian Fu edited comment on FLINK-17923 at 5/28/20, 9:29 AM:
---

[~xintongsong] Appreciated your suggestions. It makes sense to me.
I want to adjust the PR a bit as following:
- Set Python UDF to use managed memory by default.
- If Python UDF and RocksDB is used together and both Python UDF and RocksDB 
are configured to use managed memory, throw exceptions with meaningful 
suggestions.
- If Python UDF is configured to use off-heap memory and the task off-heap 
memory could not meet the requirement, throw exceptions with meaningful 
suggestions.

In this case, when we support to let Python UDF and RocksDB both use managed 
memory in the future, we could just remove the checks and there will be no 
potential backward compatibility issues.

What do you think?


was (Author: dian.fu):
[~xintongsong] Appreciated your suggestions. It makes sense to me.
I want to adjust the PR a bit as following:
- Set Python UDF to use managed memory by default.
- If Python UDF and RocksDB is used together and Python UDF is configured to 
use managed memory, throw exceptions with meaningful suggestions.
- If Python UDF is configured to use off-heap memory and the task off-heap 
memory could not meet the requirement, throw exceptions with meaningful 
suggestions.

In this case, when we support to let Python UDF and RocksDB both use managed 
memory in the future, we could just remove the checks and there will be no 
potential backward compatibility issues.

What do you think?

> It will throw MemoryAllocationException if rocksdb statebackend and Python 
> UDF are used in the same slot  
> --
>
> Key: FLINK-17923
> URL: https://issues.apache.org/jira/browse/FLINK-17923
> Project: Flink
>  Issue Type: Bug
>  Components: API / Python, Runtime / State Backends
>Affects Versions: 1.10.0, 1.11.0
>Reporter: Dian Fu
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.11.0
>
>
> For the following job:
> {code}
> import logging
> import os
> import shutil
> import sys
> import tempfile
> from pyflink.datastream import StreamExecutionEnvironment
> from pyflink.table import TableConfig, StreamTableEnvironment, DataTypes
> from pyflink.table.udf import udf
> def word_count():
> content = "line Licensed to the Apache Software Foundation ASF under one 
> " \
>   "line or more contributor license agreements See the NOTICE 
> file " \
>   "line distributed with this work for additional information " \
>   "line regarding copyright ownership The ASF licenses this file 
> " \
>   "to you under the Apache License Version the " \
>   "License you may not use this file except in compliance " \
>   "with the License"
> t_config = TableConfig()
> env = StreamExecutionEnvironment.get_execution_environment()
> t_env = StreamTableEnvironment.create(env, t_config)
> # register Results table in table environment
> tmp_dir = tempfile.gettempdir()
> result_path = tmp_dir + '/result'
> if os.path.exists(result_path):
> try:
> if os.path.isfile(result_path):
> os.remove(result_path)
> else:
> shutil.rmtree(result_path)
> except OSError as e:
> logging.error("Error removing directory: %s - %s.", e.filename, 
> e.strerror)
> logging.info("Results directory: %s", result_path)
> sink_ddl = """
> create table Results(
> word VARCHAR,
> `count` BIGINT
> ) with (
> 'connector' = 'blackhole'
> )
> """
> t_env.sql_update(sink_ddl)
> @udf(input_types=[DataTypes.BIGINT()], result_type=DataTypes.BIGINT())
> def inc(count):
> return count + 1
> t_env.register_function("inc", inc)
> elements = [(word, 1) for word in content.split(" ")]
> t_env.from_elements(elements, ["word", "count"]) \
>  .group_by("word") \
>  .select("word, count(1) as count") \
>  .select("word, inc(count) as count") \
>  .insert_into("Results")
> t_env.execute("word_count")
> if __name__ == '__main__':
> logging.basicConfig(stream=sys.stdout, level=logging.INFO, 
> format="%(message)s")
> word_count()
> {code}
> It will throw the following exception if rocksdb state backend is used:
> {code}
> Caused by: org.apache.flink.util.FlinkException: Could not restore keyed 
> state backend for KeyedProcessOperator_c27dcf7b54ef6bfd6cff02ca8870b681_(1/1) 
> from any of the 1 provided restore options.
>   at 
> 

[jira] [Comment Edited] (FLINK-17923) It will throw MemoryAllocationException if rocksdb statebackend and Python UDF are used in the same slot

2020-05-28 Thread Xintong Song (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17923?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17118489#comment-17118489
 ] 

Xintong Song edited comment on FLINK-17923 at 5/28/20, 9:29 AM:


Thanks [~dian.fu], that sounds good to me.


was (Author: xintongsong):
[~dian.fu], that sounds good to me.

> It will throw MemoryAllocationException if rocksdb statebackend and Python 
> UDF are used in the same slot  
> --
>
> Key: FLINK-17923
> URL: https://issues.apache.org/jira/browse/FLINK-17923
> Project: Flink
>  Issue Type: Bug
>  Components: API / Python, Runtime / State Backends
>Affects Versions: 1.10.0, 1.11.0
>Reporter: Dian Fu
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.11.0
>
>
> For the following job:
> {code}
> import logging
> import os
> import shutil
> import sys
> import tempfile
> from pyflink.datastream import StreamExecutionEnvironment
> from pyflink.table import TableConfig, StreamTableEnvironment, DataTypes
> from pyflink.table.udf import udf
> def word_count():
> content = "line Licensed to the Apache Software Foundation ASF under one 
> " \
>   "line or more contributor license agreements See the NOTICE 
> file " \
>   "line distributed with this work for additional information " \
>   "line regarding copyright ownership The ASF licenses this file 
> " \
>   "to you under the Apache License Version the " \
>   "License you may not use this file except in compliance " \
>   "with the License"
> t_config = TableConfig()
> env = StreamExecutionEnvironment.get_execution_environment()
> t_env = StreamTableEnvironment.create(env, t_config)
> # register Results table in table environment
> tmp_dir = tempfile.gettempdir()
> result_path = tmp_dir + '/result'
> if os.path.exists(result_path):
> try:
> if os.path.isfile(result_path):
> os.remove(result_path)
> else:
> shutil.rmtree(result_path)
> except OSError as e:
> logging.error("Error removing directory: %s - %s.", e.filename, 
> e.strerror)
> logging.info("Results directory: %s", result_path)
> sink_ddl = """
> create table Results(
> word VARCHAR,
> `count` BIGINT
> ) with (
> 'connector' = 'blackhole'
> )
> """
> t_env.sql_update(sink_ddl)
> @udf(input_types=[DataTypes.BIGINT()], result_type=DataTypes.BIGINT())
> def inc(count):
> return count + 1
> t_env.register_function("inc", inc)
> elements = [(word, 1) for word in content.split(" ")]
> t_env.from_elements(elements, ["word", "count"]) \
>  .group_by("word") \
>  .select("word, count(1) as count") \
>  .select("word, inc(count) as count") \
>  .insert_into("Results")
> t_env.execute("word_count")
> if __name__ == '__main__':
> logging.basicConfig(stream=sys.stdout, level=logging.INFO, 
> format="%(message)s")
> word_count()
> {code}
> It will throw the following exception if rocksdb state backend is used:
> {code}
> Caused by: org.apache.flink.util.FlinkException: Could not restore keyed 
> state backend for KeyedProcessOperator_c27dcf7b54ef6bfd6cff02ca8870b681_(1/1) 
> from any of the 1 provided restore options.
>   at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
>   at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:317)
>   at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:144)
>   ... 9 more
> Caused by: java.io.IOException: Failed to acquire shared cache resource for 
> RocksDB
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.allocateSharedCachesIfConfigured(RocksDBOperationUtils.java:212)
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:516)
>   at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:301)
>   at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
>   at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
>   ... 11 more
> Caused by: org.apache.flink.runtime.memory.MemoryAllocationException: Could 

[jira] [Comment Edited] (FLINK-17923) It will throw MemoryAllocationException if rocksdb statebackend and Python UDF are used in the same slot

2020-05-28 Thread Dian Fu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17923?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17118478#comment-17118478
 ] 

Dian Fu edited comment on FLINK-17923 at 5/28/20, 9:24 AM:
---

[~xintongsong] Appreciated your suggestions. It makes sense to me.
I want to adjust the PR a bit as following:
- Set Python UDF to use managed memory by default.
- If Python UDF and RocksDB is used together and Python UDF is configured to 
use managed memory, throw exceptions with meaningful suggestions.
- If Python UDF is configured to use off-heap memory and the task off-heap 
memory could not meet the requirement, throw exceptions with meaningful 
suggestions.

In this case, when we support to let Python UDF and RocksDB both use managed 
memory in the future, we could just remove the checks and there will be no 
potential backward compatibility issues.

What do you think?


was (Author: dian.fu):
[~xintongsong] Appreciated your suggestions. It makes sense to me.
I want to adjust the PR a bit as following:
- Set Python UDF to use managed memory by default.
- If Python UDF and RocksDB is used together, throw exceptions with meaningful 
suggestions.
- If Python UDF is configured to use off-heap memory and the task off-heap 
memory could not meet the requirement, throw exceptions with meaningful 
suggestions.

In this case, when we support to let Python UDF and RocksDB both use managed 
memory in the future, we could just remove the checks and there will be no 
potential backward compatibility issues.

What do you think?

> It will throw MemoryAllocationException if rocksdb statebackend and Python 
> UDF are used in the same slot  
> --
>
> Key: FLINK-17923
> URL: https://issues.apache.org/jira/browse/FLINK-17923
> Project: Flink
>  Issue Type: Bug
>  Components: API / Python, Runtime / State Backends
>Affects Versions: 1.10.0, 1.11.0
>Reporter: Dian Fu
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.11.0
>
>
> For the following job:
> {code}
> import logging
> import os
> import shutil
> import sys
> import tempfile
> from pyflink.datastream import StreamExecutionEnvironment
> from pyflink.table import TableConfig, StreamTableEnvironment, DataTypes
> from pyflink.table.udf import udf
> def word_count():
> content = "line Licensed to the Apache Software Foundation ASF under one 
> " \
>   "line or more contributor license agreements See the NOTICE 
> file " \
>   "line distributed with this work for additional information " \
>   "line regarding copyright ownership The ASF licenses this file 
> " \
>   "to you under the Apache License Version the " \
>   "License you may not use this file except in compliance " \
>   "with the License"
> t_config = TableConfig()
> env = StreamExecutionEnvironment.get_execution_environment()
> t_env = StreamTableEnvironment.create(env, t_config)
> # register Results table in table environment
> tmp_dir = tempfile.gettempdir()
> result_path = tmp_dir + '/result'
> if os.path.exists(result_path):
> try:
> if os.path.isfile(result_path):
> os.remove(result_path)
> else:
> shutil.rmtree(result_path)
> except OSError as e:
> logging.error("Error removing directory: %s - %s.", e.filename, 
> e.strerror)
> logging.info("Results directory: %s", result_path)
> sink_ddl = """
> create table Results(
> word VARCHAR,
> `count` BIGINT
> ) with (
> 'connector' = 'blackhole'
> )
> """
> t_env.sql_update(sink_ddl)
> @udf(input_types=[DataTypes.BIGINT()], result_type=DataTypes.BIGINT())
> def inc(count):
> return count + 1
> t_env.register_function("inc", inc)
> elements = [(word, 1) for word in content.split(" ")]
> t_env.from_elements(elements, ["word", "count"]) \
>  .group_by("word") \
>  .select("word, count(1) as count") \
>  .select("word, inc(count) as count") \
>  .insert_into("Results")
> t_env.execute("word_count")
> if __name__ == '__main__':
> logging.basicConfig(stream=sys.stdout, level=logging.INFO, 
> format="%(message)s")
> word_count()
> {code}
> It will throw the following exception if rocksdb state backend is used:
> {code}
> Caused by: org.apache.flink.util.FlinkException: Could not restore keyed 
> state backend for KeyedProcessOperator_c27dcf7b54ef6bfd6cff02ca8870b681_(1/1) 
> from any of the 1 provided restore options.
>   at 
> 

[jira] [Comment Edited] (FLINK-17923) It will throw MemoryAllocationException if rocksdb statebackend and Python UDF are used in the same slot

2020-05-28 Thread Dian Fu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17923?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17118478#comment-17118478
 ] 

Dian Fu edited comment on FLINK-17923 at 5/28/20, 9:16 AM:
---

[~xintongsong] Appreciated your suggestions. It makes sense to me.
I want to adjust the PR a bit as following:
- Set Python UDF to use managed memory by default.
- If Python UDF and RocksDB is used together, throw exceptions with meaningful 
suggestions.
- If Python UDF is configured to use off-heap memory and the task off-heap 
memory could not meet the requirement, throw exceptions with meaningful 
suggestions.

In this case, when we support to let Python UDF and RocksDB both use managed 
memory in the future, we could just remove the checks and there will be no 
potential backward compatibility issues.

What do you think?


was (Author: dian.fu):
[~xintongsong] Appreciated your suggestions. It makes sense to me.
I want to adjust it a bit as following:
- Set Python UDF to use managed memory by default.
- If Python UDF and RocksDB is used together, throw exceptions with meaningful 
suggestions.
- If Python UDF is configured to use off-heap memory and the task off-heap 
memory could not meet the requirement, throw exceptions with meaningful 
suggestions.

In this case, when we support to let Python UDF and RocksDB both use managed 
memory in the future, we could just remove the checks and there will be no 
potential backward compatibility issues.

What do you think?

> It will throw MemoryAllocationException if rocksdb statebackend and Python 
> UDF are used in the same slot  
> --
>
> Key: FLINK-17923
> URL: https://issues.apache.org/jira/browse/FLINK-17923
> Project: Flink
>  Issue Type: Bug
>  Components: API / Python, Runtime / State Backends
>Affects Versions: 1.10.0, 1.11.0
>Reporter: Dian Fu
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.11.0
>
>
> For the following job:
> {code}
> import logging
> import os
> import shutil
> import sys
> import tempfile
> from pyflink.datastream import StreamExecutionEnvironment
> from pyflink.table import TableConfig, StreamTableEnvironment, DataTypes
> from pyflink.table.udf import udf
> def word_count():
> content = "line Licensed to the Apache Software Foundation ASF under one 
> " \
>   "line or more contributor license agreements See the NOTICE 
> file " \
>   "line distributed with this work for additional information " \
>   "line regarding copyright ownership The ASF licenses this file 
> " \
>   "to you under the Apache License Version the " \
>   "License you may not use this file except in compliance " \
>   "with the License"
> t_config = TableConfig()
> env = StreamExecutionEnvironment.get_execution_environment()
> t_env = StreamTableEnvironment.create(env, t_config)
> # register Results table in table environment
> tmp_dir = tempfile.gettempdir()
> result_path = tmp_dir + '/result'
> if os.path.exists(result_path):
> try:
> if os.path.isfile(result_path):
> os.remove(result_path)
> else:
> shutil.rmtree(result_path)
> except OSError as e:
> logging.error("Error removing directory: %s - %s.", e.filename, 
> e.strerror)
> logging.info("Results directory: %s", result_path)
> sink_ddl = """
> create table Results(
> word VARCHAR,
> `count` BIGINT
> ) with (
> 'connector' = 'blackhole'
> )
> """
> t_env.sql_update(sink_ddl)
> @udf(input_types=[DataTypes.BIGINT()], result_type=DataTypes.BIGINT())
> def inc(count):
> return count + 1
> t_env.register_function("inc", inc)
> elements = [(word, 1) for word in content.split(" ")]
> t_env.from_elements(elements, ["word", "count"]) \
>  .group_by("word") \
>  .select("word, count(1) as count") \
>  .select("word, inc(count) as count") \
>  .insert_into("Results")
> t_env.execute("word_count")
> if __name__ == '__main__':
> logging.basicConfig(stream=sys.stdout, level=logging.INFO, 
> format="%(message)s")
> word_count()
> {code}
> It will throw the following exception if rocksdb state backend is used:
> {code}
> Caused by: org.apache.flink.util.FlinkException: Could not restore keyed 
> state backend for KeyedProcessOperator_c27dcf7b54ef6bfd6cff02ca8870b681_(1/1) 
> from any of the 1 provided restore options.
>   at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
>   at 
> 

[jira] [Comment Edited] (FLINK-17923) It will throw MemoryAllocationException if rocksdb statebackend and Python UDF are used in the same slot

2020-05-28 Thread Xintong Song (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17923?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17118460#comment-17118460
 ] 

Xintong Song edited comment on FLINK-17923 at 5/28/20, 8:48 AM:


[~dian.fu], [~sunjincheng121]

I'm not familiar with the flink-python code base, thus I cannot speak much to 
the PR.

My only concern is regarding auto-magically setting task.off-heap.size for 
users. I wonder whether we are trying to be a bit over-smart. It might save 
some user efforts in many cases, but could also make things hard to understand 
in other cases. It is one of the main motivations for FLIP-49 to make sure all 
the memory calculations happen at one place, without such kind of implicit 
logics.

I would suggest not to override the task.off-heap.size configuration. Instead, 
we can suggest how to set this configuration in both memory configuration and 
python udf docs. This is similar to RocksDBStateBackend, when managed memory is 
disabled users need to explicitly make sure enough native memory is reserved 
for RocksDB.

WDYT?


was (Author: xintongsong):
[~dian.fu], [~sunjincheng121]

I'm not familiar with the flink-python code base, thus I cannot speak much to 
the PR.

My only concern is regarding auto-magically setting task.off-heap.size for 
users. I wonder whether we are trying to be a bit over-smart. It might safe 
some user efforts in many cases, but could also make things hard to understand 
in other cases. It is one of the main motivations for FLIP-49 to make sure all 
the memory calculations happen at one place, without such kind of implicit 
logics.

I would suggest not to override the task.off-heap.size configuration. Instead, 
we can suggest how to set this configuration in both memory configuration and 
python udf docs. This is similar to RocksDBStateBackend, when managed memory is 
disabled users need to explicitly make sure enough native memory is reserved 
for RocksDB.

WDYT?

> It will throw MemoryAllocationException if rocksdb statebackend and Python 
> UDF are used in the same slot  
> --
>
> Key: FLINK-17923
> URL: https://issues.apache.org/jira/browse/FLINK-17923
> Project: Flink
>  Issue Type: Bug
>  Components: API / Python, Runtime / State Backends
>Affects Versions: 1.10.0, 1.11.0
>Reporter: Dian Fu
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.11.0
>
>
> For the following job:
> {code}
> import logging
> import os
> import shutil
> import sys
> import tempfile
> from pyflink.datastream import StreamExecutionEnvironment
> from pyflink.table import TableConfig, StreamTableEnvironment, DataTypes
> from pyflink.table.udf import udf
> def word_count():
> content = "line Licensed to the Apache Software Foundation ASF under one 
> " \
>   "line or more contributor license agreements See the NOTICE 
> file " \
>   "line distributed with this work for additional information " \
>   "line regarding copyright ownership The ASF licenses this file 
> " \
>   "to you under the Apache License Version the " \
>   "License you may not use this file except in compliance " \
>   "with the License"
> t_config = TableConfig()
> env = StreamExecutionEnvironment.get_execution_environment()
> t_env = StreamTableEnvironment.create(env, t_config)
> # register Results table in table environment
> tmp_dir = tempfile.gettempdir()
> result_path = tmp_dir + '/result'
> if os.path.exists(result_path):
> try:
> if os.path.isfile(result_path):
> os.remove(result_path)
> else:
> shutil.rmtree(result_path)
> except OSError as e:
> logging.error("Error removing directory: %s - %s.", e.filename, 
> e.strerror)
> logging.info("Results directory: %s", result_path)
> sink_ddl = """
> create table Results(
> word VARCHAR,
> `count` BIGINT
> ) with (
> 'connector' = 'blackhole'
> )
> """
> t_env.sql_update(sink_ddl)
> @udf(input_types=[DataTypes.BIGINT()], result_type=DataTypes.BIGINT())
> def inc(count):
> return count + 1
> t_env.register_function("inc", inc)
> elements = [(word, 1) for word in content.split(" ")]
> t_env.from_elements(elements, ["word", "count"]) \
>  .group_by("word") \
>  .select("word, count(1) as count") \
>  .select("word, inc(count) as count") \
>  .insert_into("Results")
> t_env.execute("word_count")
> if __name__ == '__main__':
> logging.basicConfig(stream=sys.stdout, level=logging.INFO, 
> format="%(message)s")
> word_count()

[jira] [Comment Edited] (FLINK-17923) It will throw MemoryAllocationException if rocksdb statebackend and Python UDF are used in the same slot

2020-05-28 Thread Xintong Song (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17923?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17118460#comment-17118460
 ] 

Xintong Song edited comment on FLINK-17923 at 5/28/20, 8:47 AM:


[~dian.fu], [~sunjincheng121]

I'm not familiar with the flink-python code base, thus I cannot speak much to 
the PR.

My only concern is regarding auto-magically setting task.off-heap.size for 
users. I wonder whether we are trying to be a bit over-smart. It might safe 
some user efforts in many cases, but could also make things hard to understand 
in other cases. It is one of the main motivations for FLIP-49 to make sure all 
the memory calculations happen at one place, without such kind of implicit 
logics.

I would suggest not to override the task.off-heap.size configuration. Instead, 
we can suggest how to set this configuration in both memory configuration and 
python udf docs. This is similar to RocksDBStateBackend, when managed memory is 
disabled users need to explicitly make sure enough native memory is reserved 
for RocksDB.

WDYT?


was (Author: xintongsong):
[~dian.fu][~sunjincheng121]

I'm not familiar with the flink-python code base, thus I cannot speak much to 
the PR.

My only concern is regarding auto-magically setting task.off-heap.size for 
users. I wonder whether we are trying to be a bit over-smart. It might safe 
some user efforts in many cases, but could also make things hard to understand 
in other cases. It is one of the main motivations for FLIP-49 to make sure all 
the memory calculations happen at one place, without such kind of implicit 
logics.

I would suggest not to override the task.off-heap.size configuration. Instead, 
we can suggest how to set this configuration in both memory configuration and 
python udf docs. This is similar to RocksDBStateBackend, when managed memory is 
disabled users need to explicitly make sure enough native memory is reserved 
for RocksDB.

WDYT?

> It will throw MemoryAllocationException if rocksdb statebackend and Python 
> UDF are used in the same slot  
> --
>
> Key: FLINK-17923
> URL: https://issues.apache.org/jira/browse/FLINK-17923
> Project: Flink
>  Issue Type: Bug
>  Components: API / Python, Runtime / State Backends
>Affects Versions: 1.10.0, 1.11.0
>Reporter: Dian Fu
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.11.0
>
>
> For the following job:
> {code}
> import logging
> import os
> import shutil
> import sys
> import tempfile
> from pyflink.datastream import StreamExecutionEnvironment
> from pyflink.table import TableConfig, StreamTableEnvironment, DataTypes
> from pyflink.table.udf import udf
> def word_count():
> content = "line Licensed to the Apache Software Foundation ASF under one 
> " \
>   "line or more contributor license agreements See the NOTICE 
> file " \
>   "line distributed with this work for additional information " \
>   "line regarding copyright ownership The ASF licenses this file 
> " \
>   "to you under the Apache License Version the " \
>   "License you may not use this file except in compliance " \
>   "with the License"
> t_config = TableConfig()
> env = StreamExecutionEnvironment.get_execution_environment()
> t_env = StreamTableEnvironment.create(env, t_config)
> # register Results table in table environment
> tmp_dir = tempfile.gettempdir()
> result_path = tmp_dir + '/result'
> if os.path.exists(result_path):
> try:
> if os.path.isfile(result_path):
> os.remove(result_path)
> else:
> shutil.rmtree(result_path)
> except OSError as e:
> logging.error("Error removing directory: %s - %s.", e.filename, 
> e.strerror)
> logging.info("Results directory: %s", result_path)
> sink_ddl = """
> create table Results(
> word VARCHAR,
> `count` BIGINT
> ) with (
> 'connector' = 'blackhole'
> )
> """
> t_env.sql_update(sink_ddl)
> @udf(input_types=[DataTypes.BIGINT()], result_type=DataTypes.BIGINT())
> def inc(count):
> return count + 1
> t_env.register_function("inc", inc)
> elements = [(word, 1) for word in content.split(" ")]
> t_env.from_elements(elements, ["word", "count"]) \
>  .group_by("word") \
>  .select("word, count(1) as count") \
>  .select("word, inc(count) as count") \
>  .insert_into("Results")
> t_env.execute("word_count")
> if __name__ == '__main__':
> logging.basicConfig(stream=sys.stdout, level=logging.INFO, 
> format="%(message)s")
> word_count()
> 

[jira] [Comment Edited] (FLINK-17923) It will throw MemoryAllocationException if rocksdb statebackend and Python UDF are used in the same slot

2020-05-26 Thread sunjincheng (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17923?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17116772#comment-17116772
 ] 

sunjincheng edited comment on FLINK-17923 at 5/26/20, 2:20 PM:
---

At present, users can't start jobs as long as they use rocksDB + Python UDF. 
The core scenario of our Flink is stream computing. In stream computing, as 
long as it's an analytical application, it needs to use AGG. In this case, if 
it's a Python User, the demand for Python UDF is our core function of 
1.10/1.11. At present, we have china users waiting to use this feature.

We discussed the details of using option 3 today. Later [~zhuzh] will share the 
design document with you. We can discuss the design first and evaluate whether 
put this fixing to 1.11 is reasonable.


was (Author: sunjincheng121):
At present, users can't start jobs as long as they use rocksDB + Python UDF. 
The core scenario of our Flink is stream computing. In stream computing, as 
long as it's an analytical application, it needs to use AGG. In this case, if 
it's a Python User, the demand for Python UDF is our core function of 
1.10/1.11. At present, we have china users waiting to use this feature.

We discussed the details of using scheme 3 today. Later [~zhuzh] will share the 
design document with you. We can discuss the design first and evaluate whether 
put this fixing to 1.11 is reasonable.

> It will throw MemoryAllocationException if rocksdb statebackend and Python 
> UDF are used in the same slot  
> --
>
> Key: FLINK-17923
> URL: https://issues.apache.org/jira/browse/FLINK-17923
> Project: Flink
>  Issue Type: Bug
>  Components: API / Python, Runtime / State Backends
>Affects Versions: 1.10.0, 1.11.0
>Reporter: Dian Fu
>Priority: Blocker
> Fix For: 1.11.0
>
>
> For the following job:
> {code}
> import logging
> import os
> import shutil
> import sys
> import tempfile
> from pyflink.datastream import StreamExecutionEnvironment
> from pyflink.table import TableConfig, StreamTableEnvironment, DataTypes
> from pyflink.table.udf import udf
> def word_count():
> content = "line Licensed to the Apache Software Foundation ASF under one 
> " \
>   "line or more contributor license agreements See the NOTICE 
> file " \
>   "line distributed with this work for additional information " \
>   "line regarding copyright ownership The ASF licenses this file 
> " \
>   "to you under the Apache License Version the " \
>   "License you may not use this file except in compliance " \
>   "with the License"
> t_config = TableConfig()
> env = StreamExecutionEnvironment.get_execution_environment()
> t_env = StreamTableEnvironment.create(env, t_config)
> # register Results table in table environment
> tmp_dir = tempfile.gettempdir()
> result_path = tmp_dir + '/result'
> if os.path.exists(result_path):
> try:
> if os.path.isfile(result_path):
> os.remove(result_path)
> else:
> shutil.rmtree(result_path)
> except OSError as e:
> logging.error("Error removing directory: %s - %s.", e.filename, 
> e.strerror)
> logging.info("Results directory: %s", result_path)
> sink_ddl = """
> create table Results(
> word VARCHAR,
> `count` BIGINT
> ) with (
> 'connector' = 'blackhole'
> )
> """
> t_env.sql_update(sink_ddl)
> @udf(input_types=[DataTypes.BIGINT()], result_type=DataTypes.BIGINT())
> def inc(count):
> return count + 1
> t_env.register_function("inc", inc)
> elements = [(word, 1) for word in content.split(" ")]
> t_env.from_elements(elements, ["word", "count"]) \
>  .group_by("word") \
>  .select("word, count(1) as count") \
>  .select("word, inc(count) as count") \
>  .insert_into("Results")
> t_env.execute("word_count")
> if __name__ == '__main__':
> logging.basicConfig(stream=sys.stdout, level=logging.INFO, 
> format="%(message)s")
> word_count()
> {code}
> It will throw the following exception if rocksdb state backend is used:
> {code}
> Caused by: org.apache.flink.util.FlinkException: Could not restore keyed 
> state backend for KeyedProcessOperator_c27dcf7b54ef6bfd6cff02ca8870b681_(1/1) 
> from any of the 1 provided restore options.
>   at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
>   at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:317)
>   at 
> 

[jira] [Comment Edited] (FLINK-17923) It will throw MemoryAllocationException if rocksdb statebackend and Python UDF are used in the same slot

2020-05-26 Thread sunjincheng (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17923?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17116614#comment-17116614
 ] 

sunjincheng edited comment on FLINK-17923 at 5/26/20, 10:40 AM:


It's a pity that we do not find this issue earlier(We also need to improve the 
e2e test for PyFlink after fixing this issue). This is a very critical problem 
for PyFlink as it means that Python UDF could not be used in most streaming 
jobs(with state).So I think we should address this problem in 1.11. 

We( [~zhuzh] [~xintongsong] [~yunta] [~dian.fu] and me) have a further 
discussion about this problem and will update the status later.

Appreciate if you can pay attention to this [~pnowojski]  and [~zjwang] .  


was (Author: sunjincheng121):
It's a pity that we do not find this issue earlier(We also need to improve the 
e2e test for PyFlink after fixing this issue). This is a very critical problem 
for PyFlink as it means that Python UDF could not be used in most streaming 
jobs(with state).So I think we should address this problem in 1.11. 

 

We( [~zhuzh] [~xintongsong] [~yunta] [~dian.fu] and me) have a further 
discussion about this problem and will update the status later.

 

Appreciate if you can pay attention to this [~pnowojski]  and [~zjwang] . 

 

 

> It will throw MemoryAllocationException if rocksdb statebackend and Python 
> UDF are used in the same slot  
> --
>
> Key: FLINK-17923
> URL: https://issues.apache.org/jira/browse/FLINK-17923
> Project: Flink
>  Issue Type: Bug
>  Components: API / Python, Runtime / State Backends
>Affects Versions: 1.10.0, 1.11.0
>Reporter: Dian Fu
>Priority: Blocker
> Fix For: 1.11.0
>
>
> For the following job:
> {code}
> import logging
> import os
> import shutil
> import sys
> import tempfile
> from pyflink.datastream import StreamExecutionEnvironment
> from pyflink.table import TableConfig, StreamTableEnvironment, DataTypes
> from pyflink.table.udf import udf
> def word_count():
> content = "line Licensed to the Apache Software Foundation ASF under one 
> " \
>   "line or more contributor license agreements See the NOTICE 
> file " \
>   "line distributed with this work for additional information " \
>   "line regarding copyright ownership The ASF licenses this file 
> " \
>   "to you under the Apache License Version the " \
>   "License you may not use this file except in compliance " \
>   "with the License"
> t_config = TableConfig()
> env = StreamExecutionEnvironment.get_execution_environment()
> t_env = StreamTableEnvironment.create(env, t_config)
> # register Results table in table environment
> tmp_dir = tempfile.gettempdir()
> result_path = tmp_dir + '/result'
> if os.path.exists(result_path):
> try:
> if os.path.isfile(result_path):
> os.remove(result_path)
> else:
> shutil.rmtree(result_path)
> except OSError as e:
> logging.error("Error removing directory: %s - %s.", e.filename, 
> e.strerror)
> logging.info("Results directory: %s", result_path)
> sink_ddl = """
> create table Results(
> word VARCHAR,
> `count` BIGINT
> ) with (
> 'connector' = 'blackhole'
> )
> """
> t_env.sql_update(sink_ddl)
> @udf(input_types=[DataTypes.BIGINT()], result_type=DataTypes.BIGINT())
> def inc(count):
> return count + 1
> t_env.register_function("inc", inc)
> elements = [(word, 1) for word in content.split(" ")]
> t_env.from_elements(elements, ["word", "count"]) \
>  .group_by("word") \
>  .select("word, count(1) as count") \
>  .select("word, inc(count) as count") \
>  .insert_into("Results")
> t_env.execute("word_count")
> if __name__ == '__main__':
> logging.basicConfig(stream=sys.stdout, level=logging.INFO, 
> format="%(message)s")
> word_count()
> {code}
> It will throw the following exception if rocksdb state backend is used:
> {code}
> Caused by: org.apache.flink.util.FlinkException: Could not restore keyed 
> state backend for KeyedProcessOperator_c27dcf7b54ef6bfd6cff02ca8870b681_(1/1) 
> from any of the 1 provided restore options.
>   at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
>   at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:317)
>   at 
>