[jira] [Created] (KAFKA-13383) Consumer Group partition management across clusters (Active-Active)

2021-10-18 Thread NEERAJ VAIDYA (Jira)
NEERAJ VAIDYA created KAFKA-13383:
-

 Summary: Consumer Group partition management across clusters 
(Active-Active)
 Key: KAFKA-13383
 URL: https://issues.apache.org/jira/browse/KAFKA-13383
 Project: Kafka
  Issue Type: New Feature
  Components: consumer, core, mirrormaker
Affects Versions: 3.0.0, 2.8.1
Reporter: NEERAJ VAIDYA


When using MM2 (Mirror Maker 2) in an Active-Active setup, we would like 
consumers to consume from the same topic name in both datacentres, rather than 
consume from 1) The topic without the site prefix 2) and the topic with the 
site prefix.

MM2 replicates a topic to a remote site by prefixing the site name.

Confluent replicator prevents this topic name prefixing by using Provenance 
headers in the message.

However, if we start 2 consumers (1 on each site) and both of them consume from 
the topic with the same name, the partitions are not balanced across sites.

So, the consumer on both sites process all messages and hence duplication of 
processing occurs. This might not be a big deal for consumers in some business 
domains, but in the Telecom domain, where duplicates are not tolerated, it 
becomes an issue.

It would be great if the partition allocation can be done across clusters.

So that when a consumer is started on 1 site, it always also takes into 
consideration if there are any other consumers for the same topic on the other 
site.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-13292) InvalidPidMappingException: The producer attempted to use a producer id which is not currently assigned to its transactional id

