[jira] [Comment Edited] (KAFKA-7697) Possible deadlock in kafka.cluster.Partition

2019-05-04 Thread little brother ma (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7697?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16808327#comment-16808327
 ] 

little brother ma edited comment on KAFKA-7697 at 5/5/19 1:49 AM:
--

we also hit the same issue with 2.1.1 !

Get the metric "kafka.network:type=RequestChannel,name=RequestQueueSize" value 
is always 1000, and we config queued.max.requests=1000

 

kafka-network-thread-5-ListenerName(PLAINTEXT)-PLAINTEXT-4" #97 prio=5 
os_prio=0 tid=0x7fb7ce0ba800 nid=0x2d5 waiting on condition 
[0x7fad6e5f8000]
 java.lang.Thread.State: WAITING (parking)
 at sun.misc.Unsafe.park(Native Method)
 - parking to wait for <0x0004530783a0> (a 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
 at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
 at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
 at java.util.concurrent.ArrayBlockingQueue.put(ArrayBlockingQueue.java:353)
 at kafka.network.RequestChannel.sendRequest(RequestChannel.scala:310)
 at 
kafka.network.Processor.$anonfun$processCompletedReceives$1(SocketServer.scala:709)
 at 
kafka.network.Processor.$anonfun$processCompletedReceives$1$adapted(SocketServer.scala:699)
 at kafka.network.Processor$$Lambda$877/855310793.apply(Unknown Source)
 at scala.collection.Iterator.foreach(Iterator.scala:937)
 at scala.collection.Iterator.foreach$(Iterator.scala:937)
 at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
 at scala.collection.IterableLike.foreach(IterableLike.scala:70)
 at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
 at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
 at kafka.network.Processor.processCompletedReceives(SocketServer.scala:699)
 at kafka.network.Processor.run(SocketServer.scala:595)
 at java.lang.Thread.run(Thread.java:748)

Locked ownable synchronizers:
 - None

 

"kafka-request-handler-15" #87 daemon prio=5 os_prio=0 tid=0x7fb7ceee6800 
nid=0x2cb waiting on condition [0x7fad71af4000]
 java.lang.Thread.State: WAITING (parking)
 at sun.misc.Unsafe.park(Native Method)
 - parking to wait for <0x0004540423f0> (a 
java.util.concurrent.locks.ReentrantReadWriteLock$NonfairSync)
 at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
 at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
 at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireShared(AbstractQueuedSynchronizer.java:967)
 at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireShared(AbstractQueuedSynchronizer.java:1283)
 at 
java.util.concurrent.locks.ReentrantReadWriteLock$ReadLock.lock(ReentrantReadWriteLock.java:727)
 at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:249)
 at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:257)
 at kafka.cluster.Partition.appendRecordsToLeader(Partition.scala:729)
 at 
kafka.server.ReplicaManager.$anonfun$appendToLocalLog$2(ReplicaManager.scala:735)
 at kafka.server.ReplicaManager$$Lambda$1567/915411568.apply(Unknown Source)
 at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
 at scala.collection.TraversableLike$$Lambda$12/811760110.apply(Unknown Source)
 at scala.collection.immutable.Map$Map1.foreach(Map.scala:125)
 at scala.collection.TraversableLike.map(TraversableLike.scala:233)
 at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
 at scala.collection.AbstractTraversable.map(Traversable.scala:104)
 at kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:723)
 at kafka.server.ReplicaManager.appendRecords(ReplicaManager.scala:470)
 at 
kafka.coordinator.group.GroupMetadataManager.appendForGroup(GroupMetadataManager.scala:280)
 at 
kafka.coordinator.group.GroupMetadataManager.storeOffsets(GroupMetadataManager.scala:423)
 at 
kafka.coordinator.group.GroupCoordinator.$anonfun$doCommitOffsets$1(GroupCoordinator.scala:518)
 at 
kafka.coordinator.group.GroupCoordinator$$Lambda$1816/513285617.apply$mcV$sp(Unknown
 Source)
 at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
 at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251)
 at kafka.coordinator.group.GroupMetadata.inLock(GroupMetadata.scala:197)
 at 
