Kakfa consumer times out as Kakfa-server responds to Fetch requests too late

2023-09-29 Thread Neha Rawat
Hello,
I am using kafka server 3.4.0 along with flink. Kafka server and Flink are 
installed on a 48 core , 252GB box. My use case is as follows -

8 Kafka producers writing events at 200K per second  to  kafka topic "Event" 
with 20 partitions, source for flink  --> Flink processing rules that read from 
Event topic and write to Alert topic --> kafka topic "Alert" with 20 
partitions, sink for flink.

It was all good until we started seeing that flink kafka consumer for Event 
topic getting timed out frequently as kafka responds quite late to the fetch 
requests. I am not able to figure out the reason why kafka takes a lot of time 
to process this FETCH request randomly. Is there a configuration that I must 
look at or nay other log that I must check to figure out whats going on? When 
everything is fine, kakfa takes only a few milliseconds to process the fetch 
requests.
Timeout of kafka consumer at the flink side is 30 secs and the consumer thread 
blocks until it gets a response or timesout.


Here are some kafka trace logs -

server.log
[2023-09-13 17:37:05,463] TRACE [Kafka Request Handler 1 on Broker 1], Kafka 
request handler 1 on broker 1 handling request Request(processor=0, 
connectionId=127.0.0.1:9092-127.0.0.1:39268-2028, 
session=Session(User:ANONYMOUS,/127.0.0.1), 
listenerName=ListenerName(PLAINTEXT), securityProtocol=PLAINTEXT, buffer=null, 
envelope=None) (kafka.server.KafkaRequestHandler)
[2023-09-13 17:37:05,463] TRACE Sending FETCH response to client Event-8 of 
1048654 bytes. (kafka.network.RequestChannel)
[2023-09-13 17:37:05,463] TRACE Socket server received response to send to 
127.0.0.1:9092-127.0.0.1:39268-2028, registering for write and sending data: 
Response(type=Send, request=Request(processor=0, 
connectionId=127.0.0.1:9092-127.0.0.1:39268-2028, 
session=Session(User:ANONYMOUS,/127.0.0.1), 
listenerName=ListenerName(PLAINTEXT), securityProtocol=PLAINTEXT, buffer=null, 
envelope=None), send=MultiRecordsSend(size=1048654, totalWritten=0), 
asString=Some({"throttleTimeMs":0,"errorCode":0,"sessionId":1670662638,"responses":[{"topicId":"MtsZtV-bRoCAwPiHqRIzvA","partitions":[{"partitionIndex":14,"errorCode":0,"highWatermark":37934921072,"lastStableOffset":37934921072,"logStartOffset":31032998864,"abortedTransactions":null,"preferredReadReplica":-1,"recordsSizeInBytes":1048576}]}]}))
 (kafka.network.Processor)

request-logs
[2023-09-13 17:37:05,463] TRACE Processor 0 received request: 
RequestHeader(apiKey=FETCH, apiVersion=13, clientId=Event-8, 
correlationId=39532, headerVersion=2) -- FetchRequestData(clusterId=null, 
replicaId=-1, maxWaitMs=500, minBytes=1, maxBytes=52428800, isolationLevel=0, 
sessionId=1670662638, sessionEpoch=10, topics=[FetchTopic(topic='', 
topicId=MtsZtV-bRoCAwPiHqRIzvA, partitions=[FetchPartition(partition=14, 
currentLeaderEpoch=66, fetchOffset=37656832058, lastFetchedEpoch=-1, 
logStartOffset=-1, partitionMaxBytes=1048576)])], forgottenTopicsData=[], 
rackId='') (kafka.network.RequestChannel$)
[2023-09-13 17:37:05,463] TRACE [KafkaApi-1] Handling 
request:RequestHeader(apiKey=FETCH, apiVersion=13, clientId=Event-8, 
correlationId=39532, headerVersion=2) -- FetchRequestData(clusterId=null, 
replicaId=-1, maxWaitMs=500, minBytes=1, maxBytes=52428800, isolationLevel=0, 
sessionId=1670662638, sessionEpoch=10, topics=[FetchTopic(topic='', 
topicId=MtsZtV-bRoCAwPiHqRIzvA, partitions=[FetchPartition(partition=14, 
currentLeaderEpoch=66, fetchOffset=37656832058, lastFetchedEpoch=-1, 
logStartOffset=-1, partitionMaxBytes=1048576)])], forgottenTopicsData=[], 
rackId='') from connection 
127.0.0.1:9092-127.0.0.1:39268-2028;securityProtocol:PLAINTEXT,principal:User:ANONYMOUS
 (kafka.server.KafkaApis)
