[DISCUSS][FLINK-26014]Document how to use the working directory for faster local recoveries

2022-07-14 Thread Haihang Jing
After read FLIP-198 & FLIP-201,I confuse that this feature can be used on
Yarn mode and how to confige a deterministic `taskmanager.resource-id` ?Or
just suit for k8s mode.


Re: standalone mode support in the kubernetes operator (FLIP-25)

2022-07-14 Thread yidan zhao
Hi all, Does 'standalone mode support in the kubernetes operator'
means: Using flink-k8s-operator to manage jobs deployed in a
standalone cluster?
What is the advantag doing so.

Yang Wang  于2022年7月14日周四 10:55写道:
>
> I think the standalone mode support is expected to be done in the version 
> 1.2.0[1], which will be released on Oct 1 (ETA).
>
> [1]. 
> https://cwiki.apache.org/confluence/display/FLINK/Release+Schedule+and+Planning
>
>
> Best,
> Yang
>
> Javier Vegas  于2022年7月14日周四 06:25写道:
>>
>> Hello! The operator docs 
>> https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/concepts/overview/
>>  say "The Operator does not support Standalone Kubernetes deployments yet" 
>> and mentions 
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-225%3A+Implement+standalone+mode+support+in+the+kubernetes+operator
>>  as a "what's next" step. Is there a timeline for that to be released?
>>
>> Thanks,
>>
>> Javier Vegas


Flink application mode, multiple jobs

2022-07-14 Thread Mason Chen
Hi all,

Is there any limitation on the number of jobs you can deploy together
within the same Flink application? We are noticing some exceptions related
to task slots at job startup. It typically recovers after 10-20 minutes.

What are some of the recommended configurations that we can tune to
alleviate these issues? I've copied some of the exception messages below.
The most frequent exception is TaskSubmissionException and we have more
than enough task slots for the job, so the last exception seems to be a red
herring.

org.apache.flink.runtime.taskexecutor.exceptions.TaskSubmissionException: No
task slot allocated for job ID 233b239f6f7e410fa84dcb3ed9bb958f and
allocation ID 0d4cd6b91fac49aa9b8d401c58b34f46.

org.apache.flink.util.FlinkException: TaskExecutor akka.tcp://flink@172.18.
112.121:40703/user/rpc/taskmanager_0 has no more allocated slots for job 233
b239f6f7e410fa84dcb3ed9bb958f.



Best,
Mason


Re: [DISCUSS] Contribution of Multi Cluster Kafka Source

2022-07-14 Thread Mason Chen
Hi all,

Circling back on this--I have created a first draft document in confluence:
https://cwiki.apache.org/confluence/display/FLINK/FLIP-246%3A+Multi+Cluster+Kafka+Source
.

Looking forward to hear all your feedback in this email thread!

Best,
Mason

On Thu, Jun 30, 2022 at 6:57 AM Thomas Weise  wrote:

