flink k8s operator - problem with patching seession cluster

2023-08-25 Thread Krzysztof Chmielewski
Hi community,
I have a use case where I would like to add an extra TM) to a running Flink
session cluster that has Flink jobs deployed. Session cluster creation, job
submission and cluster patching is done using flink k8s operator Java API.
The Details of this are presented here [1]

I would like to ask, what is a recommended path to add a TM to existing
Session Cluster that currently runs number of Flink jobs using Java API.
For simplicity lets assume that I dont want to resume jobs from a
savepoint, just redeploy them.

When executing steps from [1] I'm facing an issue where Session jobs are
not redeployed on patched Session cluster however kubectl shows that there
is FlinkSessionJob subbmited to the k8s.

Additionally when I'm trying to delete FlinkSessionJob from kubectl, Flink
k8s operator throws an exception described in [1]. In fact the state of
that Flink deployment requires few steps to clean it up after that patch.


[1]
https://github.com/kristoffSC/FlinkK8sControllerPoC/blob/ExceptionInOperator-clusterRestart/doc/K8sException_1.MD


Stoping a Job Without killing Task Manager

2023-08-25 Thread Kenan Kılıçtepe
Hi,

When I try to stop a job, if the job can not be stopped after a while, task
manager gets killed.
This is a big problem for me as there may be other jobs running on the same
task manager.
Is there a way to stop jobs that get stuck for some reason without causing
the task manager to terminate?

Thanks
Kenan


Re: How to use pipeline.jobvertex-parallelism-overrides property.

2023-08-25 Thread Krzysztof Chmielewski
Hi,
thank you for replaying.

Hang Ruan, regarding:
" If they have different parallelisms, we cannot chain them together." <-
that is clear for me.

What I'm still not sure about is if " JobVertexId#toHexString" that should
be used as key of  pipeline.jobvertex-parallelism-overrides map is vertex
of a graph before or after Flink chained the operators?
In my use case actually I would like to "scoop out" individual operator
from chained task.



pt., 25 sie 2023 o 10:07 Hang Ruan  napisał(a):

> Hi, Krzysztof.
>
> As liu ron said, the key of the map for this configuration is the value
> from JobVertexId#toHexString. Maybe we could improve the docs to provide
> more details.
> The condition that two operators have the same parallelism is a must for
> chaining them. If they have different parallelisms, we cannot chain them
> together.
>
> Best,
> Hang
>
> liu ron  于2023年8月25日周五 09:34写道:
>
>> Hi, Krzysztof
>>
>> As stated in the description section, this option is used to override the
>> parallelism of a JobVertex, where the key is JobVertex id, you can see [1]
>> for double check. A JobVertex may contain more than one operator, so we
>> cannot override the parallelism of a given operator alone. One possible
>> solution to your problem is to leave Map1 and Map2 unchained and put them
>> into two Vertexes so that they can override their parallelism separately.
>>
>> [1]
>> https://github.com/apache/flink/blob/b3fb1421fe86129a4e0b10bf3a46704b7132e775/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java#L1591
>>
>> Best,
>> Ron
>>
>> Krzysztof Chmielewski  于2023年8月24日周四
>> 20:08写道:
>>
>>> Hi,
>>> have someone used pipeline.jobvertex-parallelism-overrides [1] property?
>>>
>>> I wonder what actually should be a key here? Operator name?
>>>
>>> What if my operators are chained and I want to override only one of its
>>> elements. For example Source -> (Map1 chained with Map2) -> Sink. Can I
>>> override Map2 only, keeping Map1 as is? If not, what should be used as key
>>> for this chained Map1/Map2 operator then?
>>>
>>> Thanks.
>>>
>>> [1]
>>> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#pipeline-jobvertex-parallelism-overrides
>>>
>>