[2023-09-13 17:37:05,463] TRACE [KafkaApi-1] Sending Fetch response with 
partitions.size=1, metadata=1670662638 (kafka.server.KafkaApis)
[2023-09-13 17:39:30,223] DEBUG Completed 

[VOTE] 3.6.0 RC2

2023-09-29 Thread Satish Duggana
Hello Kafka users, developers and client-developers,

This is the third candidate for the release of Apache Kafka 3.6.0.
Some of the major features include:

* KIP-405 : Kafka Tiered Storage
* KIP-868 : KRaft Metadata Transactions
* KIP-875: First-class offsets support in Kafka Connect
* KIP-898: Modernize Connect plugin discovery
* KIP-938: Add more metrics for measuring KRaft performance
* KIP-902: Upgrade Zookeeper to 3.8.1
* KIP-917: Additional custom metadata for remote log segment

Release notes for the 3.6.0 release:
https://home.apache.org/~satishd/kafka-3.6.0-rc2/RELEASE_NOTES.html

*** Please download, test and vote by Tuesday, October 3, 12pm PT

Kafka's KEYS file containing PGP keys we use to sign the release:
https://kafka.apache.org/KEYS

* Release artifacts to be voted upon (source and binary):
https://home.apache.org/~satishd/kafka-3.6.0-rc2/

* Maven artifacts to be voted upon:
https://repository.apache.org/content/groups/staging/org/apache/kafka/

* Javadoc:
https://home.apache.org/~satishd/kafka-3.6.0-rc2/javadoc/

* Tag to be voted upon (off 3.6 branch) is the 3.6.0-rc2 tag:
https://github.com/apache/kafka/releases/tag/3.6.0-rc2

* Documentation:
https://kafka.apache.org/36/documentation.html

* Protocol:
https://kafka.apache.org/36/protocol.html

* Successful Jenkins builds for the 3.6 branch:
There are a few runs of unit/integration tests. You can see the latest
at https://ci-builds.apache.org/job/Kafka/job/kafka/job/3.6/. We will
continue running a few more iterations.
System tests:
We will send an update once we have the results.

Thanks,
Satish.


Re: Kafka Streams reaching ERROR state during rolling upgrade / restart of brokers

2023-09-29 Thread Matthias J. Sax

In general, Kafka Streams should keep running.

Can you inspect the logs to figure out why it's going into ERROR state 
to begin with? Maybe you need to increase/change some timeouts/retries 
configs.


The stack trace you shared, is a symptom, but not the root cause.

-Matthias

On 9/21/23 12:56 AM, Debraj Manna wrote:

I am using Kafka broker 2.8.1 (from AWS MSK) with Kafka clients and Kafka
stream 3.5.1.

I am observing that whenever some rolling upgrade is done on AWS MSK our
stream application reaches an error state. I get the below exception on
trying to query the state store

caused by: java.lang.IllegalStateException: KafkaStreams is not running.
State is ERROR.
 at
org.apache.kafka.streams.KafkaStreams.validateIsRunningOrRebalancing(KafkaStreams.java:381)
 at
org.apache.kafka.streams.KafkaStreams.queryMetadataForKey(KafkaStreams.java:1663)
 at
org.springframework.cloud.stream.binder.kafka.streams.InteractiveQueryService.lambda$getHostInfo$2(InteractiveQueryService.java:227)

Can someone let me know what the recommended way we can keep the stream
application running whenever some rolling upgrade/restart of brokers is
done in the background?



Re: Can a message avoid loss occur in Kafka