> Hi Mason,
>
> I added mason6345 to the Flink confluence space, you should be able to
> add a FLIP now.
>
> Looking forward to the contribution!
>
> Thomas
>
> On Thu, Jun 30, 2022 at 9:25 AM Martijn Visser 
> wrote:
> >
> > Hi Mason,
> >
> > I'm sure there's a PMC (*hint*) out there who can grant you access to
> > create a FLIP. Looking forward to it, this sounds like an improvement
> that
> > users are looking forward to.
> >
> > Best regards,
> >
> > Martijn
> >
> > Op di 28 jun. 2022 om 09:21 schreef Mason Chen :
> >
> > > Hi all,
> > >
> > > Thanks for the feedback! I'm adding the users, who responded in the
> user
> > > mailing list, to this thread.
> > >
> > > @Qingsheng - Yes, I would prefer to reuse the existing Kafka connector
> > > module. It makes a lot of sense since the dependencies are the same
> and the
> > > implementation can also extend and improve some of the test utilities
> you
> > > have been working on for the FLIP 27 Kafka Source. I will enumerate the
> > > migration steps in the FLIP template.
> > >
> > > @Ryan - I don't have a public branch available yet, but I would
> appreciate
> > > your review on the FLIP design! When the FLIP design is approved by
> devs
> > > and the community, I can start to commit our implementation to a fork.
> > >
> > > @Andrew - Yup, one of the requirements of the connector is to read
> > > multiple clusters within a single source, so it should be able to work
> well
> > > with your use case.
> > >
> > > @Devs - what do I need to get started on the FLIP design? I see the
> FLIP
> > > template and I have an account (mason6345), but I don't have access to
> > > create a page.
> > >
> > > Best,
> > > Mason
> > >
> > >
> > >
> > >
> > > On Sun, Jun 26, 2022 at 8:08 PM Qingsheng Ren 
> wrote:
> > >
> > >> Hi Mason,
> > >>
> > >> It sounds like an exciting enhancement to the Kafka source and will
> > >> benefit a lot of users I believe.
> > >>
> > >> Would you prefer to reuse the existing flink-connector-kafka module or
> > >> create a new one for the new multi-cluster feature? Personally I
> prefer the
> > >> former one because users won’t need to introduce another dependency
> module
> > >> to their projects in order to use the feature.
> > >>
> > >> Thanks for the effort on this and looking forward to your FLIP!
> > >>
> > >> Best,
> > >> Qingsheng
> > >>
> > >> > On Jun 24, 2022, at 09:43, Mason Chen 
> wrote:
> > >> >
> > >> > Hi community,
> > >> >
> > >> > We have been working on a Multi Cluster Kafka Source and are
> looking to
> > >> > contribute it upstream. I've given a talk about the features and
> design
> > >> at
> > >> > a Flink meetup: https://youtu.be/H1SYOuLcUTI.
> > >> >
> > >> > The main features that it provides is:
> > >> > 1. Reading multiple Kafka clusters within a single source.
> > >> > 2. Adjusting the clusters and topics the source consumes from
> > >> dynamically,
> > >> > without Flink job restart.
> > >> >
> > >> > Some of the challenging use cases that these features solve are:
> > >> > 1. Transparent Kafka cluster migration without Flink job restart.
> > >> > 2. Transparent Kafka topic migration without Flink job restart.
> > >> > 3. Direct integration with Hybrid Source.
> > >> >
> > >> > In addition, this is designed with wrapping and managing the
> existing
> > >> > KafkaSource components to enable these features, so it can continue
> to
> > >> > benefit from KafkaSource improvements and bug fixes. It can be
> > >> considered
> > >> > as a form of a composite source.
> > >> >
> > >> > I think the contribution of this source could benefit a lot of
> users who
> > >> > have asked in the mailing list about Flink handling Kafka
> migrations and
> > >> > removing topics in the past. I would love to hear and address your
> > >> thoughts
> > >> > and feedback, and if possible drive a FLIP!
> > >> >
> > >> > Best,
> > >> > Mason
> > >>
> > >>
>


Re: Making Kafka source respect offset changed externally

2022-07-14 Thread Alexis Sarda-Espinosa
Hi Yaroslav,

The test I did was just using earliest, I'll test with committed offset
again, thanks.

Regards,
Alexis.

On Thu, 14 Jul 2022, 20:49 Yaroslav Tkachenko,  wrote:

> Hi Alexis,
>
> Do you use OffsetsInitializer.committedOffsets() to specify your Kafka
> consumer offsets? In this case, it should get the offsets from Kafka and
> not the state.
>
> On Thu, Jul 14, 2022 at 11:18 AM Alexis Sarda-Espinosa <
> sarda.espin...@gmail.com> wrote:
>
>> Hello,
>>
>> Regarding the new Kafka source (configure with a consumer group), I found
>> out that if I manually change the group's offset with Kafka's admin API
>> independently of Flink (while the job is running), the Flink source will
>> ignore that and reset it to whatever it stored internally. Is there any way
>> to prevent this?
>>
>> Regards,
>> Alexis.
>>
>>


Re: Issues with watermark alignment in Flink 1.15