kafka.coordinator.group.GroupCoordinator.doCommitOffsets(GroupCoordinator.scala:503)
 at 
kafka.coordinator.group.GroupCoordinator.handleCommitOffsets(GroupCoordinator.scala:482)
 at kafka.server.KafkaApis.handleOffsetCommitRequest(KafkaApis.scala:365)
 at kafka.server.KafkaApis.handle(KafkaApis.scala:114)
 at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:69)
 at java.lang.Thread.run(Thread.java:748)

Locked ownable synchronizers:
 - <0x000794ea4248> (a 
java.util.concurrent.locks.ReentrantLock$NonfairSync)    
 - 
 - 

 

The thread dumps of a broker: [^kafka_jstack.txt]

 


was 

[jira] [Comment Edited] (KAFKA-7697) Possible deadlock in kafka.cluster.Partition

2019-05-04 Thread little brother ma (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7697?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16808327#comment-16808327
 ] 

little brother ma edited comment on KAFKA-7697 at 5/5/19 1:47 AM:
--

we also hit the same issue with 2.1.1 !

Get the metric "kafka.network:type=RequestChannel,name=RequestQueueSize" value 
is always 1000, and we config queued.max.requests=1000

 

kafka-network-thread-5-ListenerName(PLAINTEXT)-PLAINTEXT-4" #97 prio=5 
os_prio=0 tid=0x7fb7ce0ba800 nid=0x2d5 waiting on condition 
[0x7fad6e5f8000]
 java.lang.Thread.State: WAITING (parking)
 at sun.misc.Unsafe.park(Native Method)
 - parking to wait for <0x0004530783a0> (a 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
 at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
 at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
 at java.util.concurrent.ArrayBlockingQueue.put(ArrayBlockingQueue.java:353)
 at kafka.network.RequestChannel.sendRequest(RequestChannel.scala:310)
 at 
kafka.network.Processor.$anonfun$processCompletedReceives$1(SocketServer.scala:709)
 at 
kafka.network.Processor.$anonfun$processCompletedReceives$1$adapted(SocketServer.scala:699)
 at kafka.network.Processor$$Lambda$877/855310793.apply(Unknown Source)
 at scala.collection.Iterator.foreach(Iterator.scala:937)
 at scala.collection.Iterator.foreach$(Iterator.scala:937)
 at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
 at scala.collection.IterableLike.foreach(IterableLike.scala:70)
 at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
 at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
 at kafka.network.Processor.processCompletedReceives(SocketServer.scala:699)
 at kafka.network.Processor.run(SocketServer.scala:595)
 at java.lang.Thread.run(Thread.java:748)

Locked ownable synchronizers:
 - None

 

"kafka-request-handler-15" #87 daemon prio=5 os_prio=0 tid=0x7fb7ceee6800 
nid=0x2cb waiting on condition [0x7fad71af4000]
 java.lang.Thread.State: WAITING (parking)
 at sun.misc.Unsafe.park(Native Method)
 - parking to wait for <0x0004540423f0> (a 
java.util.concurrent.locks.ReentrantReadWriteLock$NonfairSync)
 at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
 at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
 at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireShared(AbstractQueuedSynchronizer.java:967)
 at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireShared(AbstractQueuedSynchronizer.java:1283)
 at 
java.util.concurrent.locks.ReentrantReadWriteLock$ReadLock.lock(ReentrantReadWriteLock.java:727)
 at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:249)
 at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:257)
 at kafka.cluster.Partition.appendRecordsToLeader(Partition.scala:729)
 at 
kafka.server.ReplicaManager.$anonfun$appendToLocalLog$2(ReplicaManager.scala:735)
 at kafka.server.ReplicaManager$$Lambda$1567/915411568.apply(Unknown Source)
 at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
 at scala.collection.TraversableLike$$Lambda$12/811760110.apply(Unknown Source)
 at scala.collection.immutable.Map$Map1.foreach(Map.scala:125)
 at scala.collection.TraversableLike.map(TraversableLike.scala:233)
 at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
 at scala.collection.AbstractTraversable.map(Traversable.scala:104)
 at kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:723)
 at kafka.server.ReplicaManager.appendRecords(ReplicaManager.scala:470)
 at 
