On Sat, Aug 29, 2020 at 10:59 AM Eugene Kirpichov <ekirpic...@gmail.com>
wrote:


>
> On Fri, Aug 28, 2020 at 6:52 PM Sam Bourne <samb...@gmail.com> wrote:
>
>> Hi Eugene,
>>
>> Glad that helped you out and thanks for the PR tweaking it for GCP.
>>
>> To fetch the containers from GCR, I had to log into Docker inside the
>> Flink nodes, specifically inside the taskmanager container, using something
>> like “kubectl exec pod/flink-taskmanager-blahblah -c taskmanager — docker
>> login -u oauth2accesstoken —password $(gcloud auth print-access-token)”
>>
>> Ouch that seems painful. I find this “precaching” step pretty silly and
>> have considered making the DockerEnvironmentFactory a little more
>> intelligent about how it deals with timeouts (e.g. no activity). It doesn’t
>> seem like it would be too difficult to also add first-class support for
>> pulling images from protected repositories. Extending the DockerPayload
>> protobuf to pass along the additional information and tweaking the
>> DockerEnvironmentFactory? I’m not a java expert but that might be worth
>> exploring if this continues to be problematic.
>>
> Yeah it makes sense to have some first-class support for Docker
> credentials. It's kind of a no-brainer that it's necessary with custom
> containers, many companies probably wouldn't want to push their custom
> containers to a public repo.
> I was thinking of embedding the credentials JSON file into the
> taskmanager container through its Dockerfile, that's workable but also
> pretty silly - having to rebuild this container just for the sake of
> putting in the credentials.
> DockerPayload might be the right place to put credentials, but I wonder if
> there's a way to do something more secure, with k8s secrets. I'm not too
> well-versed in credential management.
>
Using k8s secrets you could mount your credentials into the container and
tweak the pull/run command
<https://github.com/apache/beam/blob/master/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/DockerCommand.java#L77>
to first login using a pattern like cat /tmp/password.txt | docker login
--username foo --password-stdin. Maybe the DockerPayload protobuf could
include the password as raw-text or an absolute filepath and switch the
login command depending.

I found the time it takes to pull can be dramatically improved if you store
>> everything in memory
>> <https://github.com/sambvfx/beam-flink-k8s/blob/master/k8s/flink.yaml#L186>
>> .
>>
>> In the end, I got my pipeline to start, create the uber jar (about 240MB
>> in size), take a few minutes to transmit it to Flink
>>
>> You could explore spinning up the beam-flink-job-server
>> <https://github.com/sambvfx/beam-flink-k8s/blob/master/k8s/beam-flink-jobserver.yaml>
>> and using the PortableRunner. In theory that should reduce the amount of
>> data you’re syncing to the cluster. It does require exposing at least two
>> ingress points (8099 and 8098) so you can hit the job and artifact services
>> respectively.
>>
> Right, good idea! Haven't tried spinning up the job server. Exposing the
> job and artifact services seems pretty easy; but would also need to replace
> the jobserver image "apache/beam_flink1.10_job_server:2.23.0" with a
> custom-built one with the Beam 2.24 snapshot we're using.
>

This may be necessary anyways depending how you handle the docker login
stuff. In any case good luck!