2022-07-14 Thread Jun Qin
Found the reason: it does not work together with 
.withIdleness(Duration.ofSeconds(1))

Isn't this a valid scenario: one subtask has multiple input streams/channels 
where some are idle, others have large watermark skews?

In addition, do we expect that the watermark update interval in:
.withWatermarkAlignment("wm-group", maxDrift, updateInterval)
to be at the milli second level? If so, the following log msg should be at the 
DEBUG level, I think
2022-07-10 06:53:35,713 INFO  
org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Distributing 
maxAllowedWatermark=-9223372036854765808 to subTaskIds=[]

to avoid streaming logs filling up the disk space.

Thanks
Jun


> On Jul 10, 2022, at 9:10 AM, Jun Qin  wrote:
> 
> Hi All
> 
> I am trying watermark alignment in Flink 1.15 with:
> 
> watermarkStrategy = WatermarkStrategy.<~>forBoundedOutOfOrderness(
> Duration.ofMillis(outOfOrderness))
> .withWatermarkAlignment("wm-group", Duration.ofSeconds(10), 
> Duration.ofSeconds(1))
> .withTimestampAssigner(
> (element, timestamp) -> element.getTimestamp())
> .withIdleness(Duration.ofSeconds(1));
> 
> And got the following in DEBUG logs:
> 2022-07-10 06:53:35,713 INFO  
> org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - 
> Distributing maxAllowedWatermark=-9223372036854765808 to subTaskIds=[]
> 2022-07-10 06:53:36,606 DEBUG 
> org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - New 
> reported watermark=Watermark @ 1657436016036 (2022-07-10 06:53:36.036) from 
> subTaskId=2
> 2022-07-10 06:53:36,619 DEBUG 
> org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - New 
> reported watermark=Watermark @ 1657435956048 (2022-07-10 06:52:36.048) from 
> subTaskId=1
> 2022-07-10 06:53:36,639 DEBUG 
> org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - New 
> reported watermark=Watermark @ 1657436016034 (2022-07-10 06:53:36.034) from 
> subTaskId=3
> 2022-07-10 06:53:36,702 DEBUG 
> org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - New 
> reported watermark=Watermark @ 1657436016053 (2022-07-10 06:53:36.053) from 
> subTaskId=0
> 2022-07-10 06:53:36,713 INFO  
> org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - 
> Distributing maxAllowedWatermark=1657435966048 to subTaskIds=[0, 1, 2, 3]
> 2022-07-10 06:53:37,229 DEBUG 
> shaded.io.kubernetes.client.extended.leaderelection.LeaderElector [] - Update 
> lock acquire time to keep lease
> 2022-07-10 06:53:37,237 DEBUG 
> shaded.io.kubernetes.client.extended.leaderelection.LeaderElector [] - 
> TryAcquireOrRenew return success
> 2022-07-10 06:53:37,237 DEBUG 
> shaded.io.kubernetes.client.extended.leaderelection.LeaderElector [] - 
> Successfully renewed lease
> 2022-07-10 06:53:37,603 DEBUG 
> org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - New 
> reported watermark=Watermark @ 9223372036854775807 (292278994-08-17 
> 07:12:55.807) from subTaskId=2
> 2022-07-10 06:53:37,605 DEBUG 
> org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - New 
> reported watermark=Watermark @ 9223372036854775807 (292278994-08-17 
> 07:12:55.807) from subTaskId=3
> 2022-07-10 06:53:37,616 DEBUG 
> org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - New 
> reported watermark=Watermark @ 9223372036854775807 (292278994-08-17 
> 07:12:55.807) from subTaskId=1
> 2022-07-10 06:53:37,630 DEBUG 
> org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - New 
> reported watermark=Watermark @ 9223372036854775807 (292278994-08-17 
> 07:12:55.807) from subTaskId=0
> 2022-07-10 06:53:37,713 INFO  
> org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - 
> Distributing maxAllowedWatermark=-9223372036854765809 to subTaskIds=[0, 1, 2, 
> 3]
> 2022-07-10 06:53:38,603 DEBUG 
> org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - New 
> reported watermark=Watermark @ 9223372036854775807 (292278994-08-17 
> 07:12:55.807) from subTaskId=2
> 2022-07-10 06:53:38,604 DEBUG 
> org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - New 
> reported watermark=Watermark @ 9223372036854775807 (292278994-08-17 
> 07:12:55.807) from subTaskId=3
> 2022-07-10 06:53:38,616 DEBUG 
> org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - New 
> reported watermark=Watermark @ 9223372036854775807 (292278994-08-17 
> 07:12:55.807) from subTaskId=1
> 2022-07-10 06:53:38,630 DEBUG 
> org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - New 
> reported watermark=Watermark @ 9223372036854775807 (292278994-08-17 
> 07:12:55.807) from subTaskId=0
> 2022-07-10 06:53:38,713 INFO  
> org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - 
> Distributing maxAllowedWatermark=-9223372036854765809 to subTaskIds=[0, 1, 2, 
> 3]
> 
> 
> Then it stays with maxAllowedWatermark=-9223372036854765809 all the time. The 
> watermark looks to be correct at beginning, then 

