Re: Multiple consumers for a Single Partition in Kafka

2023-10-03 Thread Neeraj Vaidya
 Hi Sree Sanjeev,
Other than the info from Colt, why can't you just increase partitions and scale 
the number of consumers accordingly ?
Or build your application as a KStreams application and take advantage of 
horizontal and vertical scalability ?

Regards,
Neeraj


 On Wednesday, 4 October, 2023 at 01:19:37 am GMT+11, Colt McNealy 
 wrote:  
 
 Hello,

Currently, you cannot have more than one consumer consuming from a
partition in Kafka. However, there is active discussion about a proposal to
enable queueing semantics for Kafka, which is a superset of the feature you
are talking about. Feel free to check KIP-932:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-932%3A+Queues+for+Kafka

Hope this helps,
Colt McNealy

*Founder, LittleHorse.dev*


On Tue, Oct 3, 2023 at 9:11 AM Sree Sanjeev Kandasamy Gokulrajan <
sreesanjee...@gmail.com> wrote:

> I'm interested in knowing if we can achieve parallelism by allowing
> multiple consumers to subscribe to a single partition of a topic.
>
> To explore potential solutions, I'm considering the following approach.
>
> 1. Implementing a locking mechanism to control access to the offsets in a
> partition to avoid concurrent processing issues.
>
> My primary concern is maintaining message ordering within the partition, as
> Kafka guarantees ordering within a partition. and message ordering is not
> the primary concern for many applications, so can we have a flag to enable
> multiple consumers to a single partition ?
>
>  I appreciate any insights, advice, or experiences that the Kafka community
> can share on this topic. Thank you!
>
> Regards,
> Sree Sanjeev.
>
  

Re: Kafka Protocol : Compact Array or Array ?

2023-10-02 Thread Neeraj Vaidya
Thanks Luke. That helps.
Where can I find information about schemas of all requests ?

Regards,
Neeraj

Sent from my iPhone

> On 2 Oct 2023, at 6:03 pm, Luke Chen  wrote:
> 
> Hi Neeraj,
> 
> Yes, for MetadataRequest, version 0 ~ 8, the topic is ARRAY type. After
> version 9, it'll be COMPACT_ARRAY.
> It's because of this definition: "flexibleVersions": "9+".
> You can check KIP-482 for more information:
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-482
> 
> Thanks.
> Luke
> 
>> On Mon, Oct 2, 2023 at 5:09 AM Neeraj Vaidya
>>  wrote:
>> 
>> Hi All,
>> I've raised this on StackOverflow as well :
>> https://stackoverflow.com/questions/77208870/kafka-binary-protocol-array-or-compact-array
>> In case someone can help in answering that question.
>> 
>> Regards,
>> Neeraj
>> On Sunday, 1 October, 2023 at 11:32:49 am GMT+11, Neeraj Vaidya <
>> neeraj.vai...@yahoo.co.in> wrote:
>> 
>> Hi All,
>> There are 2 types of arrays specified in the Kafka protocol documentation
>> : ARRAY and COMPACT_ARRAY.
>> But in the protocol details for the different messages, it does not
>> explicitly specify if the array type is which one of the above.
>> 
>> For example, the BNF grammar for the section for MetadataRequest API is as
>> below :
>> 
>> Metadata Request (Version: 0) => [topics]
>>  topics => name
>>name => STRING
>> 
>> What is the type of [topics] ? Is it ARRAY or COMPACT_ARRAY ?
>> 
>> After playing around with the protocol using some tests, I think for
>> Version:0 of this API request, the broker expects this to be of type ARRAY.
>> 
>> But for higher versions, say v9, COMPACT_ARRAY is expected.
>> 
>> I think the protocol really needs to be explicit and is lacking in this
>> respect.
>> 
>> Regards,
>> Neeraj



Re: Kafka Protocol : Compact Array or Array ?

2023-10-01 Thread Neeraj Vaidya
 Hi All,
I've raised this on StackOverflow as well : 
https://stackoverflow.com/questions/77208870/kafka-binary-protocol-array-or-compact-array
In case someone can help in answering that question.

Regards,
Neeraj
 On Sunday, 1 October, 2023 at 11:32:49 am GMT+11, Neeraj Vaidya 
 wrote:  
 
 Hi All,
There are 2 types of arrays specified in the Kafka protocol documentation : 
ARRAY and COMPACT_ARRAY.
But in the protocol details for the different messages, it does not explicitly 
specify if the array type is which one of the above.

For example, the BNF grammar for the section for MetadataRequest API is as 
below :

Metadata Request (Version: 0) => [topics] 
  topics => name 
    name => STRING

What is the type of [topics] ? Is it ARRAY or COMPACT_ARRAY ?

After playing around with the protocol using some tests, I think for Version:0 
of this API request, the broker expects this to be of type ARRAY.

But for higher versions, say v9, COMPACT_ARRAY is expected.

I think the protocol really needs to be explicit and is lacking in this respect.

Regards,
Neeraj  

Kafka Protocol : Compact Array or Array ?

2023-09-30 Thread Neeraj Vaidya
Hi All,
There are 2 types of arrays specified in the Kafka protocol documentation : 
ARRAY and COMPACT_ARRAY.
But in the protocol details for the different messages, it does not explicitly 
specify if the array type is which one of the above.

For example, the BNF grammar for the section for MetadataRequest API is as 
below :

Metadata Request (Version: 0) => [topics] 
  topics => name 
name => STRING

What is the type of [topics] ? Is it ARRAY or COMPACT_ARRAY ?

After playing around with the protocol using some tests, I think for Version:0 
of this API request, the broker expects this to be of type ARRAY.

But for higher versions, say v9, COMPACT_ARRAY is expected.

I think the protocol really needs to be explicit and is lacking in this respect.

Regards,
Neeraj


Re: Kafka protocol ApiVersions request/response

2023-09-29 Thread Neeraj Vaidya
 I think I have found the answer to my question about how many ApiKey elements 
are in the response.
The array type for ApiKey is probably a COMPACT_ARRAY but the BNF grammar does 
not say so in the Documentation.

Is this a documentation defect/gap ?

ApiVersions Response (Version: 3) => error_code [api_keys] throttle_time_ms 
TAG_BUFFER 
 error_code => INT16
 api_keys => api_key min_version max_version TAG_BUFFER 
 api_key => INT16
 min_version => INT16
 max_version => INT16
 throttle_time_ms => INT32

Regards,
Neeraj On Friday, 29 September, 2023 at 01:04:07 pm GMT+10, Neeraj Vaidya 
 wrote:  
 
 Hi,

I am trying to create a library in Go, to use the Kafka protocol binary format. 
As a first step, I am trying the ApiVersions request.
The request is processed successfully by the Kafka broker and it responds with 
an ApiVersions response as well. And it conforms to the following spec as per 
the protocol :

ApiVersions Response (Version: 0) => error_code [api_keys] 
  error_code => INT16
  api_keys => api_key min_version max_version 
    api_key => INT16
    min_version => INT16
    max_version => INT16

When I look at the response, how do I know how many api_keys list items I need 
to process ?

I did manage to arrive at a way of doing this as below, but not sure if that is 
the way to do it :
If I look at the Wireshark capture, I can see that just before the first 
api_keys item in the response, there is a single byte/octet which contains a 
value of let's say 0x61. This evaluates to 97. 
Whereas I get 96 api_keys items in the response.
This is the case everytime I send an ApiVersions request to different versions 
of Kafka. 
Meaning that the value in this byte is always 1 more than the number of items 
returned.

Is it a fair understanding that this byte will always indicate a number one 
more than the number of ApiKey items returned in the response ?

Regards,
Neeraj
  

Kafka protocol ApiVersions request/response

2023-09-28 Thread Neeraj Vaidya
Hi,

I am trying to create a library in Go, to use the Kafka protocol binary format. 
As a first step, I am trying the ApiVersions request.
The request is processed successfully by the Kafka broker and it responds with 
an ApiVersions response as well. And it conforms to the following spec as per 
the protocol :

ApiVersions Response (Version: 0) => error_code [api_keys] 
  error_code => INT16
  api_keys => api_key min_version max_version 
api_key => INT16
min_version => INT16
max_version => INT16

When I look at the response, how do I know how many api_keys list items I need 
to process ?

I did manage to arrive at a way of doing this as below, but not sure if that is 
the way to do it :
If I look at the Wireshark capture, I can see that just before the first 
api_keys item in the response, there is a single byte/octet which contains a 
value of let's say 0x61. This evaluates to 97. 
Whereas I get 96 api_keys items in the response.
This is the case everytime I send an ApiVersions request to different versions 
of Kafka. 
Meaning that the value in this byte is always 1 more than the number of items 
returned.

Is it a fair understanding that this byte will always indicate a number one 
more than the number of ApiKey items returned in the response ?

Regards,
Neeraj


Re: Apache Kafka consumer consumes messages with "partition" option, but not with "group" option

2023-06-13 Thread Neeraj Vaidya
 Hi,
If you have a large number of partitions in your topic, it can take a really 
long while before you start seeing messages on the console.

So, using the partition id is the right approach. But just need to be patient 
at the command-line. Out of interest, how long did you wait for the output from 
console consumer ?

If you need to know the partition id, you will need to use a custom program to 
compute it based on the key. (You could have a look at the murmur2 source code 
on the Kafka github repository and try to create a simple command line tool to 
compute the partition id using the key).

However, using --group option will only set the consumer group id of your 
instance of the kafka-console-consumer.sh.