2023-09-29 Thread Matthias J. Sax
For the config you provide, data loss should not happen (as long as you 
don't allow for unclean leader election, which is disabled by default).


But you might be subject to unavailability for some partitions if a 
broker fails.



-Matthias

On 9/17/23 7:49 AM, 陈近南 wrote:

Hello,
Can a message avoid loss occur in Kafka. For example, my config is:


Producer
retries = Integer.MAX_VALUE
request.required.acks=-1


Broker
replication.factor >= 2
min.insync.replicas > 1
log.flush.interval.messages=1


Consumer
enable.auto.commit = false

  Can it avoid loss message occur in Kafka, if can not,  why? and does exist other MQ can do avoid?



Best regards,
Chen



Re: Kafka protocol ApiVersions request/response

2023-09-29 Thread Neeraj Vaidya
 I think I have found the answer to my question about how many ApiKey elements 
are in the response.
The array type for ApiKey is probably a COMPACT_ARRAY but the BNF grammar does 
not say so in the Documentation.

Is this a documentation defect/gap ?

ApiVersions Response (Version: 3) => error_code [api_keys] throttle_time_ms 
TAG_BUFFER 
 error_code => INT16
 api_keys => api_key min_version max_version TAG_BUFFER 
 api_key => INT16
 min_version => INT16
 max_version => INT16
 throttle_time_ms => INT32

Regards,
Neeraj On Friday, 29 September, 2023 at 01:04:07 pm GMT+10, Neeraj Vaidya 
 wrote:  
 
 Hi,

I am trying to create a library in Go, to use the Kafka protocol binary format. 
As a first step, I am trying the ApiVersions request.
The request is processed successfully by the Kafka broker and it responds with 
an ApiVersions response as well. And it conforms to the following spec as per 
the protocol :

ApiVersions Response (Version: 0) => error_code [api_keys] 
  error_code => INT16
  api_keys => api_key min_version max_version 
    api_key => INT16
    min_version => INT16
    max_version => INT16

When I look at the response, how do I know how many api_keys list items I need 
to process ?

I did manage to arrive at a way of doing this as below, but not sure if that is 
the way to do it :
If I look at the Wireshark capture, I can see that just before the first 
api_keys item in the response, there is a single byte/octet which contains a 
value of let's say 0x61. This evaluates to 97. 
Whereas I get 96 api_keys items in the response.
This is the case everytime I send an ApiVersions request to different versions 
of Kafka. 
Meaning that the value in this byte is always 1 more than the number of items 
returned.

Is it a fair understanding that this byte will always indicate a number one 
more than the number of ApiKey items returned in the response ?

Regards,
Neeraj
  

Re: Apache Kafka Installation and Env Set up

2023-09-29 Thread sunil chaudhari
Hi Venkat,
are you planning to use Open source Apache Kafka Or Confluent?
what is your use case apart from streaming?

regards,
Sunil.

On Fri, 29 Sep 2023 at 12:27 PM, ANANTHA VENKATA GANJAM
 wrote:

> TCS Confidential
>
> Hi Team,
>
> We are planning to set up Lab environment for Kafka in TCS. Please guide
> us on the next steps.
>
>
> Thanks & Regards,
> Vijaya Sri
>
>
>
> TCS Confidential
> =-=-=
> Notice: The information contained in this e-mail
> message and/or attachments to it may contain
> confidential or privileged information. If you are
> not the intended recipient, any dissemination, use,
> review, distribution, printing or copying of the
> information contained in this e-mail message
> and/or attachments to it are strictly prohibited. If
> you have received this communication in error,
> please notify us by reply e-mail or telephone and
> immediately and permanently delete the message
> and any attachments. Thank you
>
>
>


Apache Kafka Installation and Env Set up

2023-09-29 Thread ANANTHA VENKATA GANJAM
TCS Confidential

Hi Team,

We are planning to set up Lab environment for Kafka in TCS. Please guide us on 
the next steps.


Thanks & Regards,
Vijaya Sri



TCS Confidential
=-=-=
Notice: The information contained in this e-mail
message and/or attachments to it may contain 
confidential or privileged information. If you are 
not the intended recipient, any dissemination, use, 
review, distribution, printing or copying of the 
information contained in this e-mail message 
and/or attachments to it are strictly prohibited. If 
you have received this communication in error, 
please notify us by reply e-mail or telephone and 
immediately and permanently delete the message 
and any attachments. Thank you