2021-09-16 Thread NEERAJ VAIDYA (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13292?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

NEERAJ VAIDYA resolved KAFKA-13292.
---
Resolution: Fixed

Upgraded the client libraries to 2.8.0 while still using 2.7.0 broker.

Using the new StreamsExceptionHandler, added code to return REPLACE_THREAD in 
case of InvalidPidMappingException.

This causes the application to create a new thread and continue processing 
Event.

> InvalidPidMappingException: The producer attempted to use a producer id which 
> is not currently assigned to its transactional id
> ---
>
> Key: KAFKA-13292
> URL: https://issues.apache.org/jira/browse/KAFKA-13292
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.7.0
>Reporter: NEERAJ VAIDYA
>Priority: Major
>
> I have a KafkaStreams application which consumes from a topic which has 12 
> partitions. The incoming message rate into this topic is very low, perhaps 
> 3-4 per minute. Also, some partitions will not receive messages for more than 
> 7 days.
>  
> Exactly after 7 days of starting this application, I seem to be getting the 
> following exception and the application shuts down, without processing 
> anymore messages :
>  
> {code:java}
> 2021-09-10T12:21:59.636 [kafka-producer-network-thread | 
> mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1-0_2-producer] 
> INFO  o.a.k.c.p.i.TransactionManager - MSG=[Producer 
> clientId=mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1-0_2-producer,
>  transactionalId=mtx-caf-0_2] Transiting to abortable error state due to 
> org.apache.kafka.common.errors.InvalidPidMappingException: The producer 
> attempted to use a producer id which is not currently assigned to its 
> transactional id.
> 2021-09-10T12:21:59.642 [kafka-producer-network-thread | 
> mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1-0_2-producer] 
> ERROR o.a.k.s.p.i.RecordCollectorImpl - MSG=stream-thread 
> [mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] task [0_2] 
> Error encountered sending record to topic 
> mtx-caf-DuplicateCheckStore-changelog for task 0_2 due to:
> org.apache.kafka.common.errors.InvalidPidMappingException: The producer 
> attempted to use a producer id which is not currently assigned to its 
> transactional id.
> Exception handler choose to FAIL the processing, no more records would be 
> sent.
> 2021-09-10T12:21:59.740 
> [mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] ERROR 
> o.a.k.s.p.internals.StreamThread - MSG=stream-thread 
> [mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] Encountered the 
> following exception during processing and the thread is going to shut down:
> org.apache.kafka.streams.errors.StreamsException: Error encountered sending 
> record to topic mtx-caf-DuplicateCheckStore-changelog for task 0_2 due to:
> org.apache.kafka.common.errors.InvalidPidMappingException: The producer 
> attempted to use a producer id which is not currently assigned to its 
> transactional id.
> Exception handler choose to FAIL the processing, no more records would be 
> sent.
>         at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:214)
>         at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.lambda$send$0(RecordCollectorImpl.java:186)
>         at 
> org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback.onCompletion(KafkaProducer.java:1363)
>         at 
> org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:231)
>         at 
> org.apache.kafka.clients.producer.internals.ProducerBatch.abort(ProducerBatch.java:159)
>         at 
> org.apache.kafka.clients.producer.internals.RecordAccumulator.abortUndrainedBatches(RecordAccumulator.java:781)
>         at 
> org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:425)
>         at 
> org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:313)
>         at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:240)
>         at java.base/java.lang.Thread.run(Thread.java:829)
> Caused by: org.apache.kafka.common.errors.InvalidPidMappingException: The 
> producer attempted to use a producer id which is not currently assigned to 
> its transactional id.
> 2021-09-10T12:21:59.740 
> [mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] INFO  
> o.a.k.s.p.internals.StreamThread - MSG=stream-thread 
> [mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] State 
> transition from RUNNING to PENDING_SHUTDOWN
> {code}
>  
> After this, I can see that all 12 tasks (because there are 12 

[jira] [Reopened] (KAFKA-13292) InvalidPidMappingException: The producer attempted to use a producer id which is not currently assigned to its transactional id

2021-09-13 Thread NEERAJ VAIDYA (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13292?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

NEERAJ VAIDYA reopened KAFKA-13292:
---

As indicated in my previous comments.

> InvalidPidMappingException: The producer attempted to use a producer id which 
> is not currently assigned to its transactional id
> ---
>
> Key: KAFKA-13292
> URL: https://issues.apache.org/jira/browse/KAFKA-13292
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.7.0
>Reporter: NEERAJ VAIDYA
>Priority: Major
>
> I have a KafkaStreams application which consumes from a topic which has 12 
> partitions. The incoming message rate into this topic is very low, perhaps 
> 3-4 per minute. Also, some partitions will not receive messages for more than 
> 7 days.
>  
> Exactly after 7 days of starting this application, I seem to be getting the 
> following exception and the application shuts down, without processing 
> anymore messages :
>  
> {code:java}
> 2021-09-10T12:21:59.636 [kafka-producer-network-thread | 
> mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1-0_2-producer] 
> INFO  o.a.k.c.p.i.TransactionManager - MSG=[Producer 
> clientId=mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1-0_2-producer,
>  transactionalId=mtx-caf-0_2] Transiting to abortable error state due to 
> org.apache.kafka.common.errors.InvalidPidMappingException: The producer 
> attempted to use a producer id which is not currently assigned to its 
> transactional id.
> 2021-09-10T12:21:59.642 [kafka-producer-network-thread | 
> mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1-0_2-producer] 
> ERROR o.a.k.s.p.i.RecordCollectorImpl - MSG=stream-thread 
> [mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] task [0_2] 
> Error encountered sending record to topic 
> mtx-caf-DuplicateCheckStore-changelog for task 0_2 due to:
> org.apache.kafka.common.errors.InvalidPidMappingException: The producer 
> attempted to use a producer id which is not currently assigned to its 
> transactional id.
> Exception handler choose to FAIL the processing, no more records would be 
> sent.
> 2021-09-10T12:21:59.740 
> [mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] ERROR 
> o.a.k.s.p.internals.StreamThread - MSG=stream-thread 
> [mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] Encountered the 
> following exception during processing and the thread is going to shut down:
> org.apache.kafka.streams.errors.StreamsException: Error encountered sending 
> record to topic mtx-caf-DuplicateCheckStore-changelog for task 0_2 due to:
> org.apache.kafka.common.errors.InvalidPidMappingException: The producer 
> attempted to use a producer id which is not currently assigned to its 
> transactional id.
> Exception handler choose to FAIL the processing, no more records would be 
> sent.
>         at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:214)
>         at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.lambda$send$0(RecordCollectorImpl.java:186)
>         at 
> org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback.onCompletion(KafkaProducer.java:1363)
>         at 
> org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:231)
>         at 
> org.apache.kafka.clients.producer.internals.ProducerBatch.abort(ProducerBatch.java:159)
>         at 
> org.apache.kafka.clients.producer.internals.RecordAccumulator.abortUndrainedBatches(RecordAccumulator.java:781)
>         at 
> org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:425)
>         at 
> org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:313)
>         at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:240)
>         at java.base/java.lang.Thread.run(Thread.java:829)
> Caused by: org.apache.kafka.common.errors.InvalidPidMappingException: The 
> producer attempted to use a producer id which is not currently assigned to 
> its transactional id.
> 2021-09-10T12:21:59.740 
> [mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] INFO  
> o.a.k.s.p.internals.StreamThread - MSG=stream-thread 
> [mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] State 
> transition from RUNNING to PENDING_SHUTDOWN
> {code}
>  
> After this, I can see that all 12 tasks (because there are 12 partitions for 
> all topics) get shutdown and this brings down the whole application.
>  
> I understand that the transactional.id.expiration.ms = 7 days (default) will 
> likely cause the application thread from getting expired, but why does this 
> 

