[ 
https://issues.apache.org/jira/browse/FLINK-23819?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17406376#comment-17406376
 ] 

Leonard Xu commented on FLINK-23819:
------------------------------------

* Env: macOS
 * Python: 3.7
 * Flink commit: f2b0e4345564f420ff652f757f716426cb538d45

Tested as following, the job run successfully, no suspicious log output

 
{code:java}

-- submit log
(myenv) bang@mac build-target (master) $./bin/flink run -d -m localhost:8081 
-py 
/Users/bang/sourcecode/project/flink-master/flink/build-target/test_pyflink.py 
-pyarch myenv.tar.gz#myenv -pyexec myenv/bin/python -pyclientexec 
myenv/bin/python
Job has been submitted with JobID bbdf7691d8b9203378ad24ae397419ec
--- 25.84529185295105 seconds ---2021-08-29 16:10:11,525 INFO  


-- taskexecutor log
org.apache.flink.python.env.beam.ProcessPythonEnvironmentManager [] - Python 
working dir of python worker: 
/var/folders/55/cw682b314gn8jhfh565hp7q00000gp/T/python-dist-1227bbe8-9f86-47cd-9479-ea7a9565e89e/python-archives
2021-08-29 16:10:20,637 INFO  
org.apache.flink.python.env.beam.ProcessPythonEnvironmentManager [] - Python 
interpreter path: myenv/bin/python
2021-08-29 16:10:20,751 INFO  
org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner [] 
- Obtained shared Python process of size 536870920 bytes
2021-08-29 16:10:20,751 INFO  
org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner [] 
- Obtained shared Python process of size 536870920 bytes
2021-08-29 16:10:28,687 INFO  
org.apache.beam.runners.fnexecution.logging.GrpcLoggingService [] - Beam Fn 
Logging client connected.
2021-08-29 16:10:28,708 INFO  
org.apache.beam.runners.fnexecution.control.FnApiControlClientPoolService [] - 
Beam Fn Control client connected with id 1-1 

{code}
 

Build the master branch meet npm version problem, Resolved it by removing 
`flink-runtime-web/web-dashboard/node` and re-build.
{code:java}
Failed to execute goal com.github.eirslett:frontend-maven-plugin:1.6:npm (npm 
install) on project flink-runtime-web_2.11: Failed to run task: 'npm ci 
--cache-max=0 --no-save' failed. org.apache.commons.exec.ExecuteException: 
Process exited with an error: 1 (Exit value: 1) -> [Help 1]
{code}

