That's fine. While it adds a kubernetes dependency to the Beam go.mod, it won't 
actually be used by clients unless we're adding it to to one of the main 
packages. That's fairly safe.

I'd recommend putting an appropriately named package under into the "cmd" 
directory, since it would be most useful as a stand alone command for someone 
to run. That can be hashed out appropriately during code review, once it's 
working for your usecase.

On 2023/03/24 20:05:06 Sherif Tolba wrote:
> Hi Robert,
> 
> Thank you for your insightful response. The setup you described makes sense
> to me and I'd like to give the grpc server implementation a try. Referring
> to your point below:
> 
> "This `workerService` would then use whatever it likes to get containers
> onto VMs when requested by the Job service/flink."
> 
> , it will be Kubernetes-specific for my use case, is this okay?
> 
> Thanks,
> Sherif
> 
> On Thu, Mar 23, 2023 at 3:52 PM Robert Burke <[email protected]> wrote:
> 
> > Oh that's very interesting! I have a few comments, but we could end up
> > with a new feature for the Go SDK.
> >
> > As you've noted, you shouldn't really be manually spinning up step 4. It's
> > up to the runner to do that, but it does look like for your usage, some
> > assistance is needed.
> >
> > The Python Boot Loader has that in order to support "sibling processes",
> > on a single VM container. Basically, it's a hack to get around the Global
> > Interperter Lock slowing things down, and multiprocessing. Starting
> > additional separate processes allows for efficient use of cores and
> > multiprocessing. Go and Java don't need this since they have robust
> > concurrency support, and will generally process each bundle sent to them in
> > parallel.
> >
> > The bootloaders don't currently share a lot of code, since they were
> > developed with the language harness they start up in mind.
> >
> > So the worker_pool flag you mention is here:
> >
> > https://github.com/apache/beam/blob/8cfee7d05e6bdf73431f390577c9d000eadd3b3a/sdks/python/container/boot.go#L55
> >
> > The "External" stuff you see, is also how LOOPBACK mode operates. It just
> > treats all desired workers as internal processes. But, it's largely just a
> > service spec that anyone could implement and ensure it's pointed to.
> >
> > Eg. For Loopback in the Go SDK, the implementation is entirely here:
> >
> > https://github.com/apache/beam/blob/ba3dcd1cb983bbe92531ab7deae95438e93a1d4a/sdks/go/pkg/beam/runners/universal/extworker/extworker.go#L33
> >
> > Python's equivalent is here:
> > https://github.com/apache/beam/blob/e439f4120ef4c25aa36e5b03756dc7391bdbd211/sdks/python/apache_beam/runners/worker/worker_pool_main.py
> >
> > The service definition is pretty straight forward here, just StartWorker
> > and StopWorker requests.
> >
> >
> > https://github.com/apache/beam/blob/8cfee7d05e6bdf73431f390577c9d000eadd3b3a/model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/beam_fn_api.proto#L1104
> >
> > Basically, for distributed workers, you'd just need create a GRPC server
> > for that API to spin up the worker with the right flags so it can connect
> > and process the job. Note that this is "environment specific" so this would
> > only start "Go SDK" workers (just as you've seen the Python only starting
> > up Python SDK workers".)
> >
> > So if you're good with cluster managers, Kubernetes can be used for this
> > for example instead of whatever Flink is managing.
> >
> > The way I'm currently picturing it is a separate service binary (mostly to
> > avoid unnecessary built in deps...). If it's in the Go SDK Module, it
> > should default to the Go SDK container, but there's no reason not to
> > provide an override if desired.
> >
> > Default image for a given release is at :
> > https://github.com/apache/beam/blob/011296c14659f80c8ecbeefda79ecc3f1113bd95/sdks/go/pkg/beam/core/core.go#L33
> > (the dev versions aren't pushed by default, but this will work after
> > release).
> >
> > Then it's up to the runner to talk to that service to start and stop
> > workers as needed.
> >
> > $ workerService <external_worker_service_addr> --container=<container>
> > ...other config...
> >
> > $ myPipelineBinary --runner=portable --environment_type=external
> > --environment_config=<external_worker_service_addr> --endpoint=<beam job
> > service>
> >
> > This `workerService` would then use whatever it likes to get containers
> > onto VMs when requested by the Job service/flink.
> >
> > I'd be entirely delighted for such a thing to be contributed, and help
> > review it. @lostluck on github, if you desired to go this path.
> >
> > Robert Burke
> > Beam Go Busybody
> >
> > On 2023/03/23 15:09:44 Sherif Tolba wrote:
> > > Thank you, Robert, for your detailed response and the resources you
> > shared.
> > >
> > > One thing that I didn't mention is that my goal is to move the setup to
> > EKS
> > > after completing local experimentation. As you pointed out LOOPBACK is
> > > mainly for local setups and testing. I also started with the DOCKER mode,
> > > however, Flink's Job Manager threw an error:
> > >
> > > java.io.IOException: Cannot run program "docker": error=2, No such file
> > or
> > > directory
> > >
> > > despite using the unaltered official image:
> > >
> > > docker run --network=host --mount
> > > type=bind,source=/tmp/staged,target=/tmp/staged flink:1.14.0 jobmanager
> > >
> > > I tried to build a custom image to install docker and make sure flink
> > user
> > > has the write permissions to call it, without success.
> > >
> > > Additionally, since I'd like move it eventually to the cluster, it made
> > > more sense to me to try to use the EXTERNAL mode and have the workers
> > spun
> > > up as a separate Kubernetes deployment, then link to the associated K8s
> > > service using the envConfig := k8s-go-sdk-harness-svc-name:<port_number>
> > > pipeline option.I saw something similar done in this article
> > > <https://ndeepak.com/posts/2022-07-07-local-beam/>, however, Deepak is
> > > using a Python pipeline and it is more straightforward to start a Python
> > > SDK Harness using the -workerpool flag. I was able to create a Python SDK
> > > Harness similar to what he did but, as expected, when submitting the Go
> > > pipeline, it failed because the environment URN refers to Python and not
> > Go.
> > >
> > > Below are more details about what I am doing:
> > >
> > > 1) Run Flink's Job Manager in one terminal using:
> > >
> > > docker run --network=host --mount
> > > type=bind,source=/tmp/staged,target=/tmp/staged flink:1.14.0 jobmanager
> > >
> > > 2) Run Flink's Task Manager in another tab using:
> > >
> > > docker run --network=host --mount
> > >
> > type=bind,source=/tmp/beam-artifact-staging,target=/tmp/beam-artifact-staging
> > > flink:1.14.0 taskmanager
> > >
> > > 3) Run Beam's Job Server in a third tab:
> > >
> > > docker run --net=host --mount
> > > type=bind,source=/tmp/staged,target=/tmp/staged
> > > apache/beam_flink1.14_job_server:latest --flink-master=localhost:8081
> > >
> > > 4) Try to run Go SDK Harness in a fourth tab (fauliure):
> > >
> > > docker run --network=host --mount
> > >
> > type=bind,source=/tmp/beam-artifact-staging,target=/tmp/beam-artifact-staging
> > > apache/beam_go_sdk:2.46.0 --id=1-1 --logging_endpoint=localhost:44977
> > > --artifact_endpoint=localhost:43219 --provision_endpoint=localhost:34437
> > > --control_endpoint=localhost:42935
> > >
> > > 5) Compile the Go pipeline and run it in a fifth tab:
> > >
> > > go build minimal_wordcount.go
> > > ./minimal_wordcount
> > >
> > > *Code*
> > >
> > > func main() {
> > >
> > > setJobOptions()
> > >
> > > beam.Init()
> > > p := beam.NewPipeline()
> > > s := p.Root()
> > >
> > > lines := textio.Read(s,
> > "gs://apache-beam-samples/shakespeare/kinglear.txt")
> > > words := beam.ParDo(s, func(line string, emit func(string)) {
> > > for _, word := range wordRE.FindAllString(line, -1) {
> > > emit(word)
> > > }
> > > }, lines)
> > >
> > > counted := stats.Count(s, words)
> > > formatted := beam.ParDo(s, func(w string, c int) string {
> > > return fmt.Sprintf("%s: %v", w, c)
> > > }, counted)
> > >
> > > textio.Write(s, "wordcounts.txt", formatted)
> > > flink.Execute(context.Background(), p)
> > > }
> > >
> > > func setJobOptions() {
> > > endPoint := "localhost:8099"
> > > envType := "EXTERNAL"
> > > envConfig := "localhost:34437"
> > > jobName := "test_word_count"
> > > isAsync := true
> > > parallelism := 1
> > >
> > > jobopts.JobName = &jobName
> > > jobopts.Endpoint = &endPoint
> > > jobopts.Async = &isAsync
> > > jobopts.Parallelism = &parallelism
> > > jobopts.EnvironmentType = &envType
> > > jobopts.EnvironmentConfig = &envConfig
> > > }
> > >
> > > I am still reading up the container contract you linked but not sure if
> > > starting the harness manually is a good idea in the first place based on
> > > what you mentioned at the beginning of your response.
> > >
> > > Thank you,
> > > Sherif
> > >
> > >
> > >
> > > On Wed, Mar 22, 2023 at 7:22 PM Robert Burke <[email protected]>
> > wrote:
> > >
> > > > I'm unfamiliar with configuring Flink to run Beam jobs, but AFAIK it's
> > up
> > > > to the runner to orchestrate/set up properly configured workers with
> > the
> > > > containers.  With Beam, there should never be any need to manually set
> > up
> > > > workers for Flink, etc to run on.
> > > >
> > > > Those flags/etc are part of the "beam container contract", and are
> > > > internal implementation details, that (ideally), an end pipeline author
> > > > doesn't need to worry about. The original design doc is here:
> > > > https://s.apache.org/beam-fn-api-container-contract, but it's rather
> > out
> > > > of date WRT the fine details (eg. Modern SDKs use the single
> > > > ProvisioningService to get the other service URLs, rather than them all
> > > > being provided by flags.)
> > > >
> > > > The official instructions are here:
> > > >
> > > > https://beam.apache.org/documentation/runners/flink/
> > > >
> > > > In particular, there are two modes to be aware of for local runs:
> > > >
> > > > 1st. LOOPBACK mode, which will have Flink "loop back" to the submitting
> > > > job process to execute the job.
> > > >
> > > > Start the Flink Beam Job service: (eg. for flink1.10)
> > > >
> > > > docker run --net=host apache/beam_flink1.10_job_server:latest
> > > >
> > > > Submitting your job to the Beam Job Server (eg. at localhost:8099),
> > with
> > > > the LOOPBACK environment type.
> > > > --runner=PortableRunner
> > > > --endpoint=localhost:8099
> > > > --environment_type=LOOPBACK
> > > >
> > > > (Note, the doc there is very python SDK focused, so the --job_endpoint
> > > > flag is just --endpoint in the Go SDK).
> > > >
> > > > Other than executing in the main process, this is still using portable
> > > > beam.
> > > >
> > > > 2nd Container mode: This is closer to what you're trying to do.
> > > >
> > > > Per the linked doc, this requires you to start the Flink cluser with
> > it's
> > > > rest port (eg. localhost:8081), then with Docker, starting the
> > connected
> > > > Beam Job service:  (eg. for flink1.10)
> > > >
> > > > docker run --net=host apache/beam_flink1.10_job_server:latest
> > > > --flink-master=localhost:8081
> > > >
> > > > Note the "flink-master" flag, is how Beam ultimate sends jobs to flink,
> > > > and then sets up the workers.
> > > >
> > > > Then submit your job to *that* endpoint (which should remain at
> > > > localhost:8081) this largely should largely be the same, but without
> > > > setting the "environment_type" flag.
> > > >
> > > > -------
> > > >
> > > > Finally, I'd be remiss not to try to point you to the in development
> > > > "Prism" runner, which will eventually replace the current Go Direct
> > runner.
> > > >
> > > > See
> > > >
> > https://github.com/apache/beam/tree/master/sdks/go/pkg/beam/runners/prism
> > > > for current usage instructions and restrictions.
> > > >
> > > > It's currently suitable for smoke testing small pipelines, but the
> > goal is
> > > > to have a portable reference runner, WRT all facets of beam. Depending
> > on
> > > > what Beam features you're using, it may not be suitable.
> > > >
> > > > I hope this helps!
> > > > Robert Burke
> > > > Beam Go Busybody
> > > >
> > > > (note, I don't monitor this list, but Beam Go SDK questions tend to
> > find
> > > > their way to me)
> > > >
> > > > On 2023/03/22 11:51:53 Sherif Tolba wrote:
> > > > > Hi Apache Beam team,
> > > > >
> > > > > I hope this email finds you all well. I have been experimenting with
> > > > Apache
> > > > > Beam and Flink, mainly using golang. I hit a roadblock when trying
> > to run
> > > > > the minimal word count example on Beam and Flink locally using Go SDK
> > > > > workers. I am trying to use the "apache/beam_go_sdk:2.46.0" Docker
> > image
> > > > as
> > > > > follows:
> > > > >
> > > > > docker run --network=host apache/beam_go_sdk:2.46.0
> > > > > --id=1-1 --provision_endpoint=localhost:50000   <-- (I set this port
> > > > based
> > > > > on some research online, but I don't really know what the service
> > should
> > > > be)
> > > > >
> > > > > However, I am unable to understand what the following options
> > represent:
> > > > >
> > > > > Usage of /opt/apache/beam/boot:
> > > > >   -artifact_endpoint string
> > > > >         Local artifact endpoint for FnHarness (required).
> > > > >   -control_endpoint string
> > > > >         Local control endpoint for FnHarness (required).
> > > > >   -id string
> > > > >         Local identifier (required).
> > > > >   -logging_endpoint string
> > > > >         Local logging endpoint for FnHarness (required).
> > > > >   -provision_endpoint string
> > > > >         Local provision endpoint for FnHarness (required).
> > > > >   -semi_persist_dir string
> > > > >         Local semi-persistent directory (optional). (default "/tmp")
> > > > >
> > > > > I checked:
> > > > > https://github.com/apache/beam/blob/master/sdks/go/container/boot.go
> > but
> > > > > still unable to tell what these endpoints are. I couldn't find any
> > online
> > > > > documentation describing, for example, what the provision_endpoint
> > should
> > > > > be set to.
> > > > >
> > > > > I would greatly appreciate any pointers or explanation.
> > > > >
> > > > > My setup is as follows: I have a Flink JobManager, two TaskManagers,
> > and
> > > > a
> > > > > Beam JobServer running locally. I can execute the pipeline that's
> > written
> > > > > in Go and see the job submitted on Flink's UI, however, it quickly
> > fails
> > > > > because there are no workers to execute the Go transforms.
> > > > >
> > > > > Thanks,
> > > > > Sherif Tolba
> > > > >
> > > >
> > >
> >
> 

Reply via email to