Re: Making Kafka source respect offset changed externally

2022-07-14 Thread Yaroslav Tkachenko
Hi Alexis,

Do you use OffsetsInitializer.committedOffsets() to specify your Kafka
consumer offsets? In this case, it should get the offsets from Kafka and
not the state.

On Thu, Jul 14, 2022 at 11:18 AM Alexis Sarda-Espinosa <
sarda.espin...@gmail.com> wrote:

> Hello,
>
> Regarding the new Kafka source (configure with a consumer group), I found
> out that if I manually change the group's offset with Kafka's admin API
> independently of Flink (while the job is running), the Flink source will
> ignore that and reset it to whatever it stored internally. Is there any way
> to prevent this?
>
> Regards,
> Alexis.
>
>


Making Kafka source respect offset changed externally

2022-07-14 Thread Alexis Sarda-Espinosa
Hello,

Regarding the new Kafka source (configure with a consumer group), I found
out that if I manually change the group's offset with Kafka's admin API
independently of Flink (while the job is running), the Flink source will
ignore that and reset it to whatever it stored internally. Is there any way
to prevent this?

Regards,
Alexis.


Re: Flink running same task on different Task Manager

2022-07-14 Thread Great Info
-> If so, I think you can set Task1 and Task2 to the same parallelism and
set them in the same slot sharing group. In this way, Task1 and Task2 will
be deployed into the same slot(That is, the same task manager).

*Updating task details *
*Task1- Source some static data over HTTPS and keep it in memory(in static
memory block), this keeps refreshing it every 1 hour, since this is huge,
it can not be broadcasted *

*  Task2- Process some real-time events from Kafka and uses
static data to validate something and transform, then forward to other
Kafka topic*

Task2 needs more parallelism so deploying both Task1 and Task2 on the same
node (task manager) is becoming difficult, I am using AWS KDA and that has
the limitation to run only 8 tasks per node. now I have a requirement to
run parallelism  of 12 for the Task2

1. set different SlotSharingGroup for task1 and Task2
2. set  parallelism to 12 for the task2 (this real-time task needs to read
from 12 different Kafka partitions hence setting it to 12)
3. set parallelism  of task1 to 2
4. then set this cluster.evenly-spread-out-slots: true

Will these methods work? Also, I did not find a way to set
different parallelism for each slotSharingGourp



On Thu, Jul 14, 2022 at 7:54 AM Lijie Wang  wrote:

> Hi Great,
>
> -> Is there a way to set the restart strategy so that only tasks in the
> same slot group will restart during failure?
>
> No. On task failover, all tasks in the same region will be restarted at
> the same time (to ensure the data consistency).
> You can get more details about failover strategy in [1]
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/ops/state/task_failure_recovery/#failover-strategies
>
> Best,
> Lijie
>
>
> Great Info  于2022年7月13日周三 23:11写道:
>
>> thanks for helping with some inputs
>> actually, I have created task1 and task2 in separate slot groups,
>> thought it would be good if they run in independent slots. Also now facing
>> some issues during restarts. whenever  task1 has any exception entire job
>> is restarting.
>>
>> Is there a way to set the restart strategy so that only tasks in the same
>> slot group will restart during failure
>> ?
>>
>> On Wed, Jun 15, 2022 at 6:13 PM Lijie Wang 
>> wrote:
>>
>>> Hi Great,
>>>
>>> Do you mean there is a Task1 and a Task2 on each task manager?
>>>
>>> If so, I think you can set Task1 and Task2 to the same parallelism and
>>> set them in the same slot sharing group. In this way, the Task1 and Task2
>>> will be deployed into the same slot(That is, the same task manager).
>>>
>>> You can get more details about slot sharing group in [1], and you can
>>> get how to set slot sharing group in [2].
>>>
>>> [1]
>>> https://nightlies.apache.org/flink/flink-docs-master/docs/concepts/flink-architecture/#task-slots-and-resources
>>> [2]
>>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/operators/overview/#set-slot-sharing-group
>>>
>>> Best,
>>> Lijie
>>>
>>> Weihua Hu  于2022年6月15日周三 13:16写道:
>>>
 I don't really understand how task2 reads static data from task1,
 but I think you can integrate the logic of getting static data from
 http in
 task1 into task2 and keep only one kind of task.

 Best,
 Weihua


 On Wed, Jun 15, 2022 at 10:07 AM Great Info  wrote:

 > thanks for helping with some inputs, yes I am using rich function and
 > handling objects created in open, and also and network calls are
 getting
 > called in a run.
 > but currently, I got stuck running this same task on *all task
 managers*
 > (nodes), when I submit the job, this task1(static data task) runs
 only one
 > task manager, I have 3 task managers in my Flink cluster.
 >
 >
 > On Tue, Jun 14, 2022 at 7:20 PM Weihua Hu 
 wrote:
 >
 >> Hi,
 >>
 >> IMO, Broadcast is a better way to do this, which can reduce the QPS
 of
 >> external access.
 >> If you do not want to use Broadcast, Try using RichFunction, start a
 >> thread in the open() method to refresh the data regularly. but be
 careful
 >> to clean up your data and threads in the close() method, otherwise
 it will
 >> lead to leaks.
 >>
 >> Best,
 >> Weihua
 >>
 >>
 >> On Tue, Jun 14, 2022 at 12:04 AM Great Info 
 wrote:
 >>
 >>> Hi,
 >>> I have one flink job which has two tasks
 >>> Task1- Source some static data over https and keep it in memory,
 this
 >>> keeps refreshing it every 1 hour
 >>> Task2- Process some real-time events from Kafka and uses static
 data to
 >>> validate something and transform, then forward to other Kafka topic.
 >>>
 >>> so far, everything was running on the same Task manager(same node),
 but
 >>> due to some recent scaling requirements need to enable partitioning
 on
 >>> Task2 and that will make some partitions run on other task
 managers. but
 >>> other tas

Re: Unit test have Error "could not find implicit value for evidence parameter"

2022-07-14 Thread Min Tu via user
Thanks for the tip!

On Wed, Jul 13, 2022 at 10:33 AM Jing Ge  wrote:

> Hi,
>
> you don't have to do that. Next time you can try "Invalidate Caches..."
> under the File menu in Intellij Idea.
>
> Best regards,
> Jing
>
> On Wed, Jul 13, 2022 at 7:21 PM Min Tu via user 
> wrote:
>
>> Thanks a lot !! I have removed the .idea folder and the unit test works.
>>
>> On Mon, Jul 11, 2022 at 2:44 PM Alexander Fedulov <
>> alexan...@ververica.com> wrote:
>>
>>> Hi Min Tu,
>>>
>>> try clean install to make sure the build starts from scratch. Refresh
>>> maven modules in IntelliJ after the build. If that doesn't work, try
>>> invalidating IntelliJ caches and/or reimporting the project (remove .idea
>>> folder).
>>>
>>> Best,
>>> Alexander Fedulov
>>>
>>> On Sun, Jul 10, 2022 at 12:59 AM Hemanga Borah 
>>> wrote:
>>>
 What version of scala are you using?

 Depending on the build you are using you may be using 2.11 or 2.12. The
 version on your maven build files needs to be consistent across your 
 system.

 On Fri, Jul 8, 2022 at 10:00 PM Min Tu via user 
 wrote:

