[jira] [Commented] (KAFKA-8508) saslClient failed to initialize properly: it's null.

2020-01-02 Thread Graham Campbell (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8508?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17007003#comment-17007003
 ] 

Graham Campbell commented on KAFKA-8508:


This specific ArrayIndexOutOfBoundsException is actually due to ZOOKEEPER-2323, 
which is fixed in ZK 3.5.2

> saslClient failed to initialize properly: it's null.
> 
>
> Key: KAFKA-8508
> URL: https://issues.apache.org/jira/browse/KAFKA-8508
> Project: Kafka
>  Issue Type: Bug
>  Components: zkclient
>Affects Versions: 2.0.1
>Reporter: Caroline Liu
>Priority: Major
>
> After a network issue caused the last ISR to fail connecting to ZooKeeper, 
> the attempt to reconnect failed with an ArrayIndexOutOfBoundsException. 
> {code:java}
> 2019-05-31 15:54:38,823 [zk-session-expiry-handler0-SendThread(zk2-1:2181)] 
> WARN (org.apache.zookeeper.ClientCnxn) - Client session timed out, have not 
> heard from server in 20010ms for sessionid 0x1511b2b1042a
> 2019-05-31 15:54:38,823 [zk-session-expiry-handler0-SendThread(zk2-1:2181)] 
> INFO (org.apache.zookeeper.ClientCnxn) - Client session timed out, have not 
> heard from server in 20010ms for sessionid 0x1511b2b1042a, closing socket 
> connection and attempting reconnect
> 2019-05-31 15:54:39,702 [zk-session-expiry-handler0-SendThread(zk1-2:2181)] 
> INFO (org.apache.zookeeper.client.ZooKeeperSaslClient) - Client will use 
> DIGEST-MD5 as SASL mechanism.
> 2019-05-31 15:54:39,702 [zk-session-expiry-handler0-SendThread(zk1-2:2181)] 
> ERROR (org.apache.zookeeper.client.ZooKeeperSaslClient) - Exception while 
> trying to create SASL client: java.lang.ArrayIndexOutOfBoundsException: 0
> 2019-05-31 15:54:39,702 [zk-session-expiry-handler0-SendThread(zk1-2:2181)] 
> INFO (org.apache.zookeeper.ClientCnxn) - Opening socket connection to server 
> zk1-2/1.3.6.1:2181. Will attempt to SASL-authenticate using Login Context 
> section 'Client'
> 2019-05-31 15:54:39,702 [zk-session-expiry-handler0-SendThread(zk1-2:2181)] 
> INFO (org.apache.zookeeper.ClientCnxn) - Socket connection established to 
> zk1-2/1.3.6.1:2181, initiating session
> 2019-05-31 15:54:39,703 [zk-session-expiry-handler0-SendThread(zk1-2:2181)] 
> INFO (org.apache.zookeeper.ClientCnxn) - Session establishment complete on 
> server zk1-2/1.3.6.1:2181, sessionid = 0x1511b2b1042a, negotiated timeout 
> = 3
> 2019-05-31 15:54:39,703 [zk-session-expiry-handler0-SendThread(zk1-2:2181)] 
> ERROR (org.apache.zookeeper.ClientCnxn) - SASL authentication with Zookeeper 
> Quorum member failed: javax.security.sasl.SaslException: saslClient failed to 
> initialize properly: it's null.{code}
> Kafka was "not live" in zookeeper and had to be manually restarted to recover 
> from this error. It would be better if the last ISR could retry.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-6529) Broker leaks memory and file descriptors after sudden client disconnects

2020-04-02 Thread Graham Campbell (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-6529?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17074102#comment-17074102
 ] 

Graham Campbell commented on KAFKA-6529:


[~zzccctv] I originally reproduced it in a test environment by running many 
instances of the librdkafka performance test client against a broker to trigger 
the bug. If you're hitting this bug, the open sockets and open file descriptors 
of the broker process should grow over time due to the leaked sockets when 
clients disconnect while their requests are being processed.

_./rdkafka_performance -P -x 100 -b broker:9092 -t test_topic_

