Rolling K8s without Flink disruptions

2023-11-29 Thread Salva Alcántara
Are there any special precautions that need to be taken for undergoing
regular K8s maintenance procedures such as migrating/upgrading clusters?

For the sake of concreteness, I'm running my jobs via the Flink K8s
Operator and I'm finding that when rolling out new nodes and migrating my
jobs to them in some cases they get stuck and/or don't restart properly, or
do so multiple times causing more downtime than expected.

As of now my migration/rollout process is as follows:

- Create new K8s nodes/instances
- Cordon old ones to be replaced (where my jobs are running)
- Take savepoints
- Drain old nodes
- Wait until all jobs show up as RUNNING and STABLE

Nothing special here I would say. However, I wonder if there are any best
practices for Flink specifically which help minimize the downtime/potential
failures during these maintenance windows. Things such as tweaking budget
disruption policies and/or pod affinities, or maybe considering a HA setup
with multiple jobmangers vs just one. To be clear, all my jobs are deployed
like this:

```
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
...
spec:
  ...
  mode: native
```

and for what it's worth, their HA setup is based on the native K8s mode (vs
Zookeeper) and a single jobmanager.


Re: How to call the open method of JdbcSink?

2023-11-29 Thread Sai Vishnu
Hi Feng,

Thank you for your response and the suggestion. I was able to cast the
SinkFunction to GenericJdbcSinkFunction which enabled me to override the
open and close methods.
On proceeding further, I have observed that if the batch encounters
BatchUpdateException due to a packet inside the batch, then the driver does
not proceed further to update other entries to the db.

Is there any way to programmatically recover from this state and let the
execution move on to the next packet in the batch or the next batch?

