Woohoo thanks Kyle, adding --save_main_session made it work!!! On Fri, Aug 28, 2020 at 5:02 PM Kyle Weaver <kcwea...@google.com> wrote:
> > rpc error: code = Unimplemented desc = Method not found: > org.apache.beam.model.job_management.v1.LegacyArtifactRetrievalService/GetManifest > > This is a known issue: https://issues.apache.org/jira/browse/BEAM-10762 > > On Fri, Aug 28, 2020 at 4:57 PM Eugene Kirpichov <ekirpic...@gmail.com> > wrote: > >> P.S. Ironic how back in 2018 I was TL-ing the portable runners effort for >> a few months on Google side, and now I need community help to get it to >> work at all. >> Still pretty miraculous how far Beam's portability has come since >> then, even if it has a steep learning curve. >> >> On Fri, Aug 28, 2020 at 4:54 PM Eugene Kirpichov <ekirpic...@gmail.com> >> wrote: >> >>> Hi Sam, >>> >>> You're a wizard - this got me *way* farther than my previous attempts. >>> Here's a PR https://github.com/sambvfx/beam-flink-k8s/pull/1 with a >>> couple of changes I had to make. >>> >>> I had to make some additional changes that do not make sense to share, >>> but here they are for the record: >>> - Because I'm running on k8s engine and not minikube, I had to put the >>> docker-flink image on GCR, changing flink.yaml "image: docker-flink:1.10" >>> -> "image: us.gcr.io/$PROJECT_ID/docker-flink:1.10". I of course also >>> had to build and push the container. >>> - Because I'm running with a custom container based on an unreleased >>> version of Beam, I had to push my custom container to GCR too, and change >>> your instructions to use that image name instead of the default one >>> - To fetch the containers from GCR, I had to log into Docker inside the >>> Flink nodes, specifically inside the taskmanager container, using something >>> like "kubectl exec pod/flink-taskmanager-blahblah -c taskmanager -- docker >>> login -u oauth2accesstoken --password $(gcloud auth print-access-token)" >>> - Again because I'm using an unreleased Beam SDK (due to a bug whose fix >>> will be released in 2.24), I had to also build a custom Flink job server >>> jar and point to it via --flink_job_server_jar. >>> >>> In the end, I got my pipeline to start, create the uber jar (about 240MB >>> in size), take a few minutes to transmit it to Flink (which is a long time, >>> but it'll do for a prototype); the Flink UI was displaying the pipeline, >>> and was able to *start* the worker container - however it quickly >>> failed with the following error: >>> >>> 2020/08/28 15:49:09 Initializing python harness: /opt/apache/beam/boot >>> --id=1-1 --provision_endpoint=localhost:45111 >>> 2020/08/28 15:49:09 Failed to retrieve staged files: failed to get >>> manifest >>> caused by: >>> rpc error: code = Unimplemented desc = Method not found: >>> org.apache.beam.model.job_management.v1.LegacyArtifactRetrievalService/GetManifest >>> (followed by a bunch of other garbage) >>> >>> I'm assuming this might be because I got tangled in my custom images >>> related to the unreleased Beam SDK, and should be fixed if running on clean >>> Beam 2.24. >>> >>> Thank you again! >>> >>> On Fri, Aug 28, 2020 at 10:21 AM Eugene Kirpichov <ekirpic...@gmail.com> >>> wrote: >>> >>>> Holy shit, thanks Sam, this is more help than I could have asked for!! >>>> I'll give this a shot later today and report back. >>>> >>>> On Thu, Aug 27, 2020 at 10:27 PM Sam Bourne <samb...@gmail.com> wrote: >>>> >>>>> Hi Eugene! >>>>> >>>>> I’m struggling to find complete documentation on how to do this. There >>>>> seems to be lots of conflicting or incomplete information: several ways to >>>>> deploy Flink, several ways to get Beam working with it, bizarre >>>>> StackOverflow questions, and no documentation explaining a complete >>>>> working >>>>> example. >>>>> >>>>> This *is* possible and I went through all the same frustrations of >>>>> sparse and confusing documentation. I’m glossing over a lot of details, >>>>> but >>>>> the key thing was setting up the flink taskworker(s) to run docker. This >>>>> requires running docker-in-docker as the taskworker itself is a docker >>>>> container in k8s. >>>>> >>>>> First create a custom flink container with docker: >>>>> >>>>> # docker-flink Dockerfile >>>>> >>>>> FROM flink:1.10 >>>>> # install docker >>>>> RUN apt-get ... >>>>> >>>>> Then setup the taskmanager deployment to use a sidecar >>>>> docker-in-docker service. This dind service is where the python sdk >>>>> harness >>>>> container actually runs. >>>>> >>>>> kind: Deployment >>>>> ... >>>>> containers: >>>>> - name: docker >>>>> image: docker:19.03.5-dind >>>>> ... >>>>> - name: taskmanger >>>>> image: myregistry:5000/docker-flink:1.10 >>>>> env: >>>>> - name: DOCKER_HOST >>>>> value: tcp://localhost:2375 >>>>> ... >>>>> >>>>> I quickly threw all these pieces together in a repo here: >>>>> https://github.com/sambvfx/beam-flink-k8s >>>>> >>>>> I added a working (via minikube) step-by-step in the README to prove >>>>> to myself that I didn’t miss anything, but feel free to submit any PRs if >>>>> you want to add anything useful. >>>>> >>>>> The documents you linked are very informative. It would be great to >>>>> aggregate all this into digestible documentation. Let me know if you have >>>>> any further questions! >>>>> >>>>> Cheers, >>>>> Sam >>>>> >>>>> On Thu, Aug 27, 2020 at 10:25 AM Eugene Kirpichov < >>>>> ekirpic...@gmail.com> wrote: >>>>> >>>>>> Hi Kyle, >>>>>> >>>>>> Thanks for the response! >>>>>> >>>>>> On Wed, Aug 26, 2020 at 5:28 PM Kyle Weaver <kcwea...@google.com> >>>>>> wrote: >>>>>> >>>>>>> > - With the Flink operator, I was able to submit a Beam job, but >>>>>>> hit the issue that I need Docker installed on my Flink nodes. I haven't >>>>>>> yet >>>>>>> tried changing the operator's yaml files to add Docker inside them. >>>>>>> >>>>>>> Running Beam workers via Docker on the Flink nodes is not >>>>>>> recommended (and probably not even possible), since the Flink nodes are >>>>>>> themselves already running inside Docker containers. Running workers as >>>>>>> sidecars avoids that problem. For example: >>>>>>> https://github.com/GoogleCloudPlatform/flink-on-k8s-operator/blob/master/examples/beam/with_job_server/beam_flink_cluster.yaml#L17-L20 >>>>>>> >>>>>>> The main problem with the sidecar approach is that I can't use the >>>>>> Flink cluster as a "service" for anybody to submit their jobs with custom >>>>>> containers - the container version is fixed. >>>>>> Do I understand it correctly? >>>>>> Seems like the Docker-in-Docker approach is viable, and is mentioned >>>>>> in the Beam Flink K8s design doc >>>>>> <https://docs.google.com/document/d/1z3LNrRtr8kkiFHonZ5JJM_L4NWNBBNcqRc_yAf6G0VI/edit#heading=h.dtj1gnks47dq> >>>>>> . >>>>>> >>>>>> >>>>>>> > I also haven't tried this >>>>>>> <https://github.com/GoogleCloudPlatform/flink-on-k8s-operator/blob/master/docs/beam_guide.md> >>>>>>> yet >>>>>>> because it implies submitting jobs using "kubectl apply" which is >>>>>>> weird - >>>>>>> why not just submit it through the Flink job server? >>>>>>> >>>>>>> I'm guessing it goes through k8s for monitoring purposes. I see no >>>>>>> reason it shouldn't be possible to submit to the job server directly >>>>>>> through Python, network permitting, though I haven't tried this. >>>>>>> >>>>>>> >>>>>>> >>>>>>> On Wed, Aug 26, 2020 at 4:10 PM Eugene Kirpichov < >>>>>>> ekirpic...@gmail.com> wrote: >>>>>>> >>>>>>>> Hi folks, >>>>>>>> >>>>>>>> I'm still working with Pachama <https://pachama.com/> right now; >>>>>>>> we have a Kubernetes Engine cluster on GCP and want to run Beam Python >>>>>>>> batch pipelines with custom containers against it. >>>>>>>> Flink and Cloud Dataflow are the two options; Cloud Dataflow >>>>>>>> doesn't support custom containers for batch pipelines yes so we're >>>>>>>> going >>>>>>>> with Flink. >>>>>>>> >>>>>>>> I'm struggling to find complete documentation on how to do this. >>>>>>>> There seems to be lots of conflicting or incomplete information: >>>>>>>> several >>>>>>>> ways to deploy Flink, several ways to get Beam working with it, bizarre >>>>>>>> StackOverflow questions, and no documentation explaining a complete >>>>>>>> working >>>>>>>> example. >>>>>>>> >>>>>>>> == My requests == >>>>>>>> * Could people briefly share their working setup? Would be good to >>>>>>>> know which directions are promising. >>>>>>>> * It would be particularly helpful if someone could volunteer an >>>>>>>> hour of their time to talk to me about their working Beam/Flink/k8s >>>>>>>> setup. >>>>>>>> It's for a good cause (fixing the planet :) ) and on my side I >>>>>>>> volunteer to >>>>>>>> write up the findings to share with the community so others suffer >>>>>>>> less. >>>>>>>> >>>>>>>> == Appendix: My findings so far == >>>>>>>> There are multiple ways to deploy Flink on k8s: >>>>>>>> - The GCP marketplace Flink operator >>>>>>>> <https://cloud.google.com/blog/products/data-analytics/open-source-processing-engines-for-kubernetes> >>>>>>>> (couldn't >>>>>>>> get it to work) and the respective CLI version >>>>>>>> <https://github.com/GoogleCloudPlatform/flink-on-k8s-operator> (buggy, >>>>>>>> but I got it working) >>>>>>>> - https://github.com/lyft/flinkk8soperator (haven't tried) >>>>>>>> - Flink's native k8s support >>>>>>>> <https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/deployment/native_kubernetes.html> >>>>>>>> (super >>>>>>>> easy to get working) >>>>>>>> <https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/deployment/native_kubernetes.html> >>>>>>>> >>>>>>>> I confirmed that my Flink cluster was operational by running a >>>>>>>> simple Wordcount job, initiated from my machine. However I wasn't yet >>>>>>>> able >>>>>>>> to get Beam working: >>>>>>>> >>>>>>>> - With the Flink operator, I was able to submit a Beam job, but hit >>>>>>>> the issue that I need Docker installed on my Flink nodes. I haven't yet >>>>>>>> tried changing the operator's yaml files to add Docker inside them. I >>>>>>>> also >>>>>>>> haven't tried this >>>>>>>> <https://github.com/GoogleCloudPlatform/flink-on-k8s-operator/blob/master/docs/beam_guide.md> >>>>>>>> yet because it implies submitting jobs using "kubectl apply" which is >>>>>>>> weird - why not just submit it through the Flink job server? >>>>>>>> >>>>>>>> - With Flink's native k8s support, I tried two things: >>>>>>>> - Creating a fat portable jar using --output_executable_path. >>>>>>>> The jar is huge (200+MB) and takes forever to upload to my Flink >>>>>>>> cluster - >>>>>>>> this is a non-starter. But if I actually upload it, then I hit the same >>>>>>>> issue with lacking Docker. Haven't tried fixing it yet. >>>>>>>> - Simply running my pipeline --runner=FlinkRunner >>>>>>>> --environment_type=DOCKER --flink_master=$PUBLIC_IP:8081. The Java >>>>>>>> process >>>>>>>> appears to send 1+GB of data to somewhere, but the job never even >>>>>>>> starts. >>>>>>>> >>>>>>>> I looked at a few conference talks: >>>>>>>> - >>>>>>>> https://www.cncf.io/wp-content/uploads/2020/02/CNCF-Webinar_-Apache-Flink-on-Kubernetes-Operator-1.pdf >>>>>>>> - seems to imply that I need to add a Beam worker "sidecar" to the >>>>>>>> Flink >>>>>>>> workers; and that I need to submit my job using "kubectl apply". >>>>>>>> - https://www.youtube.com/watch?v=8k1iezoc5Sc which also mentions >>>>>>>> the sidecar, but also mentions the fat jar option >>>>>>>> >>>>>>>> -- >>>>>>>> Eugene Kirpichov >>>>>>>> http://www.linkedin.com/in/eugenekirpichov >>>>>>>> >>>>>>> >>>>>> >>>>>> -- >>>>>> Eugene Kirpichov >>>>>> http://www.linkedin.com/in/eugenekirpichov >>>>>> >>>>> >>>> >>>> -- >>>> Eugene Kirpichov >>>> http://www.linkedin.com/in/eugenekirpichov >>>> >>> >>> >>> -- >>> Eugene Kirpichov >>> http://www.linkedin.com/in/eugenekirpichov >>> >> >> >> -- >> Eugene Kirpichov >> http://www.linkedin.com/in/eugenekirpichov >> > -- Eugene Kirpichov http://www.linkedin.com/in/eugenekirpichov