ravikalla opened a new pull request, #20212:
URL: https://github.com/apache/kafka/pull/20212

   ## Summary
   
   This PR implements KAFKA-10840 to expose authentication failures in the 
KafkaConsumer.poll() method, allowing applications to catch authentication 
issues immediately instead of experiencing silent failures.
   
   ## Problem
   
   Previously, when SSL certificates expired or other authentication issues 
occurred, the consumer would stop fetching data without clear indication of the 
underlying problem. This led to "data flow stops without indication" scenarios 
that were difficult to troubleshoot and handle gracefully.
   
   ## Solution
   
   ### New Exception Classes
   - **CertificateExpiredAuthenticationException**: Specifically for SSL 
certificate expiration scenarios
   - **PersistentAuthenticationException**: For non-retriable authentication 
failures (SASL, general SSL handshake failures)
   
   ### Core Changes
   - Modified `ClassicKafkaConsumer.poll()` to actively check all known cluster 
nodes for authentication exceptions before proceeding with fetch operations
   - Added `authenticationException(Node)` method to `ConsumerNetworkClient` to 
expose authentication state from the underlying `KafkaClient`
   - Enhanced `MockClient` with `setNodeAuthenticationFailure()` method for 
testing authentication failure scenarios
   
   ### Error Detection Logic
   The implementation detects certificate expiration by checking for specific 
patterns in SSL exception messages:
   - "certificate expired"
   - "Certificate expired"  
   - "CERTIFICATE_EXPIRED"
   - "certificate has expired"
   - "expired certificate"
   
   ## Usage Example
   
   ```java
   try {
       ConsumerRecords<String, String> records = 
consumer.poll(Duration.ofSeconds(1));
   } catch (CertificateExpiredAuthenticationException e) {
       log.error("SSL certificate expired: {}", e.getMessage());
       // Handle certificate renewal
   } catch (PersistentAuthenticationException e) {
       log.error("Authentication failed: {}", e.getMessage());
       // Handle authentication configuration
   }
   ```
   
   ## Test Plan
   
   - [x] All existing Kafka consumer tests pass
   - [x] All ConsumerNetworkClient tests pass
   - [x] Code passes checkstyle and spotbugs checks
   - [x] Implementation is backward compatible
   - [x] Authentication failure scenarios can be tested using MockClient 
enhancements
   
   ## Files Changed
   
   1. 
`clients/src/main/java/org/apache/kafka/common/errors/CertificateExpiredAuthenticationException.java`
 (NEW)
   2. 
`clients/src/main/java/org/apache/kafka/common/errors/PersistentAuthenticationException.java`
 (NEW)
   3. 
`clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java`
 (MODIFIED)
   4. 
`clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java`
 (MODIFIED)
   5. `clients/src/test/java/org/apache/kafka/clients/MockClient.java` 
(MODIFIED)
   
   This addresses the core issue where "data flow stops without indication" 
when authentication fails, enabling applications to detect and handle these 
failures proactively rather than experiencing silent timeouts.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to