kafka.coordinator.group.GroupMetadataManager.appendForGroup(GroupMetadataManager.scala:280)
 at 
kafka.coordinator.group.GroupMetadataManager.storeOffsets(GroupMetadataManager.scala:423)
 at 
kafka.coordinator.group.GroupCoordinator.$anonfun$doCommitOffsets$1(GroupCoordinator.scala:518)
 at 
kafka.coordinator.group.GroupCoordinator$$Lambda$1816/513285617.apply$mcV$sp(Unknown
 Source)
 at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
 at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251)
 at kafka.coordinator.group.GroupMetadata.inLock(GroupMetadata.scala:197)
 at 
kafka.coordinator.group.GroupCoordinator.doCommitOffsets(GroupCoordinator.scala:503)
 at 
kafka.coordinator.group.GroupCoordinator.handleCommitOffsets(GroupCoordinator.scala:482)
 at kafka.server.KafkaApis.handleOffsetCommitRequest(KafkaApis.scala:365)
 at kafka.server.KafkaApis.handle(KafkaApis.scala:114)
 at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:69)
 at java.lang.Thread.run(Thread.java:748)

Locked ownable synchronizers:
 - <0x000794ea4248> (a 
java.util.concurrent.locks.ReentrantLock$NonfairSync)    
 - 
 - 
 - [^kafka_jstack.txt]

 


was (Author: little brother ma):
we 

[jira] [Commented] (KAFKA-7697) Possible deadlock in kafka.cluster.Partition

2019-04-02 Thread little brother ma (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7697?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16808327#comment-16808327
 ] 

little brother ma commented on KAFKA-7697:
--

we also hit the same issue with 2.1.1 !

Get the metric "kafka.network:type=RequestChannel,name=RequestQueueSize" value 
is always 1000, and we config queued.max.requests=1000

 

kafka-network-thread-5-ListenerName(PLAINTEXT)-PLAINTEXT-4" #97 prio=5 
os_prio=0 tid=0x7fb7ce0ba800 nid=0x2d5 waiting on condition 
[0x7fad6e5f8000]
 java.lang.Thread.State: WAITING (parking)
 at sun.misc.Unsafe.park(Native Method)
 - parking to wait for <0x0004530783a0> (a 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
 at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
 at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
 at java.util.concurrent.ArrayBlockingQueue.put(ArrayBlockingQueue.java:353)
 at kafka.network.RequestChannel.sendRequest(RequestChannel.scala:310)
 at 
kafka.network.Processor.$anonfun$processCompletedReceives$1(SocketServer.scala:709)
 at 
kafka.network.Processor.$anonfun$processCompletedReceives$1$adapted(SocketServer.scala:699)
 at kafka.network.Processor$$Lambda$877/855310793.apply(Unknown Source)
 at scala.collection.Iterator.foreach(Iterator.scala:937)
 at scala.collection.Iterator.foreach$(Iterator.scala:937)
 at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
 at scala.collection.IterableLike.foreach(IterableLike.scala:70)
 at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
 at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
 at kafka.network.Processor.processCompletedReceives(SocketServer.scala:699)
 at kafka.network.Processor.run(SocketServer.scala:595)
 at java.lang.Thread.run(Thread.java:748)

Locked ownable synchronizers:
 - None

 

"kafka-request-handler-15" #87 daemon prio=5 os_prio=0 tid=0x7fb7ceee6800 
nid=0x2cb waiting on condition [0x7fad71af4000]
 java.lang.Thread.State: WAITING (parking)
 at sun.misc.Unsafe.park(Native Method)
 - parking to wait for <0x0004540423f0> (a 
java.util.concurrent.locks.ReentrantReadWriteLock$NonfairSync)
 at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
 at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
 at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireShared(AbstractQueuedSynchronizer.java:967)
 at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireShared(AbstractQueuedSynchronizer.java:1283)
 at 
java.util.concurrent.locks.ReentrantReadWriteLock$ReadLock.lock(ReentrantReadWriteLock.java:727)
 at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:249)
 at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:257)
 at kafka.cluster.Partition.appendRecordsToLeader(Partition.scala:729)
 at 
