[
https://issues.apache.org/jira/browse/FLINK-23819?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Dian Fu updated FLINK-23819:
----------------------------
Description:
This feature is to support tar.gz files as python archives. In the past, it
only support zip files as python archives.
This feature could be tested as following:
1) Build PyFlink packages from source according to documentation:
[https://ci.apache.org/projects/flink/flink-docs-master/docs/flinkdev/building/#build-pyflink]
2) Preparing tar.gz file which contains the conda Python virtual environment
- Install MiniConda in your environment:
[https://conda.io/projects/conda/en/latest/user-guide/install/macos.html]
- Install conda pack: [https://conda.github.io/conda-pack/]
- Prepare the conda environment and install the built PyFlink in the above
step into the conda virtual environment:
{code}
conda create --name myenv
conda activate myenv
conda install python=3.8
python -m pip install
~/code/src/apache/flink/flink-python/apache-flink-libraries/dist/apache-flink-libraries-1.14.dev0.tar.gz
python -m pip install
~/code/src/apache/flink/flink-python/dist/apache_flink-1.14.dev0-cp38-cp38-macosx_10_9_x86_64.whl
{code}
- You could verify the packages installed in the conda env **myenv** as
following:
{code}
conda list -n myenv
{code}
- Package the conda virtual environment into a tgz file: (it will generate a
file named myenv.tar.gz)
{code}
conda pack -n myenv
{code}
3) Prepare a PyFlink job, here is an example:
{code:java}
import time
from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment, CoMapFunction
from pyflink.table import StreamTableEnvironment, DataTypes, Schema
def test_chaining():
env = StreamExecutionEnvironment.get_execution_environment()
t_env = StreamTableEnvironment.create(stream_execution_environment=env)
# 1. create source Table
t_env.execute_sql("""
CREATE TABLE datagen (
id INT,
data STRING
) WITH (
'connector' = 'datagen',
'rows-per-second' = '1000000',
'fields.id.kind' = 'sequence',
'fields.id.start' = '1',
'fields.id.end' = '10000000'
)
""")
# 2. create sink Table
t_env.execute_sql("""
CREATE TABLE print (
id BIGINT,
data STRING,
flag STRING
) WITH (
'connector' = 'blackhole'
)
""")
t_env.execute_sql("""
CREATE TABLE print_2 (
id BIGINT,
data STRING,
flag STRING
) WITH (
'connector' = 'blackhole'
)
""")
# 3. query from source table and perform calculations
# create a Table from a Table API query:
source_table = t_env.from_path("datagen")
ds = t_env.to_append_stream(
source_table,
Types.ROW([Types.INT(), Types.STRING()]))
ds1 = ds.map(lambda i: (i[0] * i[0], i[1]))
ds2 = ds.map(lambda i: (i[0], i[1][2:]))
class MyCoMapFunction(CoMapFunction):
def map1(self, value):
return value
def map2(self, value):
return value
ds3 = ds1.connect(ds2).map(MyCoMapFunction(),
output_type=Types.TUPLE([Types.LONG(), Types.STRING()]))
ds4 = ds3.map(lambda i: (i[0], i[1], "left"),
output_type=Types.TUPLE([Types.LONG(), Types.STRING(),
Types.STRING()]))
ds5 = ds3.map(lambda i: (i[0], i[1], "right"))\
.map(lambda i: i,
output_type=Types.TUPLE([Types.LONG(), Types.STRING(),
Types.STRING()]))
schema = Schema.new_builder() \
.column("f0", DataTypes.BIGINT()) \
.column("f1", DataTypes.STRING()) \
.column("f2", DataTypes.STRING()) \
.build()
result_table_3 = t_env.from_data_stream(ds4, schema)
statement_set = t_env.create_statement_set()
statement_set.add_insert("print", result_table_3)
result_table_4 = t_env.from_data_stream(ds5, schema)
statement_set.add_insert("print_2", result_table_4)
statement_set.execute().wait()
if __name__ == "__main__":
start_ts = time.time()
test_chaining()
end_ts = time.time()
print("--- %s seconds ---" % (end_ts - start_ts))
{code}
4) Submit the PyFlink job using the generated myenv.tar.gz
./bin/flink run -d -m localhost:8082 -py test_pyflink.py -pyarch
myenv.tar.gz#myenv -pyexec myenv/bin/python -pyclientexec myenv/bin/python
5) The job should runs normally and you should see logs as following in the log
file of TaskManager:
{code}
2021-08-26 11:14:19,295 INFO
org.apache.beam.runners.fnexecution.environment.ProcessEnvironmentFactory [] -
Still waiting for startup of environment
'/private/var/folders/jq/brl84gld47ngmcfyvwh2gtj40000gp/T/python-dist-a61682a6-79b0-443c-b3c8-f9dade55e5d6/python-archives/myenv/lib/python3.8/site-packages/pyflink/bin/pyflink-udf-runner.sh'
for worker id 1-1
{code}
It demonstrated that the Python worker was started using the Python interpreter
contained in the myenv.tar.gz.
was:
This feature is to support tar.gz files as python archives. In the past, it
only support zip files as python archives.
This feature could be tested as following:
1) Build PyFlink packages from source according to documentation:
[https://ci.apache.org/projects/flink/flink-docs-master/docs/flinkdev/building/#build-pyflink]
2) Preparing tar.gz file which contains the conda Python virtual environment
- Install MiniConda in your environment:
[https://conda.io/projects/conda/en/latest/user-guide/install/macos.html]
- Install conda pack: [https://conda.github.io/conda-pack/]
- Prepare the conda environment and install the built PyFlink in the above
step into the conda virtual environment:
{code}
conda create --name myenv
conda activate myenv
conda install python=3.8
python -m pip install
~/code/src/apache/flink/flink-python/apache-flink-libraries/dist/apache-flink-libraries-1.14.dev0.tar.gz
python -m pip install
~/code/src/apache/flink/flink-python/dist/apache_flink-1.14.dev0-cp38-cp38-macosx_10_9_x86_64.whl
{code}
- You could verify the packages installed in the conda env **myenv** as
following:
{code}
conda list -n myenv
{code}
- Package the conda virtual environment into a tgz file: (it will generate a
file named myenv.tar.gz)
{code}
conda pack -n myenv
{code}
3) Prepare a PyFlink job, here is an example:
{code:java}
import time
from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment, CoMapFunction
from pyflink.table import StreamTableEnvironment, DataTypes, Schema
def test_chaining():
env = StreamExecutionEnvironment.get_execution_environment()
t_env = StreamTableEnvironment.create(stream_execution_environment=env)
# 1. create source Table
t_env.execute_sql("""
CREATE TABLE datagen (
id INT,
data STRING
) WITH (
'connector' = 'datagen',
'rows-per-second' = '1000000',
'fields.id.kind' = 'sequence',
'fields.id.start' = '1',
'fields.id.end' = '10000000'
)
""")
# 2. create sink Table
t_env.execute_sql("""
CREATE TABLE print (
id BIGINT,
data STRING,
flag STRING
) WITH (
'connector' = 'blackhole'
)
""")
t_env.execute_sql("""
CREATE TABLE print_2 (
id BIGINT,
data STRING,
flag STRING
) WITH (
'connector' = 'blackhole'
)
""")
# 3. query from source table and perform calculations
# create a Table from a Table API query:
source_table = t_env.from_path("datagen")
ds = t_env.to_append_stream(
source_table,
Types.ROW([Types.INT(), Types.STRING()]))
ds1 = ds.map(lambda i: (i[0] * i[0], i[1]))
ds2 = ds.map(lambda i: (i[0], i[1][2:]))
class MyCoMapFunction(CoMapFunction):
def map1(self, value):
return value
def map2(self, value):
return value
ds3 = ds1.connect(ds2).map(MyCoMapFunction(),
output_type=Types.TUPLE([Types.LONG(), Types.STRING()]))
ds4 = ds3.map(lambda i: (i[0], i[1], "left"),
output_type=Types.TUPLE([Types.LONG(), Types.STRING(),
Types.STRING()]))
ds5 = ds3.map(lambda i: (i[0], i[1], "right"))\
.map(lambda i: i,
output_type=Types.TUPLE([Types.LONG(), Types.STRING(),
Types.STRING()]))
schema = Schema.new_builder() \
.column("f0", DataTypes.BIGINT()) \
.column("f1", DataTypes.STRING()) \
.column("f2", DataTypes.STRING()) \
.build()
result_table_3 = t_env.from_data_stream(ds4, schema)
statement_set = t_env.create_statement_set()
statement_set.add_insert("print", result_table_3)
result_table_4 = t_env.from_data_stream(ds5, schema)
statement_set.add_insert("print_2", result_table_4)
statement_set.execute().wait()
if __name__ == "__main__":
start_ts = time.time()
test_chaining()
end_ts = time.time()
print("--- %s seconds ---" % (end_ts - start_ts))
{code}
4) Submit the PyFlink job using the generated myenv.tar.gz
./bin/flink run -d -m localhost:8082 -py test_pyflink.py -pyarch
myenv.tar.gz#myenv -pyexec myenv/bin/python -pyclientexec myenv/bin/python
5) The job should runs normally and you should see logs as following in the log
file of TaskManager:
{code}
2021-08-26 11:14:19,295 INFO
org.apache.beam.runners.fnexecution.environment.ProcessEnvironmentFactory [] -
Still waiting for startup of environment
'/private/var/folders/jq/brl84gld47ngmcfyvwh2gtj40000gp/T/python-dist-a61682a6-79b0-443c-b3c8-f9dade55e5d6/python-archives/myenv/lib/python3.8/site-packages/pyflink/bin/pyflink-udf-runner.sh'
for worker id 1-1
{code}
It demonstrated that the Python worker was started using the Python interpreter
contained in the myenv.tar.gz.
> Testing tgz file for python archives
> ------------------------------------
>
> Key: FLINK-23819
> URL: https://issues.apache.org/jira/browse/FLINK-23819
> Project: Flink
> Issue Type: Improvement
> Components: API / Python
> Reporter: Dian Fu
> Priority: Blocker
> Labels: release-testing
> Fix For: 1.14.0
>
>
> This feature is to support tar.gz files as python archives. In the past, it
> only support zip files as python archives.
> This feature could be tested as following:
> 1) Build PyFlink packages from source according to documentation:
> [https://ci.apache.org/projects/flink/flink-docs-master/docs/flinkdev/building/#build-pyflink]
> 2) Preparing tar.gz file which contains the conda Python virtual environment
> - Install MiniConda in your environment:
> [https://conda.io/projects/conda/en/latest/user-guide/install/macos.html]
> - Install conda pack: [https://conda.github.io/conda-pack/]
> - Prepare the conda environment and install the built PyFlink in the above
> step into the conda virtual environment:
> {code}
> conda create --name myenv
> conda activate myenv
> conda install python=3.8
> python -m pip install
> ~/code/src/apache/flink/flink-python/apache-flink-libraries/dist/apache-flink-libraries-1.14.dev0.tar.gz
> python -m pip install
> ~/code/src/apache/flink/flink-python/dist/apache_flink-1.14.dev0-cp38-cp38-macosx_10_9_x86_64.whl
> {code}
> - You could verify the packages installed in the conda env **myenv** as
> following:
> {code}
> conda list -n myenv
> {code}
> - Package the conda virtual environment into a tgz file: (it will generate a
> file named myenv.tar.gz)
> {code}
> conda pack -n myenv
> {code}
> 3) Prepare a PyFlink job, here is an example:
> {code:java}
> import time
> from pyflink.common.typeinfo import Types
> from pyflink.datastream import StreamExecutionEnvironment, CoMapFunction
> from pyflink.table import StreamTableEnvironment, DataTypes, Schema
> def test_chaining():
> env = StreamExecutionEnvironment.get_execution_environment()
> t_env = StreamTableEnvironment.create(stream_execution_environment=env)
> # 1. create source Table
> t_env.execute_sql("""
> CREATE TABLE datagen (
> id INT,
> data STRING
> ) WITH (
> 'connector' = 'datagen',
> 'rows-per-second' = '1000000',
> 'fields.id.kind' = 'sequence',
> 'fields.id.start' = '1',
> 'fields.id.end' = '10000000'
> )
> """)
> # 2. create sink Table
> t_env.execute_sql("""
> CREATE TABLE print (
> id BIGINT,
> data STRING,
> flag STRING
> ) WITH (
> 'connector' = 'blackhole'
> )
> """)
> t_env.execute_sql("""
> CREATE TABLE print_2 (
> id BIGINT,
> data STRING,
> flag STRING
> ) WITH (
> 'connector' = 'blackhole'
> )
> """)
> # 3. query from source table and perform calculations
> # create a Table from a Table API query:
> source_table = t_env.from_path("datagen")
> ds = t_env.to_append_stream(
> source_table,
> Types.ROW([Types.INT(), Types.STRING()]))
> ds1 = ds.map(lambda i: (i[0] * i[0], i[1]))
> ds2 = ds.map(lambda i: (i[0], i[1][2:]))
> class MyCoMapFunction(CoMapFunction):
> def map1(self, value):
> return value
> def map2(self, value):
> return value
> ds3 = ds1.connect(ds2).map(MyCoMapFunction(),
> output_type=Types.TUPLE([Types.LONG(), Types.STRING()]))
> ds4 = ds3.map(lambda i: (i[0], i[1], "left"),
> output_type=Types.TUPLE([Types.LONG(), Types.STRING(),
> Types.STRING()]))
> ds5 = ds3.map(lambda i: (i[0], i[1], "right"))\
> .map(lambda i: i,
> output_type=Types.TUPLE([Types.LONG(), Types.STRING(),
> Types.STRING()]))
> schema = Schema.new_builder() \
> .column("f0", DataTypes.BIGINT()) \
> .column("f1", DataTypes.STRING()) \
> .column("f2", DataTypes.STRING()) \
> .build()
> result_table_3 = t_env.from_data_stream(ds4, schema)
> statement_set = t_env.create_statement_set()
> statement_set.add_insert("print", result_table_3)
> result_table_4 = t_env.from_data_stream(ds5, schema)
> statement_set.add_insert("print_2", result_table_4)
> statement_set.execute().wait()
> if __name__ == "__main__":
> start_ts = time.time()
> test_chaining()
> end_ts = time.time()
> print("--- %s seconds ---" % (end_ts - start_ts))
> {code}
> 4) Submit the PyFlink job using the generated myenv.tar.gz
> ./bin/flink run -d -m localhost:8082 -py test_pyflink.py -pyarch
> myenv.tar.gz#myenv -pyexec myenv/bin/python -pyclientexec myenv/bin/python
> 5) The job should runs normally and you should see logs as following in the
> log file of TaskManager:
> {code}
> 2021-08-26 11:14:19,295 INFO
> org.apache.beam.runners.fnexecution.environment.ProcessEnvironmentFactory []
> - Still waiting for startup of environment
> '/private/var/folders/jq/brl84gld47ngmcfyvwh2gtj40000gp/T/python-dist-a61682a6-79b0-443c-b3c8-f9dade55e5d6/python-archives/myenv/lib/python3.8/site-packages/pyflink/bin/pyflink-udf-runner.sh'
> for worker id 1-1
> {code}
> It demonstrated that the Python worker was started using the Python
> interpreter contained in the myenv.tar.gz.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)