Regards,
Neeraj
 On Tuesday, 13 June, 2023 at 05:26:24 pm GMT+10, Geithner, Wolfgang Dr. 
 wrote:  
 
 This is a copy of a topic I posted in stackoverflow 
(https://stackoverflow.com/questions/76458064/apache-kafka-consumer-consumes-messages-with-partition-option-but-not-with-g),
 where I didn't get any answer yet. Searching the web did not yield any helpful 
reults either. Hence, I am addressing to this mailing list:

I am running a plain Apache Kafka server (version 3.4.1) which I would like to 
connect to a Telegraf consumer. The Telegraf ```[[inputs.kafka_consumer]]``` 
plugin has the option to consume by Kafka "group". When staring Telegraf, I get 
an error message

    [inputs.kafka_consumer] Error in plugin: consume: kafka server: Request was 
for a consumer group that is not coordinated by this broker

Hence, I started to investigate my setup by using the Kafka console tools and 
found that when executing

    ./kafka-console-consumer.sh --bootstrap-server myserver:9092 --topic test 
--partition 0

and sending messages via ```kafka-console-consumer.sh```, these messages pop up 
in the console "consumer" window as expected.

In contrast to this, when I run

    ./kafka-console-consumer.sh --bootstrap-server myserver:9092 --topic test 
--group my-group

nothing happens in the "consumer" window. Furthermore, the command

    ./kafka-consumer-groups.sh --bootstrap-server myserver:9092 --list

yields nothing.

What do I have to do to cause the consumer with the "group" option to "see" the 
messages produced to the topic "test"? Ultimately, how can I solve the Telegraf 
error?
  

Kafka Metrics for Messages Sent Per Second to Topic

2023-05-23 Thread Neeraj Vaidya
Hi All,
Which metric should be used when trying to get the number of messages which are 
being produced to a topic ?
I tried this in Prometheus 
:sum(irate(kafka_server_brokertopicmetrics_messagesin_total{topic="my-topic"}[1m]))


This works when I use a non-transactional producer.
However, when I use a transactional producer, I see that for every message that 
I produce, there is an additional message called AddPartitionsToTxn (as seen in 
a packet capture).
This causes the metric to show double the rate at which I am actually expecting.
So, any suggestions which metric on the broker side to use to calculate the 
rate of messages that are being published to the topic ?
Regards,Neeraj




Kafka Broker (with no leader partitions) : JVM heap usage fluctuates wildly in less than a minute.

2023-01-23 Thread Neeraj Vaidya
Hi All,I've posted this question on SO about JVM heap usage wildly fluctuating 
for a Broker which has no leader partitions.If anyone has a clue, please assist.
Kafka Broker JVM heap usage fluctuates wildly

| 
| 
| 
|  |  |

 |

 |
| 
|  | 
Kafka Broker JVM heap usage fluctuates wildly

I have 8 kafka brokers in a production environment. We have set the following 
flag to false for some reason.aut...
 |

 |

 |



Regards,Neeraj




Re: Required Business Proposal For BMW Group Asia

2022-06-13 Thread Neeraj Vaidya
This is an open source community.
Please research managed/commercial/enterprise Kafka solutions on the internet 
for such inputs.


> On 13 Jun 2022, at 2:51 pm, jyoti.chaudh...@partner.bmwgroup.com wrote:
> 
> Hi Kafka Team,
> 
> I am from BMW Group Asia, based in Singapore for APAC region. The special 
> fascination of the BMW Group not only lies in its products and technology, 
> but also in the company's history, written by inventors, pioneers and 
> brilliant designers. Today, the BMW Group, with its 31 production and 
> assembly facilities in 15 countries as well as a global sales network, is the 
> world's leading manufacturer of premium automobiles and motorcycles, and 
> provider of premium financial and mobility services.
> We are Architecture & IT team looking after our application migration on 
> public and private cloud. Kindly refer the link for more details: 
> https://contenthub-eurasia.bmwgroup.net/web/bmw-group-asia/home
> 
> We are looking for Kafka and your other competitors proposals in terms of 
> benefit and advantages of using API Gateway  to implement in our business 
> applications migrations and other cloud based applications too.
> Looking forward to see someone who can propose your Gateway to our management 
> team so that we can opt for it in our incoming and even ongoing migration 
> projects.
> 
> 
> 
> Thanks with Regards,
> BMW Group
> Jyoti Chaudhary
> Group IT - APAC
> Cloud Engineer
> FG-AP
> 
> 1 HarbourFront Avenue
> #15-02/07, Keppel Bay Tower
> Singapore 098632
> 
> HP: (65) - 90585239
> Mail: 
> jyoti.chaudh...@partner.bmwgroup.com
> Web: http://www.bmwgroup.com



Re: KStreams State Store - state.dir does not have .checkpoint file

2022-06-01 Thread Neeraj Vaidya
 Thanks John !
It seems if I send a TERM signal to my KStreams application which is running 
inside a Docker container, then it results in a Clean shutdown.
This also then creates a checkpoint file successfully.
So, I guess I need to figure out how to send a TERM signal to my running Java 
KStreams application inside the Docker container.
The KStreams application is actually launched by an entrypoint.sh script in the 
Docker container. If I send a signal to this container using "docker kill", 
this signal does not get passed to the java application which is spawned by the 
entrypoint.sh script.

Regards,
Neeraj
 On Wednesday, 1 June, 2022, 04:38:16 pm GMT+10, John Roesler 
 wrote:  
 
 Hi Neeraj,

Thanks for all that detail! Your expectation is correct. You should see the 
checkpoint files after a _clean_ shutdown, and then you should not see it 
bootstrap from the beginning of the changelog on the next startup.

How are you shutting down the application? You'll want to call 
KafkaStreams#stop and wait for it to complete before stopping the java process.

I hope this helps,
-John

On Tue, May 31, 2022, at 23:09, Neeraj Vaidya wrote:
> Hi All,
> I have a KStreams application running inside a Docker container which 
> uses a persistent key-value store. 
>
> I have configured state.dir with a value of /tmp/kafka-streams (which 
> is the default).
>
> When I start this container using "docker run", I mount 
> /tmp/kafka-streams to a directory on my host machine which is, say for 
> example, /mnt/storage/kafka-streams.
>
> My application.id is "myapp". I have 288 partitions in my input topic 
> which means my state store / changelog topic will also have that many 
> partitions. Accordingly, when start my Docker container, I see that 
> there a folder with the number of the partition such as 0_1, 
> 0_20_288 under /mnt/storage/kafka-streams/myapp/
>
> When I shutdown my application, I do not see any checkpoint file in any 
> of the partition directories.
>
> And when I restart my application, it starts fetching the records from 
> the changelog topic rather than reading from local disk. I suspect this 
> is because there is no .checkpoint file in any of the partition 
> directories. 
>
> This is what I see in the startup log. It seems to be bootstrapping the 
> entire state store from the changelog topic i.e. performing network I/O 
> rather than reading from what is on disk :
>
> "
> 2022-05-31T12:08:02.791 
> [mtx-caf-f6900c0a-50ca-43a0-8a4b-95eaad9e5093-StreamThread-122] WARN  
> o.a.k.s.p.i.ProcessorStateManager - MSG=stream-thread 
> [myapp-f6900c0a-50ca-43a0-8a4b-95eaa
> d9e5093-StreamThread-122] task [0_170] State store MyAppRecordStore did 
> not find checkpoint offsets while stores are not empty, since under EOS 
> it has the risk of getting uncommitte
> d data in stores we have to treat it as a task corruption error and 
> wipe out the local state of task 0_170 before re-bootstrapping
> 2022-05-31T12:08:02.791 
> [myapp-f6900c0a-50ca-43a0-8a4b-95eaad9e5093-StreamThread-122] WARN  
> o.a.k.s.p.internals.StreamThread - MSG=stream-thread 
> [mtx-caf-f6900c0a-50ca-43a0-8a4b-95eaad
> 9e5093-StreamThread-122] Detected the states of tasks [0_170] are 
> corrupted. Will close the task as dirty and re-create and bootstrap 
> from scratch.
> org.apache.kafka.streams.errors.TaskCorruptedException: Tasks [0_170] 
> are corrupted and hence needs to be re-initialized
>        at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.initializeStoreOffsetsFromCheckpoint(ProcessorStateManager.java:254)
>        at 
> org.apache.kafka.streams.processor.internals.StateManagerUtil.registerStateStores(StateManagerUtil.java:109)
>        at 
> org.apache.kafka.streams.processor.internals.StreamTask.initializeIfNeeded(StreamTask.java:216)
>        at 
> org.apache.kafka.streams.processor.internals.TaskManager.tryToCompleteRestoration(TaskManager.java:433)
>        at 
> org.apache.kafka.streams.processor.internals.StreamThread.initializeAndRestorePhase(StreamThread.java:849)
>        at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:731)
>        at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:583)
>        at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:556)
> "
>
> 1) Should I expect to see a checkpoint file in each of the partition 
> directories under /mnt/storage/kafka-streams/myapp/ when I shutdown my 
> application ?
>
> 2) Is this an issue because I am running my KStreams app inside a 
> docker container ? If there were permissions issues, then I would have 
> expected to see issues in creating the other files such as .lock or 
> rocksdb folder (and it's contents).
>
> My runtime environment is Docker 1.13.1 on RHEL 7.
  

KStreams State Store - state.dir does not have .checkpoint file

2022-05-31 Thread Neeraj Vaidya
Hi All,
I have a KStreams application running inside a Docker container which uses a 
persistent key-value store. 

I have configured state.dir with a value of /tmp/kafka-streams (which is the 
default).

When I start this container using "docker run", I mount /tmp/kafka-streams to a 
directory on my host machine which is, say for example, 
/mnt/storage/kafka-streams.

My application.id is "myapp". I have 288 partitions in my input topic which 
means my state store / changelog topic will also have that many partitions. 
Accordingly, when start my Docker container, I see that there a folder with the 
number of the partition such as 0_1, 0_20_288 under 
/mnt/storage/kafka-streams/myapp/

When I shutdown my application, I do not see any checkpoint file in any of the 
partition directories.

And when I restart my application, it starts fetching the records from the 
changelog topic rather than reading from local disk. I suspect this is because 
there is no .checkpoint file in any of the partition directories. 

This is what I see in the startup log. It seems to be bootstrapping the entire 
state store from the changelog topic i.e. performing network I/O rather than 
reading from what is on disk :

"
2022-05-31T12:08:02.791 
[mtx-caf-f6900c0a-50ca-43a0-8a4b-95eaad9e5093-StreamThread-122] WARN  
o.a.k.s.p.i.ProcessorStateManager - MSG=stream-thread 
[myapp-f6900c0a-50ca-43a0-8a4b-95eaa
d9e5093-StreamThread-122] task [0_170] State store MyAppRecordStore did not 
find checkpoint offsets while stores are not empty, since under EOS it has the 
risk of getting uncommitte
d data in stores we have to treat it as a task corruption error and wipe out 
the local state of task 0_170 before re-bootstrapping
2022-05-31T12:08:02.791 
[myapp-f6900c0a-50ca-43a0-8a4b-95eaad9e5093-StreamThread-122] WARN  
o.a.k.s.p.internals.StreamThread - MSG=stream-thread 
[mtx-caf-f6900c0a-50ca-43a0-8a4b-95eaad
9e5093-StreamThread-122] Detected the states of tasks [0_170] are corrupted. 
Will close the task as dirty and re-create and bootstrap from scratch.
org.apache.kafka.streams.errors.TaskCorruptedException: Tasks [0_170] are 
corrupted and hence needs to be re-initialized
at 
org.apache.kafka.streams.processor.internals.ProcessorStateManager.initializeStoreOffsetsFromCheckpoint(ProcessorStateManager.java:254)
at 
org.apache.kafka.streams.processor.internals.StateManagerUtil.registerStateStores(StateManagerUtil.java:109)
at 
org.apache.kafka.streams.processor.internals.StreamTask.initializeIfNeeded(StreamTask.java:216)
at 
org.apache.kafka.streams.processor.internals.TaskManager.tryToCompleteRestoration(TaskManager.java:433)
at 
org.apache.kafka.streams.processor.internals.StreamThread.initializeAndRestorePhase(StreamThread.java:849)
at 
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:731)
at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:583)
at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:556)
"

