[ https://issues.apache.org/jira/browse/FLINK-26462?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Huang Xingbo updated FLINK-26462: --------------------------------- Description: h1. Setup * Build flink source code and compile source code {code:bash} $ cd {flink-source-code} $ mvn clean install -DskipTests {code} * Prepare a Python Virtual Environment {code:bash} $ cd flink-python/dev $ ./lint-python.sh -s basic $ source .conda/bin/activate {code} * Install PyFlink from source code. For more details, you can refer to the [doc|https://nightlies.apache.org/flink/flink-docs-master/docs/flinkdev/building/#build-pyflink] {code:bash} $ cd flink-python/apache-flink-libraries $ python setup.py sdist $ pip install dist/*.tar.gz $ cd .. $ pip install -r dev/dev-requirements.txt $ python setpy.py sdist $ pip install dist/*.tar.gz {code} h1. Test * Write a python udf job named demo.py in process mode {code:python} from pyflink.table.table_environment import TableEnvironment from pyflink.table.environment_settings import EnvironmentSettings from pyflink.table.udf import ScalarFunction, udf from pyflink.table import DataTypes, expressions as expr class SubtractOne(ScalarFunction): def eval(self, i): return i - 1 @udf(input_types=[DataTypes.BIGINT(), DataTypes.BIGINT()], result_type=DataTypes.BIGINT()) def add(i, j): return i + j def main(): t_env = TableEnvironment.create(EnvironmentSettings.in_batch_mode()) # process mode ! t_env.get_config().get_configuration().set_string("python.execution-mode", "process") # optinal values t_env.get_config().get_configuration().set_string("parallelism.default", "2") add_one = udf(lambda i: i + 1, result_type=DataTypes.BIGINT()) subtract_one = udf(SubtractOne(), result_type=DataTypes.BIGINT()) t = t_env.from_elements([(1, 2, 1), (2, 5, 2), (3, 1, 3)], ['a', 'b', 'c']) result = t.select(add(add_one(t.a), subtract_one(t.b)), t.c, expr.lit(1)) print(result.to_pandas()) if __name__ == '__main__': main() {code} * run the python udf job and watch the result {code:bash} $ python demo.py _c0 c _c2 0 3 1 1 1 7 2 1 2 4 3 1 {code} * change the python udf job to multi-thread mode {code:python} from pyflink.table.table_environment import TableEnvironment from pyflink.table.environment_settings import EnvironmentSettings from pyflink.table.udf import ScalarFunction, udf from pyflink.table import DataTypes, expressions as expr class SubtractOne(ScalarFunction): def eval(self, i): return i - 1 @udf(input_types=[DataTypes.BIGINT(), DataTypes.BIGINT()], result_type=DataTypes.BIGINT()) def add(i, j): return i + j def main(): t_env = TableEnvironment.create(EnvironmentSettings.in_batch_mode()) # multi-thread mode t_env.get_config().get_configuration().set_string("python.execution-mode", "multi-thread") t_env.get_config().get_configuration().set_string("parallelism.default", "2") add_one = udf(lambda i: i + 1, result_type=DataTypes.BIGINT()) subtract_one = udf(SubtractOne(), result_type=DataTypes.BIGINT()) t = t_env.from_elements([(1, 2, 1), (2, 5, 2), (3, 1, 3)], ['a', 'b', 'c']) result = t.select(add(add_one(t.a), subtract_one(t.b)), t.c, expr.lit(1)) print(result.to_pandas()) if __name__ == '__main__': main() {code} * run the python udf job and watch the result {code:bash} $ python demo.py _c0 c _c2 0 3 1 1 1 7 2 1 2 4 3 1 {code} was: h1. Setup * Build flink source code and compile source code {code:bash} $ cd {flink-source-code} $ mvn clean install -DskipTests {code} * Prepare a Python Virtual Environment {code:bash} $ cd flink-python/dev $ ./lint-python.sh -s basic $ source .conda/bin/activate {code} * Install PyFlink from source code. For more details, you can refer to the [doc|https://nightlies.apache.org/flink/flink-docs-master/docs/flinkdev/building/#build-pyflink] {code:bash} $ cd flink-python/apache-flink-libraries $ python setup.py sdist $ pip install dist/*.tar.gz $ cd .. $ pip install -r dev/dev-requirements.txt $ python setup.py $ python setpy.py sdist $ pip install dist/*.tar.gz {code} h1. Test * Write a python udf job named demo.py in process mode {code:python} from pyflink.table.table_environment import TableEnvironment from pyflink.table.environment_settings import EnvironmentSettings from pyflink.table.udf import ScalarFunction, udf from pyflink.table import DataTypes, expressions as expr class SubtractOne(ScalarFunction): def eval(self, i): return i - 1 @udf(input_types=[DataTypes.BIGINT(), DataTypes.BIGINT()], result_type=DataTypes.BIGINT()) def add(i, j): return i + j def main(): t_env = TableEnvironment.create(EnvironmentSettings.in_batch_mode()) # process mode ! t_env.get_config().get_configuration().set_string("python.execution-mode", "process") # optinal values t_env.get_config().get_configuration().set_string("parallelism.default", "2") add_one = udf(lambda i: i + 1, result_type=DataTypes.BIGINT()) subtract_one = udf(SubtractOne(), result_type=DataTypes.BIGINT()) t = t_env.from_elements([(1, 2, 1), (2, 5, 2), (3, 1, 3)], ['a', 'b', 'c']) result = t.select(add(add_one(t.a), subtract_one(t.b)), t.c, expr.lit(1)) print(result.to_pandas()) if __name__ == '__main__': main() {code} * run the python udf job and watch the result {code:bash} $ python demo.py _c0 c _c2 0 3 1 1 1 7 2 1 2 4 3 1 {code} * change the python udf job to multi-thread mode {code:python} from pyflink.table.table_environment import TableEnvironment from pyflink.table.environment_settings import EnvironmentSettings from pyflink.table.udf import ScalarFunction, udf from pyflink.table import DataTypes, expressions as expr class SubtractOne(ScalarFunction): def eval(self, i): return i - 1 @udf(input_types=[DataTypes.BIGINT(), DataTypes.BIGINT()], result_type=DataTypes.BIGINT()) def add(i, j): return i + j def main(): t_env = TableEnvironment.create(EnvironmentSettings.in_batch_mode()) # multi-thread mode t_env.get_config().get_configuration().set_string("python.execution-mode", "multi-thread") t_env.get_config().get_configuration().set_string("parallelism.default", "2") add_one = udf(lambda i: i + 1, result_type=DataTypes.BIGINT()) subtract_one = udf(SubtractOne(), result_type=DataTypes.BIGINT()) t = t_env.from_elements([(1, 2, 1), (2, 5, 2), (3, 1, 3)], ['a', 'b', 'c']) result = t.select(add(add_one(t.a), subtract_one(t.b)), t.c, expr.lit(1)) print(result.to_pandas()) if __name__ == '__main__': main() {code} * run the python udf job and watch the result {code:bash} $ python demo.py _c0 c _c2 0 3 1 1 1 7 2 1 2 4 3 1 {code} > Release Testing: Running Python UDF in different Execution Mode > --------------------------------------------------------------- > > Key: FLINK-26462 > URL: https://issues.apache.org/jira/browse/FLINK-26462 > Project: Flink > Issue Type: Improvement > Components: API / Python > Affects Versions: 1.15.0 > Reporter: Huang Xingbo > Priority: Blocker > Labels: release-testing > Fix For: 1.15.0 > > > h1. Setup > * Build flink source code and compile source code > {code:bash} > $ cd {flink-source-code} > $ mvn clean install -DskipTests > {code} > * Prepare a Python Virtual Environment > {code:bash} > $ cd flink-python/dev > $ ./lint-python.sh -s basic > $ source .conda/bin/activate > {code} > * Install PyFlink from source code. For more details, you can refer to the > [doc|https://nightlies.apache.org/flink/flink-docs-master/docs/flinkdev/building/#build-pyflink] > {code:bash} > $ cd flink-python/apache-flink-libraries > $ python setup.py sdist > $ pip install dist/*.tar.gz > $ cd .. > $ pip install -r dev/dev-requirements.txt > $ python setpy.py sdist > $ pip install dist/*.tar.gz > {code} > h1. Test > * Write a python udf job named demo.py in process mode > {code:python} > from pyflink.table.table_environment import TableEnvironment > from pyflink.table.environment_settings import EnvironmentSettings > from pyflink.table.udf import ScalarFunction, udf > from pyflink.table import DataTypes, expressions as expr > class SubtractOne(ScalarFunction): > def eval(self, i): > return i - 1 > @udf(input_types=[DataTypes.BIGINT(), DataTypes.BIGINT()], > result_type=DataTypes.BIGINT()) > def add(i, j): > return i + j > def main(): > t_env = TableEnvironment.create(EnvironmentSettings.in_batch_mode()) > # process mode ! > > t_env.get_config().get_configuration().set_string("python.execution-mode", > "process") > # optinal values > t_env.get_config().get_configuration().set_string("parallelism.default", > "2") > add_one = udf(lambda i: i + 1, result_type=DataTypes.BIGINT()) > subtract_one = udf(SubtractOne(), result_type=DataTypes.BIGINT()) > t = t_env.from_elements([(1, 2, 1), (2, 5, 2), (3, 1, 3)], ['a', 'b', > 'c']) > result = t.select(add(add_one(t.a), subtract_one(t.b)), t.c, expr.lit(1)) > print(result.to_pandas()) > if __name__ == '__main__': > main() > {code} > * run the python udf job and watch the result > {code:bash} > $ python demo.py > _c0 c _c2 > 0 3 1 1 > 1 7 2 1 > 2 4 3 1 > {code} > * change the python udf job to multi-thread mode > {code:python} > from pyflink.table.table_environment import TableEnvironment > from pyflink.table.environment_settings import EnvironmentSettings > from pyflink.table.udf import ScalarFunction, udf > from pyflink.table import DataTypes, expressions as expr > class SubtractOne(ScalarFunction): > def eval(self, i): > return i - 1 > @udf(input_types=[DataTypes.BIGINT(), DataTypes.BIGINT()], > result_type=DataTypes.BIGINT()) > def add(i, j): > return i + j > def main(): > t_env = TableEnvironment.create(EnvironmentSettings.in_batch_mode()) > # multi-thread mode > > t_env.get_config().get_configuration().set_string("python.execution-mode", > "multi-thread") > t_env.get_config().get_configuration().set_string("parallelism.default", > "2") > add_one = udf(lambda i: i + 1, result_type=DataTypes.BIGINT()) > subtract_one = udf(SubtractOne(), result_type=DataTypes.BIGINT()) > t = t_env.from_elements([(1, 2, 1), (2, 5, 2), (3, 1, 3)], ['a', 'b', > 'c']) > result = t.select(add(add_one(t.a), subtract_one(t.b)), t.c, expr.lit(1)) > print(result.to_pandas()) > if __name__ == '__main__': > main() > {code} > * run the python udf job and watch the result > {code:bash} > $ python demo.py > _c0 c _c2 > 0 3 1 1 > 1 7 2 1 > 2 4 3 1 > {code} -- This message was sent by Atlassian Jira (v8.20.1#820001)