kafka.server.ReplicaManager.$anonfun$appendToLocalLog$2(ReplicaManager.scala:735)
 at kafka.server.ReplicaManager$$Lambda$1567/915411568.apply(Unknown Source)
 at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
 at scala.collection.TraversableLike$$Lambda$12/811760110.apply(Unknown Source)
 at scala.collection.immutable.Map$Map1.foreach(Map.scala:125)
 at scala.collection.TraversableLike.map(TraversableLike.scala:233)
 at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
 at scala.collection.AbstractTraversable.map(Traversable.scala:104)
 at kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:723)
 at kafka.server.ReplicaManager.appendRecords(ReplicaManager.scala:470)
 at 
kafka.coordinator.group.GroupMetadataManager.appendForGroup(GroupMetadataManager.scala:280)
 at 
kafka.coordinator.group.GroupMetadataManager.storeOffsets(GroupMetadataManager.scala:423)
 at 
kafka.coordinator.group.GroupCoordinator.$anonfun$doCommitOffsets$1(GroupCoordinator.scala:518)
 at 
kafka.coordinator.group.GroupCoordinator$$Lambda$1816/513285617.apply$mcV$sp(Unknown
 Source)
 at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
 at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251)
 at kafka.coordinator.group.GroupMetadata.inLock(GroupMetadata.scala:197)
 at 
kafka.coordinator.group.GroupCoordinator.doCommitOffsets(GroupCoordinator.scala:503)
 at 
kafka.coordinator.group.GroupCoordinator.handleCommitOffsets(GroupCoordinator.scala:482)
 at kafka.server.KafkaApis.handleOffsetCommitRequest(KafkaApis.scala:365)
 at kafka.server.KafkaApis.handle(KafkaApis.scala:114)
 at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:69)
 at java.lang.Thread.run(Thread.java:748)

Locked ownable synchronizers:
 - <0x000794ea4248> (a java.util.concurrent.locks.ReentrantLock$NonfairSync)

 

> Possible deadlock in kafka.cluster.Partition
> 
>
> Key: 

[jira] [Updated] (KAFKA-7238) Producer can't update Metadata when two brokers break down and restart the first one