1) Should I expect to see a checkpoint file in each of the partition 
directories under /mnt/storage/kafka-streams/myapp/ when I shutdown my 
application ?

2) Is this an issue because I am running my KStreams app inside a docker 
container ? If there were permissions issues, then I would have expected to see 
issues in creating the other files such as .lock or rocksdb folder (and it's 
contents).

My runtime environment is Docker 1.13.1 on RHEL 7.


org.apache.kafka.common.errors.InvalidProducerEpochException: Producer attempted to produce with an old epoch

2022-04-23 Thread Neeraj Vaidya
Hi All,
My setup is shown in the attached JPEG file.
In my setup, I have a stretch cluster spread across 2 data-centres 
(Geographically distant). The network latency as measured by ping 
round-trip-time is about 50ms.
There are 4 brokers in each DC.
In each of these data-centres, I have a Kafka Producer application and a 
KStream application.
My test involves the following :
Make producer in DC1 produce records at the rate of say about 3000 messages per 
second.
Shutdown all brokers in DC2, so as to simulate a site-outage, for about 30 
minutes.

When I re-start the brokers in DC2, I encounter the following errors in the 
logs of my KStream application. (Note : The Kafka producer application does not 
seem to suffer from any such errors).
Obviously, I can see that the replica lag in the brokers of DC2 has increased, 
but is gradually reducing due to the brokers in DC2 now trying to fetch records 
from DC1 brokers.
However, the KStream application shuts down and cannot be started up till the 
replica lag for all the partitions of the topic from which the KStream 
application consumes, is completed.
The errors I see are as follows in the KStream application and soon after, it 
shuts down.

org.apache.kafka.common.errors.InvalidProducerEpochException: Producer 
attempted to produce with an old epoch.
Written offsets would not be recorded and no more records would be sent since 
the producer is fenced, indicating the task may be migrated out; it means all 
tasks belonging to this thread should be migrated.
        at 
org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:215)
        at 
org.apache.kafka.streams.processor.internals.RecordCollectorImpl.lambda$send$0(RecordCollectorImpl.java:196)
        at 
org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback.onCompletion(KafkaProducer.java:1365)
        at 
org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:231)
        at 
org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:197)
        at 
org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:707)
        at 
org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:693)
        at 
org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:640)
        at 
org.apache.kafka.clients.producer.internals.Sender.lambda$null$1(Sender.java:574)
        at java.base/java.util.ArrayList.forEach(ArrayList.java:1541)
        at 
org.apache.kafka.clients.producer.internals.Sender.lambda$handleProduceResponse$2(Sender.java:561)
        at java.base/java.lang.Iterable.forEach(Iterable.java:75)
        at 
org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse(Sender.java:561)
        at 
org.apache.kafka.clients.producer.internals.Sender.lambda$sendProduceRequest$3(Sender.java:785)
        at 
org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109)
        at 
org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:584)
        at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:576)
        at 
org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:327)
        at 
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:242)
        at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.kafka.common.errors.InvalidProducerEpochException: 
Producer attempted to produce with an old epoch.
2022-04-21T16:27:08.469 
[mtx-caf-0ec709e3-23a7-4083-b636-7c66d4fbb992-StreamThread-54] INFO  
c.m.a.k.t.SessionBasedDataUsageAccumulator - MSG=Shutting Down
2022-04-21T16:27:08.470 
[mtx-caf-0ec709e3-23a7-4083-b636-7c66d4fbb992-StreamThread-54] INFO  
o.a.k.s.p.internals.StreamTask - MSG=stream-thread 
[mtx-caf-0ec709e3-23a7-4083-b636-7c66d4fbb992-StreamThread-54] task [0_143] 
Suspended running
2022-04-21T16:27:08.475 
[mtx-caf-0ec709e3-23a7-4083-b636-7c66d4fbb992-StreamThread-54] INFO  
o.a.k.clients.consumer.KafkaConsumer - MSG=[Consumer 
clientId=mtx-caf-0ec709e3-23a7-4083-b636-7c66d4fbb992-StreamThread-54-restore-consumer,
 groupId=null] Unsubscribed all topics or patterns and assigned partitions

Re: Kafka Producer Record Error Rate

2022-04-04 Thread Neeraj Vaidya
 Thanks Liam.
Yes, I do believe the following should really help:
A producer metric which shows records which did not make their way to the topic 
because of retries being exhausted or timeout being exhausted .

If the metric is at a batch level, then we will need to work out the math to 
calculate exactly how many records were dropped.

Regards,
Neeraj
 On Tuesday, 5 April, 2022, 07:47:21 am GMT+10, Liam Clarke-Hutchinson 
 wrote:  
 
 Hi Neeraj,

However, I am unclear what the record-error-rate|total metric for a
> producer means,
> Does the metric get incremented only when the record could not make it to
> the topic or even when there was a transient/retriable error trying to send
> the message to the topic ?


The latter - so in your example, the error rate and retry rate metrics
would both show an increase, but the records were eventually successfully
sent. Would a metric for "batches that exhausted retries and so were
dropped" be of any use to you? If so, I can propose adding one, and see
what people think.

Cheers,

Liam Clarke-Hutchinson

On Mon, 4 Apr 2022 at 19:29, Neeraj Vaidya
 wrote:

