andrewt-8x8 opened a new issue, #25290: URL: https://github.com/apache/pulsar/issues/25290
### Search before reporting - [x] I searched in the [issues](https://github.com/apache/pulsar/issues) and found nothing similar. ### Read release policy - [x] I understand that [unsupported versions](https://pulsar.apache.org/contribute/release-policy/#supported-versions) don't get bug fixes. I will attempt to reproduce the issue on a supported version of Pulsar client and Pulsar broker. ### User environment - Broker version: 4.0.6 - Client library type: Java (Kafka Source Connector / `pulsar-io-kafka`) - Deployed on Kubernetes via Function Mesh ### Issue Description When the Kafka Source connector's consumer thread encounters a fatal exception (e.g. due to a transient Pulsar authentication outage), the error notification is silently lost and the connector pod remains alive but permanently idle. **What happens:** 1. The consumer thread in `KafkaAbstractSource.start()` catches an exception at [KafkaAbstractSource.java L192-L195](https://github.com/apache/pulsar/blob/v4.0.6/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java#L192-L195): ```java } catch (Exception e) { LOG.error("Error while processing records", e); notifyError(e); break; // consumer thread exits permanently } ``` 2. `notifyError(e)` (in `AbstractPushSource`) enqueues an `ErrorNotifierRecord` onto a bounded `LinkedBlockingQueue(1000)` via `consume()` -> `queue.put()`. 3. The intent is that the instance thread dequeues it via `readNext()`, which throws the exception, causing the function framework to restart the instance. **The bug:** If the instance thread is blocked in `sendOutputMessage()` (e.g. when Pulsar client cannot contact an external authz service because of a Cloudflare outage 🤦), it never returns to call `readNext()`. The `ErrorNotifierRecord` sits in the queue (or `queue.put()` blocks if the queue is full), and the error is never propagated. The consumer thread is dead, the instance thread is stuck, but the pod passes liveness/readiness checks - it's alive but doing no work. **What was expected:** The connector should crash (or the instance should restart) so that Kubernetes can restart the pod and recovery can happen automatically once the transient outage resolves. **Why this is a bug:** The `notifyError()` mechanism added in PR #20791 / PIP-281 relies on the instance thread being in a state where it will call `readNext()`. When the instance thread is blocked on I/O (sending to Pulsar), this assumption is violated and the error signal is silently dropped. This is a liveness bug - the connector appears healthy but is permanently stalled. ### Error messages No error messages beyond the initial `LOG.error("Error while processing records", e)` on the consumer thread. The instance thread produces no output because it is blocked. ### Reproducing the issue 1. Deploy a Kafka Source connector (e.g. `KafkaBytesSource`) on Kubernetes via Function Mesh 2. Ensure the connector is actively consuming from Kafka and producing to Pulsar 3. Cause a transient Pulsar authentication failure (e.g. make the SSO/OAuth token endpoint return 500s) 4. The consumer thread will eventually hit an exception when `CompletableFuture.allOf(futures).get()` times out (because messages can't be sent to Pulsar), call `notifyError()`, and break 5. The instance thread is blocked in `sendOutputMessage()` trying to write a previously-consumed record to the unauthenticated Pulsar client 6. Observe that the pod remains running, passes health checks, but consumes no further messages from Kafka 7. Even after the auth outage recovers, consumption does not resume - the consumer thread is dead ### Additional information **Related issues and PRs:** - Discussion #19880 - "Should Kafka Source Connector itself after unrecoverable error?" - PR #20424 - `[fix][io] Close the kafka source connector if there is uncaught exception` (merged, 3.1.0) - PR #20698 - `[fix][io] Close the kafka source connector got stuck` (merged, 3.1.0) - PR #20795 - `[fix][io] Not restart instance when kafka source poll exception` (merged, 3.1.0) - this PR introduced the `notifyError()` + `break` pattern - PR #20791 / PR #20807 (PIP-281) - Added `notifyError()` to `PushSource` / `AbstractPushSource` - PR #22511 - `[fix][io] Kafka Source connector maybe stuck` (merged, 3.3.0) - added timeout on futures, but doesn't address this scenario The fixes in 3.1.0–3.3.0 addressed the case where the consumer thread itself gets stuck, but did not address the case where the consumer thread correctly signals an error via `notifyError()` but the instance thread is blocked and never reads it. **Possible fix directions:** - Have `notifyError()` call `System.exit(1)` to force a pod restart (this is what we've done as a downstream workaround) - Have `KafkaAbstractSource` catch the exception and call `close()` on a separate thread with a timeout, then `System.exit(1)` if close doesn't complete - Have `notifyError()` throw a `RuntimeException` instead of (or in addition to) enqueuing - however, since `notifyError()` runs on the consumer thread, an uncaught exception would only kill that thread (which is already dying via `break`); it would not affect the stuck instance thread unless combined with a `Thread.UncaughtExceptionHandler` that calls `System.exit(1)` - Add a watchdog/heartbeat mechanism to detect that the consumer thread has died ### Are you willing to submit a PR? - [ ] Not at this time. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
