Flink k8s Operator on AWS?

2022-06-21 Thread Matt Casters
Hi Flink team!

I'm interested in getting the new Flink Kubernetes Operator to work on AWS
EKS.  Following the documentation I got pretty far.  However, when trying
to run a job I got the following error:

Only "local" is supported as schema for application mode. This assumes t
> hat the jar is located in the image, not the Flink client. An example of
> such path is: local:///opt/flink/examples/streaming/WindowJoin.jar


 I have an Apache Hop/Beam fat jar capable of running the Flink pipeline in
my yml file:

jarURI: s3://hop-eks/hop/hop-2.1.0-fat.jar

So how could I go about getting the fat jar in a desired location for the
operator?

Getting this to work would be really cool for both short and long-lived
pipelines in the service of all sorts of data integration work.  It would
do away with the complexity of setting up and maintaining your own Flink
cluster.

Thanks in advance!

All the best,

Matt (mcasters, Apache Hop PMC)


Re: Flink k8s Operator on AWS?

2022-06-21 Thread Matt Casters
Thank you very much for the help Matyas and Gyula!

I just saw a video today where you were presenting the FKO.  Really nice
stuff!

So I'm guessing we're executing "flink run" at some point on the master and
that this is when we need the jar file to be local?
Am I right in assuming that this happens after the flink cluster in
question was started, as part of the job execution?

On the one hand I agree with the underlying idea that authentication and
security should not be a responsibility of the operator.   On the other
hand I could add a flink-s3 driver but then I'd also have to configure it
and so on and it's just hard to get that configuration to be really clean.

Do we have some service running on the flink cluster which would allow us
to post/copy files from the client (running kubectl) to the master?  If so,
could we add an option to the job specification to that effect?  Just
brainstorming ;-) (and forking apache/flink-kubernetes-operator)

All the best,
Matt

On Tue, Jun 21, 2022 at 2:52 PM Őrhidi Mátyás 
wrote:

> Hi Matt,
>
> - In FlinkDeployments you can utilize an init container to download your
> artifact onto a shared volume, then you can refer to it as local:/.. from
> the main container. FlinkDeployments comes with pod template support
> https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/custom-resource/pod-template/#pod-template
>
> - FlinkSessionJobs comes with an artifact fetcher, but it may need some
> tweaking to make it work on your environment:
>
> https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/custom-resource/overview/#flinksessionjob-spec-overview
>
> I hope it helps, let us know if you have further questions.
>
> Cheers,
> Matyas
>
>
>
> On Tue, Jun 21, 2022 at 2:35 PM Matt Casters <
> matt.cast...@neotechnology.com> wrote:
>
>> Hi Flink team!
>>
>> I'm interested in getting the new Flink Kubernetes Operator to work on
>> AWS EKS.  Following the documentation I got pretty far.  However, when
>> trying to run a job I got the following error:
>>
>> Only "local" is supported as schema for application mode. This assumes t
>>> hat the jar is located in the image, not the Flink client. An example of
>>> such path is: local:///opt/flink/examples/streaming/WindowJoin.jar
>>
>>
>>  I have an Apache Hop/Beam fat jar capable of running the Flink pipeline
>> in my yml file:
>>
>> jarURI: s3://hop-eks/hop/hop-2.1.0-fat.jar
>>
>> So how could I go about getting the fat jar in a desired location for the
>> operator?
>>
>> Getting this to work would be really cool for both short and long-lived
>> pipelines in the service of all sorts of data integration work.  It would
>> do away with the complexity of setting up and maintaining your own Flink
>> cluster.
>>
>> Thanks in advance!
>>
>> All the best,
>>
>> Matt (mcasters, Apache Hop PMC)
>>
>>


Re: Flink k8s Operator on AWS?

2022-06-22 Thread Matt Casters
Hi Matyas,

Again thank you very much for the information.  I'm a beginner and all
the help is really appreciated.  After some diving into the script
behind s3-artifiact-fetcher I kind of figured it out.  Have an folder
sync'ed into the pod container of the task manager.  Then I guess we should
be able to find the files locally.

