Woohoo thanks Kyle, adding --save_main_session made it work!!!

On Fri, Aug 28, 2020 at 5:02 PM Kyle Weaver <kcwea...@google.com> wrote:

> > rpc error: code = Unimplemented desc = Method not found:
> org.apache.beam.model.job_management.v1.LegacyArtifactRetrievalService/GetManifest
>
> This is a known issue: https://issues.apache.org/jira/browse/BEAM-10762
>
> On Fri, Aug 28, 2020 at 4:57 PM Eugene Kirpichov <ekirpic...@gmail.com>
> wrote:
>
>> P.S. Ironic how back in 2018 I was TL-ing the portable runners effort for
>> a few months on Google side, and now I need community help to get it to
>> work at all.
>> Still pretty miraculous how far Beam's portability has come since
>> then, even if it has a steep learning curve.
>>
>> On Fri, Aug 28, 2020 at 4:54 PM Eugene Kirpichov <ekirpic...@gmail.com>
>> wrote:
>>
>>> Hi Sam,
>>>
>>> You're a wizard - this got me *way* farther than my previous attempts.
>>> Here's a PR https://github.com/sambvfx/beam-flink-k8s/pull/1 with a
>>> couple of changes I had to make.
>>>
>>> I had to make some additional changes that do not make sense to share,
>>> but here they are for the record:
>>> - Because I'm running on k8s engine and not minikube, I had to put the
>>> docker-flink image on GCR, changing flink.yaml "image: docker-flink:1.10"
>>> -> "image: us.gcr.io/$PROJECT_ID/docker-flink:1.10". I of course also
>>> had to build and push the container.
>>> - Because I'm running with a custom container based on an unreleased
>>> version of Beam, I had to push my custom container to GCR too, and change
>>> your instructions to use that image name instead of the default one
>>> - To fetch the containers from GCR, I had to log into Docker inside the
>>> Flink nodes, specifically inside the taskmanager container, using something
>>> like "kubectl exec pod/flink-taskmanager-blahblah -c taskmanager -- docker
>>> login -u oauth2accesstoken --password $(gcloud auth print-access-token)"
>>> - Again because I'm using an unreleased Beam SDK (due to a bug whose fix
>>> will be released in 2.24), I had to also build a custom Flink job server
>>> jar and point to it via --flink_job_server_jar.
>>>
>>> In the end, I got my pipeline to start, create the uber jar (about 240MB
>>> in size), take a few minutes to transmit it to Flink (which is a long time,
>>> but it'll do for a prototype); the Flink UI was displaying the pipeline,
>>> and was able to *start* the worker container - however it quickly
>>> failed with the following error:
>>>
>>> 2020/08/28 15:49:09 Initializing python harness: /opt/apache/beam/boot
>>> --id=1-1 --provision_endpoint=localhost:45111
>>> 2020/08/28 15:49:09 Failed to retrieve staged files: failed to get
>>> manifest
>>> caused by:
>>> rpc error: code = Unimplemented desc = Method not found:
>>> org.apache.beam.model.job_management.v1.LegacyArtifactRetrievalService/GetManifest
>>> (followed by a bunch of other garbage)
>>>
>>> I'm assuming this might be because I got tangled in my custom images
>>> related to the unreleased Beam SDK, and should be fixed if running on clean
>>> Beam 2.24.
>>>
>>> Thank you again!
>>>
>>> On Fri, Aug 28, 2020 at 10:21 AM Eugene Kirpichov <ekirpic...@gmail.com>
>>> wrote:
>>>
>>>> Holy shit, thanks Sam, this is more help than I could have asked for!!
>>>> I'll give this a shot later today and report back.
>>>>
>>>> On Thu, Aug 27, 2020 at 10:27 PM Sam Bourne <samb...@gmail.com> wrote:
>>>>
>>>>> Hi Eugene!
>>>>>
>>>>> I’m struggling to find complete documentation on how to do this. There
>>>>> seems to be lots of conflicting or incomplete information: several ways to
>>>>> deploy Flink, several ways to get Beam working with it, bizarre
>>>>> StackOverflow questions, and no documentation explaining a complete 
>>>>> working
>>>>> example.
>>>>>
>>>>> This *is* possible and I went through all the same frustrations of
>>>>> sparse and confusing documentation. I’m glossing over a lot of details, 
>>>>> but
>>>>> the key thing was setting up the flink taskworker(s) to run docker. This
>>>>> requires running docker-in-docker as the taskworker itself is a docker
>>>>> container in k8s.
>>>>>
>>>>> First create a custom flink container with docker:
>>>>>
>>>>> # docker-flink Dockerfile
>>>>>
>>>>> FROM flink:1.10
>>>>> # install docker
>>>>> RUN apt-get ...
>>>>>
>>>>> Then setup the taskmanager deployment to use a sidecar
>>>>> docker-in-docker service. This dind service is where the python sdk 
>>>>> harness
>>>>> container actually runs.
>>>>>
>>>>> kind: Deployment
>>>>> ...
>>>>>   containers:
>>>>>   - name: docker
>>>>>     image: docker:19.03.5-dind
>>>>>     ...
>>>>>   - name: taskmanger
>>>>>     image: myregistry:5000/docker-flink:1.10
>>>>>     env:
>>>>>     - name: DOCKER_HOST
>>>>>       value: tcp://localhost:2375
>>>>> ...
>>>>>
>>>>> I quickly threw all these pieces together in a repo here:
>>>>> https://github.com/sambvfx/beam-flink-k8s
>>>>>
>>>>> I added a working (via minikube) step-by-step in the README to prove
>>>>> to myself that I didn’t miss anything, but feel free to submit any PRs if
>>>>> you want to add anything useful.
>>>>>
>>>>> The documents you linked are very informative. It would be great to
>>>>> aggregate all this into digestible documentation. Let me know if you have
>>>>> any further questions!
>>>>>
>>>>> Cheers,
>>>>> Sam
>>>>>
>>>>> On Thu, Aug 27, 2020 at 10:25 AM Eugene Kirpichov <
>>>>> ekirpic...@gmail.com> wrote:
>>>>>
>>>>>> Hi Kyle,
>>>>>>
>>>>>> Thanks for the response!
>>>>>>
>>>>>> On Wed, Aug 26, 2020 at 5:28 PM Kyle Weaver <kcwea...@google.com>
>>>>>> wrote:
>>>>>>
>>>>>>> > - With the Flink operator, I was able to submit a Beam job, but
>>>>>>> hit the issue that I need Docker installed on my Flink nodes. I haven't 
>>>>>>> yet
>>>>>>> tried changing the operator's yaml files to add Docker inside them.
>>>>>>>
>>>>>>> Running Beam workers via Docker on the Flink nodes is not
>>>>>>> recommended (and probably not even possible), since the Flink nodes are
>>>>>>> themselves already running inside Docker containers. Running workers as
>>>>>>> sidecars avoids that problem. For example:
>>>>>>> https://github.com/GoogleCloudPlatform/flink-on-k8s-operator/blob/master/examples/beam/with_job_server/beam_flink_cluster.yaml#L17-L20
>>>>>>>
>>>>>>> The main problem with the sidecar approach is that I can't use the
>>>>>> Flink cluster as a "service" for anybody to submit their jobs with custom
>>>>>> containers - the container version is fixed.
>>>>>> Do I understand it correctly?
>>>>>> Seems like the Docker-in-Docker approach is viable, and is mentioned
>>>>>> in the Beam Flink K8s design doc
>>>>>> <https://docs.google.com/document/d/1z3LNrRtr8kkiFHonZ5JJM_L4NWNBBNcqRc_yAf6G0VI/edit#heading=h.dtj1gnks47dq>
>>>>>> .
>>>>>>
>>>>>>
>>>>>>> > I also haven't tried this
>>>>>>> <https://github.com/GoogleCloudPlatform/flink-on-k8s-operator/blob/master/docs/beam_guide.md>
>>>>>>>  yet
>>>>>>> because it implies submitting jobs using "kubectl apply"  which is 
>>>>>>> weird -
>>>>>>> why not just submit it through the Flink job server?
>>>>>>>
>>>>>>> I'm guessing it goes through k8s for monitoring purposes. I see no
>>>>>>> reason it shouldn't be possible to submit to the job server directly
>>>>>>> through Python, network permitting, though I haven't tried this.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Wed, Aug 26, 2020 at 4:10 PM Eugene Kirpichov <
>>>>>>> ekirpic...@gmail.com> wrote:
>>>>>>>
>>>>>>>> Hi folks,
>>>>>>>>
>>>>>>>> I'm still working with Pachama <https://pachama.com/> right now;
>>>>>>>> we have a Kubernetes Engine cluster on GCP and want to run Beam Python
>>>>>>>> batch pipelines with custom containers against it.
>>>>>>>> Flink and Cloud Dataflow are the two options; Cloud Dataflow
>>>>>>>> doesn't support custom containers for batch pipelines yes so we're 
>>>>>>>> going
>>>>>>>> with Flink.
>>>>>>>>
>>>>>>>> I'm struggling to find complete documentation on how to do this.
>>>>>>>> There seems to be lots of conflicting or incomplete information: 
>>>>>>>> several
>>>>>>>> ways to deploy Flink, several ways to get Beam working with it, bizarre
>>>>>>>> StackOverflow questions, and no documentation explaining a complete 
>>>>>>>> working
>>>>>>>> example.
>>>>>>>>
>>>>>>>> == My requests ==
>>>>>>>> * Could people briefly share their working setup? Would be good to
>>>>>>>> know which directions are promising.
>>>>>>>> * It would be particularly helpful if someone could volunteer an
>>>>>>>> hour of their time to talk to me about their working Beam/Flink/k8s 
>>>>>>>> setup.
>>>>>>>> It's for a good cause (fixing the planet :) ) and on my side I 
>>>>>>>> volunteer to
>>>>>>>> write up the findings to share with the community so others suffer 
>>>>>>>> less.
>>>>>>>>
>>>>>>>> == Appendix: My findings so far ==
>>>>>>>> There are multiple ways to deploy Flink on k8s:
>>>>>>>> - The GCP marketplace Flink operator
>>>>>>>> <https://cloud.google.com/blog/products/data-analytics/open-source-processing-engines-for-kubernetes>
>>>>>>>>  (couldn't
>>>>>>>> get it to work) and the respective CLI version
>>>>>>>> <https://github.com/GoogleCloudPlatform/flink-on-k8s-operator> (buggy,
>>>>>>>> but I got it working)
>>>>>>>> - https://github.com/lyft/flinkk8soperator (haven't tried)
>>>>>>>> - Flink's native k8s support
>>>>>>>> <https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/deployment/native_kubernetes.html>
>>>>>>>>  (super
>>>>>>>> easy to get working)
>>>>>>>> <https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/deployment/native_kubernetes.html>
>>>>>>>>
>>>>>>>> I confirmed that my Flink cluster was operational by running a
>>>>>>>> simple Wordcount job, initiated from my machine. However I wasn't yet 
>>>>>>>> able
>>>>>>>> to get Beam working:
>>>>>>>>
>>>>>>>> - With the Flink operator, I was able to submit a Beam job, but hit
>>>>>>>> the issue that I need Docker installed on my Flink nodes. I haven't yet
>>>>>>>> tried changing the operator's yaml files to add Docker inside them. I 
>>>>>>>> also
>>>>>>>> haven't tried this
>>>>>>>> <https://github.com/GoogleCloudPlatform/flink-on-k8s-operator/blob/master/docs/beam_guide.md>
>>>>>>>> yet because it implies submitting jobs using "kubectl apply"  which is
>>>>>>>> weird - why not just submit it through the Flink job server?
>>>>>>>>
>>>>>>>> - With Flink's native k8s support, I tried two things:
>>>>>>>>   - Creating a fat portable jar using  --output_executable_path.
>>>>>>>> The jar is huge (200+MB) and takes forever to upload to my Flink 
>>>>>>>> cluster -
>>>>>>>> this is a non-starter. But if I actually upload it, then I hit the same
>>>>>>>> issue with lacking Docker. Haven't tried fixing it yet.
>>>>>>>>   - Simply running my pipeline --runner=FlinkRunner
>>>>>>>> --environment_type=DOCKER --flink_master=$PUBLIC_IP:8081. The Java 
>>>>>>>> process
>>>>>>>> appears to send 1+GB of data to somewhere, but the job never even 
>>>>>>>> starts.
>>>>>>>>
>>>>>>>> I looked at a few conference talks:
>>>>>>>> -
>>>>>>>> https://www.cncf.io/wp-content/uploads/2020/02/CNCF-Webinar_-Apache-Flink-on-Kubernetes-Operator-1.pdf
>>>>>>>> - seems to imply that I need to add a Beam worker "sidecar" to the 
>>>>>>>> Flink
>>>>>>>> workers; and that I need to submit my job using "kubectl apply".
>>>>>>>> - https://www.youtube.com/watch?v=8k1iezoc5Sc which also mentions
>>>>>>>> the sidecar, but also mentions the fat jar option
>>>>>>>>
>>>>>>>> --
>>>>>>>> Eugene Kirpichov
>>>>>>>> http://www.linkedin.com/in/eugenekirpichov
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>> --
>>>>>> Eugene Kirpichov
>>>>>> http://www.linkedin.com/in/eugenekirpichov
>>>>>>
>>>>>
>>>>
>>>> --
>>>> Eugene Kirpichov
>>>> http://www.linkedin.com/in/eugenekirpichov
>>>>
>>>
>>>
>>> --
>>> Eugene Kirpichov
>>> http://www.linkedin.com/in/eugenekirpichov
>>>
>>
>>
>> --
>> Eugene Kirpichov
>> http://www.linkedin.com/in/eugenekirpichov
>>
>

-- 
Eugene Kirpichov
http://www.linkedin.com/in/eugenekirpichov

Reply via email to