[jira] [Created] (KAFKA-12397) Error:KeeperErrorCode = BadVersion for /brokers/topics
Sachin created KAFKA-12397: -- Summary: Error:KeeperErrorCode = BadVersion for /brokers/topics Key: KAFKA-12397 URL: https://issues.apache.org/jira/browse/KAFKA-12397 Project: Kafka Issue Type: Bug Reporter: Sachin We are running Zookeeper and kafka on DCOS. Currently observing below errors in zookeeper logs: NFO [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxn@892] - Processing mntr command from /100.77.120.5:47676 2021-03-01 06:48:28,223 [myid:2] - INFO [Thread-7313:NIOServerCnxn@1040] - Closed socket connection for client /100.77.120.5:47676 (no session established for client) 2021-03-01 06:48:31,745 [myid:2] - INFO [ProcessThread(sid:2 cport:-1)::PrepRequestProcessor@653] - Got user-level KeeperException when processing sessionid:0x208f3e3caa5 type:setData cxid:0x11772f zxid:0x571be5 txntype:-1 reqpath:n/a Error Path:/brokers/topics/connect-offsets/partitions/8/state Error:KeeperErrorCode = BadVersion for /brokers/topics/connect-offsets/partitions/8/state 2021-03-01 06:48:31,756 [myid:2] - INFO [ProcessThread(sid:2 cport:-1)::PrepRequestProcessor@653] - Got user-level KeeperException when processing sessionid:0x208f3e3caa5 type:setData cxid:0x117731 zxid:0x571be6 txntype:-1 reqpath:n/a Error Path:/brokers/topics/connect-offsets/partitions/20/state Error:KeeperErrorCode = BadVersion for /brokers/topics/connect-offsets/partitions/20/state 2 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-7280) ConcurrentModificationException in FetchSessionHandler in heartbeat thread
[ https://issues.apache.org/jira/browse/KAFKA-7280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16675053#comment-16675053 ] sachin commented on KAFKA-7280: --- Hi, I am getting this error also with Kafka 1.1.0. Is there any workaround that i can use till the fix is available in a public build? > ConcurrentModificationException in FetchSessionHandler in heartbeat thread > -- > > Key: KAFKA-7280 > URL: https://issues.apache.org/jira/browse/KAFKA-7280 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 1.1.1, 2.0.0 >Reporter: Rajini Sivaram >Assignee: Rajini Sivaram >Priority: Critical > Fix For: 1.1.2, 2.0.1, 2.1.0 > > > Request/response handling in FetchSessionHandler is not thread-safe. But we > are using it in Kafka consumer without any synchronization even though poll() > from heartbeat thread can process responses. Heartbeat thread holds the > coordinator lock while processing its poll and responses, making other > operations involving the group coordinator safe. We also need to lock > FetchSessionHandler for the operations that update or read > FetchSessionHandler#sessionPartitions. > This exception is from a system test run on trunk of > TestSecurityRollingUpgrade.test_rolling_upgrade_sasl_mechanism_phase_two: > {quote}[2018-08-12 06:13:22,316] ERROR [Consumer clientId=console-consumer, > groupId=group] Heartbeat thread failed due to unexpected error > (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) > java.util.ConcurrentModificationException > at > java.util.LinkedHashMap$LinkedHashIterator.nextNode(LinkedHashMap.java:719) > at java.util.LinkedHashMap$LinkedKeyIterator.next(LinkedHashMap.java:742) > at > org.apache.kafka.clients.FetchSessionHandler.responseDataToLogString(FetchSessionHandler.java:362) > at > org.apache.kafka.clients.FetchSessionHandler.handleResponse(FetchSessionHandler.java:424) > at > org.apache.kafka.clients.consumer.internals.Fetcher$1.onSuccess(Fetcher.java:216) > at > org.apache.kafka.clients.consumer.internals.Fetcher$1.onSuccess(Fetcher.java:206) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:575) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:389) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:297) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.pollNoWakeup(ConsumerNetworkClient.java:304) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatThread.run(AbstractCoordinator.java:996) > {quote} > > The logs just prior to the exception show that a partition was removed from > the session: > {quote}[2018-08-12 06:13:22,315] TRACE [Consumer clientId=console-consumer, > groupId=group] Skipping fetch for partition test_topic-1 because there is an > in-flight request to worker4:9095 (id: 3 rack: null) > (org.apache.kafka.clients.consumer.internals.Fetcher) > [2018-08-12 06:13:22,316] TRACE [Consumer clientId=console-consumer, > groupId=group] Completed receive from node 2 for FETCH with correlation id > 417, received > {throttle_time_ms=0,error_code=0,session_id=109800960,responses=[{topic=test_topic,partition_responses=[{partition_header= > Unknown macro: > \{partition=2,error_code=0,high_watermark=184,last_stable_offset=-1,log_start_offset=0,aborted_transactions=null} > ,record_set=[(record=DefaultRecord(offset=183, timestamp=1534054402327, key=0 > bytes, value=3 bytes))]}]}]} (org.apache.kafka.clients.NetworkClient) > [2018-08-12 06:13:22,316] DEBUG [Consumer clientId=console-consumer, > groupId=group] Added READ_UNCOMMITTED fetch request for partition > test_topic-0 at offset 189 to node worker3:9095 (id: 2 rack: null) > (org.apache.kafka.clients.consumer.internals.Fetcher) > [2018-08-12 06:13:22,316] DEBUG [Consumer clientId=console-consumer, > groupId=group] Built incremental fetch (sessionId=109800960, epoch=237) for > node 2. Added (), altered (), removed (test_topic-2) out of (test_topic-0) > (org.apache.kafka.clients.FetchSessionHandler) > [2018-08-12 06:13:22,316] DEBUG [Consumer clientId=console-consumer, > groupId=group] Sending READ_UNCOMMITTED IncrementalFetchRequest(toSend=(), > toForget=(test_topic-2), implied=(test_topic-0)) to broker worker3:9095 (id: > 2 rack: null) (org.apache.kafka.clients.consumer.internals.Fetc
[jira] [Comment Edited] (KAFKA-7280) ConcurrentModificationException in FetchSessionHandler in heartbeat thread
[ https://issues.apache.org/jira/browse/KAFKA-7280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16675053#comment-16675053 ] sachin edited comment on KAFKA-7280 at 11/5/18 4:50 PM: Hi [~rsivaram] I am getting this error also with Kafka 1.1.0. Is there any workaround that i can use till the fix is available in a public build? was (Author: sachinu): Hi, I am getting this error also with Kafka 1.1.0. Is there any workaround that i can use till the fix is available in a public build? > ConcurrentModificationException in FetchSessionHandler in heartbeat thread > -- > > Key: KAFKA-7280 > URL: https://issues.apache.org/jira/browse/KAFKA-7280 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 1.1.1, 2.0.0 >Reporter: Rajini Sivaram >Assignee: Rajini Sivaram >Priority: Critical > Fix For: 1.1.2, 2.0.1, 2.1.0 > > > Request/response handling in FetchSessionHandler is not thread-safe. But we > are using it in Kafka consumer without any synchronization even though poll() > from heartbeat thread can process responses. Heartbeat thread holds the > coordinator lock while processing its poll and responses, making other > operations involving the group coordinator safe. We also need to lock > FetchSessionHandler for the operations that update or read > FetchSessionHandler#sessionPartitions. > This exception is from a system test run on trunk of > TestSecurityRollingUpgrade.test_rolling_upgrade_sasl_mechanism_phase_two: > {quote}[2018-08-12 06:13:22,316] ERROR [Consumer clientId=console-consumer, > groupId=group] Heartbeat thread failed due to unexpected error > (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) > java.util.ConcurrentModificationException > at > java.util.LinkedHashMap$LinkedHashIterator.nextNode(LinkedHashMap.java:719) > at java.util.LinkedHashMap$LinkedKeyIterator.next(LinkedHashMap.java:742) > at > org.apache.kafka.clients.FetchSessionHandler.responseDataToLogString(FetchSessionHandler.java:362) > at > org.apache.kafka.clients.FetchSessionHandler.handleResponse(FetchSessionHandler.java:424) > at > org.apache.kafka.clients.consumer.internals.Fetcher$1.onSuccess(Fetcher.java:216) > at > org.apache.kafka.clients.consumer.internals.Fetcher$1.onSuccess(Fetcher.java:206) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:575) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:389) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:297) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.pollNoWakeup(ConsumerNetworkClient.java:304) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatThread.run(AbstractCoordinator.java:996) > {quote} > > The logs just prior to the exception show that a partition was removed from > the session: > {quote}[2018-08-12 06:13:22,315] TRACE [Consumer clientId=console-consumer, > groupId=group] Skipping fetch for partition test_topic-1 because there is an > in-flight request to worker4:9095 (id: 3 rack: null) > (org.apache.kafka.clients.consumer.internals.Fetcher) > [2018-08-12 06:13:22,316] TRACE [Consumer clientId=console-consumer, > groupId=group] Completed receive from node 2 for FETCH with correlation id > 417, received > {throttle_time_ms=0,error_code=0,session_id=109800960,responses=[{topic=test_topic,partition_responses=[{partition_header= > Unknown macro: > \{partition=2,error_code=0,high_watermark=184,last_stable_offset=-1,log_start_offset=0,aborted_transactions=null} > ,record_set=[(record=DefaultRecord(offset=183, timestamp=1534054402327, key=0 > bytes, value=3 bytes))]}]}]} (org.apache.kafka.clients.NetworkClient) > [2018-08-12 06:13:22,316] DEBUG [Consumer clientId=console-consumer, > groupId=group] Added READ_UNCOMMITTED fetch request for partition > test_topic-0 at offset 189 to node worker3:9095 (id: 2 rack: null) > (org.apache.kafka.clients.consumer.internals.Fetcher) > [2018-08-12 06:13:22,316] DEBUG [Consumer clientId=console-consumer, > groupId=group] Built incremental fetch (sessionId=109800960, epoch=237) for > node 2. Added (), altered (), removed (test_topic-2) out of (test_topic-0) > (org.apache.kafka.clients.FetchSessionHandler) > [2018-08-12 06:13:22,316] DEBUG [Consumer clientId=console-consumer
[jira] [Commented] (KAFKA-7280) ConcurrentModificationException in FetchSessionHandler in heartbeat thread
[ https://issues.apache.org/jira/browse/KAFKA-7280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16681132#comment-16681132 ] sachin commented on KAFKA-7280: --- {code:java} {code} [~rsivaram] * I am on "1.1.0" version and following is the stack trace. Is this fatal error? Or upon subsequent poll() or commitSync(), will heartbeat thread restart?: {code:java} 2018-11-08 16:53:57,456 [kafka-coordinator-heartbeat-thread | ConsumerNginxRawCW] ERROR org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=ConsumerNginxRawCW] Heartbeat thread failed due to unexpected error java.util.ConcurrentModificationException at java.util.LinkedHashMap$LinkedHashIterator.nextNode(LinkedHashMap.java:711) at java.util.LinkedHashMap$LinkedKeyIterator.next(LinkedHashMap.java:734) at org.apache.kafka.clients.FetchSessionHandler.responseDataToLogString(FetchSessionHandler.java:362) at org.apache.kafka.clients.FetchSessionHandler.handleResponse(FetchSessionHandler.java:423) at org.apache.kafka.clients.consumer.internals.Fetcher$1.onSuccess(Fetcher.java:212) at org.apache.kafka.clients.consumer.internals.Fetcher$1.onSuccess(Fetcher.java:202) at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167) at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:563) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:390) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:293) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.pollNoWakeup(ConsumerNetworkClient.java:300) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatThread.run(AbstractCoordinator.java:948) {code} * I have not enabled TRACE level logging. Following is the log4j2 config: * {code:java} %d [%t] %-5level %logger{36} - %msg%n {code} * Can any workaround be done to avoid this problem? Like listen for above exception from heartbeat thread and wakeup the thread doing the actual poll/commit then spawn another thread that reinitiates eveything including subscribe(...) etc? > ConcurrentModificationException in FetchSessionHandler in heartbeat thread > -- > > Key: KAFKA-7280 > URL: https://issues.apache.org/jira/browse/KAFKA-7280 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 1.1.1, 2.0.0 >Reporter: Rajini Sivaram >Assignee: Rajini Sivaram >Priority: Critical > Fix For: 1.1.2, 2.0.1, 2.1.0 > > > Request/response handling in FetchSessionHandler is not thread-safe. But we > are using it in Kafka consumer without any synchronization even though poll() > from heartbeat thread can process responses. Heartbeat thread holds the > coordinator lock while processing its poll and responses, making other > operations involving the group coordinator safe. We also need to lock > FetchSessionHandler for the operations that update or read > FetchSessionHandler#sessionPartitions. > This exception is from a system test run on trunk of > TestSecurityRollingUpgrade.test_rolling_upgrade_sasl_mechanism_phase_two: > {quote}[2018-08-12 06:13:22,316] ERROR [Consumer clientId=console-consumer, > groupId=group] Heartbeat thread failed due to unexpected error > (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) > java.util.ConcurrentModificationException > at > java.util.LinkedHashMap$LinkedHashIterator.nextNode(LinkedHashMap.java:719) > at java.util.LinkedHashMap$LinkedKeyIterator.next(LinkedHashMap.java:742) > at > org.apache.kafka.clients.FetchSessionHandler.responseDataToLogString(FetchSessionHandler.java:362) > at > org.apache.kafka.clients.FetchSessionHandler.handleResponse(FetchSessionHandler.java:424) > at > org.apache.kafka.clients.consumer.internals.Fetcher$1.onSuccess(Fetcher.java:216) > at > org.apache.kafka.clients.consumer.internals.Fetcher$1.onSuccess(Fetcher.java:206) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:575) > at > org.apache.kaf
[jira] [Commented] (KAFKA-7280) ConcurrentModificationException in FetchSessionHandler in heartbeat thread
[ https://issues.apache.org/jira/browse/KAFKA-7280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16683407#comment-16683407 ] sachin commented on KAFKA-7280: --- [~ijuma] I checked at [https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients] The latest is 2.0.0. Are you referring to another maven repo? [~rsivaram] Thanks for the valuable inputs. 1) Will client and broker version mismatch be an issue? Can it introduce any performance issue too? 2) In order to go with the discussed solution to work with 1.1.x, the issues i see are that we would be restarting another instance of consumer in the same process after heartbeat thread errs out. It may not be recommended as the heartbeat thread might have corrupted some global resources. Thoughts? Thanks, -Sachin > ConcurrentModificationException in FetchSessionHandler in heartbeat thread > -- > > Key: KAFKA-7280 > URL: https://issues.apache.org/jira/browse/KAFKA-7280 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 1.1.1, 2.0.0 >Reporter: Rajini Sivaram >Assignee: Rajini Sivaram >Priority: Critical > Fix For: 1.1.2, 2.0.1, 2.1.0 > > > Request/response handling in FetchSessionHandler is not thread-safe. But we > are using it in Kafka consumer without any synchronization even though poll() > from heartbeat thread can process responses. Heartbeat thread holds the > coordinator lock while processing its poll and responses, making other > operations involving the group coordinator safe. We also need to lock > FetchSessionHandler for the operations that update or read > FetchSessionHandler#sessionPartitions. > This exception is from a system test run on trunk of > TestSecurityRollingUpgrade.test_rolling_upgrade_sasl_mechanism_phase_two: > {quote}[2018-08-12 06:13:22,316] ERROR [Consumer clientId=console-consumer, > groupId=group] Heartbeat thread failed due to unexpected error > (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) > java.util.ConcurrentModificationException > at > java.util.LinkedHashMap$LinkedHashIterator.nextNode(LinkedHashMap.java:719) > at java.util.LinkedHashMap$LinkedKeyIterator.next(LinkedHashMap.java:742) > at > org.apache.kafka.clients.FetchSessionHandler.responseDataToLogString(FetchSessionHandler.java:362) > at > org.apache.kafka.clients.FetchSessionHandler.handleResponse(FetchSessionHandler.java:424) > at > org.apache.kafka.clients.consumer.internals.Fetcher$1.onSuccess(Fetcher.java:216) > at > org.apache.kafka.clients.consumer.internals.Fetcher$1.onSuccess(Fetcher.java:206) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:575) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:389) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:297) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.pollNoWakeup(ConsumerNetworkClient.java:304) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatThread.run(AbstractCoordinator.java:996) > {quote} > > The logs just prior to the exception show that a partition was removed from > the session: > {quote}[2018-08-12 06:13:22,315] TRACE [Consumer clientId=console-consumer, > groupId=group] Skipping fetch for partition test_topic-1 because there is an > in-flight request to worker4:9095 (id: 3 rack: null) > (org.apache.kafka.clients.consumer.internals.Fetcher) > [2018-08-12 06:13:22,316] TRACE [Consumer clientId=console-consumer, > groupId=group] Completed receive from node 2 for FETCH with correlation id > 417, received > {throttle_time_ms=0,error_code=0,session_id=109800960,responses=[{topic=test_topic,partition_responses=[{partition_header= > Unknown macro: > \{partition=2,error_code=0,high_watermark=184,last_stable_offset=-1,log_start_offset=0,aborted_transactions=null} > ,record_set=[(record=DefaultRecord(offset=183, timestamp=1534054402327, key=0 > bytes, value=3 bytes))]}]}]} (org.apache.kafka.clients.NetworkClient) > [2018-08-12 06:13:22,316] DEBUG [Consumer clientId=console-consumer, > groupId=group] Added READ_UNCOMMITTED fetch request for partition > test_topic-0 at offset 189 to node worker3:9095 (id: 2 rack: null) > (org.apache.kafka.clients.consumer.internals.Fetcher) > [2018-08-12 06:13:22,316] DEBUG [Consumer clientId=console-consumer, > groupId=group]