>  Thank you David and Liam for your excellent responses.
> Checking in the consumer will be extremely difficult.
> However, I am unclear what the record-error-rate|total metric for a
> producer means,
> Does the metric get incremented only when the record could not make it to
> the topic or even when there was a transient/retriable error trying to send
> the message to the topic ?
>
> I am posting below the producer properties that I am using.
>
>  acks = -1
>  batch.size = 16384
>  bootstrap.servers = [##MASKED##]
>  buffer.memory = 23622320128
>  client.dns.lookup = use_all_dns_ips
>  client.id = producer-1
>  compression.type = none
>  connections.max.idle.ms = 54
>  delivery.timeout.ms = 288
>  enable.idempotence = true
>  interceptor.classes = []
>  internal.auto.downgrade.txn.commit = false
>  key.serializer = class
> org.apache.kafka.common.serialization.StringSerializer
>  linger.ms = 0
>  max.block.ms = 144
>  max.in.flight.requests.per.connection = 5
>  max.request.size = 1048576
>  metadata.max.age.ms = 720
>  metadata.max.idle.ms = 720
>  metric.reporters = []
>  metrics.num.samples = 2
>  metrics.recording.level = INFO
>  metrics.sample.window.ms = 3
>  partitioner.class = class
> org.apache.kafka.clients.producer.internals.DefaultPartitioner
>  receive.buffer.bytes = 32768
>  reconnect.backoff.max.ms = 1000
>  reconnect.backoff.ms = 50
>  request.timeout.ms = 3
>  retries = 2147483647
>  retry.backoff.ms = 100
>  sasl.client.callback.handler.class = null
>  sasl.jaas.config = null
>  sasl.kerberos.kinit.cmd = /usr/bin/kinit
>  sasl.kerberos.min.time.before.relogin = 6
>  sasl.kerberos.service.name = null
>  sasl.kerberos.ticket.renew.jitter = 0.05
>  sasl.kerberos.ticket.renew.window.factor = 0.8
>  sasl.login.callback.handler.class = null
>  sasl.login.class = null
>  sasl.login.refresh.buffer.seconds = 300
>  sasl.login.refresh.min.period.seconds = 60
>  sasl.login.refresh.window.factor = 0.8
>  sasl.login.refresh.window.jitter = 0.05
>  sasl.mechanism = GSSAPI
>  security.protocol = PLAINTEXT
>  security.providers = null
>  send.buffer.bytes = 131072
>  ssl.cipher.suites = null
>  ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
>  ssl.endpoint.identification.algorithm = https
>  ssl.engine.factory.class = null
>  ssl.key.password = null
>  ssl.keymanager.algorithm = SunX509
>  ssl.keystore.location = null
>  ssl.keystore.password = null
>  ssl.keystore.type = JKS
>  ssl.protocol = TLSv1.3
>  ssl.provider = null
>  ssl.secure.random.implementation = null
>  ssl.trustmanager.algorithm = PKIX
>  ssl.truststore.location = null
>  ssl.truststore.password = null
>  ssl.truststore.type = JKS
>  transaction.timeout.ms = 6
>  transactional.id = null
>  value.serializer = class
> io.vertx.kafka.client.serialization.JsonObjectSerializer
>
> Regards,
> Neeraj    On Monday, 4 April, 2022, 03:19:08 pm GMT+10, David Finnie <
> david.fin...@infrasoft.com.au> wrote:
>
>  Hi Neeraj,
>
> I don't know what might be causing the first Produce error. Is the
> OUT_OF_ORDER_SEQUENCE_NUMBER the first Produce error? From the error
> that you included (Invalid sequence number for new epoch) it would seem
> that the broker doesn't (yet) know about the Producer's epoch - possibly
> because it is still catching up after you restarted it? Note that the
> first sequence number for a new epoch must be 0, so if the broker thinks
> that it is a new epoch, but the sequence number is 3, it will cause this
> error.
>
> I can explain more about the relationship of Producer ID, Prod

Re: Kafka Producer Record Error Rate

2022-04-04 Thread Neeraj Vaidya
, and reset sequence numbers from 0. 
That should then allow for resumption of normal traffic for that 
Producer ID on that partition.

Re. whether the records made it to the topic, they should have. The log 
messages indicate that it is retrying the records, and incrementing the 
epoch and resequencing is part of that process. Of course, you should 
probably check by setting up a consumer to ensure that all messages made 
it to the topic, if that is feasible.

David Finnie

Infrasoft Pty Limited

On 4/04/2022 12:42, Neeraj Vaidya wrote:
>  Hi Liam,
> Thanks for getting back.
>
> 1) Producer settings ( I am guessing these are the ones you are interested in)
> enable.idempotence=true
> max.in.flight.requests.per.connection=5
>
> 2) Sample broker logs corresponding to the timestamp in the application logs 
> of the Producer
>
> [2022-04-03 15:56:39,587] ERROR [ReplicaManager broker=5] Error processing 
> append operation on partition input-topic-114 (kafka.server.ReplicaManager)
> org.apache.kafka.common.errors.OutOfOrderSequenceException: Invalid sequence 
> number for new epoch at offset 967756 in partition input-topic-114: 158 
> (request epoch), 3 (seq. number)
>
> Do the producer errors indicate that these messages never made it to the 
> Kafka topic at all ?
>
> Regards,
> Neeraj
>      On Monday, 4 April, 2022, 12:23:30 pm GMT+10, Liam 
>Clarke-Hutchinson  wrote:
>  
>  Hi Neeraj,
>
> First off, what are your producer settings?
> Secondly, do you have brokers logs for the leaders of some of your affected
> topics on hand at all?
>
> Cheers,
>
> Liam Clarke-Hutchinson
>
> On Mon, 4 Apr 2022 at 14:04, Neeraj Vaidya
>   wrote:
>
>> Hi All,
>> For one of the Kafka producers that I have, I see that the Producer Record
>> Error rate is non-zero i.e. out of the expected 3000 messages per second
>> which I a expect to be producing to the topic, I can see that this metric
>> shows a rate of about 200.
>> Does this indicate that the records failed to be sent to the Kafka topic,
>> or does this metric show up even for each retry in the Producer.Send
>> operation ?
>>
>> Notes :
>> 1) I have distributed  8 brokers equally across 2 sites. Using
>> rack-awareness, I am making Kafka position replicas equally across both
>> sites. My min.isr=2 and replication factor = 4. This makes 2 replicas to be
>> located in each site.
>> 2) The scenario I am testing is that of shutting down a set of 4 brokers
>> in one site (out of 8) for an extended period of time and then bringing
>> them back up after say 2 hours. This causes the the follower replicas on
>> those brokers to try and catch-up with the leader replicas on the other
>> brokers. The error rate that I am referring to shows up under this scenario
>> of restarting the brokers. It does not show up when I have just the other
>> set of (4) brokers.
>>
>> To be specific, here are the errors that I see in the Kafka producer log
>> file:
>>
>> 2022-04-03 15:56:39.613  WARN --- [-thread | producer-1]
>> o.a.k.c.p.i.Sender                      : [Producer clientId=producer-1]
>> Got error produce response with correlation id 16512434 on topic-partition
>> input-topic-114, retrying (2147483646 attempts left). Error:
>> OUT_OF_ORDER_SEQUENCE_NUMBER
>> 2022-04-03 15:56:39.613  WARN --- [-thread | producer-1]
>> o.a.k.c.p.i.Sender                      : [Producer clientId=producer-1]
>> Got error produce response with correlation id 16512434 on topic-partition
>> input-topic-58, retrying (2147483646 attempts left). Error:
>> OUT_OF_ORDER_SEQUENCE_NUMBER
>> 2022-04-03 15:56:39.613  INFO --- [-thread | producer-1]
>> o.a.k.c.p.i.TransactionManager          : [Producer clientId=producer-1]
>> ProducerId set to 2040 with epoch 159
>> 2022-04-03 15:56:39.613  INFO --- [-thread | producer-1]
>> o.a.k.c.p.i.ProducerBatch                : Resetting sequence number of
>> batch with current sequence 3 for partition input-topic-114 to 0
>> 2022-04-03 15:56:39.613  INFO --- [-thread | producer-1]
>> o.a.k.c.p.i.ProducerBatch                : Resetting sequence number of
>> batch with current sequence 5 for partition input-topic-114 to 2
>> 2022-04-03 15:56:39.613  INFO --- [-thread | producer-1]
>> o.a.k.c.p.i.ProducerBatch                : Resetting sequence number of
>> batch with current sequence 6 for partition input-topic-114 to 3
>> 2022-04-03 15:56:39.613  INFO --- [-thread | producer-1]
>> o.a.k.c.p.i.ProducerBatch                : Resetting sequence number of
>> batch with current sequence 1 for partition input-topic-58 to 0
>> 2022-04-03 15:56:39.739  WARN --- [-thread | producer-1]
>> o.a.k.c.p.i.Sender                      : [Producer clientId=producer-1]
>> Got error produce response with correlation id 16512436 on topic-partition
>> input-topic-82, retrying (2147483646 attempts left). Error:
>> OUT_OF_ORDER_SEQUENCE_NUMBER
>>
>> Regards,
>> Neeraj
>>
>>
>    
  

Re: Kafka Producer Record Error Rate

2022-04-03 Thread Neeraj Vaidya
 Hi Liam,
Brokers are on Apache Kafka v2.7.0
However, the Producer client is using the v2.6 libraries.

Regards,
Neeraj On Monday, 4 April, 2022, 02:17:42 pm GMT+10, Liam Clarke-Hutchinson 
 wrote:  
 
 Hi Neeraj,

Not sure just yet, I'm diving into the code to find out. Oh, what version
Kafka are you running please?

Cheers,

Liam

On Mon, 4 Apr 2022 at 14:50, Neeraj Vaidya
 wrote:

>  Hi Liam,
> Thanks for getting back.
>
> 1) Producer settings ( I am guessing these are the ones you are interested
> in)
> enable.idempotence=true
> max.in.flight.requests.per.connection=5
>
> 2) Sample broker logs corresponding to the timestamp in the application
> logs of the Producer
>
> [2022-04-03 15:56:39,587] ERROR [ReplicaManager broker=5] Error processing
> append operation on partition input-topic-114 (kafka.server.ReplicaManager)
> org.apache.kafka.common.errors.OutOfOrderSequenceException: Invalid
> sequence number for new epoch at offset 967756 in partition
> input-topic-114: 158 (request epoch), 3 (seq. number)
>
> Do the producer errors indicate that these messages never made it to the
> Kafka topic at all ?
>
> Regards,
> Neeraj
>      On Monday, 4 April, 2022, 12:23:30 pm GMT+10, Liam Clarke-Hutchinson <
> lclar...@redhat.com> wrote:
>
>  Hi Neeraj,
>
> First off, what are your producer settings?
> Secondly, do you have brokers logs for the leaders of some of your affected
> topics on hand at all?
>
> Cheers,
>
> Liam Clarke-Hutchinson
>
> On Mon, 4 Apr 2022 at 14:04, Neeraj Vaidya
>  wrote:
>
> > Hi All,
> > For one of the Kafka producers that I have, I see that the Producer
> Record
> > Error rate is non-zero i.e. out of the expected 3000 messages per second
> > which I a expect to be producing to the topic, I can see that this metric
> > shows a rate of about 200.
> > Does this indicate that the records failed to be sent to the Kafka topic,
> > or does this metric show up even for each retry in the Producer.Send
> > operation ?
> >
> > Notes :
> > 1) I have distributed  8 brokers equally across 2 sites. Using
> > rack-awareness, I am making Kafka position replicas equally across both
> > sites. My min.isr=2 and replication factor = 4. This makes 2 replicas to
> be
> > located in each site.
> > 2) The scenario I am testing is that of shutting down a set of 4 brokers
> > in one site (out of 8) for an extended period of time and then bringing
> > them back up after say 2 hours. This causes the the follower replicas on
> > those brokers to try and catch-up with the leader replicas on the other
> > brokers. The error rate that I am referring to shows up under this
> scenario
> > of restarting the brokers. It does not show up when I have just the other
> > set of (4) brokers.
> >
> > To be specific, here are the errors that I see in the Kafka producer log
> > file:
> >
> > 2022-04-03 15:56:39.613  WARN --- [-thread | producer-1]
> > o.a.k.c.p.i.Sender                      : [Producer clientId=producer-1]
> > Got error produce response with correlation id 16512434 on
> topic-partition
> > input-topic-114, retrying (2147483646 attempts left). Error:
> > OUT_OF_ORDER_SEQUENCE_NUMBER
> > 2022-04-03 15:56:39.613  WARN --- [-thread | producer-1]
> > o.a.k.c.p.i.Sender                      : [Producer clientId=producer-1]
> > Got error produce response with correlation id 16512434 on
> topic-partition
> > input-topic-58, retrying (2147483646 attempts left). Error:
> > OUT_OF_ORDER_SEQUENCE_NUMBER
> > 2022-04-03 15:56:39.613  INFO --- [-thread | producer-1]
> > o.a.k.c.p.i.TransactionManager          : [Producer clientId=producer-1]
> > ProducerId set to 2040 with epoch 159
> > 2022-04-03 15:56:39.613  INFO --- [-thread | producer-1]
> > o.a.k.c.p.i.ProducerBatch                : Resetting sequence number of
> > batch with current sequence 3 for partition input-topic-114 to 0
> > 2022-04-03 15:56:39.613  INFO --- [-thread | producer-1]
> > o.a.k.c.p.i.ProducerBatch                : Resetting sequence number of
> > batch with current sequence 5 for partition input-topic-114 to 2
> > 2022-04-03 15:56:39.613  INFO --- [-thread | producer-1]
> > o.a.k.c.p.i.ProducerBatch                : Resetting sequence number of
> > batch with current sequence 6 for partition input-topic-114 to 3
> > 2022-04-03 15:56:39.613  INFO --- [-thread | producer-1]
> > o.a.k.c.p.i.ProducerBatch                : Resetting sequence number of
> > batch with current sequence 1 for partition input-topic-58 to 0
> > 2022-04-03 15:56:39.739  WARN --- [-thread | producer-1]
> > o.a.k.c.p.i.Sender                      : [Producer clientId=producer-1]
> > Got error produce response with correlation id 16512436 on
> topic-partition
> > input-topic-82, retrying (2147483646 attempts left). Error:
> > OUT_OF_ORDER_SEQUENCE_NUMBER
> >
> > Regards,
> > Neeraj
> >
> >
>
  

Re: Kafka Producer Record Error Rate

2022-04-03 Thread Neeraj Vaidya
 Hi Liam,
Thanks for getting back.

1) Producer settings ( I am guessing these are the ones you are interested in)
enable.idempotence=true
max.in.flight.requests.per.connection=5

2) Sample broker logs corresponding to the timestamp in the application logs of 
the Producer 

[2022-04-03 15:56:39,587] ERROR [ReplicaManager broker=5] Error processing 
append operation on partition input-topic-114 (kafka.server.ReplicaManager)
org.apache.kafka.common.errors.OutOfOrderSequenceException: Invalid sequence 
number for new epoch at offset 967756 in partition input-topic-114: 158 
(request epoch), 3 (seq. number)

Do the producer errors indicate that these messages never made it to the Kafka 
topic at all ?

Regards,
Neeraj
 On Monday, 4 April, 2022, 12:23:30 pm GMT+10, Liam Clarke-Hutchinson 
 wrote:  
 
 Hi Neeraj,

First off, what are your producer settings?
Secondly, do you have brokers logs for the leaders of some of your affected
topics on hand at all?

Cheers,

Liam Clarke-Hutchinson

On Mon, 4 Apr 2022 at 14:04, Neeraj Vaidya
 wrote:

> Hi All,
> For one of the Kafka producers that I have, I see that the Producer Record
> Error rate is non-zero i.e. out of the expected 3000 messages per second
> which I a expect to be producing to the topic, I can see that this metric
> shows a rate of about 200.
> Does this indicate that the records failed to be sent to the Kafka topic,
> or does this metric show up even for each retry in the Producer.Send
> operation ?
>
> Notes :
> 1) I have distributed  8 brokers equally across 2 sites. Using
> rack-awareness, I am making Kafka position replicas equally across both
> sites. My min.isr=2 and replication factor = 4. This makes 2 replicas to be
> located in each site.
> 2) The scenario I am testing is that of shutting down a set of 4 brokers
> in one site (out of 8) for an extended period of time and then bringing
> them back up after say 2 hours. This causes the the follower replicas on
> those brokers to try and catch-up with the leader replicas on the other
> brokers. The error rate that I am referring to shows up under this scenario
> of restarting the brokers. It does not show up when I have just the other
> set of (4) brokers.
>
> To be specific, here are the errors that I see in the Kafka producer log
> file:
>
> 2022-04-03 15:56:39.613  WARN --- [-thread | producer-1]
> o.a.k.c.p.i.Sender                      : [Producer clientId=producer-1]
> Got error produce response with correlation id 16512434 on topic-partition
> input-topic-114, retrying (2147483646 attempts left). Error:
> OUT_OF_ORDER_SEQUENCE_NUMBER
> 2022-04-03 15:56:39.613  WARN --- [-thread | producer-1]
> o.a.k.c.p.i.Sender                      : [Producer clientId=producer-1]
> Got error produce response with correlation id 16512434 on topic-partition
> input-topic-58, retrying (2147483646 attempts left). Error:
> OUT_OF_ORDER_SEQUENCE_NUMBER
> 2022-04-03 15:56:39.613  INFO --- [-thread | producer-1]
> o.a.k.c.p.i.TransactionManager          : [Producer clientId=producer-1]
> ProducerId set to 2040 with epoch 159
> 2022-04-03 15:56:39.613  INFO --- [-thread | producer-1]
> o.a.k.c.p.i.ProducerBatch                : Resetting sequence number of
> batch with current sequence 3 for partition input-topic-114 to 0
> 2022-04-03 15:56:39.613  INFO --- [-thread | producer-1]
> o.a.k.c.p.i.ProducerBatch                : Resetting sequence number of
> batch with current sequence 5 for partition input-topic-114 to 2
> 2022-04-03 15:56:39.613  INFO --- [-thread | producer-1]
> o.a.k.c.p.i.ProducerBatch                : Resetting sequence number of
> batch with current sequence 6 for partition input-topic-114 to 3
> 2022-04-03 15:56:39.613  INFO --- [-thread | producer-1]
> o.a.k.c.p.i.ProducerBatch                : Resetting sequence number of
> batch with current sequence 1 for partition input-topic-58 to 0
> 2022-04-03 15:56:39.739  WARN --- [-thread | producer-1]
> o.a.k.c.p.i.Sender                      : [Producer clientId=producer-1]
> Got error produce response with correlation id 16512436 on topic-partition
> input-topic-82, retrying (2147483646 attempts left). Error:
> OUT_OF_ORDER_SEQUENCE_NUMBER
>
> Regards,
> Neeraj
>
>
  

Kafka Producer Record Error Rate

2022-04-03 Thread Neeraj Vaidya
Hi All,
For one of the Kafka producers that I have, I see that the Producer Record 
Error rate is non-zero i.e. out of the expected 3000 messages per second which 
I a expect to be producing to the topic, I can see that this metric shows a 
rate of about 200.
Does this indicate that the records failed to be sent to the Kafka topic, or 
does this metric show up even for each retry in the Producer.Send operation ?

Notes : 
1) I have distributed  8 brokers equally across 2 sites. Using rack-awareness, 
I am making Kafka position replicas equally across both sites. My min.isr=2 and 
replication factor = 4. This makes 2 replicas to be located in each site.
2) The scenario I am testing is that of shutting down a set of 4 brokers in one 
site (out of 8) for an extended period of time and then bringing them back up 
after say 2 hours. This causes the the follower replicas on those brokers to 
try and catch-up with the leader replicas on the other brokers. The error rate 
that I am referring to shows up under this scenario of restarting the brokers. 
It does not show up when I have just the other set of (4) brokers. 

To be specific, here are the errors that I see in the Kafka producer log file:

2022-04-03 15:56:39.613  WARN --- [-thread | producer-1] o.a.k.c.p.i.Sender 
  : [Producer clientId=producer-1] Got error produce response 
