# 定义计算逻辑函数 @udf(input_types=DataTypes.DECIMAL(38,18,True), result_type=DataTypes.DECIMAL(38,18,True), udf_type="pandas")
def multi_production(yldrate): yldrate_1 = yldrate + 1 return np.prod(yldrate_1) - 1 调用:env.sql_query('SELECT multi_production(YLDRATE) FROM query_result') 由于官网并未找到再详细的例子,pandas类型的udf 内部,可以遵循pandas风格处理数据么? 【报错信息】: at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) at org.apache.beam.sdk.util.MoreFutures.get(MoreFutures.java:57) at org.apache.beam.runners.fnexecution.control.SdkHarnessClient$BundleProcessor$ActiveBundle.close(SdkHarnessClient.java:345) at org.apache.flink.python.AbstractPythonFunctionRunner.finishBundle(AbstractPythonFunctionRunner.java:230) ... 17 more Caused by: java.lang.RuntimeException: Error received from SDK harness for instruction 2: Traceback (most recent call last): File "C:\Users\18242\AppData\Local\Programs\Python\Python37\lib\site-packages\apache_beam\runners\worker\sdk_worker.py", line 167, in _execute response = task() File "C:\Users\18242\AppData\Local\Programs\Python\Python37\lib\site-packages\apache_beam\runners\worker\sdk_worker.py", line 223, in <lambda> lambda: self.create_worker().do_instruction(request), request) File "C:\Users\18242\AppData\Local\Programs\Python\Python37\lib\site-packages\apache_beam\runners\worker\sdk_worker.py", line 352, in do_instruction request.instruction_id) File "C:\Users\18242\AppData\Local\Programs\Python\Python37\lib\site-packages\apache_beam\runners\worker\sdk_worker.py", line 386, in process_bundle bundle_processor.process_bundle(instruction_id)) File "C:\Users\18242\AppData\Local\Programs\Python\Python37\lib\site-packages\apache_beam\runners\worker\bundle_processor.py", line 812, in process_bundle data.transform_id].process_encoded(data.data) File "C:\Users\18242\AppData\Local\Programs\Python\Python37\lib\site-packages\apache_beam\runners\worker\bundle_processor.py", line 205, in process_encoded self.output(decoded_value) File "C:\Users\18242\AppData\Local\Programs\Python\Python37\lib\site-packages\apache_beam\runners\worker\operations.py", line 304, in output cython.cast(Receiver, self.receivers[output_index]).receive(windowed_value) File "C:\Users\18242\AppData\Local\Programs\Python\Python37\lib\site-packages\apache_beam\runners\worker\operations.py", line 178, in receive self.consumer.process(windowed_value) File "C:\Users\18242\AppData\Local\Programs\Python\Python37\lib\site-packages\pyflink\fn_execution\operations.py", line 92, in process self._value_coder_impl.encode_to_stream(self.func(o.value), output_stream, True) File "C:\Users\18242\AppData\Local\Programs\Python\Python37\lib\site-packages\pyflink\fn_execution\coder_impl.py", line 467, in encode_to_stream self._value_coder.encode_to_stream(value, out, nested) File "C:\Users\18242\AppData\Local\Programs\Python\Python37\lib\site-packages\pyflink\fn_execution\coder_impl.py", line 438, in encode_to_stream pandas_to_arrow(self._schema, self._timezone, self._field_types, cols)) File "C:\Users\18242\AppData\Local\Programs\Python\Python37\lib\site-packages\pyflink\table\utils.py", line 35, in pandas_to_arrow schema.types[i]) for i in range(0, len(schema))] File "C:\Users\18242\AppData\Local\Programs\Python\Python37\lib\site-packages\pyflink\table\utils.py", line 35, in <listcomp> schema.types[i]) for i in range(0, len(schema))] File "C:\Users\18242\AppData\Local\Programs\Python\Python37\lib\site-packages\pyflink\table\utils.py", line 27, in create_array return pa.Array.from_pandas(s, mask=s.isnull(), type=t) AttributeError: 'decimal.Decimal' object has no attribute 'isnull' at org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:160) at org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:140) at org.apache.beam.vendor.grpc.v1p21p0.io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onMessage(ServerCalls.java:249) at org.apache.beam.vendor.grpc.v1p21p0.io.grpc.ForwardingServerCallListener.onMessage(ForwardingServerCallListener.java:33) at org.apache.beam.vendor.grpc.v1p21p0.io.grpc.Contexts$ContextualizedServerCallListener.onMessage(Contexts.java:76) at org.apache.beam.vendor.grpc.v1p21p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailable(ServerCallImpl.java:297) at org.apache.beam.vendor.grpc.v1p21p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1MessagesAvailable.runInContext(ServerImpl.java:738) at org.apache.beam.vendor.grpc.v1p21p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37) at org.apache.beam.vendor.grpc.v1p21p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ... 1 more