Some homework that I did which could be helpful -
I see that flink-connector-jdbc uses com.oracle.database.jdbc.ojdbc8
[source:
https://github.com/apache/flink-connector-jdbc/blob/main/flink-connector-jdbc/pom.xml#L69-L73]
and the Oracle documentation only says this: "After a command in a batch
update fails to execute properly and a BatchUpdateException is thrown, the
driver may or may not continue to process the remaining commands in the
batch".[source:
https://docs.oracle.com/javase/8/docs/api/java/sql/BatchUpdateException.html].
Is there a way to mandate this driver to continue the processing even after
encountering a failure?


I am looking for a way to recover programmatically from the
BatchUpdateException and let the driver continue updating to the dB. Any
packet failing should be added to a DLQ, or at least be logged with the
help of try-catch blocks. Any insights would be much appreciated.

One more observation: The BatchUpdateException is nested inside multiple
RuntimeExceptions and IOExceptions. Is there any consistency/pattern to
this?

Thanks,
Sai Vishnu Soudri


On Tue, 28 Nov 2023 at 20:29, Feng Jin  wrote:

> Hi Sai
>
> I think you can directly cast SinkFunction to GenericJdbcSinkFunction.
>
>
>
> https://github.com/apache/flink-connector-jdbc/blob/b477d452ba3aac38d53d1f5d4c4820bdad3ad9cd/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/JdbcSink.java#L63C41-L63C41
> ```
> public static  SinkFunction sink(
> String sql,
> JdbcStatementBuilder statementBuilder,
> JdbcExecutionOptions executionOptions,
> JdbcConnectionOptions connectionOptions) {
> return new GenericJdbcSinkFunction<>(
> new JdbcOutputFormat<>(
> new
> SimpleJdbcConnectionProvider(connectionOptions),
> executionOptions,
> () -> JdbcBatchStatementExecutor.simple(sql,
> statementBuilder)));
> }
> ```
>
>
> Best,
> Feng
>
>
> On Tue, Nov 28, 2023 at 5:49 PM Sai Vishnu 
> wrote:
>
>> Hi team,
>>
>>
>> I am using the JdbcSink from flink-connector-jdbc artifact, version
>> 3.1.0-1.17. I am trying to write a Sink wrapper that will internally call
>> the invoke method and open method of jdbc sink. While implementing, I see
>> that JdbcSink.*sink() *returns a SinkFunction which only exposes the
>> invoke method and not the open method.
>>
>>
>> Would appreciate any suggestions on how I can implement this. To add to
>> the requirement, the use case is to try and enclose the invoke operation in
>> a try catch block so that any exception during the db write process can be
>> caught and handled properly.
>>
>>
>> Thanks,
>>
>> Sai Vishnu Soudri
>>
>


[ANNOUNCE] Apache Flink 1.16.3 released

2023-11-29 Thread Rui Fan
The Apache Flink community is very happy to announce the release of
Apache Flink 1.16.3, which is the
third bugfix release for the Apache Flink 1.16 series.



Apache Flink® is an open-source stream processing framework for
distributed, high-performing, always-available, and accurate data
streaming applications.



The release is available for download at:

https://flink.apache.org/downloads.html



Please check out the release blog post for an overview of the
improvements for this bugfix release:

https://flink.apache.org/2023/11/29/apache-flink-1.16.3-release-announcement/



The full release notes are available in Jira:

https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12353259



We would like to thank all contributors of the Apache Flink community
who made this release possible!



Feel free to reach out to the release managers (or respond to this
thread) with feedback on the release process. Our goal is to
constantly improve the release process. Feedback on what could be
improved or things that didn't go so well are appreciated.



Regards,

Release Manager


[ANNOUNCE] Apache Flink 1.16.3 released

2023-11-29 Thread Rui Fan
The Apache Flink community is very happy to announce the release of
Apache Flink 1.16.3, which is the
third bugfix release for the Apache Flink 1.16 series.



Apache Flink® is an open-source stream processing framework for
distributed, high-performing, always-available, and accurate data
streaming applications.



The release is available for download at:

https://flink.apache.org/downloads.html



Please check out the release blog post for an overview of the
improvements for this bugfix release:

https://flink.apache.org/2023/11/29/apache-flink-1.16.3-release-announcement/



The full release notes are available in Jira:

https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12353259



We would like to thank all contributors of the Apache Flink community
who made this release possible!



Feel free to reach out to the release managers (or respond to this
thread) with feedback on the release process. Our goal is to
constantly improve the release process. Feedback on what could be
improved or things that didn't go so well are appreciated.



Regards,

Release Manager


Re: [EXTERNAL] TaskManagers Crushing

2023-11-29 Thread Ivan Webber via user
Were you ever able to find a workaround for this? I also have transient 
failures due to
org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException.

From: Kenan Kılıçtepe 
Sent: Saturday, August 19, 2023 5:50 PM
To: user@flink.apache.org 
Subject: [EXTERNAL] TaskManagers Crushing

You don't often get email from kkilict...@gmail.com. Learn why this is 
important
Hi,

I have 4 task manager working on 4 servers.
They all crush at the same time without any useful error logs.
Only log I can see is some disconnection from Kafka for both consumer and 
producers.
Any idea or any help is appreciated.

Some logs from all taskmanagers:

I think first server 4 is crushing and it causes crush for all taskmanagers.

JobManager:

2023-08-18 15:16:46,528 INFO  org.apache.kafka.clients.NetworkClient
   [] - [AdminClient clientId=47539-enumerator-admin-client] Node 2 
disconnected.
2023-08-18 15:19:00,303 INFO  org.apache.kafka.clients.NetworkClient
   [] - [AdminClient clientId=tf_25464-enumerator-admin-client] Node 4 
disconnected.
2023-08-18 15:19:16,668 INFO  org.apache.kafka.clients.NetworkClient
   [] - [AdminClient clientId=cpu_59942-enumerator-admin-client] Node 1 
disconnected.
2023-08-18 15:19:16,764 INFO  org.apache.kafka.clients.NetworkClient
   [] - [AdminClient clientId=cpu_55128-enumerator-admin-client] Node 3 
disconnected.
2023-08-18 15:19:27,913 WARN  akka.remote.transport.netty.NettyTransport
   [] - Remote connection to 
[/10.11.0.51:42778] failed with java.io.IOException: 
Connection reset by peer
2023-08-18 15:19:27,963 WARN  akka.remote.ReliableDeliverySupervisor
   [] - Association with remote system 
[akka.tcp://flink@tef-prod-flink-04:38835] has failed, address is now gated for 
[50] ms. Reason: [Disassociated]
2023-08-18 15:19:27,967 WARN  akka.remote.ReliableDeliverySupervisor
   [] - Association with remote system 
[akka.tcp://flink-metrics@tef-prod-flink-04:46491] has failed, address is now 
gated for [50] ms. Reason: [Disassociated]
2023-08-18 15:19:29,225 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - 
RouterReplacementAlgorithm -> kafkaSink_sinkFaultyRouter_windowMode: Writer -> 
kafkaSink_sinkFaultyRouter_windowMode: Committer (3/4) 
(f6fd65e3fc049bd9021093d8f532bbaf_a47f4a3b960228021159de8de51dbb1f_2_0) 
switched from RUNNING to FAILED on 
injection-assia-3-pro-cloud-tef-gcp-europe-west1:39011-b24b1d @ 
injection-assia-3-pro-cloud-tef-gcp-europe-west1 (dataPort=35223).
org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException: 
Connection unexpectedly closed by remote task manager 
'tef-prod-flink-04/10.11.0.51:37505 [ 
tef-prod-flink-04:38835-e3ca4d ] '. This might indicate that the remote task 
manager was lost.
at 
org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler.channelInactive(CreditBasedPartitionRequestClientHandler.java:134)
 ~[flink-dist-1.16.2.jar:1.16.2]
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262)
 ~[flink-dist-1.16.2.jar:1.16.2]
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248)
 ~[flink-dist-1.16.2.jar:1.16.2]
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241)
 ~[flink-dist-1.16.2.jar:1.16.2]