with correlation id 16512434 on topic-partition input-topic-114, retrying 
(2147483646 attempts left). Error: OUT_OF_ORDER_SEQUENCE_NUMBER
2022-04-03 15:56:39.613  WARN --- [-thread | producer-1] o.a.k.c.p.i.Sender 
  : [Producer clientId=producer-1] Got error produce response 
with correlation id 16512434 on topic-partition input-topic-58, retrying 
(2147483646 attempts left). Error: OUT_OF_ORDER_SEQUENCE_NUMBER
2022-04-03 15:56:39.613  INFO --- [-thread | producer-1] 
o.a.k.c.p.i.TransactionManager   : [Producer clientId=producer-1] 
ProducerId set to 2040 with epoch 159
2022-04-03 15:56:39.613  INFO --- [-thread | producer-1] 
o.a.k.c.p.i.ProducerBatch: Resetting sequence number of batch 
with current sequence 3 for partition input-topic-114 to 0
2022-04-03 15:56:39.613  INFO --- [-thread | producer-1] 
o.a.k.c.p.i.ProducerBatch: Resetting sequence number of batch 
with current sequence 5 for partition input-topic-114 to 2
2022-04-03 15:56:39.613  INFO --- [-thread | producer-1] 
o.a.k.c.p.i.ProducerBatch: Resetting sequence number of batch 
with current sequence 6 for partition input-topic-114 to 3
2022-04-03 15:56:39.613  INFO --- [-thread | producer-1] 
o.a.k.c.p.i.ProducerBatch: Resetting sequence number of batch 
with current sequence 1 for partition input-topic-58 to 0
2022-04-03 15:56:39.739  WARN --- [-thread | producer-1] o.a.k.c.p.i.Sender 
  : [Producer clientId=producer-1] Got error produce response 
with correlation id 16512436 on topic-partition input-topic-82, retrying 
(2147483646 attempts left). Error: OUT_OF_ORDER_SEQUENCE_NUMBER

Regards,
Neeraj


Re: behavior when PID expires before transactional id expires

2021-11-18 Thread Neeraj Vaidya
Sorry for the typo in your name. It gor autocorrected.
Corrected it now as below.

Regards,
Neeraj

> On 19 Nov 2021, at 7:03 am, Neeraj Vaidya  
> wrote:
> 
> Hi Jiahui,
> Have a look at this ticket https://issues.apache.org/jira/browse/KAFKA-13292
> 
> Regards,
> Neeraj
> 
>> On 19 Nov 2021, at 2:41 am, Jiahui Jiang  wrote:
>> 
>> Hello Kafka!
>> 
>> I'm using Flink with a Kafka and seeing an edge case that I'm hoping to get 
>> some help to find the source code that would explain!
>> 
>> In the Flink setup, I have a transactional producer that's regularly 
>> committing transactions with a consistent transactional ID. But all these 
>> transactions have no data records written into them.
>> Based on my understanding according to the transactions design doc. After 
>> transactional.id.expiration.ms has passed, because there have been 
>> transactions committed regularly, the transactional id should not expire, 
>> and the transactionalId mapping would contain the PID.
>> But since there is no record written for that producer, it's possible that 
>> the PID is expired.
>> 
>> We are seeing endTxnRequests failing with invalidMappingException. But 
>> looking from the source code, since the transactional id entry should still 
>> exist. I don't see why this could be the reason that an 
>> InvalidMappingException is thrown.
>> 
>> My questions are:
>> 
>> 1.  Is it correct that if transactions are being regularly committed, 
>> transactional id should not expire even if there is no data written by the 
>> producer?
>> 2.  what failures can be triggered when a transactional id has NOT expired, 
>> but the associated producer id has expired?
>> 3.  Do the expiration of producer id trigger a cleanup task to remove the 
>> transactional id entry from the PID snapshot file?
>> 
>> Thank you!



Re: behavior when PID expires before transactional id expires

2021-11-18 Thread Neeraj Vaidya
Hi Joshua,
Have a look at this ticket https://issues.apache.org/jira/browse/KAFKA-13292

Regards,
Neeraj

> On 19 Nov 2021, at 2:41 am, Jiahui Jiang  wrote:
> 
> Hello Kafka!
> 
> I'm using Flink with a Kafka and seeing an edge case that I'm hoping to get 
> some help to find the source code that would explain!
> 
> In the Flink setup, I have a transactional producer that's regularly 
> committing transactions with a consistent transactional ID. But all these 
> transactions have no data records written into them.
> Based on my understanding according to the transactions design doc. After 
> transactional.id.expiration.ms has passed, because there have been 
> transactions committed regularly, the transactional id should not expire, and 
> the transactionalId mapping would contain the PID.
> But since there is no record written for that producer, it's possible that 
> the PID is expired.
> 
> We are seeing endTxnRequests failing with invalidMappingException. But 
> looking from the source code, since the transactional id entry should still 
> exist. I don't see why this could be the reason that an 
> InvalidMappingException is thrown.
> 
> My questions are:
> 
>  1.  Is it correct that if transactions are being regularly committed, 
> transactional id should not expire even if there is no data written by the 
> producer?
>  2.  what failures can be triggered when a transactional id has NOT expired, 
> but the associated producer id has expired?
>  3.  Do the expiration of producer id trigger a cleanup task to remove the 
> transactional id entry from the PID snapshot file?
> 
> Thank you!


Re: MirorMaker 2 - Accessing State Store ChangeLogs

2021-10-28 Thread Neeraj Vaidya
Hi All
Anybody has inputs on my question below ?

Regards,
Neeraj


> On 26 Oct 2021, at 7:12 am, Neeraj Vaidya  
> wrote:
> 
> Hello Experts
> Any  comments on this ?
> 
> Regards,
> Neeraj
> 
> 
>> On 24 Oct 2021, at 11:55 am, Neeraj Vaidya  wrote:
>> 
>> Hi All,
>> In an Active-Active MM2 (MirrorMaker 2) setup, I have the following Data 
>> Centres (DC1,DC2) :
>> 
>> DC1:
>> Topic T1
>> Kafka Streams which consumes from T1 and updates a local state store 
>> (example : "MyStateStore"). the applicationId is "myapp".
>> 
>> DC2:
>> Topic T1
>> The MM2 process replicates the records from T1 to DC1.T1 on this data centre.
>> MM2 also replicates myapp-MyStateStore-changelog to 
>> DC1.myapp-MyStateStore-changelog.
>> 
>> MM2 configuration:
>> "sync.group.offsets.enabled = true".
>> 
>> Questions :
>> When I failover "myapp" to the DC2, how will the application get access to 
>> the State Store ? In my code I can only specify the name of the statestore 
>> from which to consume.
>> Is there a way I can get access to the State Store in this site as well ?
>> My theory is that in DC2 will only have records in the change log topic 
>> called DC1.myapp-MyStateStore-changelog so all state is essentially lost 
>> when I failover to DC2.
>> 
>> Regards.
>> Neeraj
> 



Re: Apache Kafka mirrormaker2 : sync.group.offsets.enabled and sync.group.offsets.interval.seconds

2021-10-26 Thread Neeraj Vaidya
Hello All,
Any inputs on my question below ?

Regards,
Neeraj

Sent from my iPhone

> On 26 Oct 2021, at 10:29 am, Neeraj Vaidya 
>  wrote:
> 
> Anyone has inputs on my question below ?
> 
> Regards,
> Neeraj
> 
> Sent from my iPhone
> 
>> On 24 Oct 2021, at 10:23 pm, Neeraj Vaidya 
>>  wrote:
>> 
>> When replicating data from one datacentre (DC1) to another (DC2) using MM2, 
>> I can see that the consumer offsets are actually getting synchronized from 
>> source to target cluster in realtime. And not really at the default sync 
>> frequency time of 60 seconds.
>> 
>> I have enabled both the above config options (in the subject of this post) 
>> in MM2 config. 
>> 
>> The way I tested this is by making consumers listen to the topic , say T1 in 
>> source cluster as well as DC2.T1 in target cluster.
>> 
>> If I produce a message to T1 in DC1, then the consumer in DC1 consumes the 
>> message, but the consumer in DC2 which is listening on DC2.T1 does not 
>> process the message. I am guessing because the message was processed by 
>> consumer in DC1 so it does not need to be processed by the consumer in DC2 
>> as the offsets are being synchronized.
>> 
>> This is actually good for me because I can then run consumers in both DC's 
>> in parallel and make use of parallelism as well as geo-redundancy.
>> 
>> But I would like to understand if this is expected behaviour.
> 



Re: Apache Kafka mirrormaker2 : sync.group.offsets.enabled and sync.group.offsets.interval.seconds

2021-10-25 Thread Neeraj Vaidya
Anyone has inputs on my question below ?

Regards,
Neeraj

Sent from my iPhone

> On 24 Oct 2021, at 10:23 pm, Neeraj Vaidya 
>  wrote:
> 
> When replicating data from one datacentre (DC1) to another (DC2) using MM2, 
> I can see that the consumer offsets are actually getting synchronized from 
> source to target cluster in realtime. And not really at the default sync 
> frequency time of 60 seconds.
> 
> I have enabled both the above config options (in the subject of this post) in 
> MM2 config. 
> 
> The way I tested this is by making consumers listen to the topic , say T1 in 
> source cluster as well as DC2.T1 in target cluster.
> 
> If I produce a message to T1 in DC1, then the consumer in DC1 consumes the 
> message, but the consumer in DC2 which is listening on DC2.T1 does not 
> process the message. I am guessing because the message was processed by 
> consumer in DC1 so it does not need to be processed by the consumer in DC2 as 
> the offsets are being synchronized.
> 
> This is actually good for me because I can then run consumers in both DC's in 
> parallel and make use of parallelism as well as geo-redundancy.
> 
> But I would like to understand if this is expected behaviour.