> Hi,
>
> I have downloaded the flink code and the build works fine
> with following command
>
> mvnw install -DskipTests -Dcheckstyle.skip
>
> Then I try to run the unit test code in IntelliJ, but got following
> error:
>
>
> /Users/mintu/ApacheProjects/flink/flink-scala/src/test/scala/org/apache/flink/api/scala/DeltaIterationSanityCheckTest.scala:34:41
> *could not find implicit value for evidence parameter of type*
> org.apache.flink.api.common.typeinfo.TypeInformation[(Int, String)]
> val solutionInput = env.fromElements((1, "1"))
>
> Please advise.
>
> Thanks in advance
>



Re: Kubernetes Operator - Logging

2022-07-14 Thread Daniel Fischer
Biao and Matyas,

Thanks for the responses. Biao, I tried what you suggested, but maybe I'm
not clear on this step

Use -Dlogback.configurationFile= to specify the logback.xml(mounted
or packaged in the image) in spec.flinkConfiguration

I added my logback-console.xml to /opt/flink/logback-console.xml in my
image, and specified

spec:
flinkConfiguration:
logback.configurationFile: /opt/flink/logback-console.xml

However I still get the following error on startup

ERROR in ch.qos.logback.classic.joran.JoranConfigurator@3abfe836 - Could
not open URL [file:/opt/flink/conf/logback-console.xml].
java.io.FileNotFoundException: /opt/flink/conf/logback-console.xml (No such
file or directory)

I tried adding logback-console.xml to /opt/flink/conf, but it seems that
the operator wipes that directory except for flink-conf.yaml and
log4j-console.properties, which is why I just added it to /opt/flink
instead.

Thanks! Dan

On Wed, Jul 13, 2022 at 7:36 AM Őrhidi Mátyás 
wrote:

> Hi Daniel,
>
> Some additional info on the current approach we have in the operator:
>
> Manipulating plain property files was relatively easy from the Helm chart,
> hence we went down this route first. We could try to document at least an
> example for logback then see if we can improve the setup further.
>
> Best,
> Matyas
>
> On Wed, Jul 13, 2022 at 1:21 PM Geng Biao  wrote:
>
>> Hi Daniel,
>>
>>
>>
>> I believe it is possible to use logback for jobs submitted by flink
>> kubernetes operator but I have not found an out-of-box solution either.
>> I tried following steps:
>>
>>- Create custom flink image following
>>
>> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/advanced/logging/#configuring-logback
>>to add necessary jars/xmls and use the custom image
>>- Use -Dlogback.configurationFile= to specify the
>>logback.xml(mounted or packaged in the image) in spec.flinkConfiguration
>>- Submit the job
>>
>>
>>
>> Best,
>>
>> Biao Geng
>>
>>
>>
>> *From: *Daniel Fischer 
>> *Date: *Wednesday, July 13, 2022 at 6:44 PM
>> *To: *user@flink.apache.org 
>> *Subject: *Kubernetes Operator - Logging
>>
>> Hi,
>>
>>
>>
>> I have a job running in Kubernetes that I'm looking to migrate to the new
>> Kubernetes Operator
>> https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/operations/metrics-logging/.
>> My question is about logging. My job currently uses logback, but reading
>> through the documentation I'm not seeing a way to use anything other than
>> log4j. My questions are:
>>
>>
>>
>> 1. Is it possible to use a logging framework other than log4j with the
>> kubernetes operator?
>>
>> 2. Is it possible to configure logging through the FlinkDeployment? The
>> documentation seems to read that logging must be configured when installing
>> the operator.
>> https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/operations/metrics-logging/#logging
>>
>>
>>
>> Thanks!
>>
>