> Broker leaks memory and file descriptors after sudden client disconnects
> 
>
> Key: KAFKA-6529
> URL: https://issues.apache.org/jira/browse/KAFKA-6529
> Project: Kafka
>  Issue Type: Bug
>  Components: network
>Affects Versions: 0.11.0.2, 1.0.0
>Reporter: Graham Campbell
>Priority: Major
> Fix For: 0.11.0.3, 1.0.1, 1.1.0
>
>
> If a producer forcefully disconnects from a broker while it has staged 
> receives, that connection enters a limbo state where it is no longer 
> processed by the SocketServer.Processor, leaking the file descriptor for the 
> socket and the memory used for the staged recieve queue for that connection.
> We noticed this during an upgrade from 0.9.0.2 to 0.11.0.2. Immediately after 
> the rolling restart to upgrade, open file descriptors on the brokers started 
> climbing uncontrollably. In a few cases brokers reached our configured max 
> open files limit of 100k and crashed before we rolled back.
> We tracked this down to a buildup of muted connections in the 
> Selector.closingChannels list. If a client disconnects from the broker with 
> multiple pending produce requests, when the broker attempts to send an ack to 
> the client it recieves an IOException because the TCP socket has been closed. 
> This triggers the Selector to close the channel, but because it still has 
> pending requests, it adds it to Selector.closingChannels to process those 
> requests. However, because that exception was triggered by trying to send a 
> response, the SocketServer.Processor has marked the channel as muted and will 
> no longer process it at all.
> *Reproduced by:*
> Starting a Kafka broker/cluster
> Client produces several messages and then disconnects abruptly (eg. 
> _./rdkafka_performance -P -x 100 -b broker:9092 -t test_topic_)
> Broker then leaks file descriptor previously used for TCP socket and memory 
> for unprocessed messages
> *Proposed solution (which we've implemented internally)*
> Whenever an exception is encountered when writing to a socket in 
> Selector.pollSelectionKeys(...) record that that connection failed a send by 
> adding the KafkaChannel ID to Selector.failedSends. Then re-raise the 
> exception to still trigger the socket disconnection logic. Since every 
> exception raised in this function triggers a disconnect, we also treat any 
> exception while writing to the socket as a failed send.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9021) Broker shutdown during SSL handshake may be handled as handshake failure (Regression)

2019-10-10 Thread Graham Campbell (Jira)
Graham Campbell created KAFKA-9021:
--

 Summary: Broker shutdown during SSL handshake may be handled as 
handshake failure (Regression)
 Key: KAFKA-9021
 URL: https://issues.apache.org/jira/browse/KAFKA-9021
 Project: Kafka
  Issue Type: Bug
  Components: security
Affects Versions: 2.2.1, 2.1.1, 2.0.1, 2.3.1
Reporter: Graham Campbell


[Java 11 throws an 
SSLProtocolException|http://hg.openjdk.java.net/jdk/jdk11/file/1ddf9a99e4ad/src/java.base/share/classes/sun/security/ssl/Alert.java#l125]
 when a SSL connection is gracefully closed during handshaking instead of an 
SSLException. This breaks the fix for KAFKA-7168 and the client may process the 
resulting SSLException as a non-retriable handshake failure rather than a 
retriable I/O exception.

 
{code:java}
org.apache.kafka.common.errors.SslAuthenticationException: SSL handshake failed
Caused by: javax.net.ssl.SSLProtocolException: Received close_notify during 
handshake
 at java.base/sun.security.ssl.Alert.createSSLException(Alert.java:126)
 at java.base/sun.security.ssl.Alert.createSSLException(Alert.java:117)
 at java.base/sun.security.ssl.TransportContext.fatal(TransportContext.java:308)
 at java.base/sun.security.ssl.TransportContext.fatal(TransportContext.java:264)
 at java.base/sun.security.ssl.TransportContext.fatal(TransportContext.java:255)
 at java.base/sun.security.ssl.Alert$AlertConsumer.consume(Alert.java:244)
 at 
java.base/sun.security.ssl.TransportContext.dispatch(TransportContext.java:181)
 at java.base/sun.security.ssl.SSLTransport.decode(SSLTransport.java:164)
 at java.base/sun.security.ssl.SSLEngineImpl.decode(SSLEngineImpl.java:672)
 at java.base/sun.security.ssl.SSLEngineImpl.readRecord(SSLEngineImpl.java:627)
 at java.base/sun.security.ssl.SSLEngineImpl.unwrap(SSLEngineImpl.java:443)
 at java.base/sun.security.ssl.SSLEngineImpl.unwrap(SSLEngineImpl.java:422)
 at java.base/javax.net.ssl.SSLEngine.unwrap(SSLEngine.java:634)
 at 
org.apache.kafka.common.network.SslTransportLayer.handshakeUnwrap(SslTransportLayer.java:474)
 at 
org.apache.kafka.common.network.SslTransportLayer.doHandshake(SslTransportLayer.java:337)
 at 
org.apache.kafka.common.network.SslTransportLayer.handshake(SslTransportLayer.java:264)
 at org.apache.kafka.common.network.KafkaChannel.prepare(KafkaChannel.java:125)
 at 
org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:489)
 at org.apache.kafka.common.network.Selector.poll(Selector.java:427)
 at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:510)
 at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:271)
 at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:242)
 at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233)
 at 