Re: MirorMaker 2 - Accessing State Store ChangeLogs

2021-10-25 Thread Neeraj Vaidya
Hello Experts
Any  comments on this ?

Regards,
Neeraj


> On 24 Oct 2021, at 11:55 am, Neeraj Vaidya  wrote:
> 
> Hi All,
> In an Active-Active MM2 (MirrorMaker 2) setup, I have the following Data 
> Centres (DC1,DC2) :
> 
> DC1:
> Topic T1
> Kafka Streams which consumes from T1 and updates a local state store (example 
> : "MyStateStore"). the applicationId is "myapp".
> 
> DC2:
> Topic T1
> The MM2 process replicates the records from T1 to DC1.T1 on this data centre.
> MM2 also replicates myapp-MyStateStore-changelog to 
> DC1.myapp-MyStateStore-changelog.
> 
> MM2 configuration:
> "sync.group.offsets.enabled = true".
> 
> Questions :
> When I failover "myapp" to the DC2, how will the application get access to 
> the State Store ? In my code I can only specify the name of the statestore 
> from which to consume.
> Is there a way I can get access to the State Store in this site as well ?
> My theory is that in DC2 will only have records in the change log topic 
> called DC1.myapp-MyStateStore-changelog so all state is essentially lost when 
> I failover to DC2.
> 
> Regards.
> Neeraj



Apache Kafka mirrormaker2 : sync.group.offsets.enabled and sync.group.offsets.interval.seconds

2021-10-24 Thread Neeraj Vaidya
When replicating data from one datacentre (DC1) to another (DC2) using MM2, I 
can see that the consumer offsets are actually getting synchronized from source 
to target cluster in realtime. And not really at the default sync frequency 
time of 60 seconds.

I have enabled both the above config options (in the subject of this post) in 
MM2 config. 

The way I tested this is by making consumers listen to the topic , say T1 in 
source cluster as well as DC2.T1 in target cluster.

If I produce a message to T1 in DC1, then the consumer in DC1 consumes the 
message, but the consumer in DC2 which is listening on DC2.T1 does not process 
the message. I am guessing because the message was processed by consumer in DC1 
so it does not need to be processed by the consumer in DC2 as the offsets are 
being synchronized.

This is actually good for me because I can then run consumers in both DC's in 
parallel and make use of parallelism as well as geo-redundancy.

But I would like to understand if this is expected behaviour.


MirorMaker 2 - Accessing State Store ChangeLogs

2021-10-23 Thread Neeraj Vaidya
Hi All,
In an Active-Active MM2 (MirrorMaker 2) setup, I have the following Data 
Centres (DC1,DC2) :

DC1:
Topic T1
Kafka Streams which consumes from T1 and updates a local state store (example : 
"MyStateStore"). the applicationId is "myapp".

DC2:
Topic T1
The MM2 process replicates the records from T1 to DC1.T1 on this data centre.
MM2 also replicates myapp-MyStateStore-changelog to 
DC1.myapp-MyStateStore-changelog.

MM2 configuration:
"sync.group.offsets.enabled = true".

Questions :
When I failover "myapp" to the DC2, how will the application get access to the 
State Store ? In my code I can only specify the name of the statestore from 
which to consume.
Is there a way I can get access to the State Store in this site as well ?
My theory is that in DC2 will only have records in the change log topic called 
DC1.myapp-MyStateStore-changelog so all state is essentially lost when I 
failover to DC2.

Regards.
Neeraj


Re: Question about controller behavior when fenced

2021-09-29 Thread Neeraj Vaidya
 Hi Andrew,
Every broker receives a notification about the /controller znode being created 
and will also receive the latest epoch.
The old controller will get some partitions assigned as a result of it being 
considered as part of the cluster, by the new controller. This is the normal 
partition allocation roles/duties of a controller.
The old controller will not shutdown. It will become one of the non-controller 
brokers, if I may call it.

Regards,
Neeraj
 On Thursday, 30 September, 2021, 12:12:08 am GMT+10, Andrew Grant 
 wrote:  
 
 Hi all,

I had a question about controller behavior when fenced. From my
understanding epoch numbers are used to fence brokers who might think
they're still the controller but really in the meantime a new broker has
been elected as the new controller. My question is, how does a broker
realize it's no longer the controller? And when it does realize this, does
it shutdown or does it maybe log something and continue on with its other
duties? I suspect the latter but wanted to check.

Links to any resources that might answer the question would also be
helpful!!

Thanks,
Andrew

-- 
Andrew Grant
8054482621
  

Re: InvalidPidMappingException: The producer attempted to use a producer id which is not currently assigned to its transactional id

2021-09-19 Thread Neeraj Vaidya
 Following on this email. I raised this as a defect on the Apache Kafka JIRA 
(KAFKA-13292)

Based on a suggestion from Mathias J Sax I upgraded my client libraries to 
2.8.0 and also wrote a custom StreamsExceptionHandler to respond with an action 
of REPLACE_THREAD, when such an InvalidPidMappingException is encountered. This 
is all good, as in the event/message being processed currently is first aborted 
as EOS is enabled in my Streams application.
Also, a new thread is started and the message is then successfully processed by 
that new thread.

My only outstanding question is : Does this exception only apply to Streams 
applications which have state stores ? 
I have a Producer application which produces messages to the same topic, but 
does not suffer from this InvalidPidMappingException.

Regards,
Neeraj On Monday, 13 September, 2021, 04:40:50 pm GMT+10, Neeraj Vaidya 
 wrote:  
 
 Hi All,

My software versions :
Apache Kafka 2.7.0
Kafka Streams 2.6.0

I have a KafkaStreams application which consumes from a topic which has 12 
partitions. The incoming message rate into this topic is very low, perhaps 3-4 
per minute. Also, some partitions will not receive messages for more than 7 
days.

Exactly after 7 days of starting this application, I seem to be getting the 
following exception and the application shuts down, without processing anymore 
messages :

2021-09-10T12:21:59.636 [kafka-producer-network-thread | 
mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1-0_2-producer] INFO  
o.a.k.c.p.i.TransactionManager - MSG=[Producer 
clientId=mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1-0_2-producer,
 transactionalId=mtx-caf-0_2] Transiting to abortable error state due to 
org.apache.kafka.common.errors.InvalidPidMappingException: The producer 
attempted to use a producer id which is not currently assigned to its 
transactional id.
2021-09-10T12:21:59.642 [kafka-producer-network-thread | 
mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1-0_2-producer] ERROR 
o.a.k.s.p.i.RecordCollectorImpl - MSG=stream-thread 
[mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] task [0_2] Error 
encountered sending record to topic mtx-caf-DuplicateCheckStore-changelog for 
task 0_2 due to:
org.apache.kafka.common.errors.InvalidPidMappingException: The producer 
attempted to use a producer id which is not currently assigned to its 
transactional id.
Exception handler choose to FAIL the processing, no more records would be sent.
2021-09-10T12:21:59.740 
[mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] ERROR 
o.a.k.s.p.internals.StreamThread - MSG=stream-thread 
[mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] Encountered the 
following exception during processing and the thread is going to shut down: 
org.apache.kafka.streams.errors.StreamsException: Error encountered sending 
record to topic mtx-caf-DuplicateCheckStore-changelog for task 0_2 due to:
org.apache.kafka.common.errors.InvalidPidMappingException: The producer 
attempted to use a producer id which is not currently assigned to its 
transactional id.
Exception handler choose to FAIL the processing, no more records would be sent.
        at 
org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:214)
        at 
org.apache.kafka.streams.processor.internals.RecordCollectorImpl.lambda$send$0(RecordCollectorImpl.java:186)
        at 
org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback.onCompletion(KafkaProducer.java:1363)
        at 
org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:231)
        at 
org.apache.kafka.clients.producer.internals.ProducerBatch.abort(ProducerBatch.java:159)
        at 
org.apache.kafka.clients.producer.internals.RecordAccumulator.abortUndrainedBatches(RecordAccumulator.java:781)
        at 
org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:425)
        at 
org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:313)
        at 
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:240)
        at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.kafka.common.errors.InvalidPidMappingException: The 
producer attempted to use a producer id which is not currently assigned to its 
transactional id.
2021-09-10T12:21:59.740 
[mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] INFO  
o.a.k.s.p.internals.StreamThread - MSG=stream-thread 
[mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] State transition 
from RUNNING to PENDING_SHUTDOWN

After this, I can see that all 12 tasks (because there are 12 partitions for 
all topics) get shutdown and this brings down the whole application.

I understand that the transactional.id.expiration.ms = 7 days (default) will 
likely cause the application thread from getting expired, but why does

InvalidPidMappingException: The producer attempted to use a producer id which is not currently assigned to its transactional id

2021-09-13 Thread Neeraj Vaidya
Hi All,

My software versions :
Apache Kafka 2.7.0
Kafka Streams 2.6.0

I have a KafkaStreams application which consumes from a topic which has 12 
partitions. The incoming message rate into this topic is very low, perhaps 3-4 
per minute. Also, some partitions will not receive messages for more than 7 
days.

Exactly after 7 days of starting this application, I seem to be getting the 
following exception and the application shuts down, without processing anymore 
messages :