[jira] [Created] (KAFKA-13292) InvalidPidMappingException: The producer attempted to use a producer id which is not currently assigned to its transactional id

2021-09-13 Thread NEERAJ VAIDYA (Jira)
NEERAJ VAIDYA created KAFKA-13292:
-

 Summary: InvalidPidMappingException: The producer attempted to use 
a producer id which is not currently assigned to its transactional id
 Key: KAFKA-13292
 URL: https://issues.apache.org/jira/browse/KAFKA-13292
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 2.7.0
Reporter: NEERAJ VAIDYA


I have a KafkaStreams application which consumes from a topic which has 12 
partitions. The incoming message rate into this topic is very low, perhaps 3-4 
per minute. Also, some partitions will not receive messages for more than 7 
days.
 
Exactly after 7 days of starting this application, I seem to be getting the 
following exception and the application shuts down, without processing anymore 
messages :
 
{code:java}
2021-09-10T12:21:59.636 [kafka-producer-network-thread | 
mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1-0_2-producer] INFO  
o.a.k.c.p.i.TransactionManager - MSG=[Producer 
clientId=mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1-0_2-producer,
 transactionalId=mtx-caf-0_2] Transiting to abortable error state due to 
org.apache.kafka.common.errors.InvalidPidMappingException: The producer 
attempted to use a producer id which is not currently assigned to its 
transactional id.
2021-09-10T12:21:59.642 [kafka-producer-network-thread | 
mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1-0_2-producer] ERROR 
o.a.k.s.p.i.RecordCollectorImpl - MSG=stream-thread 
[mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] task [0_2] Error 
encountered sending record to topic mtx-caf-DuplicateCheckStore-changelog for 
task 0_2 due to:
org.apache.kafka.common.errors.InvalidPidMappingException: The producer 
attempted to use a producer id which is not currently assigned to its 
transactional id.
Exception handler choose to FAIL the processing, no more records would be sent.
2021-09-10T12:21:59.740 
[mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] ERROR 
o.a.k.s.p.internals.StreamThread - MSG=stream-thread 
[mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] Encountered the 
following exception during processing and the thread is going to shut down:
org.apache.kafka.streams.errors.StreamsException: Error encountered sending 
record to topic mtx-caf-DuplicateCheckStore-changelog for task 0_2 due to:
org.apache.kafka.common.errors.InvalidPidMappingException: The producer 
attempted to use a producer id which is not currently assigned to its 
transactional id.
Exception handler choose to FAIL the processing, no more records would be sent.
        at 
org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:214)
        at 
org.apache.kafka.streams.processor.internals.RecordCollectorImpl.lambda$send$0(RecordCollectorImpl.java:186)
        at 
org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback.onCompletion(KafkaProducer.java:1363)
        at 
org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:231)
        at 
org.apache.kafka.clients.producer.internals.ProducerBatch.abort(ProducerBatch.java:159)
        at 
org.apache.kafka.clients.producer.internals.RecordAccumulator.abortUndrainedBatches(RecordAccumulator.java:781)
        at 
org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:425)
        at 
org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:313)
        at 
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:240)
        at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.kafka.common.errors.InvalidPidMappingException: The 
producer attempted to use a producer id which is not currently assigned to its 
transactional id.
2021-09-10T12:21:59.740 
[mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] INFO  
o.a.k.s.p.internals.StreamThread - MSG=stream-thread 
[mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] State transition 
from RUNNING to PENDING_SHUTDOWN
{code}
 
After this, I can see that all 12 tasks (because there are 12 partitions for 
all topics) get shutdown and this brings down the whole application.
 
I understand that the transactional.id.expiration.ms = 7 days (default) will 
likely cause the application thread from getting expired, but why does this 
specific thread/task not get fenced or respawned.
Why shutdown the entire Streams processing application just because one task 
has been idle ??
 
Is there a way to keep my application up and running without causing it to 
shutdown ?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-12972) KStreams - Rebalance not happening

