??????????????????pyflink??????????udf????????????????????????????????????????????????????????????????????
Caused by: java.lang.RuntimeException: Error received from SDK harness for 
instruction 8: Traceback (most recent call last):
  File 
"/root/anaconda3/envs/bigdata/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
 line 253, in _execute
    response = task()
  File 
"/root/anaconda3/envs/bigdata/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
 line 310, in <lambda&gt;
&nbsp; &nbsp; lambda: self.create_worker().do_instruction(request), request)
&nbsp; File 
"/root/anaconda3/envs/bigdata/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
 line 480, in do_instruction
&nbsp; &nbsp; getattr(request, request_type), request.instruction_id)
&nbsp; File 
"/root/anaconda3/envs/bigdata/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
 line 515, in process_bundle
&nbsp; &nbsp; bundle_processor.process_bundle(instruction_id))
&nbsp; File 
"/root/anaconda3/envs/bigdata/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
 line 978, in process_bundle
&nbsp; &nbsp; element.data)
&nbsp; File 
"/root/anaconda3/envs/bigdata/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
 line 218, in process_encoded
&nbsp; &nbsp; self.output(decoded_value)
&nbsp; File "apache_beam/runners/worker/operations.py", line 330, in 
apache_beam.runners.worker.operations.Operation.output
&nbsp; File "apache_beam/runners/worker/operations.py", line 332, in 
apache_beam.runners.worker.operations.Operation.output
&nbsp; File "apache_beam/runners/worker/operations.py", line 195, in 
apache_beam.runners.worker.operations.SingletonConsumerSet.receive
&nbsp; File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 71, in 
pyflink.fn_execution.beam.beam_operations_fast.FunctionOperation.process
&nbsp; File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 73, in 
pyflink.fn_execution.beam.beam_operations_fast.FunctionOperation.process
&nbsp; File 
"/root/anaconda3/envs/bigdata/lib/python3.6/site-packages/pyflink/fn_execution/beam/beam_coder_impl_slow.py",
 line 625, in decode_from_stream
&nbsp; &nbsp; yield self._decode_one_batch_from_stream(in_stream, 
in_stream.read_var_int64())
&nbsp; File 
"/root/anaconda3/envs/bigdata/lib/python3.6/site-packages/pyflink/fn_execution/beam/beam_coder_impl_slow.py",
 line 636, in _decode_one_batch_from_stream
&nbsp; &nbsp; return arrow_to_pandas(self._timezone, self._field_types, 
[next(self._batch_reader)])
&nbsp; File 
"/root/anaconda3/envs/bigdata/lib/python3.6/site-packages/pyflink/fn_execution/beam/beam_coder_impl_slow.py",
 line 629, in _load_from_stream
&nbsp; &nbsp; reader = pa.ipc.open_stream(stream)
&nbsp; File 
"/root/anaconda3/envs/bigdata/lib/python3.6/site-packages/pyarrow/ipc.py", line 
146, in open_stream
&nbsp; &nbsp; return RecordBatchStreamReader(source)
&nbsp; File 
"/root/anaconda3/envs/bigdata/lib/python3.6/site-packages/pyarrow/ipc.py", line 
62, in __init__
&nbsp; &nbsp; self._open(source)
&nbsp; File "pyarrow/ipc.pxi", line 360, in 
pyarrow.lib._RecordBatchStreamReader._open
&nbsp; File "pyarrow/error.pxi", line 123, in 
pyarrow.lib.pyarrow_internal_check_status
&nbsp; File "pyarrow/error.pxi", line 100, in pyarrow.lib.check_status
OSError: Expected IPC message of type schema but got record batch

回复