Hi Mike, I’m not an expert, but I have some experience running beam pipelines in Flink that require access to something on disk. When the Flink taskmanager executes the WriteToText transform, it spins up a beam python SDK docker container to perform the work*. At the moment there is not a way to mount a directory into the SDK docker container, but there is an open ticket <https://github.com/apache/beam/issues/19240> [1] to provide a way to specify a directory to mount.
I was running a fork for a little while with these changes <https://github.com/apache/beam/pull/8982> [2] that allowed us to pass along some docker run options for how the SDK container was started (so we could include a mount). One thing to note is that the flink taskmanager workers need access to whatever directory you’re specifying (the E:\ drive in your example). I also created a quick sample of how to deploy Flink in Kubernetes here <https://github.com/sambvfx/beam-flink-k8s> [3] which solved some problems we were running into dealing with the Flink job server sharing the same disk staging area. Hopefully some of that helps, -Sam *There’s an exception to this where some transforms are replaced with something runner-specific. For example, the apache_beam.io.gcp.pubsub.ReadFromPubSub transform. This gets “swapped out” to and executes the Java implementation of the transform directly on the Flink taskmanager worker and not within the SDK container. [1] https://github.com/apache/beam/issues/19240 [2] https://github.com/apache/beam/pull/8982 [3] https://github.com/sambvfx/beam-flink-k8s On Sat, Jun 18, 2022 at 11:39 AM <pod...@gmx.com> wrote: > > I try again maybe someone can help me with this? > > How to run Beam on Flink? > > I have code: > > > > > > > > > > > > > > > > > > > > > > > > *def run(): import apache_beam as beam from > apache_beam.options.pipeline_options import PipelineOptions options = > PipelineOptions([ "--runner=FlinkRunner", > "--flink_version=1.14", "--flink_master=localhost:8081", > "--environment_config=localhost:50000" ]) output_file = > 'E:\\directory\\output.txt' with beam.Pipeline(options=options) as p: > (p | 'Create file lines' >> beam.Create([ 'Each element > must be a string.', 'It writes one element per line.', > 'There are no guarantees on the line order.', 'The data might be > written into multiple files.', ]) | 'Write to files' >> > beam.io.WriteToText(output_file) ) if __name__ == "__main__": run()* > > Should work. But for some reason Flink is not able to save to file: > > *CHAIN MapPartition (MapPartition at [2]Write to > files/Write/WriteImpl/DoOnce/{FlatMap(<lambda at core.py:3320>), > Map(decode)}) -> FlatMap (FlatMap at ExtractOutput[0]) FAILED* > > Same problem if I want to open some file. > What is wrong here? I tried several example scripts - none is working. > If you could help me to take first step in Beam and Flink. > Regards > Mike >