Elkhan Dadashov created FLINK-32728:
---------------------------------------

             Summary: Metrics are not reported in Python UDF (used inside 
FlinkSQL) when exception is raised
                 Key: FLINK-32728
                 URL: https://issues.apache.org/jira/browse/FLINK-32728
             Project: Flink
          Issue Type: New Feature
          Components: API / Python, Table SQL / Runtime
    Affects Versions: 1.17.1, 1.16.2
         Environment: Flink 1.16.2 and Flink 1.17.1 (Python 3.9)
            Reporter: Elkhan Dadashov


When Python UDF (which is used inside FlinkSQL) raises an exception, then 
metrics get lost and not reported.  Facing this issue both in Flink 1.16.2 and 
Flink 1.17.1 (Python 3.9).
If an exception is not raised, then metrics show up.
 
It is not mentioned on Flink documentation that UDFs should not throw an 
exception.
=======================
FlinkSQL script content:
=======================

CREATE TABLE input_table (
price DOUBLE
) WITH (
'connector' = 'datagen',
'rows-per-second' = '1'
);

CREATE TABLE output_table WITH ('connector' = 'print')
LIKE input_table (EXCLUDING ALL);

CREATE FUNCTION myDivide AS 'custom_udf.divide_udf'
LANGUAGE PYTHON;

-- Fail scenario: ZeroDivisionError: division by zero
INSERT into output_table (select myDivide(value, 0)  from input_table);

=======================
Python UDF content:
=======================

from pyflink.table.udf import ScalarFunction, udf
from pyflink.table import DataTypes

import logging

class DivideUDF(ScalarFunction):
    def __init__(self):
        self.success_counter = None
        self.fail_counter = None
    def open(self, function_context):
        self.success_counter = 
function_context.get_metric_group().counter("flinksql_custom_udf_success_metric")
        self.fail_counter = 
function_context.get_metric_group().counter("flinksql_custom_udf_fail_metric")
    def eval(self, x, y):
        [logging.info|http://logging.info/]('executing custom udf with logging 
and metric example...')
        try:
            result = x/y
            self.success_counter.inc()
            return result
        except Exception as e:
            self.fail_counter.inc()
            raise e

divide_udf = udf(DivideUDF(), result_type=DataTypes.DOUBLE())
 
=======================
Exception stack trace:
=======================
 
2023-07-26 18:17:20
org.apache.flink.runtime.taskmanager.AsynchronousException: Caught exception 
while processing timer.
at 
org.apache.flink.streaming.runtime.tasks.StreamTask$StreamTaskAsyncExceptionHandler.handleAsyncException(StreamTask.java:1575)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.handleAsyncException(StreamTask.java:1550)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1704)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$null$22(StreamTask.java:1693)
at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMail(MailboxProcessor.java:398)
at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:367)
at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:352)
at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:229)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:838)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:787)
at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:914)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: TimerException\{java.lang.RuntimeException: Error while waiting for 
BeamPythonFunctionRunner flush}
... 15 more
Caused by: java.lang.RuntimeException: Error while waiting for 
BeamPythonFunctionRunner flush
at 
org.apache.flink.streaming.api.operators.python.process.AbstractExternalPythonFunctionOperator.invokeFinishBundle(AbstractExternalPythonFunctionOperator.java:107)
at 
org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.checkInvokeFinishBundleByTime(AbstractPythonFunctionOperator.java:300)
at 
org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.lambda$open$0(AbstractPythonFunctionOperator.java:118)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1702)
... 14 more
Caused by: java.lang.RuntimeException: Failed to close remote bundle
at 
org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.finishBundle(BeamPythonFunctionRunner.java:423)
at 
org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.flush(BeamPythonFunctionRunner.java:407)
at 
org.apache.flink.streaming.api.operators.python.process.AbstractExternalPythonFunctionOperator.lambda$invokeFinishBundle$0(AbstractExternalPythonFunctionOperator.java:86)
at 
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
... 1 more
Caused by: java.util.concurrent.ExecutionException: java.lang.RuntimeException: 
Error received from SDK harness for instruction 3: Traceback (most recent call 
last):
  File 
"/usr/local/lib/python3.9/site-packages/apache_beam/runners/worker/sdk_worker.py",
 line 267, in _execute
    response = task()
  File 
"/usr/local/lib/python3.9/site-packages/apache_beam/runners/worker/sdk_worker.py",
 line 340, in <lambda>
    lambda: self.create_worker().do_instruction(request), request)
  File 
"/usr/local/lib/python3.9/site-packages/apache_beam/runners/worker/sdk_worker.py",
 line 580, in do_instruction
    return getattr(self, request_type)(
  File 
"/usr/local/lib/python3.9/site-packages/apache_beam/runners/worker/sdk_worker.py",
 line 618, in process_bundle
    bundle_processor.process_bundle(instruction_id))
  File 
"/usr/local/lib/python3.9/site-packages/apache_beam/runners/worker/bundle_processor.py",
 line 995, in process_bundle
    input_op_by_transform_id[element.transform_id].process_encoded(
  File 
"/usr/local/lib/python3.9/site-packages/apache_beam/runners/worker/bundle_processor.py",
 line 221, in process_encoded
    self.output(decoded_value)
  File "apache_beam/runners/worker/operations.py", line 346, in 
apache_beam.runners.worker.operations.Operation.output
  File "apache_beam/runners/worker/operations.py", line 348, in 
apache_beam.runners.worker.operations.Operation.output
  File "apache_beam/runners/worker/operations.py", line 215, in 
apache_beam.runners.worker.operations.SingletonConsumerSet.receive
  File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 169, in 
pyflink.fn_execution.beam.beam_operations_fast.FunctionOperation.process
  File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 196, in 
pyflink.fn_execution.beam.beam_operations_fast.FunctionOperation.process
  File 
"/usr/local/lib/python3.9/site-packages/pyflink/fn_execution/table/operations.py",
 line 101, in process_element
    return self.func(value)
  File "<string>", line 1, in <lambda>
  File 
"/tmp/python-dist-9b10e17c-99f8-4a8a-adb0-90faca859ca5/python-files/blob_p-e59b6f256efca9ae3fc37635498b82f316f8ac65-34f486b7351b4e2c73282ab4069c075e/custom_udf.py",
 line 27, in eval
    raise e
  File 
"/tmp/python-dist-9b10e17c-99f8-4a8a-adb0-90faca859ca5/python-files/blob_p-e59b6f256efca9ae3fc37635498b82f316f8ac65-34f486b7351b4e2c73282ab4069c075e/custom_udf.py",
 line 22, in eval
    result = x/y
ZeroDivisionError: float division by zero
 
 at 
org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:180)
at 
org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:160)
at 
org.apache.beam.vendor.grpc.v1p43p2.io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onMessage(ServerCalls.java:262)
at 
org.apache.beam.vendor.grpc.v1p43p2.io.grpc.ForwardingServerCallListener.onMessage(ForwardingServerCallListener.java:33)
at 
org.apache.beam.vendor.grpc.v1p43p2.io.grpc.Contexts$ContextualizedServerCallListener.onMessage(Contexts.java:76)
at 
org.apache.beam.vendor.grpc.v1p43p2.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailableInternal(ServerCallImpl.java:318)
at 
org.apache.beam.vendor.grpc.v1p43p2.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailable(ServerCallImpl.java:301)
at 
org.apache.beam.vendor.grpc.v1p43p2.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1MessagesAvailable.runInContext(ServerImpl.java:834)
at 
org.apache.beam.vendor.grpc.v1p43p2.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
at 
org.apache.beam.vendor.grpc.v1p43p2.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133)
... 3 more



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to