Hi Mike,

I’m not an expert, but I have some experience running beam pipelines in
Flink that require access to something on disk. When the Flink taskmanager
executes the WriteToText transform, it spins up a beam python SDK docker
container to perform the work*. At the moment there is not a way to mount a
directory into the SDK docker container, but there is an open ticket
<https://github.com/apache/beam/issues/19240> [1] to provide a way to
specify a directory to mount.

I was running a fork for a little while with these changes
<https://github.com/apache/beam/pull/8982> [2] that allowed us to pass
along some docker run options for how the SDK container was started (so we
could include a mount). One thing to note is that the flink taskmanager
workers need access to whatever directory you’re specifying (the E:\ drive
in your example). I also created a quick sample of how to deploy Flink in
Kubernetes here <https://github.com/sambvfx/beam-flink-k8s> [3] which
solved some problems we were running into dealing with the Flink job server
sharing the same disk staging area.

Hopefully some of that helps,

*There’s an exception to this where some transforms are replaced with
something runner-specific. For example, the
apache_beam.io.gcp.pubsub.ReadFromPubSub transform. This gets “swapped out”
to and executes the Java implementation of the transform directly on the
Flink taskmanager worker and not within the SDK container.

On Sat, Jun 18, 2022 at 11:39 AM <pod...@gmx.com> wrote:

> I try again maybe someone can help me with this?
> How to run Beam on Flink?
> I have code:
> *def run():  import apache_beam as beam  from
> apache_beam.options.pipeline_options import PipelineOptions    options =
> PipelineOptions([         "--runner=FlinkRunner",
> "--flink_version=1.14",         "--flink_master=localhost:8081",
> "--environment_config=localhost:50000"  ])  output_file =
> 'E:\\directory\\output.txt'  with beam.Pipeline(options=options) as p:
> (p         | 'Create file lines' >> beam.Create([           'Each element
> must be a string.',           'It writes one element per line.',
> 'There are no guarantees on the line order.',           'The data might be
> written into multiple files.',         ])         | 'Write to files' >>
> beam.io.WriteToText(output_file)     ) if __name__ == "__main__":     run()*
> Should work. But for some reason Flink is not able to save to file:
> *CHAIN MapPartition (MapPartition at [2]Write to
> files/Write/WriteImpl/DoOnce/{FlatMap(<lambda at core.py:3320>),
> Map(decode)}) -> FlatMap (FlatMap at ExtractOutput[0]) FAILED*
> Same problem if I want to open some file.
> What is wrong here? I tried several example scripts - none is working.
> If you could help me to take first step in Beam and Flink.
> Regards
> Mike

