Hi, Elkhan I have one question, what metrics are not reported?
Best, Ron Elkhan Dadashov <elkhan.dadas...@gmail.com> 于2023年7月28日周五 05:46写道: > Hi Flinkers, > > Wanted to check if anyone else has faced this issue before: > > When Python UDF (which is used inside FlinkSQL) raises an exception, then > metrics get lost and not reported. Facing this issue both in Flink 1.16.2 > and FLink 1.17.1 (Python 3.9). > If an exception is not raised, then metrics show up. > > It is not mentioned on Flink documentation that UDFs should not throw an > exception. Is this the case? > > Or is it a known issue/bug? > > Thank you. > > ======================= > FlinkSQL script content: > ======================= > > CREATE TABLE input_table ( > price DOUBLE > ) WITH ( > 'connector' = 'datagen', > 'rows-per-second' = '1' > ); > > CREATE TABLE output_table WITH ('connector' = 'print') > LIKE input_table (EXCLUDING ALL); > > CREATE FUNCTION myDivide AS 'custom_udf.divide_udf' > LANGUAGE PYTHON; > > -- Fail scenario: ZeroDivisionError: division by zero > INSERT into output_table (select myDivide(value, 0) from input_table); > > ======================= > Python UDF content: > ======================= > > from pyflink.table.udf import ScalarFunction, udf > from pyflink.table import DataTypes > > import logging > > class DivideUDF(ScalarFunction): > def __init__(self): > self.success_counter = None > self.fail_counter = None > def open(self, function_context): > self.success_counter = > function_context.get_metric_group().counter("flinksql_custom_udf_success_metric") > self.fail_counter = > function_context.get_metric_group().counter("flinksql_custom_udf_fail_metric") > def eval(self, x, y): > logging.info('executing custom udf with logging and metric > example...') > try: > result = x/y > self.success_counter.inc() > return result > except Exception as e: > self.fail_counter.inc() > raise e > > divide_udf = udf(DivideUDF(), result_type=DataTypes.DOUBLE()) > > ======================= > Exception stack trace: > ======================= > > 2023-07-26 18:17:20 > org.apache.flink.runtime.taskmanager.AsynchronousException: Caught > exception while processing timer. > at > org.apache.flink.streaming.runtime.tasks.StreamTask$StreamTaskAsyncExceptionHandler.handleAsyncException(StreamTask.java:1575) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.handleAsyncException(StreamTask.java:1550) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1704) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$null$22(StreamTask.java:1693) > at > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50) > at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMail(MailboxProcessor.java:398) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:367) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:352) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:229) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:838) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:787) > at > org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935) > at > org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:914) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550) > at java.base/java.lang.Thread.run(Thread.java:829) > Caused by: TimerException{java.lang.RuntimeException: Error while waiting > for BeamPythonFunctionRunner flush} > ... 15 more > Caused by: java.lang.RuntimeException: Error while waiting for > BeamPythonFunctionRunner flush > at > org.apache.flink.streaming.api.operators.python.process.AbstractExternalPythonFunctionOperator.invokeFinishBundle(AbstractExternalPythonFunctionOperator.java:107) > at > org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.checkInvokeFinishBundleByTime(AbstractPythonFunctionOperator.java:300) > at > org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.lambda$open$0(AbstractPythonFunctionOperator.java:118) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1702) > ... 14 more > Caused by: java.lang.RuntimeException: Failed to close remote bundle > at > org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.finishBundle(BeamPythonFunctionRunner.java:423) > at > org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.flush(BeamPythonFunctionRunner.java:407) > at > org.apache.flink.streaming.api.operators.python.process.AbstractExternalPythonFunctionOperator.lambda$invokeFinishBundle$0(AbstractExternalPythonFunctionOperator.java:86) > at > java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) > at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) > at > java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) > at > java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) > ... 1 more > Caused by: java.util.concurrent.ExecutionException: > java.lang.RuntimeException: Error received from SDK harness for instruction > 3: Traceback (most recent call last): > File > "/usr/local/lib/python3.9/site-packages/apache_beam/runners/worker/sdk_worker.py", > line 267, in _execute > response = task() > File > "/usr/local/lib/python3.9/site-packages/apache_beam/runners/worker/sdk_worker.py", > line 340, in <lambda> > lambda: self.create_worker().do_instruction(request), request) > File > "/usr/local/lib/python3.9/site-packages/apache_beam/runners/worker/sdk_worker.py", > line 580, in do_instruction > return getattr(self, request_type)( > File > "/usr/local/lib/python3.9/site-packages/apache_beam/runners/worker/sdk_worker.py", > line 618, in process_bundle > bundle_processor.process_bundle(instruction_id)) > File > "/usr/local/lib/python3.9/site-packages/apache_beam/runners/worker/bundle_processor.py", > line 995, in process_bundle > input_op_by_transform_id[element.transform_id].process_encoded( > File > "/usr/local/lib/python3.9/site-packages/apache_beam/runners/worker/bundle_processor.py", > line 221, in process_encoded > self.output(decoded_value) > File "apache_beam/runners/worker/operations.py", line 346, in > apache_beam.runners.worker.operations.Operation.output > File "apache_beam/runners/worker/operations.py", line 348, in > apache_beam.runners.worker.operations.Operation.output > File "apache_beam/runners/worker/operations.py", line 215, in > apache_beam.runners.worker.operations.SingletonConsumerSet.receive > File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 169, in > pyflink.fn_execution.beam.beam_operations_fast.FunctionOperation.process > File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 196, in > pyflink.fn_execution.beam.beam_operations_fast.FunctionOperation.process > File > "/usr/local/lib/python3.9/site-packages/pyflink/fn_execution/table/operations.py", > line 101, in process_element > return self.func(value) > File "<string>", line 1, in <lambda> > File > "/tmp/python-dist-9b10e17c-99f8-4a8a-adb0-90faca859ca5/python-files/blob_p-e59b6f256efca9ae3fc37635498b82f316f8ac65-34f486b7351b4e2c73282ab4069c075e/custom_udf.py", > line 27, in eval > raise e > File > "/tmp/python-dist-9b10e17c-99f8-4a8a-adb0-90faca859ca5/python-files/blob_p-e59b6f256efca9ae3fc37635498b82f316f8ac65-34f486b7351b4e2c73282ab4069c075e/custom_udf.py", > line 22, in eval > result = x/y > ZeroDivisionError: float division by zero > > at > org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:180) > at > org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:160) > at > org.apache.beam.vendor.grpc.v1p43p2.io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onMessage(ServerCalls.java:262) > at > org.apache.beam.vendor.grpc.v1p43p2.io.grpc.ForwardingServerCallListener.onMessage(ForwardingServerCallListener.java:33) > at > org.apache.beam.vendor.grpc.v1p43p2.io.grpc.Contexts$ContextualizedServerCallListener.onMessage(Contexts.java:76) > at > org.apache.beam.vendor.grpc.v1p43p2.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailableInternal(ServerCallImpl.java:318) > at > org.apache.beam.vendor.grpc.v1p43p2.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailable(ServerCallImpl.java:301) > at > org.apache.beam.vendor.grpc.v1p43p2.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1MessagesAvailable.runInContext(ServerImpl.java:834) > at > org.apache.beam.vendor.grpc.v1p43p2.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37) > at > org.apache.beam.vendor.grpc.v1p43p2.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133) > ... 3 more >