2018-08-03 Thread little brother ma (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7238?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

little brother ma updated KAFKA-7238:
-
External issue URL: https://issues.apache.org/jira/browse/KAFKA-1843

> Producer  can't update Metadata  when two brokers break down and restart the 
> first one  
> 
>
> Key: KAFKA-7238
> URL: https://issues.apache.org/jira/browse/KAFKA-7238
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.11.0.1, 0.11.0.3
> Environment: os: 14.04.1-Ubuntu
> java: 1.8.0_151-b12
>Reporter: little brother ma
>Priority: Major
>
> There is a kafka cluster with two brokers( broker ids are 70 and 67),I create 
> a topic named topic2 with 2 partitions and 1 replica, and partition 0  is on 
> the broker 70,and partition 1  is on the broker 67.
>  While sending datas, I stop broker 70 first, and wait 5 ms, stop broker 67 . 
> And then ,I restart  broker 70,  the  producer client can't update metadata  
> and don't  switch to connet broker 70,otherwise, it always try to connect 
> broker 67.
> The log before stop the broker:
> send
> The message size: 12, key: null, partition:0, offset:29
> [2018-08-02 19:59:10,180] DEBUG Sending metadata request 
> (type=MetadataRequest, topics=topic2) to node 10.93.132.67:9092 (id: 67 rack: 
> null) (org.apache.kafka.clients.NetworkClient)
> [2018-08-02 19:59:10,184] DEBUG Updated cluster metadata version 11 to 
> Cluster(id = 3fL1MXQtRo6Ujmxa9dS3eA, nodes = [10.93.132.70:9092 (id: 70 rack: 
> null), 10.93.132.67:9092 (id: 67 rack: null)], partitions = [Partition(topic 
> = topic2, partition = 1, leader = 70, replicas = [70], isr = [70]), 
> Partition(topic = topic2, partition = 0, leader = 67, replicas = [67], isr = 
> [67])]) (org.apache.kafka.clients.Metadata)
> send
> The message size: 12, key: null, partition:1, offset:69
> send
> The message size: 12, key: null, partition:0, offset:30
> send
> The message size: 12, key: null, partition:1, offset:70
> send
> The message size: 12, key: null, partition:0, offset:31
> send
> The message size: 12, key: null, partition:1, offset:71
> send
> The message size: 12, key: null, partition:0, offset:32
>  
>  
> The log after stop the broker 70  :
> [2018-08-02 20:02:50,049] DEBUG Node 70 disconnected. 
> (org.apache.kafka.clients.NetworkClient)
> [2018-08-02 20:02:50,049] WARN Connection to node 70 could not be 
> established. Broker may not be available. 
> (org.apache.kafka.clients.NetworkClient)
> [2018-08-02 20:02:50,049] DEBUG Give up sending metadata request since no 
> node is available (org.apache.kafka.clients.NetworkClient)
> [2018-08-02 20:02:50,100] DEBUG Give up sending metadata request since no 
> node is available (org.apache.kafka.clients.NetworkClient)
> [2018-08-02 20:02:50,151] DEBUG Initialize connection to node 
> 10.93.132.70:9092 (id: 70 rack: null) for sending metadata request 
> (org.apache.kafka.clients.NetworkClient)
> [2018-08-02 20:02:50,151] DEBUG Initiating connection to node 
> 10.93.132.70:9092 (id: 70 rack: null) (org.apache.kafka.clients.NetworkClient)
> send
> The message size: 12, key: null, partition:0, offset:95
> [2018-08-02 20:02:51,156] DEBUG Connection with /10.93.132.70 disconnected 
> (org.apache.kafka.common.network.Selector)
> java.net.ConnectException: Connection refused: no further information
>  at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
>  at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
>  at 
> org.apache.kafka.common.network.PlaintextTransportLayer.finishConnect(PlaintextTransportLayer.java:50)
>  at 
> org.apache.kafka.common.network.KafkaChannel.finishConnect(KafkaChannel.java:95)
>  at 
> org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:361)
>  at org.apache.kafka.common.network.Selector.poll(Selector.java:326)
>  at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:433)
>  at 
> org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:954)
>  at java.lang.Thread.run(Thread.java:745)
> [2018-08-02 20:02:51,157] DEBUG Node 70 disconnected. 
> (org.apache.kafka.clients.NetworkClient)
> [2018-08-02 20:02:51,157] WARN Connection to node 70 could not be 
> established. Broker may not be available. 
> (org.apache.kafka.clients.NetworkClient)
> [2018-08-02 20:02:51,157] DEBUG Give up sending metadata request since no 
> node is available (org.apache.kafka.clients.NetworkClient)
> [2018-08-02 20:02:51,208] DEBUG Initialize connection to node 
> 10.93.132.70:9092 (id: 70 rack: null) for sending metadata request 
> 

[jira] [Commented] (KAFKA-1843) Metadata fetch/refresh in new producer should handle all node connection states gracefully

2018-08-03 Thread little brother ma (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-1843?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16569030#comment-16569030
 ] 

little brother ma commented on KAFKA-1843:
--

I am still facing this issue in version 0.11.0.1  0.11.0.3

