RobertIndie commented on code in PR #21079:
URL: https://github.com/apache/pulsar/pull/21079#discussion_r1308559001


##########
pip/pip-297.md:
##########
@@ -0,0 +1,168 @@
+# Title: Support raising exceptions using Function & Connector context
+
+# Background knowledge
+
+The **Pulsar Function** is a serverless computing framework that runs on top 
of Pulsar and processes messages.
+
+The **Pulsar IO Connector** is a framework that allows users to easily 
integrate Pulsar with external systems, such as
+databases, messaging systems, and data pipelines. With Pulsar IO Connector, 
you can create, deploy, and manage
+connectors that read data from or write data to Pulsar topics. There are two 
types of Pulsar IO Connectors: source and
+sink. A **source connector** imports data from another system to Pulsar, while 
a **sink connector** exports data from
+Pulsar to another system. The Pulsar IO Connector is implemented based on the 
Pulsar Function framework. So in
+the following, we treat the connector as a special kind of function. The 
`function` refers to both function and
+connector.
+
+**Function Instance** is a running instance of a Pulsar IO Connector that 
interacts with a specific external system or a
+Pulsar Function that processes messages from the topic.
+
+**Function Framework** is a framework for running the Function instance.
+
+**Function Context** is an interface that provides access to various 
information and resources for the connector or the
+function. The function context is passed to the connector or the function when 
it is initialized, and then can be used
+to interact with the Pulsar system.
+
+The current implementation of the exception handler:
+**Function instance thread**: The function framework initializes a thread for 
each function instance to handle the
+core logic of the function/connector, including consuming messages from the 
Pulsar topic for the sink connector,
+executing the logic of the function, producing messages to the Pulsar topic 
for the source connector, handling the
+exception, etc.
+
+**Exception handling logic**: The function itself can throw exceptions, and 
this thread will catch the exception and
+then close the function. This means that the function will stop working until 
it is restarted manually or
+automatically by the function framework.
+
+Even though it is not explicitly defined, there are two types of exceptions 
that could be handled for the function or
+the framework.
+
+- **Recoverable exception**: This is an exception that the function can 
recover from by itself, such as network
+  connection issues, persistence issues, etc. The function can catch these 
exceptions and retry the operation until it
+  succeeds or reaches a limit.
+- **Unrecoverable exception**: This is an exception that the function cannot 
recover from by itself and needs to notify
+  the framework to terminate it. These are fatal exceptions that indicate a 
configuration issue, a logic error, or an
+  incompatible system. The function framework will catch these exceptions, 
report them to users, and terminate the
+  function.
+
+# Motivation
+
+The current implementation of the connector and Pulsar Function exception 
handler cannot handle the unrecoverable
+exceptions that are thrown outside of the function instance thread.
+
+For example, suppose we have a sink connector that uses its own threads to 
batch-sink the data to an external system. If
+any unrecoverable exceptions occur in those threads, the function instance 
thread will not be aware of them and will
+not be able to terminate the connector. This will cause the connector to hang 
indefinitely. There is a related issue
+here: https://github.com/apache/pulsar/issues/9464
+
+The same problem exists for the source connector. The source connector may 
also use a separate thread to fetch data from
+an external system. If any fatal exceptions happen in that thread, the 
connector will also hang forever. This issue has
+been observed for the Kafka source connector: 
https://github.com/apache/pulsar/issues/9464. We have fixed it by adding
+the notifyError method to the PushSource class in PIP-281: 
https://github.com/apache/pulsar/pull/20807. However, this
+does not solve the same problem that all source connectors face.
+
+The problem is same for the Pulsar Function. The user can also process 
messages asynchronously in a Pulsar Function. If
+any fatal exceptions happen in the user's thread, the function framework can't 
handle the exception
+correctly.
+
+We need a way for the connector and function developers to throw unrecoverable 
exceptions outside the function instance
+thread. The function framework should catch these exceptions and terminate the 
function accordingly.
+
+# Goals
+
+## In Scope
+
+- This proposal will apply both to the Pulsar Function and the Pulsar 
Connector.
+- Support throwing unrecoverable exceptions outside the function instance 
thread.
+- The function framework should terminate the connector or function when there 
are any unrecoverable exceptions thrown
+  asynchronously.
+
+## Out of Scope
+
+- The fixes of the exception-raising issue mentioned in the Motivation part 
for all the connectors are not included in
+  this PIP. This PIP only provides the feature for the connector developer to 
raise the exception outside the function
+  instance thread. The fixes should be in serval different PRs.
+
+# High Level Design
+
+Introduce a new method `raiseException` to the context. All the connector 
implementation code and the function code can
+use this context and call the `raiseException` method to raise an 
unrecoverable exception to the function framework.
+
+After the connector or function raises the unrecoverable exception, the 
function instance thread will be interrupted.
+The function framework then could catch the exception, log it, and then 
terminate the function instance.

Review Comment:
   No. There is no direct way to terminate it. PIP-281 uses an indirect way to 
implement it by resending an exception record to the source connector.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to