[ https://issues.apache.org/jira/browse/BEAM-8547?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Valentyn Tymofieiev reassigned BEAM-8547: ----------------------------------------- Assignee: Kyle Weaver > Portable Wordcount fails with on stadalone Flink cluster > --------------------------------------------------------- > > Key: BEAM-8547 > URL: https://issues.apache.org/jira/browse/BEAM-8547 > Project: Beam > Issue Type: Bug > Components: runner-flink, sdk-py-harness > Reporter: Valentyn Tymofieiev > Assignee: Kyle Weaver > Priority: P2 > Labels: stale-P2 > > Repro: > # git checkout origin/release-2.16.0 > # ./flink-1.8.2/bin/start-cluster.sh > # gradlew :runners:flink:1.8:job-server:runShadow > -PflinkMasterUrl=localhost:8081 > # python -m apache_beam.examples.wordcount --input=/etc/profile > --output=/tmp/py-wordcount-direct --runner=PortableRunner > --experiments=worker_threads=100 --parallelism=1 > --shutdown_sources_on_final_watermark --sdk_worker_parallelism=1 > --environment_cache_millis=60000 --job_endpoint=localhost:8099 > This causes the runner to crash with: > {noformat} > Traceback (most recent call last): > File > "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", > line 158, in _execute > response = task() > File > "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", > line 191, in <lambda> > self._execute(lambda: worker.do_instruction(work), work) > File > "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", > line 343, in do_instruction > request.instruction_id) > File > "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", > line 369, in process_bundle > bundle_processor.process_bundle(instruction_id)) > File > "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", > line 663, in process_bundle > data.ptransform_id].process_encoded(data.data) > File > "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", > line 143, in process_encoded > self.output(decoded_value) > File "apache_beam/runners/worker/operations.py", line 255, in > apache_beam.runners.worker.operations.Operation.output > File "apache_beam/runners/worker/operations.py", line 256, in > apache_beam.runners.worker.operations.Operation.output > File "apache_beam/runners/worker/operations.py", line 143, in > apache_beam.runners.worker.operations.SingletonConsumerSet.receive > File "apache_beam/runners/worker/operations.py", line 593, in > apache_beam.runners.worker.operations.DoOperation.process > File "apache_beam/runners/worker/operations.py", line 594, in > apache_beam.runners.worker.operations.DoOperation.process > File "apache_beam/runners/common.py", line 776, in > apache_beam.runners.common.DoFnRunner.receive > File "apache_beam/runners/common.py", line 782, in > apache_beam.runners.common.DoFnRunner.process > File "apache_beam/runners/common.py", line 849, in > apache_beam.runners.common.DoFnRunner._reraise_augmented > File "/usr/local/lib/python3.7/site-packages/future/utils/__init__.py", > line 421, in raise_with_traceback > raise exc.with_traceback(traceback) > File "apache_beam/runners/common.py", line 780, in > apache_beam.runners.common.DoFnRunner.process > File "apache_beam/runners/common.py", line 587, in > apache_beam.runners.common.PerWindowInvoker.invoke_process > File "apache_beam/runners/common.py", line 660, in > apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window > File "/usr/local/lib/python3.7/site-packages/apache_beam/io/iobase.py", > line 1042, in process > self.writer = self.sink.open_writer(init_result, str(uuid.uuid4())) > File > "/usr/local/lib/python3.7/site-packages/apache_beam/options/value_provider.py", > line 137, in _f > return fnc(self, *args, **kwargs) > File > "/usr/local/lib/python3.7/site-packages/apache_beam/io/filebasedsink.py", > line 186, in open_writer > return FileBasedSinkWriter(self, writer_path) > File > "/usr/local/lib/python3.7/site-packages/apache_beam/io/filebasedsink.py", > line 390, in __init__ > self.temp_handle = self.sink.open(temp_shard_path) > File "/usr/local/lib/python3.7/site-packages/apache_beam/io/textio.py", > line 391, in open > file_handle = super(_TextSink, self).open(temp_path) > File > "/usr/local/lib/python3.7/site-packages/apache_beam/options/value_provider.py", > line 137, in _f > return fnc(self, *args, **kwargs) > File > "/usr/local/lib/python3.7/site-packages/apache_beam/io/filebasedsink.py", > line 129, in open > return FileSystems.create(temp_path, self.mime_type, > self.compression_type) > File > "/usr/local/lib/python3.7/site-packages/apache_beam/io/filesystems.py", line > 203, in create > return filesystem.create(path, mime_type, compression_type) > File > "/usr/local/lib/python3.7/site-packages/apache_beam/io/localfilesystem.py", > line 151, in create > return self._path_open(path, 'wb', mime_type, compression_type) > File > "/usr/local/lib/python3.7/site-packages/apache_beam/io/localfilesystem.py", > line 134, in _path_open > raw_file = open(path, mode) > RuntimeError: FileNotFoundError: [Errno 2] No such file or directory: > '/tmp/beam-temp-py-wordcount-direct-ea951c18fd1211e9ac84a0c589d778c3/d39e13af-277b-437e-89f2-e00249287e1d.py-wordcount-direct' > [while running 'write/Write/WriteImpl/WriteBundles'] {noformat} > The error happens with Flink 1.5 and Flink 1.8. > The error does not happen if we run SDK harness in LOOPBACK mode > (--environment_type=LOOPBACK) > The error does not happen if we launch Flink jobServer without pointing to a > Flink cluster, that is if we remove -PflinkMasterUrl=localhost:8081, or if we > use Spark Jobserver + Spark cluster, so this seems to be a Flink-specific > problem > Similar error: https://issues.apache.org/jira/browse/BEAM-7859 > Note that default parallelism parameters set in portableWordCountBatch are > not compatible with default configuration of standalone Flink cluster, which > starts with only one available slot. > cc: [~ibzib] [~goenka] [~robertwb] -- This message was sent by Atlassian Jira (v8.3.4#803005)