Hi,

I just tried to start a beam pipeline on a flink cluster using

- the latest published beam version 2.26.0
- the python SDK
- a standalone flink cluster version 1.10.1
- the simple pipeline I used  [1]

When I start the pipeline in embedded mode it works correctly (even pulling a jobmanager docker image)

python mas_zb_demo_marc_author_count.py --input /swissbib_index/solrDocumentProcessing/FrequentInitialPreProcessing/data/beam/input/job8r1A069.format.xml --output /swissbib_index/solrDocumentProcessing/FrequentInitialPreProcessing/data/beam/output/out.txt --runner=FlinkRunner --flink_version=1.10

<logs>
WARNING:root:Make sure that locally built Python SDK docker image has Python 3.7 interpreter. INFO:root:Using Python SDK docker image: apache/beam_python3.7_sdk:2.26.0. If the image is not available at local, we will try to pull from hub.docker.com
</logs>

python mas_zb_demo_marc_author_count.py --input /swissbib_index/solrDocumentProcessing/FrequentInitialPreProcessing/data/beam/input/job8r1A069.format.xml --output /swissbib_index/solrDocumentProcessing/FrequentInitialPreProcessing/data/beam/output/out.txt --runner=FlinkRunner --flink_version=1.10

I'm using python version
python --version
Python 3.7.9

Trying to use the remote stanalone cluster the job fails when fetching the jobmanager docker image python mas_zb_demo_marc_author_count.py --input /swissbib_index/solrDocumentProcessing/FrequentInitialPreProcessing/data/beam/input/job8r1A069.format.xml --output /swissbib_index/solrDocumentProcessing/FrequentInitialPreProcessing/data/beam/output/out.txt --runner=FlinkRunner --flink_version=1.10 flink_master=localhost:8081

<logs>

java.lang.Exception: The user defined 'open()' method caused an exception: java.util.concurrent.TimeoutException: Timed out while waiting for command 'docker run -d --network=host --env=DOCKER_MAC_CONTAINER=null apache/beam_python3.8_sdk:2.26.0 --id=1-1 --provision_endpoint=localhost:41483'         at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:499)         at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:369)
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:533)
        at java.base/java.lang.Thread.run(Thread.java:834)

</logs>

Then I pulled the apache/beam_python3.8_sdk:2.26.0 image locally to avoid the timeout, which was successful, the remote job finished and the they were shown in the Flink dashboard.

But no result was written into the given --output dir although I couldn't find any logs referencing this issue in the logs of Flink. Additionally I'm getting quite a huge amount of logs in the python process shell which sends the script to the cluster [2] - but I can't see any reason for the behaviour

Thanks for any explanations for the behaviour

Günter


[1]
https://gitlab.com/swissbib/lab/services/jupyter-beam-flink/-/blob/master/mas_zb_demo_marc_author_count.py
(grep author data from bibliographic library data)

[2] https://gitlab.com/swissbib/lab/services/jupyter-beam-flink/-/blob/master/notes/logging.no.output.txt


Reply via email to