On Fri, Aug 28, 2020 at 6:52 PM Sam Bourne <samb...@gmail.com> wrote:

> Hi Eugene,
>
> Glad that helped you out and thanks for the PR tweaking it for GCP.
>
> To fetch the containers from GCR, I had to log into Docker inside the
> Flink nodes, specifically inside the taskmanager container, using something
> like “kubectl exec pod/flink-taskmanager-blahblah -c taskmanager — docker
> login -u oauth2accesstoken —password $(gcloud auth print-access-token)”
>
> Ouch that seems painful. I find this “precaching” step pretty silly and
> have considered making the DockerEnvironmentFactory a little more
> intelligent about how it deals with timeouts (e.g. no activity). It doesn’t
> seem like it would be too difficult to also add first-class support for
> pulling images from protected repositories. Extending the DockerPayload
> protobuf to pass along the additional information and tweaking the
> DockerEnvironmentFactory? I’m not a java expert but that might be worth
> exploring if this continues to be problematic.
>
Yeah it makes sense to have some first-class support for Docker
credentials. It's kind of a no-brainer that it's necessary with custom
containers, many companies probably wouldn't want to push their custom
containers to a public repo.
I was thinking of embedding the credentials JSON file into the
taskmanager container through its Dockerfile, that's workable but also
pretty silly - having to rebuild this container just for the sake of
putting in the credentials.
DockerPayload might be the right place to put credentials, but I wonder if
there's a way to do something more secure, with k8s secrets. I'm not too
well-versed in credential management.


> I found the time it takes to pull can be dramatically improved if you store
> everything in memory
> <https://github.com/sambvfx/beam-flink-k8s/blob/master/k8s/flink.yaml#L186>
> .
>
> In the end, I got my pipeline to start, create the uber jar (about 240MB
> in size), take a few minutes to transmit it to Flink
>
> You could explore spinning up the beam-flink-job-server
> <https://github.com/sambvfx/beam-flink-k8s/blob/master/k8s/beam-flink-jobserver.yaml>
> and using the PortableRunner. In theory that should reduce the amount of
> data you’re syncing to the cluster. It does require exposing at least two
> ingress points (8099 and 8098) so you can hit the job and artifact services
> respectively.
>
Right, good idea! Haven't tried spinning up the job server. Exposing the
job and artifact services seems pretty easy; but would also need to replace
the jobserver image "apache/beam_flink1.10_job_server:2.23.0" with a
custom-built one with the Beam 2.24 snapshot we're using.


