Praveenkumar76 opened a new pull request, #27: URL: https://github.com/apache/pulsar-connectors/pull/27
Fixes [apache#25290](https://github.com/apache/pulsar/issues/25290) ### Motivation The Kafka Source connector can enter a liveness failure when its consumer thread encounters a fatal error. In such cases, the connector reports the error via `notifyError()`, which relies on the framework’s instance thread to read and handle it. However, if the instance thread is blocked in `sendOutputMessage()` (for example, waiting on network I/O to the Pulsar broker), it does not return to `readNext()` and therefore never processes the error queue. As a result, the consumer thread terminates, the instance thread remains stuck, and the connector pod continues running without doing useful work. This leads to a “zombie” state where Kubernetes health checks still pass, but the connector is effectively dead. ### Modifications - Captured the framework’s main `instanceThread` reference during the `open()` method. - Updated the Kafka consumer `runnerThread` error handling: - In both the `catch` block and `UncaughtExceptionHandler`, when a fatal exception occurs, the consumer thread now interrupts the `instanceThread`. - This interruption breaks the blocking call (e.g., `CompletableFuture.get()` or socket wait) in `sendOutputMessage()`, allowing the instance thread to wake up and process the error via `notifyError()`. - Ensured graceful shutdown behavior so that the failure propagates correctly and Kubernetes can restart the pod. - Moved `consumer.subscribe()` outside the `while (running)` loop to properly surface initialization errors. ### Verifying this change - Verified that the connector no longer remains stuck when the consumer thread fails. - Confirmed that the instance thread is interrupted, wakes up, processes the error, and exits cleanly. This change added tests and can be verified as follows: - Added an integration test: `KafkaSourceLivenessTest.java` - **Test design:** - Uses Testcontainers to run a Kafka broker. - Simulates a fatal failure in the consumer thread. - Simulates a blocked `sendOutputMessage()` in the instance thread. - **Assertion:** - The consumer thread interrupts the instance thread. - The deadlock is broken and the connector exits instead of hanging. ### Does this pull request potentially affect one of the following parts: - [ ] Dependencies (add or upgrade a dependency) - [ ] The public API - [ ] The schema - [ ] The default values of configurations - [x] The threading model - [ ] The binary protocol - [ ] The REST endpoints - [ ] The admin CLI options - [ ] The metrics - [ ] Anything that affects deployment -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
