This is an automated email from the ASF dual-hosted git repository.

zike pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 88231f732a8 [improve][pip] PIP-297: Support terminating Function & 
Connector with the fatal exception (#21079)
88231f732a8 is described below

commit 88231f732a804fe1c75c9f082500762b3d1faa6c
Author: Zike Yang <[email protected]>
AuthorDate: Wed Sep 6 19:18:18 2023 +0800

    [improve][pip] PIP-297: Support terminating Function & Connector with the 
fatal exception (#21079)
    
    ### Motivation
    
    This PIP is to improve the current function exception handler. It will be 
applied to both Pulsar Function and Pulsar IO Connector.
    
    For more details, please refer to `pip-297.md`
---
 pip/pip-297.md | 213 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++
 1 file changed, 213 insertions(+)

diff --git a/pip/pip-297.md b/pip/pip-297.md
new file mode 100644
index 00000000000..2985864beed
--- /dev/null
+++ b/pip/pip-297.md
@@ -0,0 +1,213 @@
+# Title: Support terminating Function & Connector with the fatal exception
+
+# Background knowledge
+
+The **Pulsar Function** is a serverless computing framework that runs on top 
of Pulsar and processes messages.
+
+The **Pulsar IO Connector** is a framework that allows users to easily 
integrate Pulsar with external systems, such as
+databases, messaging systems, and data pipelines. With Pulsar IO Connector, 
you can create, deploy, and manage
+connectors that read data from or write data to Pulsar topics. There are two 
types of Pulsar IO Connectors: source and
+sink. A **source connector** imports data from another system to Pulsar, while 
a **sink connector** exports data from
+Pulsar to another system. The Pulsar IO Connector is implemented based on the 
Pulsar Function framework. So in
+the following, we treat the connector as a special kind of function. The 
`function` refers to both function and
+connector.
+
+**Function Instance** is a running instance of a Pulsar IO Connector that 
interacts with a specific external system or a
+Pulsar Function that processes messages from the topic.
+
+**Function Framework** is a framework for running the Function instance.
+
+**Function Context** is an interface that provides access to various 
information and resources for the connector or the
+function. The function context is passed to the connector or the function when 
it is initialized, and then can be used
+to interact with the Pulsar system.
+
+## The current implementation of the exception handler
+
+**Function instance thread**: The function framework initializes a thread for 
each function instance to handle the
+core logic of the function/connector, including consuming messages from the 
Pulsar topic for the sink connector,
+executing the logic of the function, producing messages to the Pulsar topic 
for the source connector, handling the
+exception, etc. And let's define the **Connector thread/Function thread** as a 
thread that is created by the connector
+or function itself.
+
+**Exception handling logic**: The function itself can throw exceptions, and 
this thread will catch the exception and
+then close the function. This means that the function will stop working until 
it is restarted manually or
+automatically by the function framework.
+
+Even though it is not explicitly defined, there are two types of exceptions 
that should be handled by the function or
+the framework:
+
+- **Fatal exception**: This is an exception that the function cannot recover 
from by itself and needs to notify the
+  framework to terminate it. These are fatal exceptions that indicate a 
configuration issue, a logic error, or an
+  incompatible system. The function framework will catch these exceptions, 
report them to users, and terminate the
+  function.
+- **Non-fatal exception** is an exception that the function instance don't 
need to be terminated for. It could be
+  handled by the connector or function itself. Or be thrown by the function. 
This exception won't cause the function
+  instance to be terminated.
+
+### How to handle exceptions thrown from connectors
+
+All the exceptions thrown form the connector are treated as fatal exceptions.
+
+If the exception is thrown from the function instance thread, the function 
framework will catch the exception and
+terminate the function instance.
+
+If the exception is thrown from the connector thread that is created by the 
connector itself, the function framework
+will not be able to catch the exception and terminate the function instance. 
The connector will hang forever.
+The `Motivation` part will talk more about this case.
+
+If the exception is thrown from the external system, the connector 
implementation could treat it as a retryable
+exception and retry to process the message later, or throw it to indicate it 
as a fatal exception.
+
+### How to handle exceptions thrown from functions
+
+All the exceptions thrown from the pulsar function are treated as non-fatal 
exceptions. The function framework will
+catch the exception and log it. But it will not terminate the function 
instance.
+
+There is no way for the function developer to throw a fatal exception to the 
function framework to terminate the
+function instance.
+
+# Motivation
+
+Currently, the connector and function cannot terminate the function instance 
if there are fatal exceptions thrown
+outside the function instance thread. The current implementation of the 
connector and Pulsar Function exception handler
+cannot handle the fatal exceptions that are thrown outside the function 
instance thread.
+
+For example, suppose we have a sink connector that uses its own threads to 
batch-sink the data to an external system. If
+any fatal exceptions occur in those threads, the function instance thread will 
not be aware of them and will
+not be able to terminate the connector. This will cause the connector to hang 
indefinitely. There is a related issue
+here: https://github.com/apache/pulsar/issues/9464
+
+The same problem exists for the source connector. The source connector may 
also use a separate thread to fetch data from
+an external system. If any fatal exceptions happen in that thread, the 
connector will also hang forever. This issue has
+been observed for the Kafka source connector: 
https://github.com/apache/pulsar/issues/9464. We have fixed it by adding
+the notifyError method to the `PushSource` class in PIP-281: 
https://github.com/apache/pulsar/pull/20807. However, this
+does not solve the same problem that all source connectors face because not 
all connectors are implemented based on
+the `PushSource` class.
+
+The problem is same for the Pulsar Function. Currently, the function can't 
throw fatal exceptions to the function
+framework. We need to provide a way for the function developer to implement it.
+
+We need a way for the connector and function developers to throw fatal 
exceptions outside the function instance
+thread. The function framework should catch these exceptions and terminate the 
function accordingly.
+
+# Goals
+
+## In Scope
+
+- Support terminating the function instance with fatal exceptions
+- This proposal will apply both to the Pulsar Function and the Pulsar 
Connector.
+
+## Out of Scope
+
+- The fixes of the exception-raising issue mentioned in the Motivation part 
for all the connectors are not included in
+  this PIP. This PIP only provides the feature for the connector developer to 
terminate the function instance. The fixes
+  should be in several different PRs.
+
+# High Level Design
+
+Introduce a new method `fatal` to the context. All the connector 
implementation code and the function code
+can use this context and call the `fatal` method to terminate the instance 
while raising a fatal exception.
+
+After the connector or function raises the fatal exception, the function 
instance thread will be interrupted.
+The function framework then could catch the exception, log it, and then 
terminate the function instance.
+
+# Detailed Design
+
+## Design & Implementation Details
+
+This PIP proposes to add a new method`fatal`to the context `BaseContext`. This 
method allows the connector or the
+function code to report a fatal exception to the function framework and 
terminate the instance. The `SinkContext`
+and `SourceContext` are all inherited from `BaseContext`. Therefore, all the 
sink connectors and source connectors can
+invoke this new method. The pulsar function context class `Context` is also 
inherited from `BaseContext`. Therefore, the
+function code can also invoke this new method.
+
+In the `fatal` method, the function instance thread will be interrupted. The 
function instance thread can then
+catch the interrupt exception and get the fatal exception. The function 
framework then logs this exception,
+reports to the metrics, and finally terminates the function instance.
+
+Tbe behavior when invoking the `fatal` method:
+
+- For the connector thread or function thread:
+    - Invoke the `fatal` method
+    - Send the exception to the function framework. There is a field 
`deathException` in the
+      class `JavaInstanceRunnable` that is used to store the fatal exception.
+    - Interrupt the function instance thread
+- For the function instance thread:
+    - Catch the interrupt exception
+    - Get the exception from the function framework
+    - Report the log and metrics
+    - Close the function instance
+
+## Public-facing Changes
+
+### Public API
+
+Introduce `fatal` method to the `BaseContext`:
+
+```java
+public interface BaseContext {
+    /**
+     * Terminate the function instance with a fatal exception.
+     *
+     * @param t the fatal exception to be raised
+     */
+    void fatal(Throwable t);
+}
+```
+
+### Binary protocol
+
+No changes for this part.
+
+### Configuration
+
+No changes for this part.
+
+### CLI
+
+No changes for this part.
+
+### Metrics
+
+No changes for this part.
+
+# Monitoring
+
+No changes for this part.
+
+# Security Considerations
+
+No security-related changes.
+The new method `fatal` will only take effect on the current function instance. 
It won't affect other function instances
+even they are in the same function worker.
+
+# Backward & Forward Compatibility
+
+## Revert
+
+No operation required.
+
+## Upgrade
+
+No operation required.
+
+# Alternatives
+
+## Using futures to handle results or exceptions returned the connector
+
+The benefit of this solution is that it makes the use of exception throwing 
more intuitive to the connector developer.
+
+But it requires changes to existing interfaces, including `Source` and `Sink`, 
which can complicate connector
+development. And we still need the `fatal` method to handle some cases such as 
terminating the instance in code outside
+of the message processing logic. This alternative solution can't handle this 
case.
+
+Meanwhile, the implementation of this solution will also be more complex, 
involving changes to the core message
+processing logic of the function framework. We need to turn the entire message 
processing logic into an asynchronous
+pattern.
+
+# General Notes
+
+# Links
+
+* Mailing List discussion thread: 
https://lists.apache.org/thread/j59gzzwjp8c48lwv5poddm9qzlp2hol0
+* Mailing List voting thread: 
https://lists.apache.org/thread/ggok3c2601mnbdomr65v3pjth3lk6fr8

Reply via email to