>
>
>> Cheers,
>> Sam
>>
>> On Fri, Aug 28, 2020 at 5:50 PM Eugene Kirpichov <ekirpic...@gmail.com>
>> wrote:
>>
>>> Woohoo thanks Kyle, adding --save_main_session made it work!!!
>>>
>>> On Fri, Aug 28, 2020 at 5:02 PM Kyle Weaver <kcwea...@google.com> wrote:
>>>
>>>> > rpc error: code = Unimplemented desc = Method not found:
>>>> org.apache.beam.model.job_management.v1.LegacyArtifactRetrievalService/GetManifest
>>>>
>>>> This is a known issue: https://issues.apache.org/jira/browse/BEAM-10762
>>>>
>>>> On Fri, Aug 28, 2020 at 4:57 PM Eugene Kirpichov <ekirpic...@gmail.com>
>>>> wrote:
>>>>
>>>>> P.S. Ironic how back in 2018 I was TL-ing the portable runners effort
>>>>> for a few months on Google side, and now I need community help to get it 
>>>>> to
>>>>> work at all.
>>>>> Still pretty miraculous how far Beam's portability has come since
>>>>> then, even if it has a steep learning curve.
>>>>>
>>>>> On Fri, Aug 28, 2020 at 4:54 PM Eugene Kirpichov <ekirpic...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi Sam,
>>>>>>
>>>>>> You're a wizard - this got me *way* farther than my previous
>>>>>> attempts. Here's a PR
>>>>>> https://github.com/sambvfx/beam-flink-k8s/pull/1 with a couple of
>>>>>> changes I had to make.
>>>>>>
>>>>>> I had to make some additional changes that do not make sense to
>>>>>> share, but here they are for the record:
>>>>>> - Because I'm running on k8s engine and not minikube, I had to put
>>>>>> the docker-flink image on GCR, changing flink.yaml "image:
>>>>>> docker-flink:1.10" -> "image: us.gcr.io/$PROJECT_ID/docker-flink:1.10".
>>>>>> I of course also had to build and push the container.
>>>>>> - Because I'm running with a custom container based on an unreleased
>>>>>> version of Beam, I had to push my custom container to GCR too, and change
>>>>>> your instructions to use that image name instead of the default one
>>>>>> - To fetch the containers from GCR, I had to log into Docker inside
>>>>>> the Flink nodes, specifically inside the taskmanager container, using
>>>>>> something like "kubectl exec pod/flink-taskmanager-blahblah -c 
>>>>>> taskmanager
>>>>>> -- docker login -u oauth2accesstoken --password $(gcloud auth
>>>>>> print-access-token)"
>>>>>> - Again because I'm using an unreleased Beam SDK (due to a bug whose
>>>>>> fix will be released in 2.24), I had to also build a custom Flink job
>>>>>> server jar and point to it via --flink_job_server_jar.
>>>>>>
>>>>>> In the end, I got my pipeline to start, create the uber jar (about
>>>>>> 240MB in size), take a few minutes to transmit it to Flink (which is a 
>>>>>> long
>>>>>> time, but it'll do for a prototype); the Flink UI was displaying the
>>>>>> pipeline, and was able to *start* the worker container - however it
>>>>>> quickly failed with the following error:
>>>>>>
>>>>>> 2020/08/28 15:49:09 Initializing python harness:
>>>>>> /opt/apache/beam/boot --id=1-1 --provision_endpoint=localhost:45111
>>>>>> 2020/08/28 15:49:09 Failed to retrieve staged files: failed to get
>>>>>> manifest
>>>>>> caused by:
>>>>>> rpc error: code = Unimplemented desc = Method not found:
>>>>>> org.apache.beam.model.job_management.v1.LegacyArtifactRetrievalService/GetManifest
>>>>>> (followed by a bunch of other garbage)
>>>>>>
>>>>>> I'm assuming this might be because I got tangled in my custom images
>>>>>> related to the unreleased Beam SDK, and should be fixed if running on 
>>>>>> clean
>>>>>> Beam 2.24.
>>>>>>
>>>>>> Thank you again!
>>>>>>
>>>>>> On Fri, Aug 28, 2020 at 10:21 AM Eugene Kirpichov <
>>>>>> ekirpic...@gmail.com> wrote:
>>>>>>
>>>>>>> Holy shit, thanks Sam, this is more help than I could have asked
>>>>>>> for!!
>>>>>>> I'll give this a shot later today and report back.
>>>>>>>
>>>>>>> On Thu, Aug 27, 2020 at 10:27 PM Sam Bourne <samb...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi Eugene!
>>>>>>>>
>>>>>>>> I’m struggling to find complete documentation on how to do this.
>>>>>>>> There seems to be lots of conflicting or incomplete information: 
>>>>>>>> several
>>>>>>>> ways to deploy Flink, several ways to get Beam working with it, bizarre
>>>>>>>> StackOverflow questions, and no documentation explaining a complete 
>>>>>>>> working
>>>>>>>> example.
>>>>>>>>
>>>>>>>> This *is* possible and I went through all the same frustrations of
>>>>>>>> sparse and confusing documentation. I’m glossing over a lot of 
>>>>>>>> details, but
>>>>>>>> the key thing was setting up the flink taskworker(s) to run docker. 
>>>>>>>> This
>>>>>>>> requires running docker-in-docker as the taskworker itself is a docker
>>>>>>>> container in k8s.
>>>>>>>>
>>>>>>>> First create a custom flink container with docker:
>>>>>>>>
>>>>>>>> # docker-flink Dockerfile
>>>>>>>>
>>>>>>>> FROM flink:1.10
>>>>>>>> # install docker
>>>>>>>> RUN apt-get ...
>>>>>>>>
>>>>>>>> Then setup the taskmanager deployment to use a sidecar
>>>>>>>> docker-in-docker service. This dind service is where the python sdk 
>>>>>>>> harness
>>>>>>>> container actually runs.
>>>>>>>>
>>>>>>>> kind: Deployment
>>>>>>>> ...
>>>>>>>>   containers:
>>>>>>>>   - name: docker
>>>>>>>>     image: docker:19.03.5-dind
>>>>>>>>     ...
>>>>>>>>   - name: taskmanger
>>>>>>>>     image: myregistry:5000/docker-flink:1.10
>>>>>>>>     env:
>>>>>>>>     - name: DOCKER_HOST
>>>>>>>>       value: tcp://localhost:2375
>>>>>>>> ...
>>>>>>>>
>>>>>>>> I quickly threw all these pieces together in a repo here:
>>>>>>>> https://github.com/sambvfx/beam-flink-k8s
>>>>>>>>
>>>>>>>> I added a working (via minikube) step-by-step in the README to
>>>>>>>> prove to myself that I didn’t miss anything, but feel free to submit 
>>>>>>>> any
>>>>>>>> PRs if you want to add anything useful.
>>>>>>>>
>>>>>>>> The documents you linked are very informative. It would be great to
>>>>>>>> aggregate all this into digestible documentation. Let me know if you 
>>>>>>>> have
>>>>>>>> any further questions!
>>>>>>>>
>>>>>>>> Cheers,
>>>>>>>> Sam
>>>>>>>>
>>>>>>>> On Thu, Aug 27, 2020 at 10:25 AM Eugene Kirpichov <
>>>>>>>> ekirpic...@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Hi Kyle,
>>>>>>>>>
>>>>>>>>> Thanks for the response!
>>>>>>>>>
>>>>>>>>> On Wed, Aug 26, 2020 at 5:28 PM Kyle Weaver <kcwea...@google.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> > - With the Flink operator, I was able to submit a Beam job, but
>>>>>>>>>> hit the issue that I need Docker installed on my Flink nodes. I 
>>>>>>>>>> haven't yet
>>>>>>>>>> tried changing the operator's yaml files to add Docker inside them.
>>>>>>>>>>
>>>>>>>>>> Running Beam workers via Docker on the Flink nodes is not
>>>>>>>>>> recommended (and probably not even possible), since the Flink nodes 
>>>>>>>>>> are
>>>>>>>>>> themselves already running inside Docker containers. Running workers 
>>>>>>>>>> as
>>>>>>>>>> sidecars avoids that problem. For example:
>>>>>>>>>> https://github.com/GoogleCloudPlatform/flink-on-k8s-operator/blob/master/examples/beam/with_job_server/beam_flink_cluster.yaml#L17-L20
>>>>>>>>>>
>>>>>>>>>> The main problem with the sidecar approach is that I can't use
>>>>>>>>> the Flink cluster as a "service" for anybody to submit their jobs with
>>>>>>>>> custom containers - the container version is fixed.
>>>>>>>>> Do I understand it correctly?
>>>>>>>>> Seems like the Docker-in-Docker approach is viable, and is
>>>>>>>>> mentioned in the Beam Flink K8s design doc
>>>>>>>>> <https://docs.google.com/document/d/1z3LNrRtr8kkiFHonZ5JJM_L4NWNBBNcqRc_yAf6G0VI/edit#heading=h.dtj1gnks47dq>
>>>>>>>>> .
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>> > I also haven't tried this
>>>>>>>>>> <https://github.com/GoogleCloudPlatform/flink-on-k8s-operator/blob/master/docs/beam_guide.md>
>>>>>>>>>>  yet
>>>>>>>>>> because it implies submitting jobs using "kubectl apply"  which is 
>>>>>>>>>> weird -
>>>>>>>>>> why not just submit it through the Flink job server?
>>>>>>>>>>
>>>>>>>>>> I'm guessing it goes through k8s for monitoring purposes. I see
>>>>>>>>>> no reason it shouldn't be possible to submit to the job server 
>>>>>>>>>> directly
>>>>>>>>>> through Python, network permitting, though I haven't tried this.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Wed, Aug 26, 2020 at 4:10 PM Eugene Kirpichov <
>>>>>>>>>> ekirpic...@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi folks,
>>>>>>>>>>>
>>>>>>>>>>> I'm still working with Pachama <https://pachama.com/> right
>>>>>>>>>>> now; we have a Kubernetes Engine cluster on GCP and want to run 
>>>>>>>>>>> Beam Python
>>>>>>>>>>> batch pipelines with custom containers against it.
>>>>>>>>>>> Flink and Cloud Dataflow are the two options; Cloud Dataflow
>>>>>>>>>>> doesn't support custom containers for batch pipelines yes so we're 
>>>>>>>>>>> going
>>>>>>>>>>> with Flink.
>>>>>>>>>>>
>>>>>>>>>>> I'm struggling to find complete documentation on how to do this.
>>>>>>>>>>> There seems to be lots of conflicting or incomplete information: 
>>>>>>>>>>> several
>>>>>>>>>>> ways to deploy Flink, several ways to get Beam working with it, 
>>>>>>>>>>> bizarre
>>>>>>>>>>> StackOverflow questions, and no documentation explaining a complete 
>>>>>>>>>>> working
>>>>>>>>>>> example.
>>>>>>>>>>>
>>>>>>>>>>> == My requests ==
>>>>>>>>>>> * Could people briefly share their working setup? Would be good
>>>>>>>>>>> to know which directions are promising.
>>>>>>>>>>> * It would be particularly helpful if someone could volunteer an
>>>>>>>>>>> hour of their time to talk to me about their working Beam/Flink/k8s 
>>>>>>>>>>> setup.
>>>>>>>>>>> It's for a good cause (fixing the planet :) ) and on my side I 
>>>>>>>>>>> volunteer to
>>>>>>>>>>> write up the findings to share with the community so others suffer 
>>>>>>>>>>> less.
>>>>>>>>>>>
>>>>>>>>>>> == Appendix: My findings so far ==
>>>>>>>>>>> There are multiple ways to deploy Flink on k8s:
>>>>>>>>>>> - The GCP marketplace Flink operator
>>>>>>>>>>> <https://cloud.google.com/blog/products/data-analytics/open-source-processing-engines-for-kubernetes>
>>>>>>>>>>>  (couldn't
>>>>>>>>>>> get it to work) and the respective CLI version
>>>>>>>>>>> <https://github.com/GoogleCloudPlatform/flink-on-k8s-operator> 
>>>>>>>>>>> (buggy,
>>>>>>>>>>> but I got it working)
>>>>>>>>>>> - https://github.com/lyft/flinkk8soperator (haven't tried)
>>>>>>>>>>> - Flink's native k8s support
>>>>>>>>>>> <https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/deployment/native_kubernetes.html>
>>>>>>>>>>>  (super
>>>>>>>>>>> easy to get working)
>>>>>>>>>>> <https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/deployment/native_kubernetes.html>
>>>>>>>>>>>
>>>>>>>>>>> I confirmed that my Flink cluster was operational by running a
>>>>>>>>>>> simple Wordcount job, initiated from my machine. However I wasn't 
>>>>>>>>>>> yet able
>>>>>>>>>>> to get Beam working:
>>>>>>>>>>>
>>>>>>>>>>> - With the Flink operator, I was able to submit a Beam job, but
>>>>>>>>>>> hit the issue that I need Docker installed on my Flink nodes. I 
>>>>>>>>>>> haven't yet
>>>>>>>>>>> tried changing the operator's yaml files to add Docker inside them. 
>>>>>>>>>>> I also
>>>>>>>>>>> haven't tried this
>>>>>>>>>>> <https://github.com/GoogleCloudPlatform/flink-on-k8s-operator/blob/master/docs/beam_guide.md>
>>>>>>>>>>> yet because it implies submitting jobs using "kubectl apply"  which 
>>>>>>>>>>> is
>>>>>>>>>>> weird - why not just submit it through the Flink job server?
>>>>>>>>>>>
>>>>>>>>>>> - With Flink's native k8s support, I tried two things:
>>>>>>>>>>>   - Creating a fat portable jar using  --output_executable_path.
>>>>>>>>>>> The jar is huge (200+MB) and takes forever to upload to my Flink 
>>>>>>>>>>> cluster -
>>>>>>>>>>> this is a non-starter. But if I actually upload it, then I hit the 
>>>>>>>>>>> same
>>>>>>>>>>> issue with lacking Docker. Haven't tried fixing it yet.
>>>>>>>>>>>   - Simply running my pipeline --runner=FlinkRunner
>>>>>>>>>>> --environment_type=DOCKER --flink_master=$PUBLIC_IP:8081. The Java 
>>>>>>>>>>> process
>>>>>>>>>>> appears to send 1+GB of data to somewhere, but the job never even 
>>>>>>>>>>> starts.
>>>>>>>>>>>
>>>>>>>>>>> I looked at a few conference talks:
>>>>>>>>>>> -
>>>>>>>>>>> https://www.cncf.io/wp-content/uploads/2020/02/CNCF-Webinar_-Apache-Flink-on-Kubernetes-Operator-1.pdf
>>>>>>>>>>> - seems to imply that I need to add a Beam worker "sidecar" to the 
>>>>>>>>>>> Flink
>>>>>>>>>>> workers; and that I need to submit my job using "kubectl apply".
>>>>>>>>>>> - https://www.youtube.com/watch?v=8k1iezoc5Sc which also
>>>>>>>>>>> mentions the sidecar, but also mentions the fat jar option
>>>>>>>>>>>
>>>>>>>>>>> --
>>>>>>>>>>> Eugene Kirpichov
>>>>>>>>>>> http://www.linkedin.com/in/eugenekirpichov
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>> --
>>>>>>>>> Eugene Kirpichov
>>>>>>>>> http://www.linkedin.com/in/eugenekirpichov
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> Eugene Kirpichov
>>>>>>> http://www.linkedin.com/in/eugenekirpichov
>>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> Eugene Kirpichov
>>>>>> http://www.linkedin.com/in/eugenekirpichov
>>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Eugene Kirpichov
>>>>> http://www.linkedin.com/in/eugenekirpichov
>>>>>
>>>>
>>>
>>> --
>>> Eugene Kirpichov
>>> http://www.linkedin.com/in/eugenekirpichov
>>>
>>
>
> --
> Eugene Kirpichov
> http://www.linkedin.com/in/eugenekirpichov
>

Reply via email to