Thank you Robert - I really appreciate.
 
I'm not happy that reading or writing to file is actually done on Beam side. This means it's not exactly true that pipelines are executed by Flink - this is just some part of the job done by Flink (and the other by Beam).
I did imagine that Beam just compose pipeline so in this way I can run it buid in Beam and run it in Flink (once or forever). But Beam is required for each execution :-(
So to make real Flink pipeline I have to work only with Flink.
 
It's a bit silly to ask me this question, but is there another product that works similar to Flink but is easier to create pipelines?
 
Best,
 
M.
 
Sent: Tuesday, June 21, 2022 at 7:54 PM
From: "Robert Bradshaw" <rober...@google.com>
To: "user" <user@beam.apache.org>, "Kyle Weaver" <kcwea...@google.com>
Subject: Re: How to run Beam pipeline in Flink [Python]?
On Tue, Jun 21, 2022 at 7:10 AM <pod...@gmx.com> wrote:
>
> Hi Sam,
>
> thanks for your help.
> "When the Flink taskmanager executes the WriteToText transform, it spins up a beam python SDK docker container"
> Hmmm... that's weird. I though Beam create pipeline in Flink language so once sent do Flink there's no relation with Beam anymore. This is what they say at least. And why some exotic 'docker' not just a library....

The docker image is spun up to invoke user code. In this case the
WriteToText function is actually completely implemented in user space
(it's a series of Maps and DoFns that have the side effect of writing
to disk) which may explain the confusion here.

If you're running on a single machine, you can also run in "loopback"
mode (IIRC this is the default if you don't specify anything when
using FlinkRunner) which will use the local Python process (which has
access to the filesystem, etc.) rather than docker for user fns.

> I try to learn Beam/Flink but I'm stuck. They say 'it's easy' but I see this is not - especially if official examples do not work.

I agree that reading and writing local files is a major pain point.
Generally people use local runners when running locally and
distributed filesystem when running in a distributed mode (like Flink)
but we should certainly try to make this better.

> Maybe you know; is there any other way I could prepare pipeline in Beam [Python] and run in Flink with 'flink run'?

IIRC, Kyle (cc'd) added the ability to create a single jar that can be
submitted with flink run, but I don't know that that route is easier.

> Sent: Monday, June 20, 2022 at 7:38 PM
> From: "Sam Bourne" <samb...@gmail.com>
> To: user@beam.apache.org
> Subject: Re: How to run Beam pipeline in Flink [Python]?
>
> Hi Mike,
>
> I’m not an expert, but I have some experience running beam pipelines in Flink that require access to something on disk. When the Flink taskmanager executes the WriteToText transform, it spins up a beam python SDK docker container to perform the work*. At the moment there is not a way to mount a directory into the SDK docker container, but there is an open ticket [1] to provide a way to specify a directory to mount.
>
> I was running a fork for a little while with these changes [2] that allowed us to pass along some docker run options for how the SDK container was started (so we could include a mount). One thing to note is that the flink taskmanager workers need access to whatever directory you’re specifying (the E:\ drive in your example). I also created a quick sample of how to deploy Flink in Kubernetes here [3] which solved some problems we were running into dealing with the Flink job server sharing the same disk staging area.
>
> Hopefully some of that helps,
> -Sam
>
> *There’s an exception to this where some transforms are replaced with something runner-specific. For example, the apache_beam.io.gcp.pubsub.ReadFromPubSub transform. This gets “swapped out” to and executes the Java implementation of the transform directly on the Flink taskmanager worker and not within the SDK container.
>
> [1] https://github.com/apache/beam/issues/19240
> [2] https://github.com/apache/beam/pull/8982
> [3] https://github.com/sambvfx/beam-flink-k8s
>
>
>
> On Sat, Jun 18, 2022 at 11:39 AM <pod...@gmx.com> wrote:
>>
>>
>> I try again maybe someone can help me with this?
>>
>> How to run Beam on Flink?
>>
>> I have code:
>>
>> def run():
>> import apache_beam as beam
>> from apache_beam.options.pipeline_options import PipelineOptions
>>
>> options = PipelineOptions([
>> "--runner=FlinkRunner",
>> "--flink_version=1.14",
>> "--flink_master=localhost:8081",
>> "--environment_config=localhost:50000"
>> ])
>> output_file = 'E:\\directory\\output.txt'
>> with beam.Pipeline(options=options) as p:
>> (p
>> | 'Create file lines' >> beam.Create([
>> 'Each element must be a string.',
>> 'It writes one element per line.',
>> 'There are no guarantees on the line order.',
>> 'The data might be written into multiple files.',
>> ])
>> | 'Write to files' >> beam.io.WriteToText(output_file)
>> )
>> if __name__ == "__main__":
>> run()
>>
>> Should work. But for some reason Flink is not able to save to file:
>>
>> CHAIN MapPartition (MapPartition at [2]Write to files/Write/WriteImpl/DoOnce/{FlatMap(<lambda at core.py:3320>), Map(decode)}) -> FlatMap (FlatMap at ExtractOutput[0]) FAILED
>>
>> Same problem if I want to open some file.
>> What is wrong here? I tried several example scripts - none is working.
>> If you could help me to take first step in Beam and Flink.
>> Regards
>> Mike
>
>
>
 
 

Reply via email to