On Fri, Aug 28, 2020 at 6:52 PM Sam Bourne <samb...@gmail.com> wrote:
> Hi Eugene, > > Glad that helped you out and thanks for the PR tweaking it for GCP. > > To fetch the containers from GCR, I had to log into Docker inside the > Flink nodes, specifically inside the taskmanager container, using something > like “kubectl exec pod/flink-taskmanager-blahblah -c taskmanager — docker > login -u oauth2accesstoken —password $(gcloud auth print-access-token)” > > Ouch that seems painful. I find this “precaching” step pretty silly and > have considered making the DockerEnvironmentFactory a little more > intelligent about how it deals with timeouts (e.g. no activity). It doesn’t > seem like it would be too difficult to also add first-class support for > pulling images from protected repositories. Extending the DockerPayload > protobuf to pass along the additional information and tweaking the > DockerEnvironmentFactory? I’m not a java expert but that might be worth > exploring if this continues to be problematic. > Yeah it makes sense to have some first-class support for Docker credentials. It's kind of a no-brainer that it's necessary with custom containers, many companies probably wouldn't want to push their custom containers to a public repo. I was thinking of embedding the credentials JSON file into the taskmanager container through its Dockerfile, that's workable but also pretty silly - having to rebuild this container just for the sake of putting in the credentials. DockerPayload might be the right place to put credentials, but I wonder if there's a way to do something more secure, with k8s secrets. I'm not too well-versed in credential management. > I found the time it takes to pull can be dramatically improved if you store > everything in memory > <https://github.com/sambvfx/beam-flink-k8s/blob/master/k8s/flink.yaml#L186> > . > > In the end, I got my pipeline to start, create the uber jar (about 240MB > in size), take a few minutes to transmit it to Flink > > You could explore spinning up the beam-flink-job-server > <https://github.com/sambvfx/beam-flink-k8s/blob/master/k8s/beam-flink-jobserver.yaml> > and using the PortableRunner. In theory that should reduce the amount of > data you’re syncing to the cluster. It does require exposing at least two > ingress points (8099 and 8098) so you can hit the job and artifact services > respectively. > Right, good idea! Haven't tried spinning up the job server. Exposing the job and artifact services seems pretty easy; but would also need to replace the jobserver image "apache/beam_flink1.10_job_server:2.23.0" with a custom-built one with the Beam 2.24 snapshot we're using. > Cheers, > Sam > > On Fri, Aug 28, 2020 at 5:50 PM Eugene Kirpichov <ekirpic...@gmail.com> > wrote: > >> Woohoo thanks Kyle, adding --save_main_session made it work!!! >> >> On Fri, Aug 28, 2020 at 5:02 PM Kyle Weaver <kcwea...@google.com> wrote: >> >>> > rpc error: code = Unimplemented desc = Method not found: >>> org.apache.beam.model.job_management.v1.LegacyArtifactRetrievalService/GetManifest >>> >>> This is a known issue: https://issues.apache.org/jira/browse/BEAM-10762 >>> >>> On Fri, Aug 28, 2020 at 4:57 PM Eugene Kirpichov <ekirpic...@gmail.com> >>> wrote: >>> >>>> P.S. Ironic how back in 2018 I was TL-ing the portable runners effort >>>> for a few months on Google side, and now I need community help to get it to >>>> work at all. >>>> Still pretty miraculous how far Beam's portability has come since >>>> then, even if it has a steep learning curve. >>>> >>>> On Fri, Aug 28, 2020 at 4:54 PM Eugene Kirpichov <ekirpic...@gmail.com> >>>> wrote: >>>> >>>>> Hi Sam, >>>>> >>>>> You're a wizard - this got me *way* farther than my previous >>>>> attempts. Here's a PR https://github.com/sambvfx/beam-flink-k8s/pull/1 >>>>> with >>>>> a couple of changes I had to make. >>>>> >>>>> I had to make some additional changes that do not make sense to share, >>>>> but here they are for the record: >>>>> - Because I'm running on k8s engine and not minikube, I had to put the >>>>> docker-flink image on GCR, changing flink.yaml "image: docker-flink:1.10" >>>>> -> "image: us.gcr.io/$PROJECT_ID/docker-flink:1.10". I of course also >>>>> had to build and push the container. >>>>> - Because I'm running with a custom container based on an unreleased >>>>> version of Beam, I had to push my custom container to GCR too, and change >>>>> your instructions to use that image name instead of the default one >>>>> - To fetch the containers from GCR, I had to log into Docker inside >>>>> the Flink nodes, specifically inside the taskmanager container, using >>>>> something like "kubectl exec pod/flink-taskmanager-blahblah -c taskmanager >>>>> -- docker login -u oauth2accesstoken --password $(gcloud auth >>>>> print-access-token)" >>>>> - Again because I'm using an unreleased Beam SDK (due to a bug whose >>>>> fix will be released in 2.24), I had to also build a custom Flink job >>>>> server jar and point to it via --flink_job_server_jar. >>>>> >>>>> In the end, I got my pipeline to start, create the uber jar (about >>>>> 240MB in size), take a few minutes to transmit it to Flink (which is a >>>>> long >>>>> time, but it'll do for a prototype); the Flink UI was displaying the >>>>> pipeline, and was able to *start* the worker container - however it >>>>> quickly failed with the following error: >>>>> >>>>> 2020/08/28 15:49:09 Initializing python harness: /opt/apache/beam/boot >>>>> --id=1-1 --provision_endpoint=localhost:45111 >>>>> 2020/08/28 15:49:09 Failed to retrieve staged files: failed to get >>>>> manifest >>>>> caused by: >>>>> rpc error: code = Unimplemented desc = Method not found: >>>>> org.apache.beam.model.job_management.v1.LegacyArtifactRetrievalService/GetManifest >>>>> (followed by a bunch of other garbage) >>>>> >>>>> I'm assuming this might be because I got tangled in my custom images >>>>> related to the unreleased Beam SDK, and should be fixed if running on >>>>> clean >>>>> Beam 2.24. >>>>> >>>>> Thank you again! >>>>> >>>>> On Fri, Aug 28, 2020 at 10:21 AM Eugene Kirpichov < >>>>> ekirpic...@gmail.com> wrote: >>>>> >>>>>> Holy shit, thanks Sam, this is more help than I could have asked for!! >>>>>> I'll give this a shot later today and report back. >>>>>> >>>>>> On Thu, Aug 27, 2020 at 10:27 PM Sam Bourne <samb...@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> Hi Eugene! >>>>>>> >>>>>>> I’m struggling to find complete documentation on how to do this. >>>>>>> There seems to be lots of conflicting or incomplete information: several >>>>>>> ways to deploy Flink, several ways to get Beam working with it, bizarre >>>>>>> StackOverflow questions, and no documentation explaining a complete >>>>>>> working >>>>>>> example. >>>>>>> >>>>>>> This *is* possible and I went through all the same frustrations of >>>>>>> sparse and confusing documentation. I’m glossing over a lot of details, >>>>>>> but >>>>>>> the key thing was setting up the flink taskworker(s) to run docker. This >>>>>>> requires running docker-in-docker as the taskworker itself is a docker >>>>>>> container in k8s. >>>>>>> >>>>>>> First create a custom flink container with docker: >>>>>>> >>>>>>> # docker-flink Dockerfile >>>>>>> >>>>>>> FROM flink:1.10 >>>>>>> # install docker >>>>>>> RUN apt-get ... >>>>>>> >>>>>>> Then setup the taskmanager deployment to use a sidecar >>>>>>> docker-in-docker service. This dind service is where the python sdk >>>>>>> harness >>>>>>> container actually runs. >>>>>>> >>>>>>> kind: Deployment >>>>>>> ... >>>>>>> containers: >>>>>>> - name: docker >>>>>>> image: docker:19.03.5-dind >>>>>>> ... >>>>>>> - name: taskmanger >>>>>>> image: myregistry:5000/docker-flink:1.10 >>>>>>> env: >>>>>>> - name: DOCKER_HOST >>>>>>> value: tcp://localhost:2375 >>>>>>> ... >>>>>>> >>>>>>> I quickly threw all these pieces together in a repo here: >>>>>>> https://github.com/sambvfx/beam-flink-k8s >>>>>>> >>>>>>> I added a working (via minikube) step-by-step in the README to prove >>>>>>> to myself that I didn’t miss anything, but feel free to submit any PRs >>>>>>> if >>>>>>> you want to add anything useful. >>>>>>> >>>>>>> The documents you linked are very informative. It would be great to >>>>>>> aggregate all this into digestible documentation. Let me know if you >>>>>>> have >>>>>>> any further questions! >>>>>>> >>>>>>> Cheers, >>>>>>> Sam >>>>>>> >>>>>>> On Thu, Aug 27, 2020 at 10:25 AM Eugene Kirpichov < >>>>>>> ekirpic...@gmail.com> wrote: >>>>>>> >>>>>>>> Hi Kyle, >>>>>>>> >>>>>>>> Thanks for the response! >>>>>>>> >>>>>>>> On Wed, Aug 26, 2020 at 5:28 PM Kyle Weaver <kcwea...@google.com> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> > - With the Flink operator, I was able to submit a Beam job, but >>>>>>>>> hit the issue that I need Docker installed on my Flink nodes. I >>>>>>>>> haven't yet >>>>>>>>> tried changing the operator's yaml files to add Docker inside them. >>>>>>>>> >>>>>>>>> Running Beam workers via Docker on the Flink nodes is not >>>>>>>>> recommended (and probably not even possible), since the Flink nodes >>>>>>>>> are >>>>>>>>> themselves already running inside Docker containers. Running workers >>>>>>>>> as >>>>>>>>> sidecars avoids that problem. For example: >>>>>>>>> https://github.com/GoogleCloudPlatform/flink-on-k8s-operator/blob/master/examples/beam/with_job_server/beam_flink_cluster.yaml#L17-L20 >>>>>>>>> >>>>>>>>> The main problem with the sidecar approach is that I can't use the >>>>>>>> Flink cluster as a "service" for anybody to submit their jobs with >>>>>>>> custom >>>>>>>> containers - the container version is fixed. >>>>>>>> Do I understand it correctly? >>>>>>>> Seems like the Docker-in-Docker approach is viable, and is >>>>>>>> mentioned in the Beam Flink K8s design doc >>>>>>>> <https://docs.google.com/document/d/1z3LNrRtr8kkiFHonZ5JJM_L4NWNBBNcqRc_yAf6G0VI/edit#heading=h.dtj1gnks47dq> >>>>>>>> . >>>>>>>> >>>>>>>> >>>>>>>>> > I also haven't tried this >>>>>>>>> <https://github.com/GoogleCloudPlatform/flink-on-k8s-operator/blob/master/docs/beam_guide.md> >>>>>>>>> yet >>>>>>>>> because it implies submitting jobs using "kubectl apply" which is >>>>>>>>> weird - >>>>>>>>> why not just submit it through the Flink job server? >>>>>>>>> >>>>>>>>> I'm guessing it goes through k8s for monitoring purposes. I see no >>>>>>>>> reason it shouldn't be possible to submit to the job server directly >>>>>>>>> through Python, network permitting, though I haven't tried this. >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> On Wed, Aug 26, 2020 at 4:10 PM Eugene Kirpichov < >>>>>>>>> ekirpic...@gmail.com> wrote: >>>>>>>>> >>>>>>>>>> Hi folks, >>>>>>>>>> >>>>>>>>>> I'm still working with Pachama <https://pachama.com/> right now; >>>>>>>>>> we have a Kubernetes Engine cluster on GCP and want to run Beam >>>>>>>>>> Python >>>>>>>>>> batch pipelines with custom containers against it. >>>>>>>>>> Flink and Cloud Dataflow are the two options; Cloud Dataflow >>>>>>>>>> doesn't support custom containers for batch pipelines yes so we're >>>>>>>>>> going >>>>>>>>>> with Flink. >>>>>>>>>> >>>>>>>>>> I'm struggling to find complete documentation on how to do this. >>>>>>>>>> There seems to be lots of conflicting or incomplete information: >>>>>>>>>> several >>>>>>>>>> ways to deploy Flink, several ways to get Beam working with it, >>>>>>>>>> bizarre >>>>>>>>>> StackOverflow questions, and no documentation explaining a complete >>>>>>>>>> working >>>>>>>>>> example. >>>>>>>>>> >>>>>>>>>> == My requests == >>>>>>>>>> * Could people briefly share their working setup? Would be good >>>>>>>>>> to know which directions are promising. >>>>>>>>>> * It would be particularly helpful if someone could volunteer an >>>>>>>>>> hour of their time to talk to me about their working Beam/Flink/k8s >>>>>>>>>> setup. >>>>>>>>>> It's for a good cause (fixing the planet :) ) and on my side I >>>>>>>>>> volunteer to >>>>>>>>>> write up the findings to share with the community so others suffer >>>>>>>>>> less. >>>>>>>>>> >>>>>>>>>> == Appendix: My findings so far == >>>>>>>>>> There are multiple ways to deploy Flink on k8s: >>>>>>>>>> - The GCP marketplace Flink operator >>>>>>>>>> <https://cloud.google.com/blog/products/data-analytics/open-source-processing-engines-for-kubernetes> >>>>>>>>>> (couldn't >>>>>>>>>> get it to work) and the respective CLI version >>>>>>>>>> <https://github.com/GoogleCloudPlatform/flink-on-k8s-operator> >>>>>>>>>> (buggy, >>>>>>>>>> but I got it working) >>>>>>>>>> - https://github.com/lyft/flinkk8soperator (haven't tried) >>>>>>>>>> - Flink's native k8s support >>>>>>>>>> <https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/deployment/native_kubernetes.html> >>>>>>>>>> (super >>>>>>>>>> easy to get working) >>>>>>>>>> <https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/deployment/native_kubernetes.html> >>>>>>>>>> >>>>>>>>>> I confirmed that my Flink cluster was operational by running a >>>>>>>>>> simple Wordcount job, initiated from my machine. However I wasn't >>>>>>>>>> yet able >>>>>>>>>> to get Beam working: >>>>>>>>>> >>>>>>>>>> - With the Flink operator, I was able to submit a Beam job, but >>>>>>>>>> hit the issue that I need Docker installed on my Flink nodes. I >>>>>>>>>> haven't yet >>>>>>>>>> tried changing the operator's yaml files to add Docker inside them. >>>>>>>>>> I also >>>>>>>>>> haven't tried this >>>>>>>>>> <https://github.com/GoogleCloudPlatform/flink-on-k8s-operator/blob/master/docs/beam_guide.md> >>>>>>>>>> yet because it implies submitting jobs using "kubectl apply" which >>>>>>>>>> is >>>>>>>>>> weird - why not just submit it through the Flink job server? >>>>>>>>>> >>>>>>>>>> - With Flink's native k8s support, I tried two things: >>>>>>>>>> - Creating a fat portable jar using --output_executable_path. >>>>>>>>>> The jar is huge (200+MB) and takes forever to upload to my Flink >>>>>>>>>> cluster - >>>>>>>>>> this is a non-starter. But if I actually upload it, then I hit the >>>>>>>>>> same >>>>>>>>>> issue with lacking Docker. Haven't tried fixing it yet. >>>>>>>>>> - Simply running my pipeline --runner=FlinkRunner >>>>>>>>>> --environment_type=DOCKER --flink_master=$PUBLIC_IP:8081. The Java >>>>>>>>>> process >>>>>>>>>> appears to send 1+GB of data to somewhere, but the job never even >>>>>>>>>> starts. >>>>>>>>>> >>>>>>>>>> I looked at a few conference talks: >>>>>>>>>> - >>>>>>>>>> https://www.cncf.io/wp-content/uploads/2020/02/CNCF-Webinar_-Apache-Flink-on-Kubernetes-Operator-1.pdf >>>>>>>>>> - seems to imply that I need to add a Beam worker "sidecar" to the >>>>>>>>>> Flink >>>>>>>>>> workers; and that I need to submit my job using "kubectl apply". >>>>>>>>>> - https://www.youtube.com/watch?v=8k1iezoc5Sc which also >>>>>>>>>> mentions the sidecar, but also mentions the fat jar option >>>>>>>>>> >>>>>>>>>> -- >>>>>>>>>> Eugene Kirpichov >>>>>>>>>> http://www.linkedin.com/in/eugenekirpichov >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>> -- >>>>>>>> Eugene Kirpichov >>>>>>>> http://www.linkedin.com/in/eugenekirpichov >>>>>>>> >>>>>>> >>>>>> >>>>>> -- >>>>>> Eugene Kirpichov >>>>>> http://www.linkedin.com/in/eugenekirpichov >>>>>> >>>>> >>>>> >>>>> -- >>>>> Eugene Kirpichov >>>>> http://www.linkedin.com/in/eugenekirpichov >>>>> >>>> >>>> >>>> -- >>>> Eugene Kirpichov >>>> http://www.linkedin.com/in/eugenekirpichov >>>> >>> >> >> -- >> Eugene Kirpichov >> http://www.linkedin.com/in/eugenekirpichov >> > -- Eugene Kirpichov http://www.linkedin.com/in/eugenekirpichov