at 
org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:81)
 ~[flink-dist-1.16.2.jar:1.16.2]
at 
org.apache.flink.runtime.io.network.netty.NettyMessageClientDecoderDelegate.channelInactive(NettyMessageClientDecoderDelegate.java:94)
 ~[flink-dist-1.16.2.jar:1.16.2]
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262)
 ~[flink-dist-1.16.2.jar:1.16.2]
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248)
 ~[flink-dist-1.16.2.jar:1.16.2]
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241)
 ~[flink-dist-1.16.2.jar:1.16.2]
at 
org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.channelInactive(DefaultChannelPipeline.java:1405)
 ~[flink-dist-1.16.2.jar:1.16.2]
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262)
 

Re: Doubts about state and table API

2023-11-29 Thread Matthias Pohl via user
Hi Oscar,
could you provide the Java code to illustrate what you were doing?
The difference between version A and B might be especially helpful. I
assume you already looked into the FAQ about operator IDs [1]?

Adding the JM and TM logs might help as well to investigate the issue, as
Yu Chen mentioned.

Best,
Matthias

On Sun, Nov 26, 2023 at 2:18 PM Yu Chen  wrote:

> Hi Oscar,
>
> The Operator ID of the SQL job was generated by
> `StreamingJobGraphGenerator`, it was releated with the topology of the
> stream graph.
> If you would like to confirm that the problem was caused by the changes of
> opearator id or not, please remove --allowNonRestoredState, and you will
> get the exception of the failed restore operator id.
>
> However, the lost of the operator state would only produce some erroneous
> results and would not result in `not able to return any row`. It would be
> better to provide logs after restoring to locate a more specific problem.
>
> Best,
> Yu Chen
> --
> *发件人:* Oscar Perez via user 
> *发送时间:* 2023年11月25日 0:08
> *收件人:* Oscar Perez via user 
> *主题:* Doubts about state and table API
>
> Hi,
>
> We are having a job in production where we use table API to join multiple
> topics. The query looks like this:
>
>
> SELECT *
> FROM topic1 AS t1
> JOIN topic2 AS t2 ON t1.userId = t2.userId
> JOIN topic3 AS t3 ON t1.userId = t3.accountUserId
>
>
> This works and produces an EnrichedActivity any time any of the topics
> receives a new event, which is what we expect. This SQL query is linked to
> a processor function and the processElement gets triggered whenever a new
> EnrichedActivity occurs
>
> We have experienced an issue a couple of times in production where we have
> deployed a new version from savepoint and then suddenly we
> stopped receiving EnrichedActivities in the process function.
>
> Our assumption is that this is related to the table API state and that
> some operators are lost from going from one savepoint to new deployment.
>
> Let me illustrate with one example:
>
> version A of the job is deployed
> version B of the job is deployed
>
> version B UID for some table api operators changes and this operator is
> removed when deploying version B as it is unable to be mapped (we have the
> --allowNonRestoredState enabled)
>
> The state for the table api stores bot the committed offset and the
> contents of the topic but just the contents are lost and the committed
> offset is still in the offset
>
> Therefore, when doing the join of the query, it is not able to return any
> row as it is unable to get data from topic2 or topic 3.
>
> Can this be the case?
> We are having a hard time trying to understand how the table api and state
> works internally so any help in this regard would be truly helpful!
>
> Thanks,
> Oscar
>
>
>