At its core what we're trying to do with a project like Apache Hop is sit
on the side of the organizations that use the software since we want to
lower complexity, maintenance costs, learning curves and so on.  Every time
I see a cryptic scarcely documented Yaml file or complicated k8s setup I
need to ask myself in which way I'm sending our users on a week-long
mission.

In a way it makes me appreciate the work Google did with Dataflow a bit
more because they looked at this problem in a holistic way and considered
the platform (GCP), the engine (Dataflow cluster on GCP k8s) and the
executing pipeline (Beam API Jar files) to be different facets of the same
problem.  Jar files get uploaded automatically, the cluster automatically
instantiated, the pipeline run, monitored and scaled automatically and at
the end shut down properly.

I want to figure out a way to do this with Flink as well since I believe,
especially on AWS (even with Spark centric options on EMR, EMR serverless),
that running a pipeline is just too complicated.  Your work really helps!

All the best,
Matt

On Tue, Jun 21, 2022 at 4:53 PM Őrhidi Mátyás 
wrote:

> Hi Matt,
>
> I believe an artifact fetcher (e.g
> https://hub.docker.com/r/agiledigital/s3-artifact-fetcher ) + the pod
> template (
> https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/custom-resource/pod-template/#pod-template)
> is an elegant way to solve your problem.
>
> The operator uses K8s native integration under the hood:
> https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/deployment/resource-providers/native_kubernetes/#application-mode
>  In
> application mode,  the main() method of the application is executed on the
> JobManager, hence we need the jar locally.
>
> You can launch a session cluster (without job spec) on the operator that
> allows submitting jars if you would like to avoid dealing with
> authentication, but the recommended and safe approach is to use
> sessionjobs for this purpose.
>
>
> Cheers,
> Matyas
>
> On Tue, Jun 21, 2022 at 4:03 PM Matt Casters <
> matt.cast...@neotechnology.com> wrote:
>
>> Thank you very much for the help Matyas and Gyula!
>>
>> I just saw a video today where you were presenting the FKO.  Really nice
>> stuff!
>>
>> So I'm guessing we're executing "flink run" at some point on the master
>> and that this is when we need the jar file to be local?
>> Am I right in assuming that this happens after the flink cluster in
>> question was started, as part of the job execution?
>>
>> On the one hand I agree with the underlying idea that authentication and
>> security should not be a responsibility of the operator.   On the other
>> hand I could add a flink-s3 driver but then I'd also have to configure it
>> and so on and it's just hard to get that configuration to be really clean.
>>
>> Do we have some service running on the flink cluster which would allow us
>> to post/copy files from the client (running kubectl) to the master?  If so,
>> could we add an option to the job specification to that effect?  Just
>> brainstorming ;-) (and forking apache/flink-kubernetes-operator)
>>
>> All the best,
>> Matt
>>
>> On Tue, Jun 21, 2022 at 2:52 PM Őrhidi Mátyás 
>> wrote:
>>
>>> Hi Matt,
>>>
>>> - In FlinkDeployments you can utilize an init container to download your
>>> artifact onto a shared volume, then you can refer to it as local:/.. from
>>> the main container. FlinkDeployments comes with pod template support
>>> https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/custom-resource/pod-template/#pod-template
>>>
>>> - FlinkSessionJobs comes with an artifact fetcher, but it may need some
>>> tweaking to make it work on your environment:
>>>
>>> https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/custom-resource/overview/#flinksessionjob-spec-overview
>>>
>>> I hope it helps, let us know if you have further questions.
>>>
>>> Cheers,
>>> Matyas
>>>
>>>
>>>
>>> On Tue, Jun 21, 2022 at 2:35 PM Matt Casters <
>>> matt.cast...@neotechnology.com> wrote:
>>>
>>>> Hi Flink team!
>>>>
>>>> I'm interested in getting th

Re: Flink k8s Operator on AWS?

2022-06-22 Thread Matt Casters
Hi Yang,

Thanks for the suggestion!  I looked into this volume sharing on EKS
yesterday but I couldn't figure it out right away.
The way that people come into the Apache Hop project is often with very
little technical knowledge since that's sort of the goal of the project:
make things easy.  Following page after page of complicated instructions
just to get a few files into a pod container... I feel it's just a bit
much.
But again, this is my frustration with k8s, not with Flink ;-)

