[ 
https://issues.apache.org/jira/browse/FLINK-26462?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Huang Xingbo updated FLINK-26462:
---------------------------------
    Description: 
h1. Setup

* Build flink source code and compile source code
{code:bash}
$ cd {flink-source-code}
$ mvn clean install -DskipTests
{code}

* Prepare a Python Virtual Environment

{code:bash}
$ cd flink-python/dev
$ ./lint-python.sh -s basic
$ source .conda/bin/activate
{code}

* Install PyFlink from source code. For more details, you can refer to the 
[doc|https://nightlies.apache.org/flink/flink-docs-master/docs/flinkdev/building/#build-pyflink]
{code:bash}
$ cd flink-python/apache-flink-libraries
$ python setup.py sdist
$ pip install dist/*.tar.gz
$ cd ..
$ pip install -r dev/dev-requirements.txt
$ python setpy.py sdist
$ pip install dist/*.tar.gz
{code}

h1. Test
* Write a python udf job named demo.py in process mode

{code:python}
from pyflink.table.table_environment import TableEnvironment
from pyflink.table.environment_settings import EnvironmentSettings
from pyflink.table.udf import ScalarFunction, udf
from pyflink.table import DataTypes, expressions as expr

class SubtractOne(ScalarFunction):
    def eval(self, i):
        return i - 1

@udf(input_types=[DataTypes.BIGINT(), DataTypes.BIGINT()], 
result_type=DataTypes.BIGINT())
def add(i, j):
    return i + j

def main():
    t_env = TableEnvironment.create(EnvironmentSettings.in_batch_mode())
    # process mode !
    t_env.get_config().get_configuration().set_string("python.execution-mode", 
"process")
    # optinal values
    t_env.get_config().get_configuration().set_string("parallelism.default", 
"2")
    add_one = udf(lambda i: i + 1, result_type=DataTypes.BIGINT())
    subtract_one = udf(SubtractOne(), result_type=DataTypes.BIGINT())

    t = t_env.from_elements([(1, 2, 1), (2, 5, 2), (3, 1, 3)], ['a', 'b', 'c'])
    result = t.select(add(add_one(t.a), subtract_one(t.b)), t.c, expr.lit(1))
    print(result.to_pandas())

if __name__ == '__main__':
    main()
{code}

* run the python udf job and watch the result
{code:bash}
$ python demo.py
   _c0  c  _c2
0    3  1    1
1    7  2    1
2    4  3    1
{code}

* change the python udf job to multi-thread mode

{code:python}
from pyflink.table.table_environment import TableEnvironment
from pyflink.table.environment_settings import EnvironmentSettings
from pyflink.table.udf import ScalarFunction, udf
from pyflink.table import DataTypes, expressions as expr

class SubtractOne(ScalarFunction):
    def eval(self, i):
        return i - 1

@udf(input_types=[DataTypes.BIGINT(), DataTypes.BIGINT()], 
result_type=DataTypes.BIGINT())
def add(i, j):
    return i + j

def main():
    t_env = TableEnvironment.create(EnvironmentSettings.in_batch_mode())
    # multi-thread mode
    t_env.get_config().get_configuration().set_string("python.execution-mode", 
"multi-thread")
    t_env.get_config().get_configuration().set_string("parallelism.default", 
"2")
    add_one = udf(lambda i: i + 1, result_type=DataTypes.BIGINT())
    subtract_one = udf(SubtractOne(), result_type=DataTypes.BIGINT())

    t = t_env.from_elements([(1, 2, 1), (2, 5, 2), (3, 1, 3)], ['a', 'b', 'c'])
    result = t.select(add(add_one(t.a), subtract_one(t.b)), t.c, expr.lit(1))
    print(result.to_pandas())

if __name__ == '__main__':
    main()
{code}

* run the python udf job and watch the result
{code:bash}
$ python demo.py
   _c0  c  _c2
0    3  1    1
1    7  2    1
2    4  3    1
{code}

  was:
h1. Setup

* Build flink source code and compile source code
{code:bash}
$ cd {flink-source-code}
$ mvn clean install -DskipTests
{code}

* Prepare a Python Virtual Environment

{code:bash}
$ cd flink-python/dev
$ ./lint-python.sh -s basic
$ source .conda/bin/activate
{code}

* Install PyFlink from source code. For more details, you can refer to the 
[doc|https://nightlies.apache.org/flink/flink-docs-master/docs/flinkdev/building/#build-pyflink]
{code:bash}
$ cd flink-python/apache-flink-libraries
$ python setup.py sdist
$ pip install dist/*.tar.gz
$ cd ..
$ pip install -r dev/dev-requirements.txt
$ python setup.py
$ python setpy.py sdist
$ pip install dist/*.tar.gz
{code}

h1. Test
* Write a python udf job named demo.py in process mode

{code:python}
from pyflink.table.table_environment import TableEnvironment
from pyflink.table.environment_settings import EnvironmentSettings
from pyflink.table.udf import ScalarFunction, udf
from pyflink.table import DataTypes, expressions as expr

class SubtractOne(ScalarFunction):
    def eval(self, i):
        return i - 1

@udf(input_types=[DataTypes.BIGINT(), DataTypes.BIGINT()], 
result_type=DataTypes.BIGINT())
def add(i, j):
    return i + j

def main():
    t_env = TableEnvironment.create(EnvironmentSettings.in_batch_mode())
    # process mode !
    t_env.get_config().get_configuration().set_string("python.execution-mode", 
"process")
    # optinal values
    t_env.get_config().get_configuration().set_string("parallelism.default", 
"2")
    add_one = udf(lambda i: i + 1, result_type=DataTypes.BIGINT())
    subtract_one = udf(SubtractOne(), result_type=DataTypes.BIGINT())

    t = t_env.from_elements([(1, 2, 1), (2, 5, 2), (3, 1, 3)], ['a', 'b', 'c'])
    result = t.select(add(add_one(t.a), subtract_one(t.b)), t.c, expr.lit(1))
    print(result.to_pandas())

if __name__ == '__main__':
    main()
{code}

* run the python udf job and watch the result
{code:bash}
$ python demo.py
   _c0  c  _c2
0    3  1    1
1    7  2    1
2    4  3    1
{code}

* change the python udf job to multi-thread mode

{code:python}
from pyflink.table.table_environment import TableEnvironment
from pyflink.table.environment_settings import EnvironmentSettings
from pyflink.table.udf import ScalarFunction, udf
from pyflink.table import DataTypes, expressions as expr

class SubtractOne(ScalarFunction):
    def eval(self, i):
        return i - 1

@udf(input_types=[DataTypes.BIGINT(), DataTypes.BIGINT()], 
result_type=DataTypes.BIGINT())
def add(i, j):
    return i + j

def main():
    t_env = TableEnvironment.create(EnvironmentSettings.in_batch_mode())
    # multi-thread mode
    t_env.get_config().get_configuration().set_string("python.execution-mode", 
"multi-thread")
    t_env.get_config().get_configuration().set_string("parallelism.default", 
"2")
    add_one = udf(lambda i: i + 1, result_type=DataTypes.BIGINT())
    subtract_one = udf(SubtractOne(), result_type=DataTypes.BIGINT())

    t = t_env.from_elements([(1, 2, 1), (2, 5, 2), (3, 1, 3)], ['a', 'b', 'c'])
    result = t.select(add(add_one(t.a), subtract_one(t.b)), t.c, expr.lit(1))
    print(result.to_pandas())

if __name__ == '__main__':
    main()
{code}

* run the python udf job and watch the result
{code:bash}
$ python demo.py
   _c0  c  _c2
0    3  1    1
1    7  2    1
2    4  3    1
{code}


> Release Testing: Running Python UDF in different Execution Mode
> ---------------------------------------------------------------
>
>                 Key: FLINK-26462
>                 URL: https://issues.apache.org/jira/browse/FLINK-26462
>             Project: Flink
>          Issue Type: Improvement
>          Components: API / Python
>    Affects Versions: 1.15.0
>            Reporter: Huang Xingbo
>            Priority: Blocker
>              Labels: release-testing
>             Fix For: 1.15.0
>
>
> h1. Setup
> * Build flink source code and compile source code
> {code:bash}
> $ cd {flink-source-code}
> $ mvn clean install -DskipTests
> {code}
> * Prepare a Python Virtual Environment
> {code:bash}
> $ cd flink-python/dev
> $ ./lint-python.sh -s basic
> $ source .conda/bin/activate
> {code}
> * Install PyFlink from source code. For more details, you can refer to the 
> [doc|https://nightlies.apache.org/flink/flink-docs-master/docs/flinkdev/building/#build-pyflink]
> {code:bash}
> $ cd flink-python/apache-flink-libraries
> $ python setup.py sdist
> $ pip install dist/*.tar.gz
> $ cd ..
> $ pip install -r dev/dev-requirements.txt
> $ python setpy.py sdist
> $ pip install dist/*.tar.gz
> {code}
> h1. Test
> * Write a python udf job named demo.py in process mode
> {code:python}
> from pyflink.table.table_environment import TableEnvironment
> from pyflink.table.environment_settings import EnvironmentSettings
> from pyflink.table.udf import ScalarFunction, udf
> from pyflink.table import DataTypes, expressions as expr
> class SubtractOne(ScalarFunction):
>     def eval(self, i):
>         return i - 1
> @udf(input_types=[DataTypes.BIGINT(), DataTypes.BIGINT()], 
> result_type=DataTypes.BIGINT())
> def add(i, j):
>     return i + j
> def main():
>     t_env = TableEnvironment.create(EnvironmentSettings.in_batch_mode())
>     # process mode !
>     
> t_env.get_config().get_configuration().set_string("python.execution-mode", 
> "process")
>     # optinal values
>     t_env.get_config().get_configuration().set_string("parallelism.default", 
> "2")
>     add_one = udf(lambda i: i + 1, result_type=DataTypes.BIGINT())
>     subtract_one = udf(SubtractOne(), result_type=DataTypes.BIGINT())
>     t = t_env.from_elements([(1, 2, 1), (2, 5, 2), (3, 1, 3)], ['a', 'b', 
> 'c'])
>     result = t.select(add(add_one(t.a), subtract_one(t.b)), t.c, expr.lit(1))
>     print(result.to_pandas())
> if __name__ == '__main__':
>     main()
> {code}
> * run the python udf job and watch the result
> {code:bash}
> $ python demo.py
>    _c0  c  _c2
> 0    3  1    1
> 1    7  2    1
> 2    4  3    1
> {code}
> * change the python udf job to multi-thread mode
> {code:python}
> from pyflink.table.table_environment import TableEnvironment
> from pyflink.table.environment_settings import EnvironmentSettings
> from pyflink.table.udf import ScalarFunction, udf
> from pyflink.table import DataTypes, expressions as expr
> class SubtractOne(ScalarFunction):
>     def eval(self, i):
>         return i - 1
> @udf(input_types=[DataTypes.BIGINT(), DataTypes.BIGINT()], 
> result_type=DataTypes.BIGINT())
> def add(i, j):
>     return i + j
> def main():
>     t_env = TableEnvironment.create(EnvironmentSettings.in_batch_mode())
>     # multi-thread mode
>     
> t_env.get_config().get_configuration().set_string("python.execution-mode", 
> "multi-thread")
>     t_env.get_config().get_configuration().set_string("parallelism.default", 
> "2")
>     add_one = udf(lambda i: i + 1, result_type=DataTypes.BIGINT())
>     subtract_one = udf(SubtractOne(), result_type=DataTypes.BIGINT())
>     t = t_env.from_elements([(1, 2, 1), (2, 5, 2), (3, 1, 3)], ['a', 'b', 
> 'c'])
>     result = t.select(add(add_one(t.a), subtract_one(t.b)), t.c, expr.lit(1))
>     print(result.to_pandas())
> if __name__ == '__main__':
>     main()
> {code}
> * run the python udf job and watch the result
> {code:bash}
> $ python demo.py
>    _c0  c  _c2
> 0    3  1    1
> 1    7  2    1
> 2    4  3    1
> {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to