2021-09-10T12:21:59.636 [kafka-producer-network-thread | 
mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1-0_2-producer] INFO  
o.a.k.c.p.i.TransactionManager - MSG=[Producer 
clientId=mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1-0_2-producer,
 transactionalId=mtx-caf-0_2] Transiting to abortable error state due to 
org.apache.kafka.common.errors.InvalidPidMappingException: The producer 
attempted to use a producer id which is not currently assigned to its 
transactional id.
2021-09-10T12:21:59.642 [kafka-producer-network-thread | 
mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1-0_2-producer] ERROR 
o.a.k.s.p.i.RecordCollectorImpl - MSG=stream-thread 
[mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] task [0_2] Error 
encountered sending record to topic mtx-caf-DuplicateCheckStore-changelog for 
task 0_2 due to:
org.apache.kafka.common.errors.InvalidPidMappingException: The producer 
attempted to use a producer id which is not currently assigned to its 
transactional id.
Exception handler choose to FAIL the processing, no more records would be sent.
2021-09-10T12:21:59.740 
[mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] ERROR 
o.a.k.s.p.internals.StreamThread - MSG=stream-thread 
[mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] Encountered the 
following exception during processing and the thread is going to shut down: 
org.apache.kafka.streams.errors.StreamsException: Error encountered sending 
record to topic mtx-caf-DuplicateCheckStore-changelog for task 0_2 due to:
org.apache.kafka.common.errors.InvalidPidMappingException: The producer 
attempted to use a producer id which is not currently assigned to its 
transactional id.
Exception handler choose to FAIL the processing, no more records would be sent.
at 
org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:214)
at 
org.apache.kafka.streams.processor.internals.RecordCollectorImpl.lambda$send$0(RecordCollectorImpl.java:186)
at 
org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback.onCompletion(KafkaProducer.java:1363)
at 
org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:231)
at 
org.apache.kafka.clients.producer.internals.ProducerBatch.abort(ProducerBatch.java:159)
at 
org.apache.kafka.clients.producer.internals.RecordAccumulator.abortUndrainedBatches(RecordAccumulator.java:781)
at 
org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:425)
at 
org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:313)
at 
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:240)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.kafka.common.errors.InvalidPidMappingException: The 
producer attempted to use a producer id which is not currently assigned to its 
transactional id.
2021-09-10T12:21:59.740 
[mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] INFO  
o.a.k.s.p.internals.StreamThread - MSG=stream-thread 
[mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] State transition 
from RUNNING to PENDING_SHUTDOWN

After this, I can see that all 12 tasks (because there are 12 partitions for 
all topics) get shutdown and this brings down the whole application.

I understand that the transactional.id.expiration.ms = 7 days (default) will 
likely cause the application thread from getting expired, but why does this 
specific thread/task not get fenced or respawned.
Why shutdown the entire Streams processing application just because one task 
has been idle ??

Is there a way to keep my application up and running without causing it to 
shutdown ?


Regards,
Neeraj


Zookeeper failure handling in Kafka

2021-05-31 Thread Neeraj Vaidya
Hi All,
If I have a 5 node zookeeper ensemble backing my 5 node Kafka cluster and if 
more than 2 of the zookeeper nodes shutdown, Kafka producer clients are still 
able to write to Kafka without any issues.

Is this normal behaviour ?

I always thought that zookeeper quorum should be available for the Kafka 
cluster to function. ( I also believed that zookeeper service will be 
unavailable if more than the majority goes down, but maybe I will ask that in a 
zookeeper forum).

When will Kafka producer clients start experiencing issues when producing 
messages to Kafka, in case of an entire Zookeeper cluster not being available ? 
(Assume that the zookeeper cluster was up and running when the kafka cluster 
and the kafka clients started, but went down sometime later).

Regards,
Neeraj





Re: Metadata max idle vs age ms

2021-05-26 Thread Neeraj Vaidya
Anybody has any inputs on this one?

Sent from my iPhone

> On 25 May 2021, at 9:49 am, Neeraj Vaidya  wrote:
> 
> Hi All, (I have asked this on SO as well, but happy to paste the response 
> there if I get a answer here or vice-versa).
> 
> I would like to know the impact of setting both of these options on the 
> Producer API.
> 
> Can someone please let me know why metadata has not expired in the Scenario-1 
> even when metadata.max.age.ms has elapsed.
> 
> Scenario-1
> 
> set metadata.max.age.ms to 10 seconds.
> Leave metadata.max.idle.ms as default of 5 minutes.
> start Kafka producer. publish some messages to a topic using this producer.
> shutdown the Kafka cluster.
> allow more than 10 seconds to elapse.> I am hoping that this would cause 
> the producer to automatically trigger a request to update Metadata.
> Now,try to produce a message to the topic using KafkaProducer#send from the 
> producer.
> This operation returns immediately, without blocking.
> I was expecting this to block as more than 10 seconds have elapsed and was 
> expecting metadata to expire based on the metadata.max.age.ms being set to 10 
> seconds.
> 
> Scenario-2
> 
> However, if do the following, then the KafkaProducer#send operation blocks :
> set metadata.max.age.ms and metadata.max.idle.ms both to 10 seconds.
> start Kafka producer
> publish some messages to a topic using this producer.
> shutdown the Kafka cluster.
> allow more than 10 seconds to elapse.
> try to produce a message to the topic using KafkaProducer#send from the 
> producer.
> This operation blocks trying to fetch metadata.
> I am not sure why it blocks now, but not in the first scenario.



Metadata max idle vs age ms

2021-05-24 Thread Neeraj Vaidya
Hi All, (I have asked this on SO as well, but happy to paste the response there 
if I get a answer here or vice-versa).

I would like to know the impact of setting both of these options on the 
Producer API.

Can someone please let me know why metadata has not expired in the Scenario-1 
even when metadata.max.age.ms has elapsed.

Scenario-1

set metadata.max.age.ms to 10 seconds.
Leave metadata.max.idle.ms as default of 5 minutes.
start Kafka producer. publish some messages to a topic using this producer.
shutdown the Kafka cluster.
allow more than 10 seconds to elapse.> I am hoping that this would cause 
the producer to automatically trigger a request to update Metadata.
Now,try to produce a message to the topic using KafkaProducer#send from the 
producer.
This operation returns immediately, without blocking.
I was expecting this to block as more than 10 seconds have elapsed and was 
expecting metadata to expire based on the metadata.max.age.ms being set to 10 
seconds.

Scenario-2

However, if do the following, then the KafkaProducer#send operation blocks :
set metadata.max.age.ms and metadata.max.idle.ms both to 10 seconds.
start Kafka producer
publish some messages to a topic using this producer.
shutdown the Kafka cluster.
allow more than 10 seconds to elapse.
try to produce a message to the topic using KafkaProducer#send from the 
producer.
This operation blocks trying to fetch metadata.
I am not sure why it blocks now, but not in the first scenario.


Multiple producers using same message key

2021-05-07 Thread Neeraj Vaidya
Hi all,
I think I kind of know the answer but wanted to confirm.
If I have multiple producers sending messages with the same key, will they end 
up in the same partition (assuming I am using the default partitioner) ?

Regards,
Neeraj

Sent from my iPhone


Re: KafkaStreams aggregation with multiple instance

2021-05-06 Thread Neeraj Vaidya
 Hi Pietro,
1) What do you mean by problems in counts due to multiple instances ? Also, do 
you use Keys in your messages ?
2) If you want to maintain state and refer to that state when processing each 
message, then yes you will need a state store. A state store will also be 
needed if you want to I guess query that state externally.

Regards,
Neeraj


 On Friday, 7 May, 2021, 01:47:59 am GMT+10, Pietro Galassi 
 wrote:  
 
 Hi all,
hi have hope you can help me figure out this scenario.

I have a multiinstance microservice that consumes from a topic
(ordersTopic) all of them use the same consumer_group.

This microservice uses a KStream to aggregate (sum) topic events and
produces results on another topic (countTopic).

Have two questions:

1) Can i have problems on counts due to multiple instance of the same
microservies ?
2) I need rockDB and materialized view in order to store data ?

Thanks a lot.
Regards,
Pietro Galassi
  

Apache Kafka Streams : Out-of-Order messages & uses of TimeStamp extractor

2021-04-20 Thread Neeraj Vaidya
Hi,
I have asked this on StackOverflow, but will ask it here as well.

I have an Apache Kafka 2.6 Producer which writes to topic-A (TA). I also have a 
Kafka streams application which consumes from TA and writes to topic-B (TB). In 
the streams application, I have a custom timestamp extractor which extracts the 
timestamp from the message payload.

For one of my failure handling test cases, I shutdown the Kafka cluster while 
my applications are running.

When the producer application tries to write messages to TA, it cannot because 
the cluster is down and hence (I assume) buffers the messages. Let's say it 
receives 4 messages m1,m2,m3,m4 in increasing time order. (i.e. m1 is first and 
m4 is last).

When I bring the Kafka cluster back online, the producer sends the buffered 
messages to the topic, but they are not in order. I receive for example, m2 
then m3 then m1 and then m4.

Why is that ? Is it because the buffering in the producer is multi-threaded 
with each producing to the topic at the same time ?

I assumed that the custom timestamp extractor would help in ordering messages 
when consuming them. But they do not. Or maybe my understanding of the 
timestamp extractor is wrong.

If not, then what are the specific uses of the timestamp extractor ? Just to 
associate a timestamp with an event ?

I got one solution from SO here, to just stream all events from tA to another 
intermediate topic (say tA') which will use the TimeStamp extractor to another 
topic. But I am not sure if this will cause the events to get reordered based 
on the extracted timestamp.

Regards,
Neeraj