Cheers,
Matt

On Wed, Jun 22, 2022 at 5:32 AM Yang Wang  wrote:

> Matyas and Gyula have shared many great informations about how to make the
> Flink Kubernetes Operator work on the EKS.
>
> One more input about how to prepare the user jars. If you are more
> familiar with K8s, you could use persistent volume to provide the user jars
> and them mount the volume to JobManager and TaskManager.
> I think the EKS could support EBS, NFS and more other PVs.
>
> Best,
> Yang
>
> Őrhidi Mátyás  于2022年6月21日周二 23:00写道:
>
>> Hi Matt,
>>
>> I believe an artifact fetcher (e.g
>> https://hub.docker.com/r/agiledigital/s3-artifact-fetcher ) + the pod
>> template (
>> https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/custom-resource/pod-template/#pod-template)
>> is an elegant way to solve your problem.
>>
>> The operator uses K8s native integration under the hood:
>> https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/deployment/resource-providers/native_kubernetes/#application-mode
>>  In
>> application mode,  the main() method of the application is executed on the
>> JobManager, hence we need the jar locally.
>>
>> You can launch a session cluster (without job spec) on the operator that
>> allows submitting jars if you would like to avoid dealing with
>> authentication, but the recommended and safe approach is to use
>> sessionjobs for this purpose.
>>
>>
>> Cheers,
>> Matyas
>>
>> On Tue, Jun 21, 2022 at 4:03 PM Matt Casters <
>> matt.cast...@neotechnology.com> wrote:
>>
>>> Thank you very much for the help Matyas and Gyula!
>>>
>>> I just saw a video today where you were presenting the FKO.  Really nice
>>> stuff!
>>>
>>> So I'm guessing we're executing "flink run" at some point on the master
>>> and that this is when we need the jar file to be local?
>>> Am I right in assuming that this happens after the flink cluster in
>>> question was started, as part of the job execution?
>>>
>>> On the one hand I agree with the underlying idea that authentication and
>>> security should not be a responsibility of the operator.   On the other
>>> hand I could add a flink-s3 driver but then I'd also have to configure it
>>> and so on and it's just hard to get that configuration to be really clean.
>>>
>>> Do we have some service running on the flink cluster which would allow
>>> us to post/copy files from the client (running kubectl) to the master?  If
>>> so, could we add an option to the job specification to that effect?  Just
>>> brainstorming ;-) (and forking apache/flink-kubernetes-operator)
>>>
>>> All the best,
>>> Matt
>>>
>>> On Tue, Jun 21, 2022 at 2:52 PM Őrhidi Mátyás 
>>> wrote:
>>>
>>>> Hi Matt,
>>>>
>>>> - In FlinkDeployments you can utilize an init container to download
>>>> your artifact onto a shared volume, then you can refer to it as local:/..
>>>> from the main container. FlinkDeployments comes with pod template support
>>>> https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/custom-resource/pod-template/#pod-template
>>>>
>>>> - FlinkSessionJobs comes with an artifact fetcher, but it may need some
>>>> tweaking to make it work on your environment:
>>>>
>>>> https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/custom-resource/overview/#flinksessionjob-spec-overview
>>>>
>>>> I hope it helps, let us know if you have further questions.
>>>>
>>>> Cheers,
>>>> Matyas
>>>>
>>>>
>>>>
>>>> On Tue, Jun 21, 2022 at 2:35 PM Matt Casters <
>>>> matt.cast...@neotechnology.com> wrote:
>>>>
>>>>> Hi Flink team!
>>>>>
>>>>> I'm interested in getting the new Flink Kubernetes Operator to work on
>>>>> AWS EKS.  Following the documentation I got pretty far. 

Re: Flink k8s Operator on AWS?

2022-06-24 Thread Matt Casters
Hi Mátyás & all,

Thanks again for the advice so far. On a related note I noticed Java 8
being used, indicated in the log.

org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] -
 JAVA_HOME: /usr/local/openjdk-8

