Thank you, Robert, for your detailed response and the resources you shared.

One thing that I didn't mention is that my goal is to move the setup to EKS
after completing local experimentation. As you pointed out LOOPBACK is
mainly for local setups and testing. I also started with the DOCKER mode,
however, Flink's Job Manager threw an error:

java.io.IOException: Cannot run program "docker": error=2, No such file or
directory

despite using the unaltered official image:

docker run --network=host --mount
type=bind,source=/tmp/staged,target=/tmp/staged flink:1.14.0 jobmanager

I tried to build a custom image to install docker and make sure flink user
has the write permissions to call it, without success.

Additionally, since I'd like move it eventually to the cluster, it made
more sense to me to try to use the EXTERNAL mode and have the workers spun
up as a separate Kubernetes deployment, then link to the associated K8s
service using the envConfig := k8s-go-sdk-harness-svc-name:<port_number>
pipeline option.I saw something similar done in this article
<https://ndeepak.com/posts/2022-07-07-local-beam/>, however, Deepak is
using a Python pipeline and it is more straightforward to start a Python
SDK Harness using the -workerpool flag. I was able to create a Python SDK
Harness similar to what he did but, as expected, when submitting the Go
pipeline, it failed because the environment URN refers to Python and not Go.

Below are more details about what I am doing:

1) Run Flink's Job Manager in one terminal using:

docker run --network=host --mount
type=bind,source=/tmp/staged,target=/tmp/staged flink:1.14.0 jobmanager

2) Run Flink's Task Manager in another tab using:

docker run --network=host --mount
type=bind,source=/tmp/beam-artifact-staging,target=/tmp/beam-artifact-staging
flink:1.14.0 taskmanager

3) Run Beam's Job Server in a third tab:

docker run --net=host --mount
type=bind,source=/tmp/staged,target=/tmp/staged
apache/beam_flink1.14_job_server:latest --flink-master=localhost:8081

4) Try to run Go SDK Harness in a fourth tab (fauliure):

docker run --network=host --mount
type=bind,source=/tmp/beam-artifact-staging,target=/tmp/beam-artifact-staging
apache/beam_go_sdk:2.46.0 --id=1-1 --logging_endpoint=localhost:44977
--artifact_endpoint=localhost:43219 --provision_endpoint=localhost:34437
--control_endpoint=localhost:42935

5) Compile the Go pipeline and run it in a fifth tab:

go build minimal_wordcount.go
./minimal_wordcount

*Code*

func main() {

setJobOptions()

beam.Init()
p := beam.NewPipeline()
s := p.Root()

lines := textio.Read(s, "gs://apache-beam-samples/shakespeare/kinglear.txt")
words := beam.ParDo(s, func(line string, emit func(string)) {
for _, word := range wordRE.FindAllString(line, -1) {
emit(word)
}
}, lines)

counted := stats.Count(s, words)
formatted := beam.ParDo(s, func(w string, c int) string {
return fmt.Sprintf("%s: %v", w, c)
}, counted)

textio.Write(s, "wordcounts.txt", formatted)
flink.Execute(context.Background(), p)
}

func setJobOptions() {
endPoint := "localhost:8099"
envType := "EXTERNAL"
envConfig := "localhost:34437"
jobName := "test_word_count"
isAsync := true
parallelism := 1

jobopts.JobName = &jobName
jobopts.Endpoint = &endPoint
jobopts.Async = &isAsync
jobopts.Parallelism = &parallelism
jobopts.EnvironmentType = &envType
jobopts.EnvironmentConfig = &envConfig
}

I am still reading up the container contract you linked but not sure if
starting the harness manually is a good idea in the first place based on
what you mentioned at the beginning of your response.

Thank you,
Sherif



On Wed, Mar 22, 2023 at 7:22 PM Robert Burke <lostl...@apache.org> wrote:

> I'm unfamiliar with configuring Flink to run Beam jobs, but AFAIK it's up
> to the runner to orchestrate/set up properly configured workers with the
> containers.  With Beam, there should never be any need to manually set up
> workers for Flink, etc to run on.
>
> Those flags/etc are part of the "beam container contract", and are
> internal implementation details, that (ideally), an end pipeline author
> doesn't need to worry about. The original design doc is here:
> https://s.apache.org/beam-fn-api-container-contract, but it's rather out
> of date WRT the fine details (eg. Modern SDKs use the single
> ProvisioningService to get the other service URLs, rather than them all
> being provided by flags.)
>
> The official instructions are here:
>
> https://beam.apache.org/documentation/runners/flink/
>
> In particular, there are two modes to be aware of for local runs:
>
> 1st. LOOPBACK mode, which will have Flink "loop back" to the submitting
> job process to execute the job.
>
> Start the Flink Beam Job service: (eg. for flink1.10)
>
> docker run --net=host apache/beam_flink1.10_job_server:latest
>
> Submitting your job to the Beam Job Server (eg. at localhost:8099), with
> the LOOPBACK environment type.
> --runner=PortableRunner
> --endpoint=localhost:8099
> --environment_type=LOOPBACK
>
> (Note, the doc there is very python SDK focused, so the --job_endpoint
> flag is just --endpoint in the Go SDK).
>
> Other than executing in the main process, this is still using portable
> beam.
>
> 2nd Container mode: This is closer to what you're trying to do.
>
> Per the linked doc, this requires you to start the Flink cluser with it's
> rest port (eg. localhost:8081), then with Docker, starting the connected
> Beam Job service:  (eg. for flink1.10)
>
> docker run --net=host apache/beam_flink1.10_job_server:latest
> --flink-master=localhost:8081
>
> Note the "flink-master" flag, is how Beam ultimate sends jobs to flink,
> and then sets up the workers.
>
> Then submit your job to *that* endpoint (which should remain at
> localhost:8081) this largely should largely be the same, but without
> setting the "environment_type" flag.
>
> -------
>
> Finally, I'd be remiss not to try to point you to the in development
> "Prism" runner, which will eventually replace the current Go Direct runner.
>
> See
> https://github.com/apache/beam/tree/master/sdks/go/pkg/beam/runners/prism
> for current usage instructions and restrictions.
>
> It's currently suitable for smoke testing small pipelines, but the goal is
> to have a portable reference runner, WRT all facets of beam. Depending on
> what Beam features you're using, it may not be suitable.
>
> I hope this helps!
> Robert Burke
> Beam Go Busybody
>
> (note, I don't monitor this list, but Beam Go SDK questions tend to find
> their way to me)
>
> On 2023/03/22 11:51:53 Sherif Tolba wrote:
> > Hi Apache Beam team,
> >
> > I hope this email finds you all well. I have been experimenting with
> Apache
> > Beam and Flink, mainly using golang. I hit a roadblock when trying to run
> > the minimal word count example on Beam and Flink locally using Go SDK
> > workers. I am trying to use the "apache/beam_go_sdk:2.46.0" Docker image
> as
> > follows:
> >
> > docker run --network=host apache/beam_go_sdk:2.46.0
> > --id=1-1 --provision_endpoint=localhost:50000   <-- (I set this port
> based
> > on some research online, but I don't really know what the service should
> be)
> >
> > However, I am unable to understand what the following options represent:
> >
> > Usage of /opt/apache/beam/boot:
> >   -artifact_endpoint string
> >         Local artifact endpoint for FnHarness (required).
> >   -control_endpoint string
> >         Local control endpoint for FnHarness (required).
> >   -id string
> >         Local identifier (required).
> >   -logging_endpoint string
> >         Local logging endpoint for FnHarness (required).
> >   -provision_endpoint string
> >         Local provision endpoint for FnHarness (required).
> >   -semi_persist_dir string
> >         Local semi-persistent directory (optional). (default "/tmp")
> >
> > I checked:
> > https://github.com/apache/beam/blob/master/sdks/go/container/boot.go but
> > still unable to tell what these endpoints are. I couldn't find any online
> > documentation describing, for example, what the provision_endpoint should
> > be set to.
> >
> > I would greatly appreciate any pointers or explanation.
> >
> > My setup is as follows: I have a Flink JobManager, two TaskManagers, and
> a
> > Beam JobServer running locally. I can execute the pipeline that's written
> > in Go and see the job submitted on Flink's UI, however, it quickly fails
> > because there are no workers to execute the Go transforms.
> >
> > Thanks,
> > Sherif Tolba
> >
>

Reply via email to