Hi Flinkers,
Wanted to check if anyone else has faced this issue before:
When Python UDF (which is used inside FlinkSQL) raises an exception, then
metrics get lost and not reported. Facing this issue both in Flink 1.16.2
and FLink 1.17.1 (Python 3.9).
If an exception is not raised, then metrics show up.
It is not mentioned on Flink documentation that UDFs should not throw an
exception. Is this the case?
Or is it a known issue/bug?
Thank you.
=======================
FlinkSQL script content:
=======================
CREATE TABLE input_table (
price DOUBLE
) WITH (
'connector' = 'datagen',
'rows-per-second' = '1'
);
CREATE TABLE output_table WITH ('connector' = 'print')
LIKE input_table (EXCLUDING ALL);
CREATE FUNCTION myDivide AS 'custom_udf.divide_udf'
LANGUAGE PYTHON;
-- Fail scenario: ZeroDivisionError: division by zero
INSERT into output_table (select myDivide(value, 0) from input_table);
=======================
Python UDF content:
=======================
from pyflink.table.udf import ScalarFunction, udf
from pyflink.table import DataTypes
import logging
class DivideUDF(ScalarFunction):
def __init__(self):
self.success_counter = None
self.fail_counter = None
def open(self, function_context):
self.success_counter =
function_context.get_metric_group().counter("flinksql_custom_udf_success_metric")
self.fail_counter =
function_context.get_metric_group().counter("flinksql_custom_udf_fail_metric")
def eval(self, x, y):
logging.info('executing custom udf with logging and metric
example...')
try:
result = x/y
self.success_counter.inc()
return result
except Exception as e:
self.fail_counter.inc()
raise e
divide_udf = udf(DivideUDF(), result_type=DataTypes.DOUBLE())
=======================
Exception stack trace:
=======================
2023-07-26 18:17:20
org.apache.flink.runtime.taskmanager.AsynchronousException: Caught
exception while processing timer.
at
org.apache.flink.streaming.runtime.tasks.StreamTask$StreamTaskAsyncExceptionHandler.handleAsyncException(StreamTask.java:1575)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.handleAsyncException(StreamTask.java:1550)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1704)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$null$22(StreamTask.java:1693)
at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMail(MailboxProcessor.java:398)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:367)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:352)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:229)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:838)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:787)
at
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:914)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: TimerException{java.lang.RuntimeException: Error while waiting
for BeamPythonFunctionRunner flush}
... 15 more
Caused by: java.lang.RuntimeException: Error while waiting for
BeamPythonFunctionRunner flush
at
org.apache.flink.streaming.api.operators.python.process.AbstractExternalPythonFunctionOperator.invokeFinishBundle(AbstractExternalPythonFunctionOperator.java:107)
at
org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.checkInvokeFinishBundleByTime(AbstractPythonFunctionOperator.java:300)
at
org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.lambda$open$0(AbstractPythonFunctionOperator.java:118)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1702)
... 14 more
Caused by: java.lang.RuntimeException: Failed to close remote bundle
at
org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.finishBundle(BeamPythonFunctionRunner.java:423)
at
org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.flush(BeamPythonFunctionRunner.java:407)
at
org.apache.flink.streaming.api.operators.python.process.AbstractExternalPythonFunctionOperator.lambda$invokeFinishBundle$0(AbstractExternalPythonFunctionOperator.java:86)
at
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
... 1 more
Caused by: java.util.concurrent.ExecutionException:
java.lang.RuntimeException: Error received from SDK harness for instruction
3: Traceback (most recent call last):
File
"/usr/local/lib/python3.9/site-packages/apache_beam/runners/worker/sdk_worker.py",
line 267, in _execute
response = task()
File
"/usr/local/lib/python3.9/site-packages/apache_beam/runners/worker/sdk_worker.py",
line 340, in <lambda>
lambda: self.create_worker().do_instruction(request), request)
File
"/usr/local/lib/python3.9/site-packages/apache_beam/runners/worker/sdk_worker.py",
line 580, in do_instruction
return getattr(self, request_type)(
File
"/usr/local/lib/python3.9/site-packages/apache_beam/runners/worker/sdk_worker.py",
line 618, in process_bundle
bundle_processor.process_bundle(instruction_id))
File
"/usr/local/lib/python3.9/site-packages/apache_beam/runners/worker/bundle_processor.py",
line 995, in process_bundle
input_op_by_transform_id[element.transform_id].process_encoded(
File
"/usr/local/lib/python3.9/site-packages/apache_beam/runners/worker/bundle_processor.py",
line 221, in process_encoded
self.output(decoded_value)
File "apache_beam/runners/worker/operations.py", line 346, in
apache_beam.runners.worker.operations.Operation.output
File "apache_beam/runners/worker/operations.py", line 348, in
apache_beam.runners.worker.operations.Operation.output
File "apache_beam/runners/worker/operations.py", line 215, in
apache_beam.runners.worker.operations.SingletonConsumerSet.receive
File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 169, in
pyflink.fn_execution.beam.beam_operations_fast.FunctionOperation.process
File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 196, in
pyflink.fn_execution.beam.beam_operations_fast.FunctionOperation.process
File
"/usr/local/lib/python3.9/site-packages/pyflink/fn_execution/table/operations.py",
line 101, in process_element
return self.func(value)
File "<string>", line 1, in <lambda>
File
"/tmp/python-dist-9b10e17c-99f8-4a8a-adb0-90faca859ca5/python-files/blob_p-e59b6f256efca9ae3fc37635498b82f316f8ac65-34f486b7351b4e2c73282ab4069c075e/custom_udf.py",
line 27, in eval
raise e
File
"/tmp/python-dist-9b10e17c-99f8-4a8a-adb0-90faca859ca5/python-files/blob_p-e59b6f256efca9ae3fc37635498b82f316f8ac65-34f486b7351b4e2c73282ab4069c075e/custom_udf.py",
line 22, in eval
result = x/y
ZeroDivisionError: float division by zero
at
org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:180)
at
org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:160)
at
org.apache.beam.vendor.grpc.v1p43p2.io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onMessage(ServerCalls.java:262)
at
org.apache.beam.vendor.grpc.v1p43p2.io.grpc.ForwardingServerCallListener.onMessage(ForwardingServerCallListener.java:33)
at
org.apache.beam.vendor.grpc.v1p43p2.io.grpc.Contexts$ContextualizedServerCallListener.onMessage(Contexts.java:76)
at
org.apache.beam.vendor.grpc.v1p43p2.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailableInternal(ServerCallImpl.java:318)
at
org.apache.beam.vendor.grpc.v1p43p2.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailable(ServerCallImpl.java:301)
at
org.apache.beam.vendor.grpc.v1p43p2.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1MessagesAvailable.runInContext(ServerImpl.java:834)
at
org.apache.beam.vendor.grpc.v1p43p2.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
at
org.apache.beam.vendor.grpc.v1p43p2.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133)
... 3 more