Hi, Elkhan

I have one question, what metrics are not reported?

Best,
Ron

Elkhan Dadashov <elkhan.dadas...@gmail.com> 于2023年7月28日周五 05:46写道:

> Hi Flinkers,
>
> Wanted to check if anyone else has faced this issue before:
>
> When Python UDF (which is used inside FlinkSQL) raises an exception, then
> metrics get lost and not reported.  Facing this issue both in Flink 1.16.2
> and FLink 1.17.1 (Python 3.9).
> If an exception is not raised, then metrics show up.
>
> It is not mentioned on Flink documentation that UDFs should not throw an
> exception. Is this the case?
>
> Or is it a known issue/bug?
>
> Thank you.
>
> =======================
> FlinkSQL script content:
> =======================
>
> CREATE TABLE input_table (
> price DOUBLE
> ) WITH (
> 'connector' = 'datagen',
> 'rows-per-second' = '1'
> );
>
> CREATE TABLE output_table WITH ('connector' = 'print')
> LIKE input_table (EXCLUDING ALL);
>
> CREATE FUNCTION myDivide AS 'custom_udf.divide_udf'
> LANGUAGE PYTHON;
>
> -- Fail scenario: ZeroDivisionError: division by zero
> INSERT into output_table (select myDivide(value, 0)  from input_table);
>
> =======================
> Python UDF content:
> =======================
>
> from pyflink.table.udf import ScalarFunction, udf
> from pyflink.table import DataTypes
>
> import logging
>
> class DivideUDF(ScalarFunction):
>     def __init__(self):
>         self.success_counter = None
>         self.fail_counter = None
>     def open(self, function_context):
>         self.success_counter =
> function_context.get_metric_group().counter("flinksql_custom_udf_success_metric")
>         self.fail_counter =
> function_context.get_metric_group().counter("flinksql_custom_udf_fail_metric")
>     def eval(self, x, y):
>         logging.info('executing custom udf with logging and metric
> example...')
>         try:
>             result = x/y
>             self.success_counter.inc()
>             return result
>         except Exception as e:
>             self.fail_counter.inc()
>             raise e
>
> divide_udf = udf(DivideUDF(), result_type=DataTypes.DOUBLE())
>
> =======================
> Exception stack trace:
> =======================
>
> 2023-07-26 18:17:20
> org.apache.flink.runtime.taskmanager.AsynchronousException: Caught
> exception while processing timer.
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask$StreamTaskAsyncExceptionHandler.handleAsyncException(StreamTask.java:1575)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.handleAsyncException(StreamTask.java:1550)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1704)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$null$22(StreamTask.java:1693)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
> at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMail(MailboxProcessor.java:398)
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:367)
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:352)
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:229)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:838)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:787)
> at
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)
> at
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:914)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
> at java.base/java.lang.Thread.run(Thread.java:829)
> Caused by: TimerException{java.lang.RuntimeException: Error while waiting
> for BeamPythonFunctionRunner flush}
> ... 15 more
> Caused by: java.lang.RuntimeException: Error while waiting for
> BeamPythonFunctionRunner flush
> at
> org.apache.flink.streaming.api.operators.python.process.AbstractExternalPythonFunctionOperator.invokeFinishBundle(AbstractExternalPythonFunctionOperator.java:107)
> at
> org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.checkInvokeFinishBundleByTime(AbstractPythonFunctionOperator.java:300)
> at
> org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.lambda$open$0(AbstractPythonFunctionOperator.java:118)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1702)
> ... 14 more
> Caused by: java.lang.RuntimeException: Failed to close remote bundle
> at
> org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.finishBundle(BeamPythonFunctionRunner.java:423)
> at
> org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.flush(BeamPythonFunctionRunner.java:407)
> at
> org.apache.flink.streaming.api.operators.python.process.AbstractExternalPythonFunctionOperator.lambda$invokeFinishBundle$0(AbstractExternalPythonFunctionOperator.java:86)
> at
> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
> at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
> at
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
> at
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
> ... 1 more
> Caused by: java.util.concurrent.ExecutionException:
> java.lang.RuntimeException: Error received from SDK harness for instruction
> 3: Traceback (most recent call last):
>   File
> "/usr/local/lib/python3.9/site-packages/apache_beam/runners/worker/sdk_worker.py",
> line 267, in _execute
>     response = task()
>   File
> "/usr/local/lib/python3.9/site-packages/apache_beam/runners/worker/sdk_worker.py",
> line 340, in <lambda>
>     lambda: self.create_worker().do_instruction(request), request)
>   File
> "/usr/local/lib/python3.9/site-packages/apache_beam/runners/worker/sdk_worker.py",
> line 580, in do_instruction
>     return getattr(self, request_type)(
>   File
> "/usr/local/lib/python3.9/site-packages/apache_beam/runners/worker/sdk_worker.py",
> line 618, in process_bundle
>     bundle_processor.process_bundle(instruction_id))
>   File
> "/usr/local/lib/python3.9/site-packages/apache_beam/runners/worker/bundle_processor.py",
> line 995, in process_bundle
>     input_op_by_transform_id[element.transform_id].process_encoded(
>   File
> "/usr/local/lib/python3.9/site-packages/apache_beam/runners/worker/bundle_processor.py",
> line 221, in process_encoded
>     self.output(decoded_value)
>   File "apache_beam/runners/worker/operations.py", line 346, in
> apache_beam.runners.worker.operations.Operation.output
>   File "apache_beam/runners/worker/operations.py", line 348, in
> apache_beam.runners.worker.operations.Operation.output
>   File "apache_beam/runners/worker/operations.py", line 215, in
> apache_beam.runners.worker.operations.SingletonConsumerSet.receive
>   File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 169, in
> pyflink.fn_execution.beam.beam_operations_fast.FunctionOperation.process
>   File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 196, in
> pyflink.fn_execution.beam.beam_operations_fast.FunctionOperation.process
>   File
> "/usr/local/lib/python3.9/site-packages/pyflink/fn_execution/table/operations.py",
> line 101, in process_element
>     return self.func(value)
>   File "<string>", line 1, in <lambda>
>   File
> "/tmp/python-dist-9b10e17c-99f8-4a8a-adb0-90faca859ca5/python-files/blob_p-e59b6f256efca9ae3fc37635498b82f316f8ac65-34f486b7351b4e2c73282ab4069c075e/custom_udf.py",
> line 27, in eval
>     raise e
>   File
> "/tmp/python-dist-9b10e17c-99f8-4a8a-adb0-90faca859ca5/python-files/blob_p-e59b6f256efca9ae3fc37635498b82f316f8ac65-34f486b7351b4e2c73282ab4069c075e/custom_udf.py",
> line 22, in eval
>     result = x/y
> ZeroDivisionError: float division by zero
>
>  at
> org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:180)
> at
> org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:160)
> at
> org.apache.beam.vendor.grpc.v1p43p2.io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onMessage(ServerCalls.java:262)
> at
> org.apache.beam.vendor.grpc.v1p43p2.io.grpc.ForwardingServerCallListener.onMessage(ForwardingServerCallListener.java:33)
> at
> org.apache.beam.vendor.grpc.v1p43p2.io.grpc.Contexts$ContextualizedServerCallListener.onMessage(Contexts.java:76)
> at
> org.apache.beam.vendor.grpc.v1p43p2.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailableInternal(ServerCallImpl.java:318)
> at
> org.apache.beam.vendor.grpc.v1p43p2.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailable(ServerCallImpl.java:301)
> at
> org.apache.beam.vendor.grpc.v1p43p2.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1MessagesAvailable.runInContext(ServerImpl.java:834)
> at
> org.apache.beam.vendor.grpc.v1p43p2.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
> at
> org.apache.beam.vendor.grpc.v1p43p2.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133)
> ... 3 more
>

Reply via email to