> Testing tgz file for python archives
> ------------------------------------
>
>                 Key: FLINK-23819
>                 URL: https://issues.apache.org/jira/browse/FLINK-23819
>             Project: Flink
>          Issue Type: Improvement
>          Components: API / Python
>            Reporter: Dian Fu
>            Assignee: Leonard Xu
>            Priority: Blocker
>              Labels: release-testing
>             Fix For: 1.14.0
>
>
> This feature is to support tar.gz files as python archives. In the past, it 
> only support zip files as python archives.
> This feature could be tested as following:
>  1) Build PyFlink packages from source according to documentation: 
> [https://ci.apache.org/projects/flink/flink-docs-master/docs/flinkdev/building/#build-pyflink]
>  2) Preparing tar.gz file which contains the conda Python virtual environment
>  - Install MiniConda in your environment: 
> [https://conda.io/projects/conda/en/latest/user-guide/install/macos.html]
>  - Install conda pack: [https://conda.github.io/conda-pack/]
>  - Prepare the conda environment and install the built PyFlink in the above 
> step into the conda virtual environment:
> {code}
>  conda create --name myenv
>  conda activate myenv
>  conda install python=3.8
>  python -m pip install 
> ~/code/src/apache/flink/flink-python/apache-flink-libraries/dist/apache-flink-libraries-1.14.dev0.tar.gz
>  python -m pip install 
> ~/code/src/apache/flink/flink-python/dist/apache_flink-1.14.dev0-cp38-cp38-macosx_10_9_x86_64.whl
> {code}
>   - You could verify the packages installed in the conda env **myenv** as 
> following:
> {code}
> conda list -n myenv
> {code}
>  - Package the conda virtual environment into a tgz file: (it will generate a 
> file named myenv.tar.gz)
> {code} 
> conda pack -n myenv
> {code}
> 3) Prepare a PyFlink job, here is an example:
> {code:java}
> import time
> from pyflink.common.typeinfo import Types
> from pyflink.datastream import StreamExecutionEnvironment, CoMapFunction
> from pyflink.table import StreamTableEnvironment, DataTypes, Schema
> def test_chaining():
>     env = StreamExecutionEnvironment.get_execution_environment()
>     t_env = StreamTableEnvironment.create(stream_execution_environment=env)
>     # 1. create source Table
>     t_env.execute_sql("""
>         CREATE TABLE datagen (
>             id INT,
>             data STRING
>         ) WITH (
>             'connector' = 'datagen',
>             'rows-per-second' = '1000000',
>             'fields.id.kind' = 'sequence',
>             'fields.id.start' = '1',
>             'fields.id.end' = '1000'
>         )
>     """)
>     # 2. create sink Table
>     t_env.execute_sql("""
>         CREATE TABLE print (
>             id BIGINT,
>             data STRING,
>             flag STRING
>         ) WITH (
>             'connector' = 'blackhole'
>         )
>     """)
>     t_env.execute_sql("""
>         CREATE TABLE print_2 (
>             id BIGINT,
>             data STRING,
>             flag STRING
>         ) WITH (
>             'connector' = 'blackhole'
>         )
>     """)
>     # 3. query from source table and perform calculations
>     # create a Table from a Table API query:
>     source_table = t_env.from_path("datagen")
>     ds = t_env.to_append_stream(
>         source_table,
>         Types.ROW([Types.INT(), Types.STRING()]))
>     ds1 = ds.map(lambda i: (i[0] * i[0], i[1]))
>     ds2 = ds.map(lambda i: (i[0], i[1][2:]))
>     class MyCoMapFunction(CoMapFunction):
>         def map1(self, value):
>             return value
>         def map2(self, value):
>             return value
>     ds3 = ds1.connect(ds2).map(MyCoMapFunction(), 
> output_type=Types.TUPLE([Types.LONG(), Types.STRING()]))
>     ds4 = ds3.map(lambda i: (i[0], i[1], "left"),
>                   output_type=Types.TUPLE([Types.LONG(), Types.STRING(), 
> Types.STRING()]))
>     ds5 = ds3.map(lambda i: (i[0], i[1], "right"))\
>              .map(lambda i: i,
>                   output_type=Types.TUPLE([Types.LONG(), Types.STRING(), 
> Types.STRING()]))
>     schema = Schema.new_builder() \
>         .column("f0", DataTypes.BIGINT()) \
>         .column("f1", DataTypes.STRING()) \
>         .column("f2", DataTypes.STRING()) \
>         .build()
>     result_table_3 = t_env.from_data_stream(ds4, schema)
>     statement_set = t_env.create_statement_set()
>     statement_set.add_insert("print", result_table_3)
>     result_table_4 = t_env.from_data_stream(ds5, schema)
>     statement_set.add_insert("print_2", result_table_4)
>     statement_set.execute()
> if __name__ == "__main__":
>     start_ts = time.time()
>     test_chaining()
>     end_ts = time.time()
>     print("--- %s seconds ---" % (end_ts - start_ts))
>    {code}
> 4) Submit the PyFlink job using the generated myenv.tar.gz
>  ./bin/flink run -d -m localhost:8081 -py test_pyflink.py -pyarch 
> myenv.tar.gz#myenv -pyexec myenv/bin/python -pyclientexec myenv/bin/python
> 5) The job should runs normally and you should see logs as following in the 
> log file of TaskManager:
> {code}
> 2021-08-26 11:14:19,295 INFO 
> org.apache.beam.runners.fnexecution.environment.ProcessEnvironmentFactory [] 
> - Still waiting for startup of environment 
> '/private/var/folders/jq/brl84gld47ngmcfyvwh2gtj40000gp/T/python-dist-a61682a6-79b0-443c-b3c8-f9dade55e5d6/python-archives/myenv/lib/python3.8/site-packages/pyflink/bin/pyflink-udf-runner.sh'
>  for worker id 1-1
> {code}
> It demonstrated that the Python worker was started using the Python 
> interpreter contained in the myenv.tar.gz.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to