org.apache.kafka.clients.consumer.KafkaConsumer.position(KafkaConsumer.java:1639)
 at 
org.apache.kafka.clients.consumer.KafkaConsumer.position(KafkaConsumer.java:1593)
{code}
 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9021) Broker shutdown during SSL handshake may be handled as handshake failure (Regression)

2019-10-10 Thread Graham Campbell (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9021?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16949027#comment-16949027
 ] 

Graham Campbell commented on KAFKA-9021:


Issue priority based on previous instance in KAFKA-7168

> Broker shutdown during SSL handshake may be handled as handshake failure 
> (Regression)
> -
>
> Key: KAFKA-9021
> URL: https://issues.apache.org/jira/browse/KAFKA-9021
> Project: Kafka
>  Issue Type: Bug
>  Components: security
>Affects Versions: 2.0.1, 2.1.1, 2.2.1, 2.3.1
>Reporter: Graham Campbell
>Priority: Major
>
> [Java 11 throws an 
> SSLProtocolException|http://hg.openjdk.java.net/jdk/jdk11/file/1ddf9a99e4ad/src/java.base/share/classes/sun/security/ssl/Alert.java#l125]
>  when a SSL connection is gracefully closed during handshaking instead of an 
> SSLException. This breaks the fix for KAFKA-7168 and the client may process 
> the resulting SSLException as a non-retriable handshake failure rather than a 
> retriable I/O exception.
>  
> {code:java}
> org.apache.kafka.common.errors.SslAuthenticationException: SSL handshake 
> failed
> Caused by: javax.net.ssl.SSLProtocolException: Received close_notify during 
> handshake
>  at java.base/sun.security.ssl.Alert.createSSLException(Alert.java:126)
>  at java.base/sun.security.ssl.Alert.createSSLException(Alert.java:117)
>  at 
> java.base/sun.security.ssl.TransportContext.fatal(TransportContext.java:308)
>  at 
> java.base/sun.security.ssl.TransportContext.fatal(TransportContext.java:264)
>  at 
> java.base/sun.security.ssl.TransportContext.fatal(TransportContext.java:255)
>  at java.base/sun.security.ssl.Alert$AlertConsumer.consume(Alert.java:244)
>  at 
> java.base/sun.security.ssl.TransportContext.dispatch(TransportContext.java:181)
>  at java.base/sun.security.ssl.SSLTransport.decode(SSLTransport.java:164)
>  at java.base/sun.security.ssl.SSLEngineImpl.decode(SSLEngineImpl.java:672)
>  at 
> java.base/sun.security.ssl.SSLEngineImpl.readRecord(SSLEngineImpl.java:627)
>  at java.base/sun.security.ssl.SSLEngineImpl.unwrap(SSLEngineImpl.java:443)
>  at java.base/sun.security.ssl.SSLEngineImpl.unwrap(SSLEngineImpl.java:422)
>  at java.base/javax.net.ssl.SSLEngine.unwrap(SSLEngine.java:634)
>  at 
> org.apache.kafka.common.network.SslTransportLayer.handshakeUnwrap(SslTransportLayer.java:474)
>  at 
> org.apache.kafka.common.network.SslTransportLayer.doHandshake(SslTransportLayer.java:337)
>  at 
> org.apache.kafka.common.network.SslTransportLayer.handshake(SslTransportLayer.java:264)
>  at 
> org.apache.kafka.common.network.KafkaChannel.prepare(KafkaChannel.java:125)
>  at 
> org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:489)
>  at org.apache.kafka.common.network.Selector.poll(Selector.java:427)
>  at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:510)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:271)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:242)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.position(KafkaConsumer.java:1639)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.position(KafkaConsumer.java:1593)
> {code}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-8508) saslClient failed to initialize properly: it's null.

