Re: [Discuss] CRD for flink sql gateway in the flink k8s operator

2023-09-19 Thread Dongwoo Kim
t; > > > >
> > > > > On Sun, Sep 17, 2023 at 2:02 PM Gyula Fóra 
> > > wrote:
> > > > >
> > > > > > Hi!
> > > > > > It sounds pretty easy to deploy the gateway automatically with
> > > session
> > > > > > cluster deployments from the operator , but there is a major
> > > limitation
> > > > > > currently. The SQL gateway itself doesn't really support any
> > operator
> > > > > > integration so jobs submitted through the SQL gateway would not
> be
> > > > > > manageable by the operator (they won't show up as session jobs).
> > > > > >
> > > > > > Without that, this is a very strange feature. We would make
> > something
> > > > > much
> > > > > > easier for users that is not well supported by the operator in
> the
> > > first
> > > > > > place. The operator is designed to manage clusters and jobs
> > > > > > (FlinkDeployment / FlinkSessionJob). It would be good to
> understand
> > > if we
> > > > > > could make the SQL Gateway create a FlinkSessionJob / Deployment
> > > (that
> > > > > > would require application cluster support) and basically submit
> the
> > > job
> > > > > > through the operator.
> > > > > >
> > > > > > Cheers,
> > > > > > Gyula
> > > > > >
> > > > > > On Sun, Sep 17, 2023 at 1:26 AM Yangze Guo 
> > > wrote:
> > > > > >
> > > > > > > > There would be many different ways of doing this. One gateway
> > per
> > > > > > > session cluster, one gateway shared across different
> clusters...
> > > > > > >
> > > > > > > Currently, sql gateway cannot be shared across multiple
> clusters.
> > > > > > >
> > > > > > > > understand the tradeoff and the simplest way of accomplishing
> > > this.
> > > > > > >
> > > > > > > I'm not familiar with the Flink operator codebase, it would be
> > > > > > > appreciated if you could elaborate more on the cost of adding
> > this
> > > > > > > feature. I agree that deploying a gateway using the native
> > > Kubernetes
> > > > > > > Deployment can be a simple way and straightforward for users.
> > > However,
> > > > > > > integrating it into an operator can provide additional benefits
> > > and be
> > > > > > > more user-friendly, especially for users who are less familiar
> > with
> > > > > > > Kubernetes. By using an operator, users can benefit from
> > consistent
> > > > > > > version management with the session cluster and upgrade
> > > capabilities.
> > > > > > >
> > > > > > >
> > > > > > > Best,
> > > > > > > Yangze Guo
> > > > > > >
> > > > > > > On Fri, Sep 15, 2023 at 5:38 PM Gyula Fóra <
> gyula.f...@gmail.com
> > >
> > > > > wrote:
> > > > > > > >
> > > > > > > > There would be many different ways of doing this. One gateway
> > per
> > > > > > session
> > > > > > > > cluster, one gateway shared across different clusters...
> > > > > > > > I would not rush to add anything anywhere until we understand
> > the
> > > > > > > tradeoff
> > > > > > > > and the simplest way of accomplishing this.
> > > > > > > >
> > > > > > > > The operator already supports ingresses for session clusters
> so
> > > we
> > > > > > could
> > > > > > > > have a gateway sitting somewhere else simply using it.
> > > > > > > >
> > > > > > > > Gyula
> > > > > > > >
> > > > > > > > On Fri, Sep 15, 2023 at 10:18 AM Yangze Guo <
> > karma...@gmail.com>
> > > > > > wrote:
> > > > > > > >
> > > > > > > > > Thanks for bringing this up, Dongwoo. Flink SQL Gateway is
> > > also a
> > > > > key
> > > > > > > > > component for OLAP scenarios.
>

Re: [Discuss] CRD for flink sql gateway in the flink k8s operator

2023-09-14 Thread Dongwoo Kim
Hi all,

*@Gyula*
Thanks for the consideration Gyula. My initial idea for the CR was roughly
like below.
I focused on simplifying the setup in k8s environment, but I agree with
your opinion that for the sql gateway
we don't need custom operator logic to handle and most of the requirements
can be met by existing k8s resources.
So maybe helm chart that bundles all resources needed should be enough.

apiVersion: flink.apache.org/v1beta1
kind: FlinkSqlGateway
metadata:
  name: flink-sql-gateway-example
  namespace: default
spec:
  clusterName: flink-session-cluster-example
  exposeServiceType: LoadBalancer
  flinkSqlGatewayConfiguration:
sql-gateway.endpoint.type: "hiveserver2"
sql-gateway.endpoint.hiveserver2.catalog.name: "hive"
  hiveConf:
configMap:
  name: hive-config
  items:
- key: hive-site.xml
  path: hive-site.xml


*@xiaolong, @Shammon*
Hi xiaolong and Shammon.
Thanks for taking the time to share.
I'd also like to add my experience with setting up flink sql gateway on k8s.
Without building a new Docker image, I've added a separate container to the
existing JobManager pod and started the sql gateway using the
"sql-gateway.sh start-foreground" command.
I haven't explored deploying the sql gateway as an independent deployment
yet, but that's something I'm considering after modifying JM's address to
desired session cluster.

Thanks all

Best
Dongwoo

2023년 9월 15일 (금) 오전 11:55, Xiaolong Wang
님이 작성:

> Hi, Shammon,
>
> Yes, I want to create a Flink SQL-gateway in a job-manager.
>
> Currently, the above script is generally a work-around and allows me to
> start a Flink session job manager with a SQL gateway running upon.
>
> I agree that it'd be more elegant that we create a new job type and write a
> script, which is much easier for the user to use (since they do not need to
> build a separate Flink image any more).
>
>
>
> On Fri, Sep 15, 2023 at 10:29 AM Shammon FY  wrote:
>
> > Hi,
> >
> > Currently `sql-gateway` can be started with the script `sql-gateway.sh`
> in
> > an existing node, it is more like a simple "standalone" node. I think
> it's
> > valuable if we can do more work to start it in k8s.
> >
> > For xiaolong:
> > Do you want to start a sql-gateway instance in the jobmanager pod? I
> think
> > maybe we need a script like `kubernetes-sql-gatewah.sh` to start
> > `sql-gateway` pods with a flink image, what do you think?
> >
> > Best,
> > Shammon FY
> >
> >
> > On Fri, Sep 15, 2023 at 10:02 AM Xiaolong Wang
> >  wrote:
> >
> > > Hi, I've experiment this feature on K8S recently, here is some of my
> > trial:
> > >
> > >
> > > 1. Create a new kubernetes-jobmanager.sh script with the following
> > content
> > >
> > > #!/usr/bin/env bash
> > > $FLINK_HOME/bin/sql-gateway.sh start
> > > $FLINK_HOME/bin/kubernetes-jobmanager1.sh kubernetes-session
> > >
> > > 2. Build your own Flink docker image something like this
> > > FROM flink:1.17.1-scala_2.12-java11
> > >
> > > RUN mv $FLINK_HOME/bin/kubernetes-jobmanager.sh $FLINK_HOME/bin/
> > > kubernetes-jobmanager1.sh
> > > COPY ./kubernetes-jobmanager.sh
> $FLINK_HOME/bin/kubernetes-jobmanager.sh
> > >
> > > RUN chmod +x $FLINK_HOME/bin/*.sh
> > > USER flink
> > >
> > > 3. Create a Flink session job with the operator using the above image.
> > >
> > > On Thu, Sep 14, 2023 at 9:49 PM Gyula Fóra 
> wrote:
> > >
> > > > Hi!
> > > >
> > > > I don't completely understand what would be a content of such CRD,
> > could
> > > > you give a minimal example how the Flink SQL Gateway CR yaml would
> look
> > > > like?
> > > >
> > > > Adding a CRD would mean you need to add some operator/controller
> logic
> > as
> > > > well. Why not simply use a Deployment / StatefulSet in Kubernetes?
> > > >
> > > > Or a Helm chart if you want to make it more user friendly?
> > > >
> > > > Cheers,
> > > > Gyula
> > > >
> > > > On Thu, Sep 14, 2023 at 12:57 PM Dongwoo Kim  >
> > > > wrote:
> > > >
> > > > > Hi all,
> > > > >
> > > > > I've been working on setting up a flink SQL gateway in a k8s
> > > environment
> > > > > and it got me thinking — what if we had a CRD for this?
> > > > >
> > > > > So I have quick questions below.
> > > > > 1. Is there ongoing work to create a CRD for the Flink SQL Gateway?
> > > > > 2. If not, would the community be open to considering a CRD for
> this?
> > > > >
> > > > > I've noticed a growing demand for simplified setup of the flink sql
> > > > gateway
> > > > > in flink's slack channel.
> > > > > Implementing a CRD could make deployments easier and offer better
> > > > > integration with k8s.
> > > > >
> > > > > If this idea is accepted, I'm open to drafting a FLIP for further
> > > > > discussion
> > > > >
> > > > > Thanks for your time and looking forward to your thoughts!
> > > > >
> > > > > Best regards,
> > > > > Dongwoo
> > > > >
> > > >
> > >
> >
>


[Discuss] CRD for flink sql gateway in the flink k8s operator

2023-09-14 Thread Dongwoo Kim
Hi all,

I've been working on setting up a flink SQL gateway in a k8s environment
and it got me thinking — what if we had a CRD for this?

So I have quick questions below.
1. Is there ongoing work to create a CRD for the Flink SQL Gateway?
2. If not, would the community be open to considering a CRD for this?

I've noticed a growing demand for simplified setup of the flink sql gateway
in flink's slack channel.
Implementing a CRD could make deployments easier and offer better
integration with k8s.

If this idea is accepted, I'm open to drafting a FLIP for further discussion

Thanks for your time and looking forward to your thoughts!

Best regards,
Dongwoo


Re: [DISCUSS] Add config to enable job stop with savepoint on exceeding tolerable checkpoint Failures

2023-09-06 Thread Dongwoo Kim
Hi Yanfei, Hangxiang.

Thank you for taking the time to review my suggestions.
I agree with Hangxiang that simply triggering a savepoint based on repeated
checkpoint failures isn't a reasonable approach.
Adding a specific condition, like *CHECKPOINT_ASYNC_EXCEPTION* as the
reason for the last checkpoint failure, could make it more practical,
although it's not a perfect solution.
Regarding restart policy my initial thought was to stop the job after
creating a savepoint.
I was open to further discussions about refining the restart policy,
especially if the community was interested in the idea of a primary/standby
checkpoint storage setup.
However, Hangxiang's suggestion to utilize Flink's REST API hadn't crossed
my mind, and it seems to address my needs well.

I'll try leveraging the REST API to implement a failover strategy of
checkpoint storage failure.
Thank you again for your insights, they've been extremely helpful.

Best Dongwoo,

2023년 9월 6일 (수) 오후 4:57, Hangxiang Yu 님이 작성:

> Hi, Dongwoo.
> IIUC, you mean using savepoint to store a snapshot to other storage if
> checkpoints fail multiple times due to some long lasting exceptions of
> external storage, right ?
> I think it's better to achieve this by an external tool instead of
> introducing a config like that:
> 1. it's not so easy to judge whether an exception occurs due to external
> storage or not sometimes, and it's not so reasonable that we just trigger a
> savepoint if checkpoints fail multiple times.
> 2. It's better to let some logic about triggering savepoint, e.g. periodic
> savepoint, triggering stop-with-savepoint, done by external tools or
> platform. As you could see from [1], we intend to make their scopes clear.
>
> Maybe you could check the status and failure message by [2] periodically in
> your external tool or platform and then trigger savepoint or
> stop-with-savepoint by REST API or CLI.
>
> [1]
>
> https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/ops/state/checkpoints_vs_savepoints/
> [2]
>
> https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/ops/rest_api/#jobs-jobid-checkpoints
>
> On Wed, Sep 6, 2023 at 11:05 AM Yanfei Lei  wrote:
>
> > Hi Dongwoo,
> >
> > If the checkpoint has failed
> > `execution.checkpointing.tolerable-failed-checkpoints` times, then
> > stopWithSavepoint is likely to fail as well.
> > If stopWithSavepoint succeeds or fails, will the job just stop?  I am
> > more curious about how this option works with the restart strategy?
> >
> > Best,
> > Yanfei
> >
> >
> > Dongwoo Kim  于2023年9月4日周一 22:17写道:
> > >
> > > Hi all,
> > > I have a proposal that aims to enhance the flink application's
> > resilience in cases of unexpected failures in checkpoint storages like S3
> > or HDFS,
> > >
> > > [Background]
> > > When using self managed S3-compatible object storage, we faced
> > checkpoint async failures lasting for an extended period more than 30
> > minutes,
> > > leading to multiple job restarts and causing lags in our streaming
> > application.
> > >
> > > [Current Behavior]
> > > Currently, when the number of checkpoint failures exceeds a predefined
> > tolerable limit, flink will either restart or fail the job based on how
> > it's configured.
> > > In my opinion this does not handle scenarios where the checkpoint
> > storage itself may be unreliable or experiencing downtime.
> > >
> > > [Proposed Feature]
> > > I propose a config that allows for a graceful job stop with a savepoint
> > when the tolerable checkpoint failure limit is reached.
> > > Instead of restarting/failing the job when tolerable checkpoint failure
> > exceeds, when this new config is set to true just trigger
> stopWithSavepoint.
> > >
> > > This could offer the following benefits.
> > > - Indication of Checkpoint Storage State: Exceeding tolerable
> checkpoint
> > failures could indicate unstable checkpoint storage.
> > > - Automated Fallback Strategy: When combined with a monitoring cron
> job,
> > this feature could act as an automated fallback strategy for handling
> > unstable checkpoint storage.
> > >   The job would stop safely, take a savepoint, and then you could
> > automatically restart with different checkpoint storage configured like
> > switching from S3 to HDFS.
> > >
> > > For example let's say checkpoint path is configured to s3 and savepoint
> > path is configured to hdfs.
> > > When the new config is set to true the job stops with savepoint like
> > below when tolerable checkpoint failure exceeds.
> > > And we can restart the job from that savepoint while the checkpoint
> > configured as hdfs.
> > >
> > >
> > >
> > > Looking forward to hearing the community's thoughts on this proposal.
> > > And also want to ask how the community is handling long lasting
> unstable
> > checkpoint storage issues.
> > >
> > > Thanks in advance.
> > >
> > > Best dongwoo,
> >
>
>
> --
> Best,
> Hangxiang.
>


[DISCUSS] Add config to enable job stop with savepoint on exceeding tolerable checkpoint Failures

2023-09-04 Thread Dongwoo Kim
Hi all,
I have a proposal that aims to enhance the flink application's resilience
in cases of unexpected failures in checkpoint storages like S3 or HDFS,

*[Background]*
When using self managed S3-compatible object storage, we faced checkpoint
async failures lasting for an extended period more than 30 minutes,
leading to multiple job restarts and causing lags in our streaming
application.


*[Current Behavior]*Currently, when the number of checkpoint failures
exceeds a predefined tolerable limit, flink will either restart or fail the
job based on how it's configured.
In my opinion this does not handle scenarios where the checkpoint storage
itself may be unreliable or experiencing downtime.

*[Proposed Feature]*
I propose a config that allows for a graceful job stop with a savepoint
when the tolerable checkpoint failure limit is reached.
Instead of restarting/failing the job when tolerable checkpoint failure
exceeds, when this new config is set to true just trigger stopWithSavepoint.

This could offer the following benefits.
- Indication of Checkpoint Storage State: Exceeding tolerable checkpoint
failures could indicate unstable checkpoint storage.
- Automated Fallback Strategy: When combined with a monitoring cron job,
this feature could act as an automated fallback strategy for handling
unstable checkpoint storage.
  The job would stop safely, take a savepoint, and then you could
automatically restart with different checkpoint storage configured like
switching from S3 to HDFS.

For example let's say checkpoint path is configured to s3 and savepoint
path is configured to hdfs.
When the new config is set to true the job stops with savepoint like below
when tolerable checkpoint failure exceeds.
And we can restart the job from that savepoint while the checkpoint
configured as hdfs.
[image: image.png]


Looking forward to hearing the community's thoughts on this proposal.
And also want to ask how the community is handling long lasting unstable
checkpoint storage issues.

Thanks in advance.

Best dongwoo,