Re: Native Kubernetes Task Managers

2024-06-02 Thread Xintong Song
I may not have understood what you mean by the naming scheme. I think the
limitation "pods in a StatefulSet are always terminated in the reverse
order as they are created" comes from Kubernetes and has nothing to do with
the naming scheme.

Best,

Xintong



On Mon, Jun 3, 2024 at 1:13 PM Alexis Sarda-Espinosa <
sarda.espin...@gmail.com> wrote:

> Hi Xintong,
>
> After experimenting a bit, I came to roughly the same conclusion: cleanup
> is what's more or less incompatible if Kubernetes manages the pods. Then it
> might be better to just allow using a more stable pod naming scheme that
> doesn't depend on the attempt number and thus produces more stable task
> manager metrics. I'll explore that.
>
> Regards,
> Alexis.
>
> On Mon, 3 Jun 2024, 03:35 Xintong Song,  wrote:
>
> > I think the reason we didn't choose StatefulSet when introducing the
> Native
> > K8s Deployment is that, IIRC, we want Flink's ResourceManager to have
> full
> > control of the individual pod lifecycles.
> >
> > E.g.,
> > - Pods in a StatefulSet are always terminated in the reverse order as
> they
> > are created. This prevents us from releasing a specific idle TM that is
> not
> > necessarily created lastly.
> > - If a pod is unexpectedly terminated, Flink's ResourceManager should
> > decide whether to restart it or not according to the job status.
> > (Technically, the same issue as above, that we may want pods to be
> > terminated / deleted in a different order.)
> >
> > There might be some other reasons. I just cannot recall all the details.
> >
> > As for determining whether a pod is OOM killed, I think Flink does print
> > diagnostics for terminated pods in JM logs, i.e. the `exitCode`, `reason`
> > and `message` of the `Terminated` container state. In our production, it
> > shows "(exitCode=137, reason=OOMKilled, message=null)". However, since
> the
> > diagnostics are from K8s, I'm not 100% sure whether this behavior is same
> > for all K8s versions,.
> >
> > Best,
> >
> > Xintong
> >
> >
> >
> > On Sun, Jun 2, 2024 at 7:35 PM Alexis Sarda-Espinosa <
> > sarda.espin...@gmail.com> wrote:
> >
> > > Hi devs,
> > >
> > > Some time ago I asked about the way Task Manager pods are handled by
> the
> > > native Kubernetes driver [1]. I have now looked a bit through the
> source
> > > code and I think it could be possible to deploy TMs with a stateful
> set,
> > > which could allow tracking OOM kills as I mentioned in my original
> email,
> > > and could also make it easier to track metrics and create alerts, since
> > the
> > > labels wouldn't change as much.
> > >
> > > One challenge is probably the new elastic scaling features [2], since
> the
> > > driver would have to differentiate between new pod requests due to a TM
> > > terminating, and a request due to scaling. I'm also not sure where
> > > downscaling requests are currently handled.
> > >
> > > I would be interested in taking a look at this and seeing if I can get
> > > something working. I think it would be possible to make it configurable
> > in
> > > a way that maintains backwards compatibility. Would it be ok if I
> enter a
> > > Jira ticket and try it out?
> > >
> > > Regards,
> > > Alexis.
> > >
> > > [1] https://lists.apache.org/thread/jysgdldv8swgf4fhqwqochgf6hq0qs52
> > > [2]
> > >
> > >
> >
> https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/deployment/elastic_scaling/
> > >
> >
>


Re: Native Kubernetes Task Managers

2024-06-02 Thread Alexis Sarda-Espinosa
Hi Xintong,

After experimenting a bit, I came to roughly the same conclusion: cleanup
is what's more or less incompatible if Kubernetes manages the pods. Then it
might be better to just allow using a more stable pod naming scheme that
doesn't depend on the attempt number and thus produces more stable task
manager metrics. I'll explore that.

Regards,
Alexis.

On Mon, 3 Jun 2024, 03:35 Xintong Song,  wrote:

> I think the reason we didn't choose StatefulSet when introducing the Native
> K8s Deployment is that, IIRC, we want Flink's ResourceManager to have full
> control of the individual pod lifecycles.
>
> E.g.,
> - Pods in a StatefulSet are always terminated in the reverse order as they
> are created. This prevents us from releasing a specific idle TM that is not
> necessarily created lastly.
> - If a pod is unexpectedly terminated, Flink's ResourceManager should
> decide whether to restart it or not according to the job status.
> (Technically, the same issue as above, that we may want pods to be
> terminated / deleted in a different order.)
>
> There might be some other reasons. I just cannot recall all the details.
>
> As for determining whether a pod is OOM killed, I think Flink does print
> diagnostics for terminated pods in JM logs, i.e. the `exitCode`, `reason`
> and `message` of the `Terminated` container state. In our production, it
> shows "(exitCode=137, reason=OOMKilled, message=null)". However, since the
> diagnostics are from K8s, I'm not 100% sure whether this behavior is same
> for all K8s versions,.
>
> Best,
>
> Xintong
>
>
>
> On Sun, Jun 2, 2024 at 7:35 PM Alexis Sarda-Espinosa <
> sarda.espin...@gmail.com> wrote:
>
> > Hi devs,
> >
> > Some time ago I asked about the way Task Manager pods are handled by the
> > native Kubernetes driver [1]. I have now looked a bit through the source
> > code and I think it could be possible to deploy TMs with a stateful set,
> > which could allow tracking OOM kills as I mentioned in my original email,
> > and could also make it easier to track metrics and create alerts, since
> the
> > labels wouldn't change as much.
> >
> > One challenge is probably the new elastic scaling features [2], since the
> > driver would have to differentiate between new pod requests due to a TM
> > terminating, and a request due to scaling. I'm also not sure where
> > downscaling requests are currently handled.
> >
> > I would be interested in taking a look at this and seeing if I can get
> > something working. I think it would be possible to make it configurable
> in
> > a way that maintains backwards compatibility. Would it be ok if I enter a
> > Jira ticket and try it out?
> >
> > Regards,
> > Alexis.
> >
> > [1] https://lists.apache.org/thread/jysgdldv8swgf4fhqwqochgf6hq0qs52
> > [2]
> >
> >
> https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/deployment/elastic_scaling/
> >
>


[jira] [Created] (FLINK-35507) Support For Individual Job Level Resource Allocation in Session Cluster in k8s

2024-06-02 Thread Amarjeet (Jira)
Amarjeet created FLINK-35507:


 Summary: Support For Individual Job Level Resource Allocation in 
Session Cluster in k8s
 Key: FLINK-35507
 URL: https://issues.apache.org/jira/browse/FLINK-35507
 Project: Flink
  Issue Type: New Feature
  Components: Deployment / Kubernetes
Reporter: Amarjeet


We can have a setup like Spark where in Spark Cluster we can set individual job 
level setting in a spark cluster to access the resouces from memory to core. 
Also Support Dynamic memory allocation.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35506) disable kafka auto-commit and rely on flink’s checkpointing if both are enabled

2024-06-02 Thread elon_X (Jira)
elon_X created FLINK-35506:
--

 Summary: disable kafka auto-commit and rely on flink’s 
checkpointing if both are enabled
 Key: FLINK-35506
 URL: https://issues.apache.org/jira/browse/FLINK-35506
 Project: Flink
  Issue Type: Improvement
  Components: Connectors / Kafka
Affects Versions: 1.16.1
Reporter: elon_X


When I use KafkaSource for consuming topics and set the Kafka parameter 
{{{}enable.auto.commit=true{}}}, while also enabling checkpointing for the 
task, I notice that both will commit offsets. Should Kafka's auto-commit be 
disabled when enabling Flink checkpointing, similar to how it's done with 
FlinkKafkaConsumer?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] Add a JDBC Sink Plugin to Flink-CDC-Pipeline