2021-06-20 Thread NEERAJ VAIDYA (Jira)
NEERAJ VAIDYA created KAFKA-12972:
-

 Summary: KStreams - Rebalance not happening
 Key: KAFKA-12972
 URL: https://issues.apache.org/jira/browse/KAFKA-12972
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 2.7.0
Reporter: NEERAJ VAIDYA


I have a kafka streams application which consumes from a topic with 20 
partitions.

I run the application with
{code:java}
num.streams.threads = 10
{code}
When I check the JMX MBean _kafka_consumer_coordinator_assigned_partitions_, I 
can see that each thread has been assigned 2 partitions. That sounds reasonable 
balancing of partitions across the threads.

Now, I add another instance of this application on a different machine/VM, but 
I run it with 4 stream threads.

I was expecting these 4 threads to be assigned (via rebalancing) atleast 1 
partition each, but after multiple rebalancing schedules (every 10 mins), I do 
not see even 1 partition being assigned to any of these 4 threads in the second 
instance of my application.

I would have expected some rebalancing to happen across the 14 (10 + 4) stream 
threads of my application instances.

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-12776) Producer sends messages out-of-order inspite of enabling idempotence

2021-05-20 Thread NEERAJ VAIDYA (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12776?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

NEERAJ VAIDYA resolved KAFKA-12776.
---
Resolution: Information Provided

KIP-739 has considerable of overlap with this issue.

> Producer sends messages out-of-order inspite of enabling idempotence
> 
>
> Key: KAFKA-12776
> URL: https://issues.apache.org/jira/browse/KAFKA-12776
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 2.6.0, 2.7.0
> Environment: Linux RHEL 7.9 and Ubuntu 20.04
>Reporter: NEERAJ VAIDYA
>Priority: Major
> Attachments: mocker.zip
>
>
> I have an Apache Kafka 2.6 Producer which writes to topic-A (TA). 
> My application is basically a Spring boot web-application which accepts JSON 
> payloads via HTTP and then pushes each to a Kafka topic. I also use Spring 
> Cloud Stream Kafka in the application to create and use a Producer.
> For one of my failure handling test cases, I shutdown the Kafka cluster while 
> my applications are running. (Note : No messages have been published to the 
> Kafka cluster before I stop the cluster)
> When the producer application tries to write messages to TA, it cannot 
> because the cluster is down and hence (I assume) buffers the messages. Let's 
> say it receives 4 messages m1,m2,m3,m4 in increasing time order. (i.e. m1 is 
> first and m4 is last).
> When I bring the Kafka cluster back online, the producer sends the buffered 
> messages to the topic, but they are not in order. I receive for example, m2 
> then m3 then m1 and then m4.
> Why is that ? Is it because the buffering in the producer is multi-threaded 
> with each producing to the topic at the same time ?
> My project code is attached herewith.
> I can confirm that I have enabled idempotence. I have also tried with 
> ```max.in.flight.requests=1```



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-12776) Producer sends messages out-of-order inspite of enabling idempotence

2021-05-12 Thread NEERAJ VAIDYA (Jira)
NEERAJ VAIDYA created KAFKA-12776:
-

 Summary: Producer sends messages out-of-order inspite of enabling 
idempotence
 Key: KAFKA-12776
 URL: https://issues.apache.org/jira/browse/KAFKA-12776
 Project: Kafka
  Issue Type: Bug
  Components: producer 
Affects Versions: 2.7.0, 2.6.0
 Environment: Linux RHEL 7.9 and Ubuntu 20.04
Reporter: NEERAJ VAIDYA


I have an Apache Kafka 2.6 Producer which writes to topic-A (TA). I also have a 
Kafka streams application which consumes from TA and writes to topic-B (TB). In 
the streams application, I have a custom timestamp extractor which extracts the 
timestamp from the message payload.

For one of my failure handling test cases, I shutdown the Kafka cluster while 
my applications are running. (Note : No messages have been published to the 
Kafka cluster before I stop the cluster)

When the producer application tries to write messages to TA, it cannot because 
the cluster is down and hence (I assume) buffers the messages. Let's say it 
receives 4 messages m1,m2,m3,m4 in increasing time order. (i.e. m1 is first and 
m4 is last).

When I bring the Kafka cluster back online, the producer sends the buffered 
messages to the topic, but they are not in order. I receive for example, m2 
then m3 then m1 and then m4.

Why is that ? Is it because the buffering in the producer is multi-threaded 
with each producing to the topic at the same time ?

My application is basically a Spring boot web-application which accepts JSON 
payloads and then pushes them to a Kafka topic. I also use Spring Cloud Stream 
Kafka within it to create and use a Producer.

My project code is attached herewith.

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)