2019-11-04 Thread Graham Campbell (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8508?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16967135#comment-16967135
 ] 

Graham Campbell commented on KAFKA-8508:


ArrayOutOfBoundsException is due to mistakenly falling back from Kerberos to 
DIGEST-MD5 mechanism because retrieving the Kerberos ticket took too long, so 
it wasn't available to SecurityUtils.java. Root cause of that was a transient 
network issue, same as KAFKA-7987.

> saslClient failed to initialize properly: it's null.
> 
>
> Key: KAFKA-8508
> URL: https://issues.apache.org/jira/browse/KAFKA-8508
> Project: Kafka
>  Issue Type: Bug
>  Components: zkclient
>Affects Versions: 2.0.1
>Reporter: Caroline Liu
>Priority: Major
>
> After a network issue caused the last ISR to fail connecting to ZooKeeper, 
> the attempt to reconnect failed with an ArrayIndexOutOfBoundsException. 
> {code:java}
> 2019-05-31 15:54:38,823 [zk-session-expiry-handler0-SendThread(zk2-1:2181)] 
> WARN (org.apache.zookeeper.ClientCnxn) - Client session timed out, have not 
> heard from server in 20010ms for sessionid 0x1511b2b1042a
> 2019-05-31 15:54:38,823 [zk-session-expiry-handler0-SendThread(zk2-1:2181)] 
> INFO (org.apache.zookeeper.ClientCnxn) - Client session timed out, have not 
> heard from server in 20010ms for sessionid 0x1511b2b1042a, closing socket 
> connection and attempting reconnect
> 2019-05-31 15:54:39,702 [zk-session-expiry-handler0-SendThread(zk1-2:2181)] 
> INFO (org.apache.zookeeper.client.ZooKeeperSaslClient) - Client will use 
> DIGEST-MD5 as SASL mechanism.
> 2019-05-31 15:54:39,702 [zk-session-expiry-handler0-SendThread(zk1-2:2181)] 
> ERROR (org.apache.zookeeper.client.ZooKeeperSaslClient) - Exception while 
> trying to create SASL client: java.lang.ArrayIndexOutOfBoundsException: 0
> 2019-05-31 15:54:39,702 [zk-session-expiry-handler0-SendThread(zk1-2:2181)] 
> INFO (org.apache.zookeeper.ClientCnxn) - Opening socket connection to server 
> zk1-2/1.3.6.1:2181. Will attempt to SASL-authenticate using Login Context 
> section 'Client'
> 2019-05-31 15:54:39,702 [zk-session-expiry-handler0-SendThread(zk1-2:2181)] 
> INFO (org.apache.zookeeper.ClientCnxn) - Socket connection established to 
> zk1-2/1.3.6.1:2181, initiating session
> 2019-05-31 15:54:39,703 [zk-session-expiry-handler0-SendThread(zk1-2:2181)] 
> INFO (org.apache.zookeeper.ClientCnxn) - Session establishment complete on 
> server zk1-2/1.3.6.1:2181, sessionid = 0x1511b2b1042a, negotiated timeout 
> = 3
> 2019-05-31 15:54:39,703 [zk-session-expiry-handler0-SendThread(zk1-2:2181)] 
> ERROR (org.apache.zookeeper.ClientCnxn) - SASL authentication with Zookeeper 
> Quorum member failed: javax.security.sasl.SaslException: saslClient failed to 
> initialize properly: it's null.{code}
> Kafka was "not live" in zookeeper and had to be manually restarted to recover 
> from this error. It would be better if the last ISR could retry.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-7987) a broker's ZK session may die on transient auth failure

2019-11-05 Thread Graham Campbell (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-7987?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16967790#comment-16967790
 ] 

Graham Campbell commented on KAFKA-7987:


[~junrao] We're starting to see these errors more frequently (guess our network 
is getting less reliable), so I'm looking at a fix for this. Does scheduling a 
reinitialize() in the ZookeeperClient after notifying handlers seem like a 
reasonable solution? I don't see a way to get more details from the ZK client 
to try to tell if the auth failure was caused by a retriable error or not.

> a broker's ZK session may die on transient auth failure
> ---
>
> Key: KAFKA-7987
> URL: https://issues.apache.org/jira/browse/KAFKA-7987
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jun Rao
>Priority: Major
>
> After a transient network issue, we saw the following log in a broker.
> {code:java}
> [23:37:02,102] ERROR SASL authentication with Zookeeper Quorum member failed: 
> javax.security.sasl.SaslException: An error: 
> (java.security.PrivilegedActionException: javax.security.sasl.SaslException: 
> GSS initiate failed [Caused by GSSException: No valid credentials provided 
> (Mechanism level: Server not found in Kerberos database (7))]) occurred when 
> evaluating Zookeeper Quorum Member's received SASL token. Zookeeper Client 
> will go to AUTH_FAILED state. (org.apache.zookeeper.ClientCnxn)
> [23:37:02,102] ERROR [ZooKeeperClient] Auth failed. 
> (kafka.zookeeper.ZooKeeperClient)
> {code}
> The network issue prevented the broker from communicating to ZK. The broker's 
> ZK session then expired, but the broker didn't know that yet since it 
> couldn't establish a connection to ZK. When the network was back, the broker 
> tried to establish a connection to ZK, but failed due to auth failure (likely 
> due to a transient KDC issue). The current logic just ignores the auth 
> failure without trying to create a new ZK session. Then the broker will be 
> permanently in a state that it's alive, but not registered in ZK.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-16902) Sender ignores socket connection timeout when reconnecting to transaction coordinator

2024-06-05 Thread Graham Campbell (Jira)
Graham Campbell created KAFKA-16902:
---

 Summary: Sender ignores socket connection timeout when 
reconnecting to transaction coordinator
 Key: KAFKA-16902
 URL: https://issues.apache.org/jira/browse/KAFKA-16902
 Project: Kafka
  Issue Type: Bug
  Components: producer 
Affects Versions: 3.6.2, 3.7.0, 3.5.2, 3.4.1, 3.3.2, 3.2.3, 3.1.2, 2.8.2
Reporter: Graham Campbell


maybeSendAndPollTransactionalRequest checks that the required coordinator node 
is ready before sending transactional requests. It uses 
{{Sender::awaitNodeReady}} and {{NetworkClientUtils::awaitReady}} to do so in a 
blocking manner. 

If the NetworkClient is in the middle of reconnecting to the coordinator due to 
a broker side disconnection (eg. broker restart), the 
socket.connection.setup.timeout.ms config (default 10 seconds) is ignored and 
request.timeout.ms is used (35 seconds). This results in up to 25 extra seconds 
of waiting before hitting the expected timeout. In my connectivity-related 
exceptions are sometimes thrown before the full 35 second timeout (eg. 
NoRouteToHostException, UnknownHostException).