Re: Java 17 as default

2023-11-29 Thread Matthias Pohl via user
The 1.18 Docker images were pushed on Oct 31. This also included Java 17
images [1].

[1] https://hub.docker.com/_/flink/tags?page=1=java17

On Wed, Nov 15, 2023 at 7:56 AM Tauseef Janvekar 
wrote:

> Dear Team,
>
> I saw the documentation for 1.18 and Java 17 is not supported and the
> image is created from Java11. I guess there is separate docker image for
> java_17.
> When do we plan to release main image with Java 17.
>
> Thanks,
> Tauseef
>


Having Problems as beginners using Flink in our Data Engineering internship

2023-11-29 Thread ALIZOUAOUI
Hey,

I hope y'all are doing well, i started a project (4 months internship in Data 
Engineering)
it's a totally new field for me and i have to present 2 months from now, i took 
with Java (i have no prior experience using it) and i started with Flink 
documentation, we have to follow the operations playground lab and our dev 
environment will be the same, hence, we'll use that lab and adapt it to our 
needs which is the programming of a real time event processing flink app that 
is gonna calculate the Net Promoter Score of an agency from a group of client 
satisfaction surveys (reviews, the format's gonna be messages) instead of 
Clicks as the operation playgrounds lab states, here's a link that describes 
that lab :
https://nightlies.apache.org/flink/flink-docs-master/docs/try-flink/flink-operations-playground/#starting-the-playground

Instead of Generating Clicks, we should generate survey scores and then 
calculate the Net Promoter Score, my question is, I've thought about declaring 
some parameters for those scores such as :
The Job consumes Scores from input topic, each with a score from 0-10, id of 
clients replies, number of replies, number of participants, number of 
detractors (if score is in the 0-6 range), number of passives (if score is in 
the 7-8 range), number of promoters (if the score is in the 9-10 range), 
percentage of promoters, percentage detractors, fullname clients, date of birth.

Do i need to add more parameters ? it's supposed to be a simulation, i still 
don't quite understand what "dummy data" should i generate to then calculate 
the NPS score that i'm gonna use one survey just to test if my coding is 
correct and then dive to the possibility of treating different responses to 'n' 
number of surveys and translating those survey scores (that could be using 
different NPS scale/ the idea that the agency wanna implement is using a 
dynamic type of scale (changing the idea of a 0-10 scale when it comes to NPS, 
and using different scale, for example 0-5, -5 to 4, 0-25, 0-100 to give the 
clients the ability to choose which scale they feel the most comfortable with 
to respond to, and here comes the logic that corresponds the most to that which 
is : would it be better to ask participants (agency clients) for a response in 
one survey then take off the scales from the other surveys or should they 
respond to different surveys using different scales ??

Problem is we don't have a lot of time left and we're stuck because we have no 
prior experience neither in Flink nor in Data Engineering and we were kinda 
forced to learn the minimum so that we can propose a solution and pass our 
internship with success, and nobody's helping us or governing our 
attempts/reviewing our work, we only have two codes that we should work on to 
apply our logic and they're : ClickEventGenerator and ClickEventCount and 
they're within the operations playground lab into coding new ones that we're 
gonna name : ScoreGenerator and ScoreCount : one of the thing that we 
understood is that we want a real time solutions so we're not gonna use 
windowing or watermark operators compared to the lab code and we feel like we 
didn't even understand how to start coding our solution besides starting an 
execution environment and other elements that are always present no matter what 
the logic is, we didn't find any ressources for us to look at and we're running 
out of time, we're ready to work overtime, the most important point is to 
succeed and learn in our internship, i just wanna ask y'all for help if someone 
is familiar with that because we still have the part of visualizing the NPS in 
a real time Dashboard with Grafana and try to use Promotheus later on, deploy 
the solution to Kubernetes, please help us with that, we really want to learn 
Flink and Data Stream Processing as a whole as it is a great skill to have, 
thank you for reading our request, wish you a all a great day,

Best Regards,

Zakaria





​


Re:回复: flink sql如何实现json字符数据解析?

2023-11-29 Thread casel.chen
社区Flink自带的那些json函数都没有解析一串json string返回一行或多行ROW的

















在 2023-11-23 15:24:33,"junjie.m...@goupwith.com"  写道:
>可以看下JSON函数
>https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/table/functions/systemfunctions/#json-functions
>
>
>
>Junjie.M
> 
>发件人: casel.chen
>发送时间: 2023-11-22 20:54
>收件人: user-zh@flink.apache.org
>主题: flink sql如何实现json字符数据解析?
>输入:
> 
>{
> 
>  "uuid":"",
> 
>  "body_data": 
> "[{\"fild1\":1"1231","fild2\":1"2341"},{"fild1\":"abc\","fild2\":"cdf\"}]"
> 
>}
> 
> 
> 
> 
>输出:
> 
>[
> 
>  {
> 
>"uuid": "",
> 
>"body_data: null,
> 
>"body_data.fild1": "123”,
> 
>"body_data.fild2": "234"
> 
>  },
> 
>  {
> 
>"uuid": "",
> 
>"body_data": null,
> 
>"body_data.fild1": "abc",
> 
>"body_data.fild2": "cdf"
> 
>  }
> 
>]
> 
> 
> 
> 
>当格式错误时
> 
> 
> 
> 
>输入:
> 
>{
> 
>"uuid": "”,
> 
>"body_data": "abc"
> 
>}
> 
>输出:
> 
>{
> 
>"uuid": "",
> 
>"body_data": "abc",
> 
>"body_data.fild1": null,
> 
>"body_data.fild2": null
> 
>}


Re:Re: flink sql如何实现json字符数据解析?

2023-11-29 Thread casel.chen



filed字段数量是固定的,但body_data数组包含的元素个数不固定,所以

Insert into SinkT (result) select Array[ROW(uuid, null,body_data[1]. field1 as 
body_data.fild1, body_data[1]. Field2 as body_data.fild2), ROW(uuid, 
null,body_data[2]. field, body_data[2]. field2)] as result




这种写死body_data[X]的sql语句应该不work








在 2023-11-23 15:10:00,"jinzhuguang"  写道:
>Flink 
>SQL比较适合处理结构化的数据,不知道你的body_data中的filed数量是否是固定的。如果是固定的,那可以将源和目标的格式写成Table形式。
>比如:
>
>SourceT: (
>   uuid String,
>   body_data ARRAY>
>)
>
>SinkT (
>   result ARRAY String, body_data.fild2  String>>
>)
>
>Insert into SinkT (result)  select Array[ROW(uuid, null,body_data[1]. field1 
>as body_data.fild1, body_data[1]. Field2 as body_data.fild2), ROW(uuid, 
>null,body_data[2]. field, body_data[2]. field2)] as result
>
>希望对你有帮助
>
>> 2023年11月22日 20:54,casel.chen  写道:
>> 
>> 输入:
>> 
>> {
>> 
>>  "uuid":"",
>> 
>>  "body_data": 
>> "[{\"fild1\":1"1231","fild2\":1"2341"},{"fild1\":"abc\","fild2\":"cdf\"}]"
>> 
>> }
>> 
>> 
>> 
>> 
>> 输出:
>> 
>> [
>> 
>>  {
>> 
>> "uuid": "",
>> 
>> "body_data: null,
>> 
>> "body_data.fild1": "123”,
>> 
>> "body_data.fild2": "234"
>> 
>>  },
>> 
>>  {
>> 
>> "uuid": "",
>> 
>> "body_data": null,
>> 
>> "body_data.fild1": "abc",
>> 
>> "body_data.fild2": "cdf"
>> 
>>  }
>> 
>> ]
>> 
>> 
>> 
>> 
>> 当格式错误时
>> 
>> 
>> 
>> 
>> 输入:
>> 
>> {
>> 
>> "uuid": "”,
>> 
>> "body_data": "abc"
>> 
>> }
>> 
>> 输出:
>> 
>> {
>> 
>> "uuid": "",
>> 
>> "body_data": "abc",
>> 
>> "body_data.fild1": null,
>> 
>> "body_data.fild2": null
>> 
>> }


Flink K8s operator ignores taskmanager.host setting

2023-11-29 Thread Salva Alcántara
Hi! I'm deploying a job via the Flink K8s Operator with these settings in
the FlinkDeployment resource:

```
  spec:
flinkConfiguration:
  taskmanager.host: 0.0.0.0 <-- ignored / not applied
```

When I look into the flink-conf.yaml in the TM the setting is not there. Is
there any reason why taskmanager.host gets ignored? Is that a bug or
something?


RE: Re: Re: inputQueueSize metric

2023-11-29 Thread Dimitris Banelas via user

Thank you for your previous response Feng. It clears things up a little.

Regarding the inputQueueSize metric. What does it express after all?

Observing the inputQueueSize and inPoolUsage metrics in Prometheus, I 
found that when the inPoolUsage is high, the inputQueueSize is low. This 
observation does not match the description of the inputQueueSize. 
Therefore, does this metric express the size of the input buffers of 
each task, or does it express how full the input buffers of a task are?



Thank you in advance.


On 2023/11/28 16:11:59 Feng Jin wrote:
> Hi Dimitris
>
> Yes, the inputQueueLength metric can only represent the number of 
buffers,

> and cannot obtain the actual size of the messages.
>
> Regarding the inputQueueSize metric, it ignores the LocalChannel.
> Therefore, when tasks are connected through the Forward partition 
strategy,

> it will consistently display a value of 0.
> You may consider altering the partition strategy to the rebalance 
strategy

> for testing purposes.st it.
>
>
> Best,
> Feng.
>
>
> On Tue, Nov 28, 2023 at 2:32 AM Dimitris Banelas via user <
> user@flink.apache.org> wrote:
>
> > As per the docs, the `inputQueueLength` metric refers to the number of
> > queued input buffers, and cannot be used on its own in order to
> > determine buffered records.
> >
> > For instance, if I know that there are 5 queued input buffers, I cannot
> > conclude anything regarding buffered records if the size of each queue
> > is not known.
> >
> >
> > On 2023/11/27 17:28:08 Feng Jin wrote:
> > > Hi Dimitris
> > >
> > > Maybe you can use the `inputQueueLength` metric.
> > >
> > >
> > > Best,
> > > Feng
> > >
> > > On Tue, Nov 28, 2023 at 12:07 AM Dimitris Mpanelas via user <
> > > user@flink.apache.org> wrote:
> > >
> > > > Hello,
> > > >
> > > > I am trying to determine the buffered records in the input 
buffers of

> > a
> > > > task. I found the inputQueueSize metric. According to the docs it
> > is "The
> > > > real size of queued input buffers in bytes". The docs also state
> > that "The
> > > > size for local input channels is always 0 since the local 
channel take

> > > > records directly from the output queue".
> > > >
> > > > What does this metric truly represent? In which cases should it be
> > 0 and
> > > > non 0? Is there a way to determine buffered records (or at least
> > something
> > > > similar) by using this metric?
> > > >
> > > > Dimitris.
> > > >
> > > >
> > >
> >
>