[jira] [Created] (KAFKA-13383) Consumer Group partition management across clusters (Active-Active)
NEERAJ VAIDYA created KAFKA-13383: - Summary: Consumer Group partition management across clusters (Active-Active) Key: KAFKA-13383 URL: https://issues.apache.org/jira/browse/KAFKA-13383 Project: Kafka Issue Type: New Feature Components: consumer, core, mirrormaker Affects Versions: 3.0.0, 2.8.1 Reporter: NEERAJ VAIDYA When using MM2 (Mirror Maker 2) in an Active-Active setup, we would like consumers to consume from the same topic name in both datacentres, rather than consume from 1) The topic without the site prefix 2) and the topic with the site prefix. MM2 replicates a topic to a remote site by prefixing the site name. Confluent replicator prevents this topic name prefixing by using Provenance headers in the message. However, if we start 2 consumers (1 on each site) and both of them consume from the topic with the same name, the partitions are not balanced across sites. So, the consumer on both sites process all messages and hence duplication of processing occurs. This might not be a big deal for consumers in some business domains, but in the Telecom domain, where duplicates are not tolerated, it becomes an issue. It would be great if the partition allocation can be done across clusters. So that when a consumer is started on 1 site, it always also takes into consideration if there are any other consumers for the same topic on the other site. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-13292) InvalidPidMappingException: The producer attempted to use a producer id which is not currently assigned to its transactional id
[ https://issues.apache.org/jira/browse/KAFKA-13292?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] NEERAJ VAIDYA resolved KAFKA-13292. --- Resolution: Fixed Upgraded the client libraries to 2.8.0 while still using 2.7.0 broker. Using the new StreamsExceptionHandler, added code to return REPLACE_THREAD in case of InvalidPidMappingException. This causes the application to create a new thread and continue processing Event. > InvalidPidMappingException: The producer attempted to use a producer id which > is not currently assigned to its transactional id > --- > > Key: KAFKA-13292 > URL: https://issues.apache.org/jira/browse/KAFKA-13292 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.7.0 >Reporter: NEERAJ VAIDYA >Priority: Major > > I have a KafkaStreams application which consumes from a topic which has 12 > partitions. The incoming message rate into this topic is very low, perhaps > 3-4 per minute. Also, some partitions will not receive messages for more than > 7 days. > > Exactly after 7 days of starting this application, I seem to be getting the > following exception and the application shuts down, without processing > anymore messages : > > {code:java} > 2021-09-10T12:21:59.636 [kafka-producer-network-thread | > mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1-0_2-producer] > INFO o.a.k.c.p.i.TransactionManager - MSG=[Producer > clientId=mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1-0_2-producer, > transactionalId=mtx-caf-0_2] Transiting to abortable error state due to > org.apache.kafka.common.errors.InvalidPidMappingException: The producer > attempted to use a producer id which is not currently assigned to its > transactional id. > 2021-09-10T12:21:59.642 [kafka-producer-network-thread | > mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1-0_2-producer] > ERROR o.a.k.s.p.i.RecordCollectorImpl - MSG=stream-thread > [mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] task [0_2] > Error encountered sending record to topic > mtx-caf-DuplicateCheckStore-changelog for task 0_2 due to: > org.apache.kafka.common.errors.InvalidPidMappingException: The producer > attempted to use a producer id which is not currently assigned to its > transactional id. > Exception handler choose to FAIL the processing, no more records would be > sent. > 2021-09-10T12:21:59.740 > [mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] ERROR > o.a.k.s.p.internals.StreamThread - MSG=stream-thread > [mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] Encountered the > following exception during processing and the thread is going to shut down: > org.apache.kafka.streams.errors.StreamsException: Error encountered sending > record to topic mtx-caf-DuplicateCheckStore-changelog for task 0_2 due to: > org.apache.kafka.common.errors.InvalidPidMappingException: The producer > attempted to use a producer id which is not currently assigned to its > transactional id. > Exception handler choose to FAIL the processing, no more records would be > sent. > at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:214) > at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.lambda$send$0(RecordCollectorImpl.java:186) > at > org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback.onCompletion(KafkaProducer.java:1363) > at > org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:231) > at > org.apache.kafka.clients.producer.internals.ProducerBatch.abort(ProducerBatch.java:159) > at > org.apache.kafka.clients.producer.internals.RecordAccumulator.abortUndrainedBatches(RecordAccumulator.java:781) > at > org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:425) > at > org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:313) > at > org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:240) > at java.base/java.lang.Thread.run(Thread.java:829) > Caused by: org.apache.kafka.common.errors.InvalidPidMappingException: The > producer attempted to use a producer id which is not currently assigned to > its transactional id. > 2021-09-10T12:21:59.740 > [mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] INFO > o.a.k.s.p.internals.StreamThread - MSG=stream-thread > [mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] State > transition from RUNNING to PENDING_SHUTDOWN > {code} > > After this, I can see that all 12 tasks (because there are 12
[jira] [Reopened] (KAFKA-13292) InvalidPidMappingException: The producer attempted to use a producer id which is not currently assigned to its transactional id
[ https://issues.apache.org/jira/browse/KAFKA-13292?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] NEERAJ VAIDYA reopened KAFKA-13292: --- As indicated in my previous comments. > InvalidPidMappingException: The producer attempted to use a producer id which > is not currently assigned to its transactional id > --- > > Key: KAFKA-13292 > URL: https://issues.apache.org/jira/browse/KAFKA-13292 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.7.0 >Reporter: NEERAJ VAIDYA >Priority: Major > > I have a KafkaStreams application which consumes from a topic which has 12 > partitions. The incoming message rate into this topic is very low, perhaps > 3-4 per minute. Also, some partitions will not receive messages for more than > 7 days. > > Exactly after 7 days of starting this application, I seem to be getting the > following exception and the application shuts down, without processing > anymore messages : > > {code:java} > 2021-09-10T12:21:59.636 [kafka-producer-network-thread | > mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1-0_2-producer] > INFO o.a.k.c.p.i.TransactionManager - MSG=[Producer > clientId=mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1-0_2-producer, > transactionalId=mtx-caf-0_2] Transiting to abortable error state due to > org.apache.kafka.common.errors.InvalidPidMappingException: The producer > attempted to use a producer id which is not currently assigned to its > transactional id. > 2021-09-10T12:21:59.642 [kafka-producer-network-thread | > mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1-0_2-producer] > ERROR o.a.k.s.p.i.RecordCollectorImpl - MSG=stream-thread > [mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] task [0_2] > Error encountered sending record to topic > mtx-caf-DuplicateCheckStore-changelog for task 0_2 due to: > org.apache.kafka.common.errors.InvalidPidMappingException: The producer > attempted to use a producer id which is not currently assigned to its > transactional id. > Exception handler choose to FAIL the processing, no more records would be > sent. > 2021-09-10T12:21:59.740 > [mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] ERROR > o.a.k.s.p.internals.StreamThread - MSG=stream-thread > [mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] Encountered the > following exception during processing and the thread is going to shut down: > org.apache.kafka.streams.errors.StreamsException: Error encountered sending > record to topic mtx-caf-DuplicateCheckStore-changelog for task 0_2 due to: > org.apache.kafka.common.errors.InvalidPidMappingException: The producer > attempted to use a producer id which is not currently assigned to its > transactional id. > Exception handler choose to FAIL the processing, no more records would be > sent. > at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:214) > at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.lambda$send$0(RecordCollectorImpl.java:186) > at > org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback.onCompletion(KafkaProducer.java:1363) > at > org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:231) > at > org.apache.kafka.clients.producer.internals.ProducerBatch.abort(ProducerBatch.java:159) > at > org.apache.kafka.clients.producer.internals.RecordAccumulator.abortUndrainedBatches(RecordAccumulator.java:781) > at > org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:425) > at > org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:313) > at > org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:240) > at java.base/java.lang.Thread.run(Thread.java:829) > Caused by: org.apache.kafka.common.errors.InvalidPidMappingException: The > producer attempted to use a producer id which is not currently assigned to > its transactional id. > 2021-09-10T12:21:59.740 > [mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] INFO > o.a.k.s.p.internals.StreamThread - MSG=stream-thread > [mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] State > transition from RUNNING to PENDING_SHUTDOWN > {code} > > After this, I can see that all 12 tasks (because there are 12 partitions for > all topics) get shutdown and this brings down the whole application. > > I understand that the transactional.id.expiration.ms = 7 days (default) will > likely cause the application thread from getting expired, but why does this >
[jira] [Created] (KAFKA-13292) InvalidPidMappingException: The producer attempted to use a producer id which is not currently assigned to its transactional id
NEERAJ VAIDYA created KAFKA-13292: - Summary: InvalidPidMappingException: The producer attempted to use a producer id which is not currently assigned to its transactional id Key: KAFKA-13292 URL: https://issues.apache.org/jira/browse/KAFKA-13292 Project: Kafka Issue Type: Bug Components: streams Affects Versions: 2.7.0 Reporter: NEERAJ VAIDYA I have a KafkaStreams application which consumes from a topic which has 12 partitions. The incoming message rate into this topic is very low, perhaps 3-4 per minute. Also, some partitions will not receive messages for more than 7 days. Exactly after 7 days of starting this application, I seem to be getting the following exception and the application shuts down, without processing anymore messages : {code:java} 2021-09-10T12:21:59.636 [kafka-producer-network-thread | mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1-0_2-producer] INFO o.a.k.c.p.i.TransactionManager - MSG=[Producer clientId=mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1-0_2-producer, transactionalId=mtx-caf-0_2] Transiting to abortable error state due to org.apache.kafka.common.errors.InvalidPidMappingException: The producer attempted to use a producer id which is not currently assigned to its transactional id. 2021-09-10T12:21:59.642 [kafka-producer-network-thread | mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1-0_2-producer] ERROR o.a.k.s.p.i.RecordCollectorImpl - MSG=stream-thread [mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] task [0_2] Error encountered sending record to topic mtx-caf-DuplicateCheckStore-changelog for task 0_2 due to: org.apache.kafka.common.errors.InvalidPidMappingException: The producer attempted to use a producer id which is not currently assigned to its transactional id. Exception handler choose to FAIL the processing, no more records would be sent. 2021-09-10T12:21:59.740 [mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] ERROR o.a.k.s.p.internals.StreamThread - MSG=stream-thread [mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] Encountered the following exception during processing and the thread is going to shut down: org.apache.kafka.streams.errors.StreamsException: Error encountered sending record to topic mtx-caf-DuplicateCheckStore-changelog for task 0_2 due to: org.apache.kafka.common.errors.InvalidPidMappingException: The producer attempted to use a producer id which is not currently assigned to its transactional id. Exception handler choose to FAIL the processing, no more records would be sent. at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:214) at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.lambda$send$0(RecordCollectorImpl.java:186) at org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback.onCompletion(KafkaProducer.java:1363) at org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:231) at org.apache.kafka.clients.producer.internals.ProducerBatch.abort(ProducerBatch.java:159) at org.apache.kafka.clients.producer.internals.RecordAccumulator.abortUndrainedBatches(RecordAccumulator.java:781) at org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:425) at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:313) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:240) at java.base/java.lang.Thread.run(Thread.java:829) Caused by: org.apache.kafka.common.errors.InvalidPidMappingException: The producer attempted to use a producer id which is not currently assigned to its transactional id. 2021-09-10T12:21:59.740 [mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] INFO o.a.k.s.p.internals.StreamThread - MSG=stream-thread [mtx-caf-53dc7e96-90f1-4ae9-8af6-236d22c88e08-StreamThread-1] State transition from RUNNING to PENDING_SHUTDOWN {code} After this, I can see that all 12 tasks (because there are 12 partitions for all topics) get shutdown and this brings down the whole application. I understand that the transactional.id.expiration.ms = 7 days (default) will likely cause the application thread from getting expired, but why does this specific thread/task not get fenced or respawned. Why shutdown the entire Streams processing application just because one task has been idle ?? Is there a way to keep my application up and running without causing it to shutdown ? -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-12972) KStreams - Rebalance not happening
NEERAJ VAIDYA created KAFKA-12972: - Summary: KStreams - Rebalance not happening Key: KAFKA-12972 URL: https://issues.apache.org/jira/browse/KAFKA-12972 Project: Kafka Issue Type: Bug Components: streams Affects Versions: 2.7.0 Reporter: NEERAJ VAIDYA I have a kafka streams application which consumes from a topic with 20 partitions. I run the application with {code:java} num.streams.threads = 10 {code} When I check the JMX MBean _kafka_consumer_coordinator_assigned_partitions_, I can see that each thread has been assigned 2 partitions. That sounds reasonable balancing of partitions across the threads. Now, I add another instance of this application on a different machine/VM, but I run it with 4 stream threads. I was expecting these 4 threads to be assigned (via rebalancing) atleast 1 partition each, but after multiple rebalancing schedules (every 10 mins), I do not see even 1 partition being assigned to any of these 4 threads in the second instance of my application. I would have expected some rebalancing to happen across the 14 (10 + 4) stream threads of my application instances. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-12776) Producer sends messages out-of-order inspite of enabling idempotence
[ https://issues.apache.org/jira/browse/KAFKA-12776?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] NEERAJ VAIDYA resolved KAFKA-12776. --- Resolution: Information Provided KIP-739 has considerable of overlap with this issue. > Producer sends messages out-of-order inspite of enabling idempotence > > > Key: KAFKA-12776 > URL: https://issues.apache.org/jira/browse/KAFKA-12776 > Project: Kafka > Issue Type: Bug > Components: producer >Affects Versions: 2.6.0, 2.7.0 > Environment: Linux RHEL 7.9 and Ubuntu 20.04 >Reporter: NEERAJ VAIDYA >Priority: Major > Attachments: mocker.zip > > > I have an Apache Kafka 2.6 Producer which writes to topic-A (TA). > My application is basically a Spring boot web-application which accepts JSON > payloads via HTTP and then pushes each to a Kafka topic. I also use Spring > Cloud Stream Kafka in the application to create and use a Producer. > For one of my failure handling test cases, I shutdown the Kafka cluster while > my applications are running. (Note : No messages have been published to the > Kafka cluster before I stop the cluster) > When the producer application tries to write messages to TA, it cannot > because the cluster is down and hence (I assume) buffers the messages. Let's > say it receives 4 messages m1,m2,m3,m4 in increasing time order. (i.e. m1 is > first and m4 is last). > When I bring the Kafka cluster back online, the producer sends the buffered > messages to the topic, but they are not in order. I receive for example, m2 > then m3 then m1 and then m4. > Why is that ? Is it because the buffering in the producer is multi-threaded > with each producing to the topic at the same time ? > My project code is attached herewith. > I can confirm that I have enabled idempotence. I have also tried with > ```max.in.flight.requests=1``` -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-12776) Producer sends messages out-of-order inspite of enabling idempotence
NEERAJ VAIDYA created KAFKA-12776: - Summary: Producer sends messages out-of-order inspite of enabling idempotence Key: KAFKA-12776 URL: https://issues.apache.org/jira/browse/KAFKA-12776 Project: Kafka Issue Type: Bug Components: producer Affects Versions: 2.7.0, 2.6.0 Environment: Linux RHEL 7.9 and Ubuntu 20.04 Reporter: NEERAJ VAIDYA I have an Apache Kafka 2.6 Producer which writes to topic-A (TA). I also have a Kafka streams application which consumes from TA and writes to topic-B (TB). In the streams application, I have a custom timestamp extractor which extracts the timestamp from the message payload. For one of my failure handling test cases, I shutdown the Kafka cluster while my applications are running. (Note : No messages have been published to the Kafka cluster before I stop the cluster) When the producer application tries to write messages to TA, it cannot because the cluster is down and hence (I assume) buffers the messages. Let's say it receives 4 messages m1,m2,m3,m4 in increasing time order. (i.e. m1 is first and m4 is last). When I bring the Kafka cluster back online, the producer sends the buffered messages to the topic, but they are not in order. I receive for example, m2 then m3 then m1 and then m4. Why is that ? Is it because the buffering in the producer is multi-threaded with each producing to the topic at the same time ? My application is basically a Spring boot web-application which accepts JSON payloads and then pushes them to a Kafka topic. I also use Spring Cloud Stream Kafka within it to create and use a Producer. My project code is attached herewith. -- This message was sent by Atlassian Jira (v8.3.4#803005)