Praveenkumar76 opened a new pull request, #27:
URL: https://github.com/apache/pulsar-connectors/pull/27

   Fixes [apache#25290](https://github.com/apache/pulsar/issues/25290)
   
   ### Motivation
   
   The Kafka Source connector can enter a liveness failure when its consumer 
thread encounters a fatal error. In such cases, the connector reports the error 
via `notifyError()`, which relies on the framework’s instance thread to read 
and handle it.
   
   However, if the instance thread is blocked in `sendOutputMessage()` (for 
example, waiting on network I/O to the Pulsar broker), it does not return to 
`readNext()` and therefore never processes the error queue. As a result, the 
consumer thread terminates, the instance thread remains stuck, and the 
connector pod continues running without doing useful work. This leads to a 
“zombie” state where Kubernetes health checks still pass, but the connector is 
effectively dead.
   
   ### Modifications
   
   - Captured the framework’s main `instanceThread` reference during the 
`open()` method.
   - Updated the Kafka consumer `runnerThread` error handling:
     - In both the `catch` block and `UncaughtExceptionHandler`, when a fatal 
exception occurs, the consumer thread now interrupts the `instanceThread`.
   - This interruption breaks the blocking call (e.g., 
`CompletableFuture.get()` or socket wait) in `sendOutputMessage()`, allowing 
the instance thread to wake up and process the error via `notifyError()`.
   - Ensured graceful shutdown behavior so that the failure propagates 
correctly and Kubernetes can restart the pod.
   - Moved `consumer.subscribe()` outside the `while (running)` loop to 
properly surface initialization errors.
   
   ### Verifying this change
   
   - Verified that the connector no longer remains stuck when the consumer 
thread fails.
   - Confirmed that the instance thread is interrupted, wakes up, processes the 
error, and exits cleanly.
   
   This change added tests and can be verified as follows:
   
   - Added an integration test: `KafkaSourceLivenessTest.java`
   - **Test design:**
     - Uses Testcontainers to run a Kafka broker.
     - Simulates a fatal failure in the consumer thread.
     - Simulates a blocked `sendOutputMessage()` in the instance thread.
   - **Assertion:**
     - The consumer thread interrupts the instance thread.
     - The deadlock is broken and the connector exits instead of hanging.
   
   ### Does this pull request potentially affect one of the following parts:
   
   - [ ] Dependencies (add or upgrade a dependency)
   - [ ] The public API
   - [ ] The schema
   - [ ] The default values of configurations
   - [x] The threading model
   - [ ] The binary protocol
   - [ ] The REST endpoints
   - [ ] The admin CLI options
   - [ ] The metrics
   - [ ] Anything that affects deployment


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to