> Cheers,
> Sam
>
> On Fri, Aug 28, 2020 at 5:50 PM Eugene Kirpichov <ekirpic...@gmail.com>
> wrote:
>
>> Woohoo thanks Kyle, adding --save_main_session made it work!!!
>>
>> On Fri, Aug 28, 2020 at 5:02 PM Kyle Weaver <kcwea...@google.com> wrote:
>>
>>> > rpc error: code = Unimplemented desc = Method not found:
>>> org.apache.beam.model.job_management.v1.LegacyArtifactRetrievalService/GetManifest
>>>
>>> This is a known issue: https://issues.apache.org/jira/browse/BEAM-10762
>>>
>>> On Fri, Aug 28, 2020 at 4:57 PM Eugene Kirpichov <ekirpic...@gmail.com>
>>> wrote:
>>>
>>>> P.S. Ironic how back in 2018 I was TL-ing the portable runners effort
>>>> for a few months on Google side, and now I need community help to get it to
>>>> work at all.
>>>> Still pretty miraculous how far Beam's portability has come since
>>>> then, even if it has a steep learning curve.
>>>>
>>>> On Fri, Aug 28, 2020 at 4:54 PM Eugene Kirpichov <ekirpic...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi Sam,
>>>>>
>>>>> You're a wizard - this got me *way* farther than my previous
>>>>> attempts. Here's a PR https://github.com/sambvfx/beam-flink-k8s/pull/1 
>>>>> with
>>>>> a couple of changes I had to make.
>>>>>
>>>>> I had to make some additional changes that do not make sense to share,
>>>>> but here they are for the record:
>>>>> - Because I'm running on k8s engine and not minikube, I had to put the
>>>>> docker-flink image on GCR, changing flink.yaml "image: docker-flink:1.10"
>>>>> -> "image: us.gcr.io/$PROJECT_ID/docker-flink:1.10". I of course also
>>>>> had to build and push the container.
>>>>> - Because I'm running with a custom container based on an unreleased
>>>>> version of Beam, I had to push my custom container to GCR too, and change
>>>>> your instructions to use that image name instead of the default one
>>>>> - To fetch the containers from GCR, I had to log into Docker inside
>>>>> the Flink nodes, specifically inside the taskmanager container, using
>>>>> something like "kubectl exec pod/flink-taskmanager-blahblah -c taskmanager
>>>>> -- docker login -u oauth2accesstoken --password $(gcloud auth
>>>>> print-access-token)"
>>>>> - Again because I'm using an unreleased Beam SDK (due to a bug whose
>>>>> fix will be released in 2.24), I had to also build a custom Flink job
>>>>> server jar and point to it via --flink_job_server_jar.
>>>>>
>>>>> In the end, I got my pipeline to start, create the uber jar (about
>>>>> 240MB in size), take a few minutes to transmit it to Flink (which is a 
>>>>> long
>>>>> time, but it'll do for a prototype); the Flink UI was displaying the
>>>>> pipeline, and was able to *start* the worker container - however it
>>>>> quickly failed with the following error:
>>>>>
>>>>> 2020/08/28 15:49:09 Initializing python harness: /opt/apache/beam/boot
>>>>> --id=1-1 --provision_endpoint=localhost:45111
>>>>> 2020/08/28 15:49:09 Failed to retrieve staged files: failed to get
>>>>> manifest
>>>>> caused by:
>>>>> rpc error: code = Unimplemented desc = Method not found:
>>>>> org.apache.beam.model.job_management.v1.LegacyArtifactRetrievalService/GetManifest
>>>>> (followed by a bunch of other garbage)
>>>>>
>>>>> I'm assuming this might be because I got tangled in my custom images
>>>>> related to the unreleased Beam SDK, and should be fixed if running on 
>>>>> clean
>>>>> Beam 2.24.
>>>>>
>>>>> Thank you again!
>>>>>
>>>>> On Fri, Aug 28, 2020 at 10:21 AM Eugene Kirpichov <
>>>>> ekirpic...@gmail.com> wrote:
>>>>>
>>>>>> Holy shit, thanks Sam, this is more help than I could have asked for!!
>>>>>> I'll give this a shot later today and report back.
>>>>>>
>>>>>> On Thu, Aug 27, 2020 at 10:27 PM Sam Bourne <samb...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi Eugene!
>>>>>>>
>>>>>>> I’m struggling to find complete documentation on how to do this.
>>>>>>> There seems to be lots of conflicting or incomplete information: several
>>>>>>> ways to deploy Flink, several ways to get Beam working with it, bizarre
>>>>>>> StackOverflow questions, and no documentation explaining a complete 
>>>>>>> working
>>>>>>> example.
>>>>>>>
>>>>>>> This *is* possible and I went through all the same frustrations of
>>>>>>> sparse and confusing documentation. I’m glossing over a lot of details, 
>>>>>>> but
>>>>>>> the key thing was setting up the flink taskworker(s) to run docker. This
>>>>>>> requires running docker-in-docker as the taskworker itself is a docker
>>>>>>> container in k8s.
>>>>>>>
>>>>>>> First create a custom flink container with docker:
>>>>>>>
>>>>>>> # docker-flink Dockerfile
>>>>>>>
>>>>>>> FROM flink:1.10
>>>>>>> # install docker
>>>>>>> RUN apt-get ...
>>>>>>>
>>>>>>> Then setup the taskmanager deployment to use a sidecar
>>>>>>> docker-in-docker service. This dind service is where the python sdk 
>>>>>>> harness
>>>>>>> container actually runs.
>>>>>>>
>>>>>>> kind: Deployment
>>>>>>> ...
>>>>>>>   containers:
>>>>>>>   - name: docker
>>>>>>>     image: docker:19.03.5-dind
>>>>>>>     ...
>>>>>>>   - name: taskmanger
>>>>>>>     image: myregistry:5000/docker-flink:1.10
>>>>>>>     env:
>>>>>>>     - name: DOCKER_HOST
>>>>>>>       value: tcp://localhost:2375
>>>>>>> ...
>>>>>>>
>>>>>>> I quickly threw all these pieces together in a repo here:
>>>>>>> https://github.com/sambvfx/beam-flink-k8s
>>>>>>>
>>>>>>> I added a working (via minikube) step-by-step in the README to prove
>>>>>>> to myself that I didn’t miss anything, but feel free to submit any PRs 
>>>>>>> if
>>>>>>> you want to add anything useful.
>>>>>>>
>>>>>>> The documents you linked are very informative. It would be great to
>>>>>>> aggregate all this into digestible documentation. Let me know if you 
>>>>>>> have
>>>>>>> any further questions!
>>>>>>>
>>>>>>> Cheers,
>>>>>>> Sam
>>>>>>>
>>>>>>> On Thu, Aug 27, 2020 at 10:25 AM Eugene Kirpichov <
>>>>>>> ekirpic...@gmail.com> wrote:
>>>>>>>
>>>>>>>> Hi Kyle,
>>>>>>>>
>>>>>>>> Thanks for the response!
>>>>>>>>
>>>>>>>> On Wed, Aug 26, 2020 at 5:28 PM Kyle Weaver <kcwea...@google.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> > - With the Flink operator, I was able to submit a Beam job, but
>>>>>>>>> hit the issue that I need Docker installed on my Flink nodes. I 
>>>>>>>>> haven't yet
>>>>>>>>> tried changing the operator's yaml files to add Docker inside them.
>>>>>>>>>
>>>>>>>>> Running Beam workers via Docker on the Flink nodes is not
>>>>>>>>> recommended (and probably not even possible), since the Flink nodes 
>>>>>>>>> are
>>>>>>>>> themselves already running inside Docker containers. Running workers 
>>>>>>>>> as
>>>>>>>>> sidecars avoids that problem. For example:
>>>>>>>>> https://github.com/GoogleCloudPlatform/flink-on-k8s-operator/blob/master/examples/beam/with_job_server/beam_flink_cluster.yaml#L17-L20
>>>>>>>>>
>>>>>>>>> The main problem with the sidecar approach is that I can't use the
>>>>>>>> Flink cluster as a "service" for anybody to submit their jobs with 
>>>>>>>> custom
>>>>>>>> containers - the container version is fixed.
>>>>>>>> Do I understand it correctly?
>>>>>>>> Seems like the Docker-in-Docker approach is viable, and is
>>>>>>>> mentioned in the Beam Flink K8s design doc
>>>>>>>> <https://docs.google.com/document/d/1z3LNrRtr8kkiFHonZ5JJM_L4NWNBBNcqRc_yAf6G0VI/edit#heading=h.dtj1gnks47dq>
>>>>>>>> .
>>>>>>>>
>>>>>>>>
>>>>>>>>> > I also haven't tried this
>>>>>>>>> <https://github.com/GoogleCloudPlatform/flink-on-k8s-operator/blob/master/docs/beam_guide.md>
>>>>>>>>>  yet
>>>>>>>>> because it implies submitting jobs using "kubectl apply"  which is 
>>>>>>>>> weird -
>>>>>>>>> why not just submit it through the Flink job server?
>>>>>>>>>
>>>>>>>>> I'm guessing it goes through k8s for monitoring purposes. I see no
>>>>>>>>> reason it shouldn't be possible to submit to the job server directly
>>>>>>>>> through Python, network permitting, though I haven't tried this.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Wed, Aug 26, 2020 at 4:10 PM Eugene Kirpichov <
>>>>>>>>> ekirpic...@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> Hi folks,
>>>>>>>>>>
>>>>>>>>>> I'm still working with Pachama <https://pachama.com/> right now;
>>>>>>>>>> we have a Kubernetes Engine cluster on GCP and want to run Beam 
>>>>>>>>>> Python
>>>>>>>>>> batch pipelines with custom containers against it.
>>>>>>>>>> Flink and Cloud Dataflow are the two options; Cloud Dataflow
>>>>>>>>>> doesn't support custom containers for batch pipelines yes so we're 
>>>>>>>>>> going
>>>>>>>>>> with Flink.
>>>>>>>>>>
>>>>>>>>>> I'm struggling to find complete documentation on how to do this.
>>>>>>>>>> There seems to be lots of conflicting or incomplete information: 
>>>>>>>>>> several
>>>>>>>>>> ways to deploy Flink, several ways to get Beam working with it, 
>>>>>>>>>> bizarre
>>>>>>>>>> StackOverflow questions, and no documentation explaining a complete 
>>>>>>>>>> working
>>>>>>>>>> example.
>>>>>>>>>>
>>>>>>>>>> == My requests ==
>>>>>>>>>> * Could people briefly share their working setup? Would be good
>>>>>>>>>> to know which directions are promising.
>>>>>>>>>> * It would be particularly helpful if someone could volunteer an
>>>>>>>>>> hour of their time to talk to me about their working Beam/Flink/k8s 
>>>>>>>>>> setup.
>>>>>>>>>> It's for a good cause (fixing the planet :) ) and on my side I 
>>>>>>>>>> volunteer to
>>>>>>>>>> write up the findings to share with the community so others suffer 
>>>>>>>>>> less.
>>>>>>>>>>
>>>>>>>>>> == Appendix: My findings so far ==
>>>>>>>>>> There are multiple ways to deploy Flink on k8s:
>>>>>>>>>> - The GCP marketplace Flink operator
>>>>>>>>>> <https://cloud.google.com/blog/products/data-analytics/open-source-processing-engines-for-kubernetes>
>>>>>>>>>>  (couldn't
>>>>>>>>>> get it to work) and the respective CLI version
>>>>>>>>>> <https://github.com/GoogleCloudPlatform/flink-on-k8s-operator> 
>>>>>>>>>> (buggy,
>>>>>>>>>> but I got it working)
>>>>>>>>>> - https://github.com/lyft/flinkk8soperator (haven't tried)
>>>>>>>>>> - Flink's native k8s support
>>>>>>>>>> <https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/deployment/native_kubernetes.html>
>>>>>>>>>>  (super
>>>>>>>>>> easy to get working)
>>>>>>>>>> <https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/deployment/native_kubernetes.html>
>>>>>>>>>>
>>>>>>>>>> I confirmed that my Flink cluster was operational by running a
>>>>>>>>>> simple Wordcount job, initiated from my machine. However I wasn't 
>>>>>>>>>> yet able
>>>>>>>>>> to get Beam working:
>>>>>>>>>>
>>>>>>>>>> - With the Flink operator, I was able to submit a Beam job, but
>>>>>>>>>> hit the issue that I need Docker installed on my Flink nodes. I 
>>>>>>>>>> haven't yet
>>>>>>>>>> tried changing the operator's yaml files to add Docker inside them. 
>>>>>>>>>> I also
>>>>>>>>>> haven't tried this
>>>>>>>>>> <https://github.com/GoogleCloudPlatform/flink-on-k8s-operator/blob/master/docs/beam_guide.md>
>>>>>>>>>> yet because it implies submitting jobs using "kubectl apply"  which 
>>>>>>>>>> is
>>>>>>>>>> weird - why not just submit it through the Flink job server?
>>>>>>>>>>
>>>>>>>>>> - With Flink's native k8s support, I tried two things:
>>>>>>>>>>   - Creating a fat portable jar using  --output_executable_path.
>>>>>>>>>> The jar is huge (200+MB) and takes forever to upload to my Flink 
>>>>>>>>>> cluster -
>>>>>>>>>> this is a non-starter. But if I actually upload it, then I hit the 
>>>>>>>>>> same
>>>>>>>>>> issue with lacking Docker. Haven't tried fixing it yet.
>>>>>>>>>>   - Simply running my pipeline --runner=FlinkRunner
>>>>>>>>>> --environment_type=DOCKER --flink_master=$PUBLIC_IP:8081. The Java 
>>>>>>>>>> process
>>>>>>>>>> appears to send 1+GB of data to somewhere, but the job never even 
>>>>>>>>>> starts.
>>>>>>>>>>
>>>>>>>>>> I looked at a few conference talks:
>>>>>>>>>> -
>>>>>>>>>> https://www.cncf.io/wp-content/uploads/2020/02/CNCF-Webinar_-Apache-Flink-on-Kubernetes-Operator-1.pdf
>>>>>>>>>> - seems to imply that I need to add a Beam worker "sidecar" to the 
>>>>>>>>>> Flink
>>>>>>>>>> workers; and that I need to submit my job using "kubectl apply".
>>>>>>>>>> - https://www.youtube.com/watch?v=8k1iezoc5Sc which also
>>>>>>>>>> mentions the sidecar, but also mentions the fat jar option
>>>>>>>>>>
>>>>>>>>>> --
>>>>>>>>>> Eugene Kirpichov
>>>>>>>>>> http://www.linkedin.com/in/eugenekirpichov
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>> Eugene Kirpichov
>>>>>>>> http://www.linkedin.com/in/eugenekirpichov
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>> --
>>>>>> Eugene Kirpichov
>>>>>> http://www.linkedin.com/in/eugenekirpichov
>>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Eugene Kirpichov
>>>>> http://www.linkedin.com/in/eugenekirpichov
>>>>>
>>>>
>>>>
>>>> --
>>>> Eugene Kirpichov
>>>> http://www.linkedin.com/in/eugenekirpichov
>>>>
>>>
>>
>> --
>> Eugene Kirpichov
>> http://www.linkedin.com/in/eugenekirpichov
>>
>

-- 
Eugene Kirpichov
http://www.linkedin.com/in/eugenekirpichov

Reply via email to