I'm unfamiliar with configuring Flink to run Beam jobs, but AFAIK it's up to 
the runner to orchestrate/set up properly configured workers with the 
containers.  With Beam, there should never be any need to manually set up 
workers for Flink, etc to run on.

Those flags/etc are part of the "beam container contract", and are internal 
implementation details, that (ideally), an end pipeline author doesn't need to 
worry about. The original design doc is here: 
https://s.apache.org/beam-fn-api-container-contract, but it's rather out of 
date WRT the fine details (eg. Modern SDKs use the single ProvisioningService 
to get the other service URLs, rather than them all being provided by flags.)

The official instructions are here:

https://beam.apache.org/documentation/runners/flink/  

In particular, there are two modes to be aware of for local runs:

1st. LOOPBACK mode, which will have Flink "loop back" to the submitting job 
process to execute the job.  

Start the Flink Beam Job service: (eg. for flink1.10)

docker run --net=host apache/beam_flink1.10_job_server:latest

Submitting your job to the Beam Job Server (eg. at localhost:8099), with the 
LOOPBACK environment type.
--runner=PortableRunner
--endpoint=localhost:8099
--environment_type=LOOPBACK

(Note, the doc there is very python SDK focused, so the --job_endpoint flag is 
just --endpoint in the Go SDK).

Other than executing in the main process, this is still using portable beam.

2nd Container mode: This is closer to what you're trying to do. 

Per the linked doc, this requires you to start the Flink cluser with it's rest 
port (eg. localhost:8081), then with Docker, starting the connected Beam Job 
service:  (eg. for flink1.10)

docker run --net=host apache/beam_flink1.10_job_server:latest 
--flink-master=localhost:8081

Note the "flink-master" flag, is how Beam ultimate sends jobs to flink, and 
then sets up the workers.

Then submit your job to *that* endpoint (which should remain at localhost:8081) 
this largely should largely be the same, but without setting the 
"environment_type" flag. 

-------

Finally, I'd be remiss not to try to point you to the in development "Prism" 
runner, which will eventually replace the current Go Direct runner. 

See https://github.com/apache/beam/tree/master/sdks/go/pkg/beam/runners/prism 
for current usage instructions and restrictions. 

It's currently suitable for smoke testing small pipelines, but the goal is to 
have a portable reference runner, WRT all facets of beam. Depending on what 
Beam features you're using, it may not be suitable.

I hope this helps!
Robert Burke
Beam Go Busybody

(note, I don't monitor this list, but Beam Go SDK questions tend to find their 
way to me)

On 2023/03/22 11:51:53 Sherif Tolba wrote:
> Hi Apache Beam team,
> 
> I hope this email finds you all well. I have been experimenting with Apache
> Beam and Flink, mainly using golang. I hit a roadblock when trying to run
> the minimal word count example on Beam and Flink locally using Go SDK
> workers. I am trying to use the "apache/beam_go_sdk:2.46.0" Docker image as
> follows:
> 
> docker run --network=host apache/beam_go_sdk:2.46.0
> --id=1-1 --provision_endpoint=localhost:50000   <-- (I set this port based
> on some research online, but I don't really know what the service should be)
> 
> However, I am unable to understand what the following options represent:
> 
> Usage of /opt/apache/beam/boot:
>   -artifact_endpoint string
>         Local artifact endpoint for FnHarness (required).
>   -control_endpoint string
>         Local control endpoint for FnHarness (required).
>   -id string
>         Local identifier (required).
>   -logging_endpoint string
>         Local logging endpoint for FnHarness (required).
>   -provision_endpoint string
>         Local provision endpoint for FnHarness (required).
>   -semi_persist_dir string
>         Local semi-persistent directory (optional). (default "/tmp")
> 
> I checked:
> https://github.com/apache/beam/blob/master/sdks/go/container/boot.go but
> still unable to tell what these endpoints are. I couldn't find any online
> documentation describing, for example, what the provision_endpoint should
> be set to.
> 
> I would greatly appreciate any pointers or explanation.
> 
> My setup is as follows: I have a Flink JobManager, two TaskManagers, and a
> Beam JobServer running locally. I can execute the pipeline that's written
> in Go and see the job submitted on Flink's UI, however, it quickly fails
> because there are no workers to execute the Go transforms.
> 
> Thanks,
> Sherif Tolba
> 

Reply via email to