Hi Mike,

I’m not an expert, but I have some experience running beam pipelines in
Flink that require access to something on disk. When the Flink taskmanager
executes the WriteToText transform, it spins up a beam python SDK docker
container to perform the work*. At the moment there is not a way to mount a
directory into the SDK docker container, but there is an open ticket
<https://github.com/apache/beam/issues/19240> [1] to provide a way to
specify a directory to mount.

I was running a fork for a little while with these changes
<https://github.com/apache/beam/pull/8982> [2] that allowed us to pass
along some docker run options for how the SDK container was started (so we
could include a mount). One thing to note is that the flink taskmanager
workers need access to whatever directory you’re specifying (the E:\ drive
in your example). I also created a quick sample of how to deploy Flink in
Kubernetes here <https://github.com/sambvfx/beam-flink-k8s> [3] which
solved some problems we were running into dealing with the Flink job server
sharing the same disk staging area.

Hopefully some of that helps,
-Sam

*There’s an exception to this where some transforms are replaced with
something runner-specific. For example, the
apache_beam.io.gcp.pubsub.ReadFromPubSub transform. This gets “swapped out”
to and executes the Java implementation of the transform directly on the
Flink taskmanager worker and not within the SDK container.

[1] https://github.com/apache/beam/issues/19240
[2] https://github.com/apache/beam/pull/8982
[3] https://github.com/sambvfx/beam-flink-k8s

On Sat, Jun 18, 2022 at 11:39 AM <pod...@gmx.com> wrote:

>
> I try again maybe someone can help me with this?
>
> How to run Beam on Flink?
>
> I have code:
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> *def run():  import apache_beam as beam  from
> apache_beam.options.pipeline_options import PipelineOptions    options =
> PipelineOptions([         "--runner=FlinkRunner",
> "--flink_version=1.14",         "--flink_master=localhost:8081",
> "--environment_config=localhost:50000"  ])  output_file =
> 'E:\\directory\\output.txt'  with beam.Pipeline(options=options) as p:
> (p         | 'Create file lines' >> beam.Create([           'Each element
> must be a string.',           'It writes one element per line.',
> 'There are no guarantees on the line order.',           'The data might be
> written into multiple files.',         ])         | 'Write to files' >>
> beam.io.WriteToText(output_file)     ) if __name__ == "__main__":     run()*
>
> Should work. But for some reason Flink is not able to save to file:
>
> *CHAIN MapPartition (MapPartition at [2]Write to
> files/Write/WriteImpl/DoOnce/{FlatMap(<lambda at core.py:3320>),
> Map(decode)}) -> FlatMap (FlatMap at ExtractOutput[0]) FAILED*
>
> Same problem if I want to open some file.
> What is wrong here? I tried several example scripts - none is working.
> If you could help me to take first step in Beam and Flink.
> Regards
> Mike
>

Reply via email to