Is there a way to use Java 11 to start Flink with?

Kind regards,

Matt

On Tue, Jun 21, 2022 at 4:53 PM Őrhidi Mátyás 
wrote:

> Hi Matt,
>
> I believe an artifact fetcher (e.g
> https://hub.docker.com/r/agiledigital/s3-artifact-fetcher ) + the pod
> template (
> https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/custom-resource/pod-template/#pod-template)
> is an elegant way to solve your problem.
>
> The operator uses K8s native integration under the hood:
> https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/deployment/resource-providers/native_kubernetes/#application-mode
>  In
> application mode,  the main() method of the application is executed on the
> JobManager, hence we need the jar locally.
>
> You can launch a session cluster (without job spec) on the operator that
> allows submitting jars if you would like to avoid dealing with
> authentication, but the recommended and safe approach is to use
> sessionjobs for this purpose.
>
>
> Cheers,
> Matyas
>
> On Tue, Jun 21, 2022 at 4:03 PM Matt Casters <
> matt.cast...@neotechnology.com> wrote:
>
>> Thank you very much for the help Matyas and Gyula!
>>
>> I just saw a video today where you were presenting the FKO.  Really nice
>> stuff!
>>
>> So I'm guessing we're executing "flink run" at some point on the master
>> and that this is when we need the jar file to be local?
>> Am I right in assuming that this happens after the flink cluster in
>> question was started, as part of the job execution?
>>
>> On the one hand I agree with the underlying idea that authentication and
>> security should not be a responsibility of the operator.   On the other
>> hand I could add a flink-s3 driver but then I'd also have to configure it
>> and so on and it's just hard to get that configuration to be really clean.
>>
>> Do we have some service running on the flink cluster which would allow us
>> to post/copy files from the client (running kubectl) to the master?  If so,
>> could we add an option to the job specification to that effect?  Just
>> brainstorming ;-) (and forking apache/flink-kubernetes-operator)
>>
>> All the best,
>> Matt
>>
>> On Tue, Jun 21, 2022 at 2:52 PM Őrhidi Mátyás 
>> wrote:
>>
>>> Hi Matt,
>>>
>>> - In FlinkDeployments you can utilize an init container to download your
>>> artifact onto a shared volume, then you can refer to it as local:/.. from
>>> the main container. FlinkDeployments comes with pod template support
>>> https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/custom-resource/pod-template/#pod-template
>>>
>>> - FlinkSessionJobs comes with an artifact fetcher, but it may need some
>>> tweaking to make it work on your environment:
>>>
>>> https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/custom-resource/overview/#flinksessionjob-spec-overview
>>>
>>> I hope it helps, let us know if you have further questions.
>>>
>>> Cheers,
>>> Matyas
>>>
>>>
>>>
>>> On Tue, Jun 21, 2022 at 2:35 PM Matt Casters <
>>> matt.cast...@neotechnology.com> wrote:
>>>
>>>> Hi Flink team!
>>>>
>>>> I'm interested in getting the new Flink Kubernetes Operator to work on
>>>> AWS EKS.  Following the documentation I got pretty far.  However, when
>>>> trying to run a job I got the following error:
>>>>
>>>> Only "local" is supported as schema for application mode. This assumes t
>>>>> hat the jar is located in the image, not the Flink client. An example
>>>>> of such path is: local:///opt/flink/examples/streaming/WindowJoin.jar
>>>>
>>>>
>>>>  I have an Apache Hop/Beam fat jar capable of running the Flink
>>>> pipeline in my yml file:
>>>>
>>>> jarURI: s3://hop-eks/hop/hop-2.1.0-fat.jar
>>>>
>>>> So how could I go about getting the fat jar in a desired location for
>>>> the operator?
>>>>
>>>> Getting this to work would be really cool for both short and long-lived
>>>> pipelines in the service of all sorts of data integration work.  It would
>>>> do away with the complexity of setting up and maintaining your own Flink
>>>> cluster.
>>>>
>>>> Thanks in advance!
>>>>
>>>> All the best,
>>>>
>>>> Matt (mcasters, Apache Hop PMC)
>>>>
>>>>


Re: Flink k8s Operator on AWS?

2022-06-24 Thread Matt Casters
Yes of-course.  I already feel a bit less intelligent for having asked the
question ;-)