> Metadata fetch/refresh in new producer should handle all node connection 
> states gracefully
> --
>
> Key: KAFKA-1843
> URL: https://issues.apache.org/jira/browse/KAFKA-1843
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, producer 
>Affects Versions: 0.8.2.0
>Reporter: Ewen Cheslack-Postava
>Priority: Major
>  Labels: patch
>
> KAFKA-1642 resolved some issues with the handling of broker connection states 
> to avoid high CPU usage, but made the minimal fix rather than the ideal one. 
> The code for handling the metadata fetch is difficult to get right because it 
> has to handle a lot of possible connectivity states and failure modes across 
> all the known nodes. It also needs to correctly integrate with the 
> surrounding event loop, providing correct poll() timeouts to both avoid busy 
> looping and make sure it wakes up and tries new nodes in the face of both 
> connection and request failures.
> A patch here should address a few issues:
> 1. Make sure connection timeouts, as implemented in KAFKA-1842, are cleanly 
> integrated. This mostly means that when a connecting node is selected to 
> fetch metadata from, that the code notices that and sets the next timeout 
> based on the connection timeout rather than some other backoff.
> 2. Rethink the logic and naming of NetworkClient.leastLoadedNode. That method 
> actually takes into account a) the current connectivity of each node, b) 
> whether the node had a recent connection failure, c) the "load" in terms of 
> in flight requests. It also needs to ensure that different clients don't use 
> the same ordering across multiple calls (which is already addressed in the 
> current code by nodeIndexOffset) and that we always eventually try all nodes 
> in the face of connection failures (which isn't currently handled by 
> leastLoadedNode and probably cannot be without tracking additional state). 
> This method also has to work for new consumer use cases even though it is 
> currently only used by the new producer's metadata fetch. Finally it has to 
> properly handle when other code calls initiateConnect() since the normal path 
> for sending messages also initiates connections.
> We can already say that there is an order of preference given a single call 
> (as follows), but making this work across multiple calls when some initial 
> choices fail to connect or return metadata *and* connection states may be 
> changing is much more difficult.
>  * Connected, zero in flight requests - the request can be sent immediately
>  * Connecting node - it will hopefully be connected very soon and by 
> definition has no in flight requests
>  * Disconnected - same reasoning as for a connecting node
>  * Connected, > 0 in flight requests - we consider any # of in flight 
> requests as a big enough backlog to delay the request a lot.
> We could use an approach that better accounts for # of in flight requests 
> rather than just turning it into a boolean variable, but that probably 
> introduces much more complexity than it is worth.
> 3. The most difficult case to handle so far has been when leastLoadedNode 
> returns a disconnected node to maybeUpdateMetadata as its best option. 
> Properly handling the two resulting cases (initiateConnect fails immediately 
> vs. taking some time to possibly establish the connection) is tricky.
> 4. Consider optimizing for the failure cases. The most common cases are when 
> you already have an active connection and can immediately get the metadata or 
> you need to establish a connection, but the connection and metadata 
> request/response happen very quickly. These common cases are infrequent 
> enough (default every 5 min) that establishing an extra connection isn't a 
> big deal as long as it's eventually cleaned up. The edge cases, like network 
> partitions where some subset of nodes become unreachable for a long period, 
> are harder to reason about but we should be sure we will always be able to 
> gracefully recover from them.
> KAFKA-1642 enumerated the possible outcomes of a single call to 
> maybeUpdateMetadata. A good fix for this would consider all of those outcomes 
> for repeated calls to 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-7238) Producer can't update Metadata when two brokers break down and restart the first one

2018-08-03 Thread little brother ma (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7238?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16567988#comment-16567988
 ] 

little brother ma edited comment on KAFKA-7238 at 8/3/18 9:14 AM:
--

{{I have also tested a cluster with 3 brokers :}}

{{stop all brokers in order, first broker 1,and then broker 2,last broker 3, 
wait for about 5 ms , restart broker 1 .}}

 

{{The result is : producer client always try to get metadata from broker 3. }}

{{It calls }}{{_leastLoadedNode(long now)._ It get node isString from 
this.metadata.fetch().nodes(). But now, the nodes just contains a Node, the 
broker 3, which is disconnected.}}

 

