[ 
https://issues.apache.org/jira/browse/FLINK-28918?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Huang Xingbo updated FLINK-28918:
---------------------------------
    Description: 
* Build flink source code and compile source code
{code:bash}
$ cd {flink-source-code}
$ mvn clean install -DskipTests
{code}

* Prepare a Python Virtual Environment

{code:bash}
$ cd flink-python/dev
$ ./lint-python.sh -s basic
$ source .conda/bin/activate
{code}

* Install PyFlink from source code. For more details, you can refer to the 
[doc|https://nightlies.apache.org/flink/flink-docs-master/docs/flinkdev/building/#build-pyflink]
{code:bash}
$ cd flink-python/apache-flink-libraries
$ python setup.py sdist
$ pip install dist/*.tar.gz
$ cd ..
$ pip install -r dev/dev-requirements.txt
$ python setpy.py sdist
$ pip install dist/*.tar.gz
{code}


h1. Test
* Write a python datastream job  in thread mode

{code:python}
from pyflink.common import Configuration
from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment

def main():
    config = Configuration()
    config.set_string("python.execution-mode", "thread")
    env = StreamExecutionEnvironment.get_execution_environment(config)
    ds = env.from_collection(
    [(1, '9', 0), (1, '5', 1), (1, '6', 2), (5, '5', 0), (5, '3', 1)],
    type_info=Types.ROW_NAMED(["v1", "v2", "v3"],
                              [Types.INT(), Types.STRING(), Types.INT()]))


    def flat_map_func1(data):
        for i in data:
            yield int(i), 1

    def flat_map_func2(data):
        for i in data:
            yield i

    ds = ds.key_by(lambda x: x[0]) \
        .min_by("v2") \
        .map(lambda x: (x[0], x[1], x[2]),
            output_type=Types.TUPLE([Types.INT(), Types.STRING(), 
Types.INT()])) \
        .key_by(lambda x: x[2]) \
        .max_by(0) \
        .flat_map(flat_map_func1, output_type=Types.TUPLE([Types.INT(), 
Types.INT()])) \
        .key_by(lambda x: [1]) \
        .min_by() \
        .flat_map(flat_map_func2, output_type=Types.INT()) \
        .key_by(lambda x: x) \
        .max_by()

    ds.print()
    env.execute("key_by_min_by_max_by_test_batch")

if __name__ == '__main__':
    main()
{code}

* run the python datastream job and watch the result
{code:bash}
$ python demo.py
{code}



  was:
* Build flink source code and compile source code
{code:bash}
$ cd {flink-source-code}
$ mvn clean install -DskipTests
{code}

* Prepare a Python Virtual Environment

{code:bash}
$ cd flink-python/dev
$ ./lint-python.sh -s basic
$ source .conda/bin/activate
{code}

* Install PyFlink from source code. For more details, you can refer to the 
[doc|https://nightlies.apache.org/flink/flink-docs-master/docs/flinkdev/building/#build-pyflink]
{code:bash}
$ cd flink-python/apache-flink-libraries
$ python setup.py sdist
$ pip install dist/*.tar.gz
$ cd ..
$ pip install -r dev/dev-requirements.txt
$ python setpy.py sdist
$ pip install dist/*.tar.gz
{code}


h1. Test
* Write a python datastream job  in thread mode

{code:python}
from pyflink.common import Configuration
from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment

def main():
    config = Configuration()
    config.set_string("python.execution-mode", "thread")
    env = StreamExecutionEnvironment.get_execution_environment(config)
    ds = env.from_collection(
    [(1, '9', 0), (1, '5', 1), (1, '6', 2), (5, '5', 0), (5, '3', 1)],
    type_info=Types.ROW_NAMED(["v1", "v2", "v3"],
                              [Types.INT(), Types.STRING(), Types.INT()]))


    def flat_map_func1(data):
        for i in data:
            yield int(i), 1

    def flat_map_func2(data):
        for i in data:
            yield i

    ds = ds.key_by(lambda x: x[0]) \
        .min_by("v2") \
        .map(lambda x: (x[0], x[1], x[2]),
            output_type=Types.TUPLE([Types.INT(), Types.STRING(), 
Types.INT()])) \
        .key_by(lambda x: x[2]) \
        .max_by(0) \
        .flat_map(flat_map_func1, output_type=Types.TUPLE([Types.INT(), 
Types.INT()])) \
        .key_by(lambda x: [1]) \
        .min_by() \
        .flat_map(flat_map_func2, output_type=Types.INT()) \
        .key_by(lambda x: x) \
        .max_by()

    ds.print()
    self.env.execute("key_by_min_by_max_by_test_batch")

if __name__ == '__main__':
    main()
{code}

* run the python datastream job and watch the result
{code:bash}
$ python demo.py
{code}




> Release Testing: Verify FLIP-206 in Python DataStream API
> ---------------------------------------------------------
>
>                 Key: FLINK-28918
>                 URL: https://issues.apache.org/jira/browse/FLINK-28918
>             Project: Flink
>          Issue Type: Sub-task
>          Components: API / Python
>    Affects Versions: 1.16.0
>            Reporter: Huang Xingbo
>            Priority: Blocker
>              Labels: release-testing
>             Fix For: 1.16.0
>
>
> * Build flink source code and compile source code
> {code:bash}
> $ cd {flink-source-code}
> $ mvn clean install -DskipTests
> {code}
> * Prepare a Python Virtual Environment
> {code:bash}
> $ cd flink-python/dev
> $ ./lint-python.sh -s basic
> $ source .conda/bin/activate
> {code}
> * Install PyFlink from source code. For more details, you can refer to the 
> [doc|https://nightlies.apache.org/flink/flink-docs-master/docs/flinkdev/building/#build-pyflink]
> {code:bash}
> $ cd flink-python/apache-flink-libraries
> $ python setup.py sdist
> $ pip install dist/*.tar.gz
> $ cd ..
> $ pip install -r dev/dev-requirements.txt
> $ python setpy.py sdist
> $ pip install dist/*.tar.gz
> {code}
> h1. Test
> * Write a python datastream job  in thread mode
> {code:python}
> from pyflink.common import Configuration
> from pyflink.common.typeinfo import Types
> from pyflink.datastream import StreamExecutionEnvironment
> def main():
>     config = Configuration()
>     config.set_string("python.execution-mode", "thread")
>     env = StreamExecutionEnvironment.get_execution_environment(config)
>     ds = env.from_collection(
>     [(1, '9', 0), (1, '5', 1), (1, '6', 2), (5, '5', 0), (5, '3', 1)],
>     type_info=Types.ROW_NAMED(["v1", "v2", "v3"],
>                               [Types.INT(), Types.STRING(), Types.INT()]))
>     def flat_map_func1(data):
>         for i in data:
>             yield int(i), 1
>     def flat_map_func2(data):
>         for i in data:
>             yield i
>     ds = ds.key_by(lambda x: x[0]) \
>         .min_by("v2") \
>         .map(lambda x: (x[0], x[1], x[2]),
>             output_type=Types.TUPLE([Types.INT(), Types.STRING(), 
> Types.INT()])) \
>         .key_by(lambda x: x[2]) \
>         .max_by(0) \
>         .flat_map(flat_map_func1, output_type=Types.TUPLE([Types.INT(), 
> Types.INT()])) \
>         .key_by(lambda x: [1]) \
>         .min_by() \
>         .flat_map(flat_map_func2, output_type=Types.INT()) \
>         .key_by(lambda x: x) \
>         .max_by()
>     ds.print()
>     env.execute("key_by_min_by_max_by_test_batch")
> if __name__ == '__main__':
>     main()
> {code}
> * run the python datastream job and watch the result
> {code:bash}
> $ python demo.py
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to