The status now is that I managed to have it all puzzled together.  Copying
the files from s3 to an ephemeral volume takes all of 2 seconds so it's
really not an issue.  The cluster starts and our fat jar and Apache Hop
MainBeam class is found and started.

The only thing that remains is figuring out how to configure the Flink
cluster itself.  I have a couple of m5.large ec2 instances in a node group
on EKS and I set taskmanager.numberOfTaskSlots to "4".  However, the tasks
in the pipeline can't seem to find resources to start.

Caused by:
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
Slot request bulk is not fulfillable! Could not allocate the required slot
within slot request timeout

Parallelism was set to 1 for the runner and there are only 2 tasks in my
first Beam pipeline so it should be simple enough but it just times out.

Next step for me is to document the result which will end up on
hop.apache.org.   I'll probably also want to demo this in Austin at the
upcoming Beam summit.

Thanks a lot for your time and help so far!

Cheers,
Matt


Re: Flink k8s Operator on AWS?

2022-06-27 Thread Matt Casters
The problem was a misconfiguration of the initContainer which would copy my
artifacts from s3 to an ephemeral volume.  This caused the task manager to
get started for a bit and then to be shut down.  It was hard to get logging
about this since the pods were gone before I could get logging from it.  I
chalk all that up to just me lacking a bit of experience with k8s.

That being said... It's all working now and I documented the deployment
over here:

https://hop.apache.org/manual/next/pipeline/beam/flink-k8s-operator-running-hop-pipeline.html

A big thank you to everyone that helped me out!

Cheers,
Matt

On Mon, Jun 27, 2022 at 4:59 AM Yang Wang  wrote:

> Could you please share the JobManager logs of failed deployment? It will
> also help a lot if you could show the pending pod status via "kubectl
> describe ".
>
> Given that the current Flink Kubernetes Operator is built on top of native
> K8s integration[1], the Flink ResourceManager should allocate enough
> TaskManager pods automatically.
> We need to find out what is wrong via the logs. Maybe the service account
> or taint or something else.
>
>
> [1]. https://flink.apache.org/2021/02/10/native-k8s-with-ha.html
>
>
> Best,
> Yang
>
> Matt Casters  于2022年6月24日周五 23:48写道:
>
>> Yes of-course.  I already feel a bit less intelligent for having asked
>> the question ;-)
>>
>> The status now is that I managed to have it all puzzled together.
>> Copying the files from s3 to an ephemeral volume takes all of 2 seconds so
>> it's really not an issue.  The cluster starts and our fat jar and Apache
>> Hop MainBeam class is found and started.
>>
>> The only thing that remains is figuring out how to configure the Flink
>> cluster itself.  I have a couple of m5.large ec2 instances in a node group
>> on EKS and I set taskmanager.numberOfTaskSlots to "4".  However, the tasks
>> in the pipeline can't seem to find resources to start.
>>
>> Caused by:
>> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
>> Slot request bulk is not fulfillable! Could not allocate the required slot
>> within slot request timeout
>>
>> Parallelism was set to 1 for the runner and there are only 2 tasks in my
>> first Beam pipeline so it should be simple enough but it just times out.
>>
>> Next step for me is to document the result which will end up on
>> hop.apache.org.   I'll probably also want to demo this in Austin at the
>> upcoming Beam summit.
>>
>> Thanks a lot for your time and help so far!
>>
>> Cheers,
>> Matt
>>
>>