{{I have restarted broker 1, why doesn't it connect to broker 1?}}

 

{{So , the following code can not work fine when all broker stopped !}}{{   }}

{{_List nodes = this.metadataUpdater.fetchNodes();_ }}

 

 

 

{{ __ }}


was (Author: little brother ma):
{{I have also tested a cluster with 3 brokers :}}

{{stop all brokers in order, first broker 1,and then broker 2,last broker 3, 
wait for about 5 ms , restart broker 1 .}}

 

{{The result is : producer client always try to get metadata from broker 3. }}

{{It calls }}{{_leastLoadedNode(long now)._ It get node isString from 
this.metadata.fetch().nodes(). But now, the nodes just contains a Node, the 
broker 3, which is disconnected.}}

 

{{I have restarted broker 1, why doesn't it connect to broker 1?}}

 

{{So , the following code can not work fine when all broker stopped !}}{{   }}

{{_List nodes = this.metadataUpdater.fetchNodes();_ }}

 

 

 

{{ __ }}

> Producer  can't update Metadata  when two brokers break down and restart the 
> first one  
> 
>
> Key: KAFKA-7238
> URL: https://issues.apache.org/jira/browse/KAFKA-7238
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.11.0.1, 0.11.0.3
> Environment: os: 14.04.1-Ubuntu
> java: 1.8.0_151-b12
>Reporter: little brother ma
>Priority: Major
>
> There is a kafka cluster with two brokers( broker ids are 70 and 67),I create 
> a topic named topic2 with 2 partitions and 1 replica, and partition 0  is on 
> the broker 70,and partition 1  is on the broker 67.
>  While sending datas, I stop broker 70 first, and wait 5 ms, stop broker 67 . 
> And then ,I restart  broker 70,  the  producer client can't update metadata  
> and don't  switch to connet broker 70,otherwise, it always try to connect 
> broker 67.
> The log before stop the broker:
> send
> The message size: 12, key: null, partition:0, offset:29
> [2018-08-02 19:59:10,180] DEBUG Sending metadata request 
> (type=MetadataRequest, topics=topic2) to node 10.93.132.67:9092 (id: 67 rack: 
> null) (org.apache.kafka.clients.NetworkClient)
> [2018-08-02 19:59:10,184] DEBUG Updated cluster metadata version 11 to 
> Cluster(id = 3fL1MXQtRo6Ujmxa9dS3eA, nodes = [10.93.132.70:9092 (id: 70 rack: 
> null), 10.93.132.67:9092 (id: 67 rack: null)], partitions = [Partition(topic 
> = topic2, partition = 1, leader = 70, replicas = [70], isr = [70]), 
> Partition(topic = topic2, partition = 0, leader = 67, replicas = [67], isr = 
> [67])]) (org.apache.kafka.clients.Metadata)
> send
> The message size: 12, key: null, partition:1, offset:69
> send
> The message size: 12, key: null, partition:0, offset:30
> send
> The message size: 12, key: null, partition:1, offset:70
> send
> The message size: 12, key: null, partition:0, offset:31
> send
> The message size: 12, key: null, partition:1, offset:71
> send
> The message size: 12, key: null, partition:0, offset:32
>  
>  
> The log after stop the broker 70  :
> [2018-08-02 20:02:50,049] DEBUG Node 70 disconnected. 
> (org.apache.kafka.clients.NetworkClient)
> [2018-08-02 20:02:50,049] WARN Connection to node 70 could not be 
> established. Broker may not be available. 
> (org.apache.kafka.clients.NetworkClient)
> [2018-08-02 20:02:50,049] DEBUG Give up sending metadata request since no 
> node is available (org.apache.kafka.clients.NetworkClient)
> [2018-08-02 20:02:50,100] DEBUG Give up sending metadata request since no 
> node is available (org.apache.kafka.clients.NetworkClient)
> [2018-08-02 20:02:50,151] DEBUG Initialize connection to node 
> 10.93.132.70:9092 (id: 70 rack: null) for sending metadata request 
> (org.apache.kafka.clients.NetworkClient)
> [2018-08-02 20:02:50,151] DEBUG Initiating connection to node 
> 10.93.132.70:9092 (id: 70 rack: null) (org.apache.kafka.clients.NetworkClient)
> send
> The message size: 12, key: null, partition:0, offset:95
> [2018-08-02 20:02:51,156] DEBUG Connection with /10.93.132.70 disconnected 
> 

[jira] [Created] (KAFKA-7238) Producer can't update Metadata when two brokers break down and restart the first one

2018-08-02 Thread little brother ma (JIRA)
little brother ma created KAFKA-7238:


 Summary: Producer  can't update Metadata  when two brokers break 
down and restart the first one  
 Key: KAFKA-7238
 URL: https://issues.apache.org/jira/browse/KAFKA-7238
 Project: Kafka
  Issue Type: Bug
  Components: clients
Affects Versions: 0.11.0.3, 0.11.0.1
 Environment: os: 14.04.1-Ubuntu
java: 1.8.0_151-b12
Reporter: little brother ma


There is a kafka cluster with two brokers( broker ids are 70 and 67),I create a 
topic named topic2 with 2 partitions and 1 replica, and partition 0  is on the 
broker 70,and partition 1  is on the broker 67.

 While sending datas, I stop broker 70 first, and wait 5 ms, stop broker 67 . 
And then ,I restart  broker 70,  the  producer client can't update metadata  
and don't  switch to connet broker 70,otherwise, it always try to connect 
broker 67.

The log before stop the broker:

send
The message size: 12, key: null, partition:0, offset:29
[2018-08-02 19:59:10,180] DEBUG Sending metadata request (type=MetadataRequest, 
topics=topic2) to node 10.93.132.67:9092 (id: 67 rack: null) 
(org.apache.kafka.clients.NetworkClient)
[2018-08-02 19:59:10,184] DEBUG Updated cluster metadata version 11 to 
Cluster(id = 3fL1MXQtRo6Ujmxa9dS3eA, nodes = [10.93.132.70:9092 (id: 70 rack: 
null), 10.93.132.67:9092 (id: 67 rack: null)], partitions = [Partition(topic = 
topic2, partition = 1, leader = 70, replicas = [70], isr = [70]), 
Partition(topic = topic2, partition = 0, leader = 67, replicas = [67], isr = 
[67])]) (org.apache.kafka.clients.Metadata)
send
The message size: 12, key: null, partition:1, offset:69
send
The message size: 12, key: null, partition:0, offset:30
send
The message size: 12, key: null, partition:1, offset:70
send
The message size: 12, key: null, partition:0, offset:31
send
The message size: 12, key: null, partition:1, offset:71
send
The message size: 12, key: null, partition:0, offset:32

 

 

The log after stop the broker 70  :

[2018-08-02 20:02:50,049] DEBUG Node 70 disconnected. 
(org.apache.kafka.clients.NetworkClient)
[2018-08-02 20:02:50,049] WARN Connection to node 70 could not be established. 
Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2018-08-02 20:02:50,049] DEBUG Give up sending metadata request since no node 
is available (org.apache.kafka.clients.NetworkClient)
[2018-08-02 20:02:50,100] DEBUG Give up sending metadata request since no node 
is available (org.apache.kafka.clients.NetworkClient)
[2018-08-02 20:02:50,151] DEBUG Initialize connection to node 10.93.132.70:9092 
(id: 70 rack: null) for sending metadata request 
(org.apache.kafka.clients.NetworkClient)
[2018-08-02 20:02:50,151] DEBUG Initiating connection to node 10.93.132.70:9092 
(id: 70 rack: null) (org.apache.kafka.clients.NetworkClient)
send
The message size: 12, key: null, partition:0, offset:95
[2018-08-02 20:02:51,156] DEBUG Connection with /10.93.132.70 disconnected 
(org.apache.kafka.common.network.Selector)
java.net.ConnectException: Connection refused: no further information
 at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
 at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
 at 
org.apache.kafka.common.network.PlaintextTransportLayer.finishConnect(PlaintextTransportLayer.java:50)
 at 
org.apache.kafka.common.network.KafkaChannel.finishConnect(KafkaChannel.java:95)
 at 
org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:361)
 at org.apache.kafka.common.network.Selector.poll(Selector.java:326)
 at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:433)
 at 
org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:954)
 at java.lang.Thread.run(Thread.java:745)
[2018-08-02 20:02:51,157] DEBUG Node 70 disconnected. 
(org.apache.kafka.clients.NetworkClient)
[2018-08-02 20:02:51,157] WARN Connection to node 70 could not be established. 
Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2018-08-02 20:02:51,157] DEBUG Give up sending metadata request since no node 
is available (org.apache.kafka.clients.NetworkClient)
[2018-08-02 20:02:51,208] DEBUG Initialize connection to node 10.93.132.70:9092 
(id: 70 rack: null) for sending metadata request 
(org.apache.kafka.clients.NetworkClient)
[2018-08-02 20:02:51,208] DEBUG Initiating connection to node 10.93.132.70:9092 
(id: 70 rack: null) (org.apache.kafka.clients.NetworkClient)
[2018-08-02 20:02:52,216] DEBUG Connection with /10.93.132.70 disconnected 
(org.apache.kafka.common.network.Selector)
java.net.ConnectException: Connection refused: no further information
 at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
 at