[awaitNodeReady uses 
requestTimeoutMs|https://github.com/apache/kafka/blob/896af1b2f2f2a7d579e0aef074bcb2004c0246f2/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java#L568]
 as the timeout, which [awaitReady passes to the NetworkClient as the poll 
timeout|https://github.com/apache/kafka/blob/896af1b2f2f2a7d579e0aef074bcb2004c0246f2/clients/src/main/java/org/apache/kafka/clients/NetworkClientUtils.java#L73-L74],
 so disconnections aren't processed until after the [selector.poll 
completes|https://github.com/apache/kafka/blob/896af1b2f2f2a7d579e0aef074bcb2004c0246f2/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java#L585].

Logs from a local test with request.timeout.ms=15000 and a patched client with 
some extra logging show the timeout being violated:

 
{code:java}
2024-05-29 21:31:49:736 + [kafka-producer-network-thread | t7] INFO 
org.apache.kafka.clients.NetworkClient - [Producer clientId=t7, 
transactionalId=MyT7] Disconnecting from node 11 due to socket connection setup 
timeout. The timeout value is 10351 ms. 15013 ms since last attempt
2024-05-29 21:31:49:736 + [kafka-producer-network-thread | t7] INFO 
org.apache.kafka.clients.producer.internals.Sender - [Producer clientId=t7, 
transactionalId=MyT7] awaitReady timed out after 15013 ms (threshold of 15000 
ms)
2024-05-29 21:31:49:752 + [kafka-producer-network-thread | t7] INFO 
org.apache.kafka.clients.producer.internals.TransactionManager - [Producer 
clientId=t7, transactionalId=MyT7] Discovered transaction coordinator 
kafka-2.redacted:9093 (id: 2 rack: null) {code}
 

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16951) TransactionManager should rediscover coordinator on disconnection

2024-06-13 Thread Graham Campbell (Jira)
Graham Campbell created KAFKA-16951:
---

 Summary: TransactionManager should rediscover coordinator on 
disconnection
 Key: KAFKA-16951
 URL: https://issues.apache.org/jira/browse/KAFKA-16951
 Project: Kafka
  Issue Type: Improvement
  Components: clients, producer 
Affects Versions: 3.7.0
Reporter: Graham Campbell


When a transaction coordinator for a transactional client shuts down for 
restart or due to failure, the NetworkClient notices the broker disconnection 
and [will automatically refresh cluster 
metadata|https://github.com/apache/kafka/blob/f380cd1b64134cf81e5dab16d71a276781de890e/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java#L1182-L1183]
 to get the latest partition assignments.

The TransactionManager does not notice any changes until the next transactional 
request. If the broker is still offline, this is a [blocking wait while the 
client attempts to reconnect to the old 
coordinator|https://github.com/apache/kafka/blob/f380cd1b64134cf81e5dab16d71a276781de890e/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java#L489-L490],
 which can be up to request.timeout.ms long (default 35 seconds). Coordinator 
lookup is only performed after a transactional request times out and fails. The 
lookup is triggered in either the [Sender|#L525-L528]
 or 
[TransactionalManager's|https://github.com/apache/kafka/blob/f380cd1b64134cf81e5dab16d71a276781de890e/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java#L1225-L1229]
 error handling.

To support faster recovery and faster reaction to transaction coordinator 
reassignments, the TransactionManager should proactively lookup the transaction 
coordinator whenever the client is disconnected from the current transaction 
coordinator.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16951) TransactionManager should rediscover coordinator on disconnection

2024-06-18 Thread Graham Campbell (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16951?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17856057#comment-17856057
 ] 

Graham Campbell commented on KAFKA-16951:
-

Yes, if the original coordinator is online the transactional request will 
either succeed as normal if leader election has happened for the relevant 
__transaction_state partition or quickly return a NOT_COORDINATOR error.

 

I've made an attempt to generalize the handleServerDisconnect method used by 
the MetadataUpdater to be a more general interface in the linked PR

Related to this ticket I also opened KAFKA-16902 to use the 
socket.connection.setup.timeout.ms config to reduce the impact of attempting 
reconnection.

> TransactionManager should rediscover coordinator on disconnection
> -
>
> Key: KAFKA-16951
> URL: https://issues.apache.org/jira/browse/KAFKA-16951
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, producer 
>Affects Versions: 3.7.0
>Reporter: Graham Campbell
>Priority: Major
>
> When a transaction coordinator for a transactional client shuts down for 
> restart or due to failure, the NetworkClient notices the broker disconnection 
> and [will automatically refresh cluster 
> metadata|https://github.com/apache/kafka/blob/f380cd1b64134cf81e5dab16d71a276781de890e/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java#L1182-L1183]
>  to get the latest partition assignments.
> The TransactionManager does not notice any changes until the next 
> transactional request. If the broker is still offline, this is a [blocking 
> wait while the client attempts to reconnect to the old 
> coordinator|https://github.com/apache/kafka/blob/f380cd1b64134cf81e5dab16d71a276781de890e/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java#L489-L490],
>  which can be up to request.timeout.ms long (default 35 seconds). Coordinator 
> lookup is only performed after a transactional request times out and fails. 
> The lookup is triggered in either the [Sender|#L525-L528]
>  or 
> [TransactionalManager's|https://github.com/apache/kafka/blob/f380cd1b64134cf81e5dab16d71a276781de890e/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java#L1225-L1229]
>  error handling.
> To support faster recovery and faster reaction to transaction coordinator 
> reassignments, the TransactionManager should proactively lookup the 
> transaction coordinator whenever the client is disconnected from the current 
> transaction coordinator.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-6529) Broker leaks memory and file descriptors after sudden client disconnects

2018-02-02 Thread Graham Campbell (JIRA)
Graham Campbell created KAFKA-6529:
--

 Summary: Broker leaks memory and file descriptors after sudden 
client disconnects
 Key: KAFKA-6529
 URL: https://issues.apache.org/jira/browse/KAFKA-6529
 Project: Kafka
  Issue Type: Bug
  Components: network
Affects Versions: 0.11.0.2, 1.0.0
Reporter: Graham Campbell


If a producer forcefully disconnects from a broker while it has staged 
receives, that connection enters a limbo state where it is no longer processed 
by the SocketServer.Processor, leaking the file descriptor for the socket and 
the memory used for the staged recieve queue for that connection.

We noticed this during an upgrade from 0.9.0.2 to 0.11.0.2. Immediately after 
the rolling restart to upgrade, open file descriptors on the brokers started 
climbing uncontrollably. In a few cases brokers reached our configured max open 
files limit of 100k and crashed before we rolled back.

We tracked this down to a buildup of muted connections in the 
Selector.closingChannels list. If a client disconnects from the broker with 
multiple pending produce requests, when the broker attempts to send an ack to 
the client it recieves an IOException because the TCP socket has been closed. 
This triggers the Selector to close the channel, but because it still has 
pending requests, it adds it to Selector.closingChannels to process those 
requests. However, because that exception was triggered by trying to send a 
response, the SocketServer.Processor has marked the channel as muted and will 
no longer process it at all.

*Reproduced by:*
Starting a Kafka broker/cluster
Client produces several messages and then disconnects abruptly (eg. 
_./rdkafka_performance -P -x 100 -b broker:9092 -t test_topic_)
Broker then leaks file descriptor previously used for TCP socket and memory for 
unprocessed messages

*Proposed solution (which we've implemented internally)*
Whenever an exception is encountered when writing to a socket in 
Selector.pollSelectionKeys(...) record that that connection failed a send by 
adding the KafkaChannel ID to Selector.failedSends. Then re-raise the exception 
to still trigger the socket disconnection logic. Since every exception raised 
in this function triggers a disconnect, we also treat any exception while 
writing to the socket as a failed send.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6529) Broker leaks memory and file descriptors after sudden client disconnects

2018-02-02 Thread Graham Campbell (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6529?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Graham Campbell updated KAFKA-6529:
---
Flags: Patch

> Broker leaks memory and file descriptors after sudden client disconnects
> 
>
> Key: KAFKA-6529
> URL: https://issues.apache.org/jira/browse/KAFKA-6529
> Project: Kafka
>  Issue Type: Bug
>  Components: network
>Affects Versions: 1.0.0, 0.11.0.2
>Reporter: Graham Campbell
>Priority: Major
>
> If a producer forcefully disconnects from a broker while it has staged 
> receives, that connection enters a limbo state where it is no longer 
> processed by the SocketServer.Processor, leaking the file descriptor for the 
> socket and the memory used for the staged recieve queue for that connection.
> We noticed this during an upgrade from 0.9.0.2 to 0.11.0.2. Immediately after 
> the rolling restart to upgrade, open file descriptors on the brokers started 
> climbing uncontrollably. In a few cases brokers reached our configured max 
> open files limit of 100k and crashed before we rolled back.
> We tracked this down to a buildup of muted connections in the 
> Selector.closingChannels list. If a client disconnects from the broker with 
> multiple pending produce requests, when the broker attempts to send an ack to 
> the client it recieves an IOException because the TCP socket has been closed. 
> This triggers the Selector to close the channel, but because it still has 
> pending requests, it adds it to Selector.closingChannels to process those 
> requests. However, because that exception was triggered by trying to send a 
> response, the SocketServer.Processor has marked the channel as muted and will 
> no longer process it at all.
> *Reproduced by:*
> Starting a Kafka broker/cluster
> Client produces several messages and then disconnects abruptly (eg. 
> _./rdkafka_performance -P -x 100 -b broker:9092 -t test_topic_)
> Broker then leaks file descriptor previously used for TCP socket and memory 
> for unprocessed messages
> *Proposed solution (which we've implemented internally)*
> Whenever an exception is encountered when writing to a socket in 
> Selector.pollSelectionKeys(...) record that that connection failed a send by 
> adding the KafkaChannel ID to Selector.failedSends. Then re-raise the 
> exception to still trigger the socket disconnection logic. Since every 
> exception raised in this function triggers a disconnect, we also treat any 
> exception while writing to the socket as a failed send.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6388) Error while trying to roll a segment that already exists

2018-02-08 Thread Graham Campbell (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6388?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16357401#comment-16357401
 ] 

Graham Campbell commented on KAFKA-6388:


Most of the recent times we've run into this it's been on non-compacted topics 
that have been idle for a while (no data in for > retention.ms) and then begin 
receiving data again. It's not happening to every replica, with 15 partitions 
and 3 replicas sometimes one or two followers will encounter this for a given 
topic.

> Error while trying to roll a segment that already exists
> 
>
> Key: KAFKA-6388
> URL: https://issues.apache.org/jira/browse/KAFKA-6388
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 1.0.0
>Reporter: David Hay
>Priority: Blocker
>
> Recreating this issue from KAFKA-654 as we've been hitting it repeatedly in 
> our attempts to get a stable 1.0 cluster running (upgrading from 0.8.2.2).
> After spending 30 min or more spewing log messages like this:
> {noformat}
> [2017-12-19 16:44:28,998] INFO Replica loaded for partition 
> screening.save.results.screening.save.results.processor.error-43 with initial 
> high watermark 0 (kafka.cluster.Replica)
> {noformat}
> Eventually, the replica thread throws the error below (also referenced in the 
> original issue).  If I remove that partition from the data directory and 
> bounce the broker, it eventually rebalances (assuming it doesn't hit a 
> different partition with the same error).
> {noformat}
> 2017-12-19 15:16:24,227] WARN Newly rolled segment file 
> 0002.log already exists; deleting it first (kafka.log.Log)
> [2017-12-19 15:16:24,227] WARN Newly rolled segment file 
> 0002.index already exists; deleting it first (kafka.log.Log)
> [2017-12-19 15:16:24,227] WARN Newly rolled segment file 
> 0002.timeindex already exists; deleting it first 
> (kafka.log.Log)
> [2017-12-19 15:16:24,232] INFO [ReplicaFetcherManager on broker 2] Removed 
> fetcher for partitions __consumer_offsets-20 
> (kafka.server.ReplicaFetcherManager)
> [2017-12-19 15:16:24,297] ERROR [ReplicaFetcher replicaId=2, leaderId=1, 
> fetcherId=0] Error due to (kafka.server.ReplicaFetcherThread)
> kafka.common.KafkaException: Error processing data for partition 
> sr.new.sr.new.processor.error-38 offset 2
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:204)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:172)
> at scala.Option.foreach(Option.scala:257)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:172)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:169)
> at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply$mcV$sp(AbstractFetcherThread.scala:169)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:169)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:169)
> at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:217)
> at 
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:167)
> at 
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113)
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:64)
> Caused by: kafka.common.KafkaException: Trying to roll a new log segment for 
> topic partition sr.new.sr.new.processor.error-38 with start offset 2 while it 
> already exists.
> at kafka.log.Log$$anonfun$roll$2.apply(Log.scala:1338)
> at kafka.log.Log$$anonfun$roll$2.apply(Log.scala:1297)
> at kafka.log.Log.maybeHandleIOException(Log.scala:1669)
> at kafka.log.Log.roll(Log.scala:1297)
> at kafka.log.Log.kafka$log$Log$$maybeRoll(Log.scala:1284)
> at kafka.log.Log$$anonfun$append$2.apply(Log.scala:710)
> at kafka.log.Log$$anonfun$append$2.apply(Log.scala:624)
> at kafka.log.Log.maybeHandleIOException(Log.scala:1669)
> at kafka.log.Log.append(Log.scala:624)
> at kafka.log.Log.appendAsFollower(Log.scala:607)
> at 
> kafka.server.ReplicaFetcherThread.processPartitionData(ReplicaFetcherThread.scala:102)
> at 
> kafka.server.Rep