RobertIndie commented on code in PR #21079: URL: https://github.com/apache/pulsar/pull/21079#discussion_r1308559001
########## pip/pip-297.md: ########## @@ -0,0 +1,168 @@ +# Title: Support raising exceptions using Function & Connector context + +# Background knowledge + +The **Pulsar Function** is a serverless computing framework that runs on top of Pulsar and processes messages. + +The **Pulsar IO Connector** is a framework that allows users to easily integrate Pulsar with external systems, such as +databases, messaging systems, and data pipelines. With Pulsar IO Connector, you can create, deploy, and manage +connectors that read data from or write data to Pulsar topics. There are two types of Pulsar IO Connectors: source and +sink. A **source connector** imports data from another system to Pulsar, while a **sink connector** exports data from +Pulsar to another system. The Pulsar IO Connector is implemented based on the Pulsar Function framework. So in +the following, we treat the connector as a special kind of function. The `function` refers to both function and +connector. + +**Function Instance** is a running instance of a Pulsar IO Connector that interacts with a specific external system or a +Pulsar Function that processes messages from the topic. + +**Function Framework** is a framework for running the Function instance. + +**Function Context** is an interface that provides access to various information and resources for the connector or the +function. The function context is passed to the connector or the function when it is initialized, and then can be used +to interact with the Pulsar system. + +The current implementation of the exception handler: +**Function instance thread**: The function framework initializes a thread for each function instance to handle the +core logic of the function/connector, including consuming messages from the Pulsar topic for the sink connector, +executing the logic of the function, producing messages to the Pulsar topic for the source connector, handling the +exception, etc. + +**Exception handling logic**: The function itself can throw exceptions, and this thread will catch the exception and +then close the function. This means that the function will stop working until it is restarted manually or +automatically by the function framework. + +Even though it is not explicitly defined, there are two types of exceptions that could be handled for the function or +the framework. + +- **Recoverable exception**: This is an exception that the function can recover from by itself, such as network + connection issues, persistence issues, etc. The function can catch these exceptions and retry the operation until it + succeeds or reaches a limit. +- **Unrecoverable exception**: This is an exception that the function cannot recover from by itself and needs to notify + the framework to terminate it. These are fatal exceptions that indicate a configuration issue, a logic error, or an + incompatible system. The function framework will catch these exceptions, report them to users, and terminate the + function. + +# Motivation + +The current implementation of the connector and Pulsar Function exception handler cannot handle the unrecoverable +exceptions that are thrown outside of the function instance thread. + +For example, suppose we have a sink connector that uses its own threads to batch-sink the data to an external system. If +any unrecoverable exceptions occur in those threads, the function instance thread will not be aware of them and will +not be able to terminate the connector. This will cause the connector to hang indefinitely. There is a related issue +here: https://github.com/apache/pulsar/issues/9464 + +The same problem exists for the source connector. The source connector may also use a separate thread to fetch data from +an external system. If any fatal exceptions happen in that thread, the connector will also hang forever. This issue has +been observed for the Kafka source connector: https://github.com/apache/pulsar/issues/9464. We have fixed it by adding +the notifyError method to the PushSource class in PIP-281: https://github.com/apache/pulsar/pull/20807. However, this +does not solve the same problem that all source connectors face. + +The problem is same for the Pulsar Function. The user can also process messages asynchronously in a Pulsar Function. If +any fatal exceptions happen in the user's thread, the function framework can't handle the exception +correctly. + +We need a way for the connector and function developers to throw unrecoverable exceptions outside the function instance +thread. The function framework should catch these exceptions and terminate the function accordingly. + +# Goals + +## In Scope + +- This proposal will apply both to the Pulsar Function and the Pulsar Connector. +- Support throwing unrecoverable exceptions outside the function instance thread. +- The function framework should terminate the connector or function when there are any unrecoverable exceptions thrown + asynchronously. + +## Out of Scope + +- The fixes of the exception-raising issue mentioned in the Motivation part for all the connectors are not included in + this PIP. This PIP only provides the feature for the connector developer to raise the exception outside the function + instance thread. The fixes should be in serval different PRs. + +# High Level Design + +Introduce a new method `raiseException` to the context. All the connector implementation code and the function code can +use this context and call the `raiseException` method to raise an unrecoverable exception to the function framework. + +After the connector or function raises the unrecoverable exception, the function instance thread will be interrupted. +The function framework then could catch the exception, log it, and then terminate the function instance. Review Comment: No. There is no direct way to terminate it. PIP-281 uses an indirect way to implement it by resending an exception record to the source connector. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org