2024-06-02 Thread Jerry
As far as I know, after the Flink-CDC table structure is changed, a
broadcast will be sent, causing other operators to suspend data
synchronization tasks and wait for the structure change to succeed before
continuing. This will cause data synchronization to terminate during the
structure change and cause a large amount of data to accumulate, which may
eventually cause an avalanche and lead to serious performance problems. We
currently have two optimization suggestions:
1. Incremental data buffer
Use the incremental data buffer to temporarily store CDC data change events
until the table structure change is completed.
Implementation method:
Implement a buffer to temporarily store CDC data changes.
After the table structure change is completed, the data in the buffer is
processed sequentially and submitted to the target database (can be written
in batches at one time).
2. Optimize table structure change operations
Try to reduce the time and frequency of table structure changes.
Implementation method:
Merge multiple table structure change operations to reduce the frequency of
operations.
Use database-specific optimization techniques (such as MySQL's Online DDL)
to speed up the table structure change process.

Yanquan Lv  于2024年5月22日周三 21:11写道:

> Thanks jerry for driving this, JDBC sink for CDC pipeline is indeed a high
> demand in the community.
>
> I have one concern:
> Some databases that use jdbc, such as mysql, may be time-consuming to
> perform table structure changes, but FlinkCDC will not send DataChangeEvent
> during this period, which can cause significant latency in sending cdc data
> changes, You may need to consider and explain how to improve this
> situation.
>
> Jerry  于2024年5月15日周三 15:07写道:
>
> > Hi all
> > My name is ZhengjunZhou, an user and developer of FlinkCDC. In my recent
> > projects, I realized that we could enhance the capabilities of
> > Flink-CDC-Pipeline by introducing a JDBC Sink plugin, enabling FlinkCDC
> to
> > directly output change data capture (CDC) to various JDBC-supported
> > database systems.
> >
> > Currently, while FlinkCDC offers support for a wide range of data
> sources,
> > there is no direct solution for sinks, especially for common relational
> > databases. I believe that adding a JDBC Sink plugin will significantly
> > boost its applicability in data integration scenarios.
> >
> > Specifically, this plugin would allow users to configure database
> > connections and stream data directly to SQL databases via the standard
> JDBC
> > interface. This could be used for data migration tasks as well as
> real-time
> > data synchronization.
> >
> > To further discuss this proposal and gather feedback from the community,
> I
> > have prepared a preliminary design draft and hope to discuss it in detail
> > in the upcoming community meeting. Please consider the potential value of
> > this feature and provide your insights and guidance.
> >
> > Thank you for your time and consideration. I look forward to your active
> > feedback and further discussion.
> >
> > [1] https://github.com/apache/flink-connector-jdbc
> >
>


[jira] [Created] (FLINK-35505) RegionFailoverITCase.testMultiRegionFailover has never ever restored state

2024-06-02 Thread Weijie Guo (Jira)
Weijie Guo created FLINK-35505:
--

 Summary: RegionFailoverITCase.testMultiRegionFailover has never 
ever restored state
 Key: FLINK-35505
 URL: https://issues.apache.org/jira/browse/FLINK-35505
 Project: Flink
  Issue Type: Bug
  Components: Build System / CI
Affects Versions: 1.20.0
Reporter: Weijie Guo






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: Native Kubernetes Task Managers

2024-06-02 Thread Xintong Song
I think the reason we didn't choose StatefulSet when introducing the Native
K8s Deployment is that, IIRC, we want Flink's ResourceManager to have full
control of the individual pod lifecycles.

E.g.,
- Pods in a StatefulSet are always terminated in the reverse order as they
are created. This prevents us from releasing a specific idle TM that is not
necessarily created lastly.
- If a pod is unexpectedly terminated, Flink's ResourceManager should
decide whether to restart it or not according to the job status.
(Technically, the same issue as above, that we may want pods to be
terminated / deleted in a different order.)

There might be some other reasons. I just cannot recall all the details.

As for determining whether a pod is OOM killed, I think Flink does print
diagnostics for terminated pods in JM logs, i.e. the `exitCode`, `reason`
and `message` of the `Terminated` container state. In our production, it
shows "(exitCode=137, reason=OOMKilled, message=null)". However, since the
diagnostics are from K8s, I'm not 100% sure whether this behavior is same
for all K8s versions,.

Best,

Xintong



On Sun, Jun 2, 2024 at 7:35 PM Alexis Sarda-Espinosa <
sarda.espin...@gmail.com> wrote:

> Hi devs,
>
> Some time ago I asked about the way Task Manager pods are handled by the
> native Kubernetes driver [1]. I have now looked a bit through the source
> code and I think it could be possible to deploy TMs with a stateful set,
> which could allow tracking OOM kills as I mentioned in my original email,
> and could also make it easier to track metrics and create alerts, since the
> labels wouldn't change as much.
>
> One challenge is probably the new elastic scaling features [2], since the
> driver would have to differentiate between new pod requests due to a TM
> terminating, and a request due to scaling. I'm also not sure where
> downscaling requests are currently handled.
>
> I would be interested in taking a look at this and seeing if I can get
> something working. I think it would be possible to make it configurable in
> a way that maintains backwards compatibility. Would it be ok if I enter a
> Jira ticket and try it out?
>
> Regards,
> Alexis.
>
> [1] https://lists.apache.org/thread/jysgdldv8swgf4fhqwqochgf6hq0qs52
> [2]
>
> https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/deployment/elastic_scaling/
>


Re: [DISCUSS] FLIP-XXX Add K8S conditions to Flink CRD

2024-06-02 Thread Thomas Weise
Thanks for the proposal. As I understand it the idea is to make the status
of a Flink deployment more accessible to standard k8s tooling, which would
be a nice improvement and further strengthen the k8s native experience!

Regarding the FLIP document's overall structure: Before diving into the
implementation details, can we please expand the intro with the
motivation/rationale for this change? A few examples of the audience that
would benefit from this change. Examples of tools that would pick up the
condition and how that would look like (link or screenshot if you have it).

Regarding multiple conditions: +1 for not commingling reconciliation status
and job status. It would make the resulting condition confusing. I believe
what the user would expect under "Ready" is the representation of the job
status. We can then add another separate condition as suggested, however
can the FLIP document also outline if/how conditions other than "Ready"
would appear in the generic k8s tooling?

Thanks,
Thomas



On Fri, May 31, 2024 at 10:37 AM David Radley 
wrote:

> Hi Mate and Gyula,
> Thank you very much for your clarifications; it is clearer for me now. I
> agree that a reconciliation condition would be useful – maybe reconciled
> instead of ready for the boolean, so it is very explicit.
>
> Your suggestion of a job related readiness condition related to it’s
> health would be useful; you suggest it be user configurable – this seems
> closer to a liveliness / readiness probe.
>
> Kind regards, David.
>
> From: Mate Czagany 
> Date: Thursday, 30 May 2024 at 10:39
> To: dev@flink.apache.org 
> Cc: morh...@apache.org 
> Subject: [EXTERNAL] Re: [DISCUSS] FLIP-XXX Add K8S conditions to Flink CRD
> Hi,
>
> I would definitely keep this as a FLIP. Not all FLIPs have to be big
> changes, and this format makes it easier for others to chime in and follow.
>
> I am not a Kubernetes expert, but my understanding is that we don't have to
> follow any strict convention for the type names in the conditions, e.g.
> "Ready" or "Error". And as Gyula said it doesn't add too much value in the
> currently proposed way, it might even be confusing for users who have not
> read this email thread or FLIP because "Ready" might suggest that the job
> is running and is healthy. So my suggestion is the same as Gyulas, to have
> more explicit type names instead of just "Ready" and "Error". However
> "ClusterReady" sounds weird in case of FlinkSessionJobs.
>
> Regarding appending to the conditions field: if I understand the FLIP
> correctly, we would allow multiple elements of the same type to exist in
> the conditions list if the message and reason fields are different. From
> the Kubernetes documentation it seems like the correct way would be to use
> the "type" field as the map key and merge the fields [1].
>
>
> [1]
>
> https://github.com/kubernetes/kubernetes/blob/bce55b94cdc3a4592749aa919c591fa7df7453eb/staging/src/k8s.io/apimachinery/pkg/apis/meta/v1/types.go#L1528
>
> Best regards,
> Mate
>
> Gyula Fóra  ezt írta (időpont: 2024. máj. 30., Cs,
> 10:53):
>
> > David,
> >
> > The problem is exactly that ResourceLifecycleStates do not correspond to
> > specific Job statuses (JobReady condition) in most cases. Let me give
> you a
> > concrete example:
> >
> > ResourceLifecycleState.STABLE means that app/job defined in the spec has
> > been successfully deployed and was observed running, and this spec is now
> > considered to be stable (won't be rolled back). Once a resource
> > (FlinkDeployment) reached STABLE state, it won't change unless the user
> > changes the spec. At the same time, this doesn't really say anything
> about
> > job health/readiness at any given future time. 10 minutes later the job
> can
> > go in an unrecoverable failure loop and never reach a running status, the
> > ResourceLifecycleState will remain STABLE.
> >
> > This is actually not a problem with the ResourceLifecycleState but more
> > with the understanding of it. It's called ResourceLifecycleState and not
> > JobState exactly because it refers to the upgrade/rollback/suspend etc
> > lifecycle of the FlinkDeployment/FlinkSessionJob resource and not the
> > underlying flink job itself.
> >
> > But this is a crucial detail here that we need to consider otherwise the
> > "Ready" condition that we may create will be practically useless.
> >
> > This is the reason why @morh...@apache.org  and
> > I suggest separating this to at least 2 independent conditions. One could
> > be the UpgradeCompleted/ReconciliationCompleted or something along these
> > lines computed based on LifecycleState (as described in your proposal but
> > with a different name). The other should be JobReady which could
> initially
> > work based on the JobStatus.state field but ideally would be user
> > configurable ready condition such as (job running at least 10 minutes,
> > running and have taken checkpoints etcetc).
> >
> > These 2 conditions should be enough to start with and would actually
> > 

Native Kubernetes Task Managers

2024-06-02 Thread Alexis Sarda-Espinosa
Hi devs,

Some time ago I asked about the way Task Manager pods are handled by the
native Kubernetes driver [1]. I have now looked a bit through the source
code and I think it could be possible to deploy TMs with a stateful set,
which could allow tracking OOM kills as I mentioned in my original email,
and could also make it easier to track metrics and create alerts, since the
labels wouldn't change as much.

One challenge is probably the new elastic scaling features [2], since the
driver would have to differentiate between new pod requests due to a TM
terminating, and a request due to scaling. I'm also not sure where
downscaling requests are currently handled.

I would be interested in taking a look at this and seeing if I can get
something working. I think it would be possible to make it configurable in
a way that maintains backwards compatibility. Would it be ok if I enter a
Jira ticket and try it out?

Regards,
Alexis.

[1] https://lists.apache.org/thread/jysgdldv8swgf4fhqwqochgf6hq0qs52
[2]
https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/deployment/elastic_scaling/