Thanks a lot~

> 2021年4月28日 上午8:25,Yik San Chan <evan.chanyik...@gmail.com> 写道:
> 
> Hi Dian,
> 
> I follow up with this PR https://github.com/apache/flink/pull/15790 
> <https://github.com/apache/flink/pull/15790>
> On Tue, Apr 27, 2021 at 11:03 PM Dian Fu <dian0511...@gmail.com 
> <mailto:dian0511...@gmail.com>> wrote:
> Hi Yik San,
> 
> Make sense to me. :)
> 
> Regards,
> Dian
> 
>> 2021年4月27日 下午9:50,Yik San Chan <evan.chanyik...@gmail.com 
>> <mailto:evan.chanyik...@gmail.com>> 写道:
>> 
>> Hi Dian,
>> 
>> Wow, this is unexpected 😮 How about adding documentations to Python UDF 
>> about this? Again it can be time consuming to figure this out. Maybe:
>> 
>> To be able to run Python UDFs in any non-local mode, it is recommended to 
>> include your UDF definitions using -pyfs config option, if your UDFs live 
>> outside of the file where the main() function is defined.
>> 
>> What do you think?
>> 
>> Best,
>> Yik San
>> 
>> On Tue, Apr 27, 2021 at 9:24 PM Dian Fu <dian0511...@gmail.com 
>> <mailto:dian0511...@gmail.com>> wrote:
>> I guess this is the magic of cloud pickle. PyFlink depends on cloud pickle 
>> to serialize the Python UDF. 
>> 
>> For the latter case, I guess the whole Python UDF implementation will be 
>> serialized. However, for the previous case, only the path of the class is 
>> serialized.
>> 
>> Regards,
>> Dian
>> 
>>> 2021年4月27日 下午8:52,Yik San Chan <evan.chanyik...@gmail.com 
>>> <mailto:evan.chanyik...@gmail.com>> 写道:
>>> 
>>> Hi Dian,
>>> 
>>> Thanks! Adding -pyfs definitely helps.
>>> 
>>> However, I am curious. If I define my udf this way:
>>> 
>>> ```python
>>> @udf(input_types=[DataTypes.STRING()], result_type=DataTypes.STRING())
>>> def decrypt(s):
>>>     import pandas as pd
>>>     d = pd.read_csv('resources.zip/resources/crypt.csv', header=None, 
>>> index_col=0, squeeze=True).to_dict()
>>>     return d.get(s, "unknown")
>>> ```
>>> 
>>> I can `flink run` without having to specify -pyfs option. The code can also 
>>> be found in the commit 
>>> https://github.com/YikSanChan/pyflink-quickstart/commit/cd003ca7d36583999dbb5ffd45958762e4323607
>>>  
>>> <https://github.com/YikSanChan/pyflink-quickstart/commit/cd003ca7d36583999dbb5ffd45958762e4323607>.
>>>  I wonder why?
>>> 
>>> Best,
>>> Yik San
>>> 
>>> On Tue, Apr 27, 2021 at 8:13 PM Dian Fu <dian0511...@gmail.com 
>>> <mailto:dian0511...@gmail.com>> wrote:
>>> Hi Yik San,
>>> 
>>> From the exception message, it’s clear that it could not find module 
>>> `decrypt_fun` during execution.
>>> 
>>> You need to specify file `decrypt_fun.py` as a dependency during submitting 
>>> the job, e.g. via -pyfs command line arguments. Otherwise, this file will 
>>> not be available during execution.
>>> 
>>> Regards,
>>> Dian
>>> 
>>>> 2021年4月27日 下午8:01,Yik San Chan <evan.chanyik...@gmail.com 
>>>> <mailto:evan.chanyik...@gmail.com>> 写道:
>>>> 
>>>> Hi,
>>>> 
>>>> Here's the reproducible code sample: 
>>>> https://github.com/YikSanChan/pyflink-quickstart/tree/83526abca832f9ed5b8ce20be52fd506c45044d3
>>>>  
>>>> <https://github.com/YikSanChan/pyflink-quickstart/tree/83526abca832f9ed5b8ce20be52fd506c45044d3>
>>>> 
>>>> I implement my Python UDF by extending the ScalarFunction base class in a 
>>>> separate file named decrypt_fun.py, and try to import the udf into my main 
>>>> python file named udf_use_resource.py.
>>>> 
>>>> However, after I `flink run`, I find the error log in TaskManager log:
>>>> 
>>>> ```
>>>> Caused by: java.lang.RuntimeException: Error received from SDK harness for 
>>>> instruction 1: Traceback (most recent call last):
>>>>   File 
>>>> "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
>>>>  line 376, in get
>>>>     processor = self.cached_bundle_processors[bundle_descriptor_id].pop()
>>>> IndexError: pop from empty list
>>>> 
>>>> During handling of the above exception, another exception occurred:
>>>> 
>>>> Traceback (most recent call last):
>>>>   File 
>>>> "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
>>>>  line 253, in _execute
>>>>     response = task()
>>>>   File 
>>>> "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
>>>>  line 310, in <lambda>
>>>>     lambda: self.create_worker().do_instruction(request), request)
>>>>   File 
>>>> "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
>>>>  line 480, in do_instruction
>>>>     getattr(request, request_type), request.instruction_id)
>>>>   File 
>>>> "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
>>>>  line 509, in process_bundle
>>>>     instruction_id, request.process_bundle_descriptor_id)
>>>>   File 
>>>> "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
>>>>  line 382, in get
>>>>     self.data_channel_factory)
>>>>   File 
>>>> "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>>  line 847, in __init__
>>>>     self.ops = self.create_execution_tree(self.process_bundle_descriptor)
>>>>   File 
>>>> "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>>  line 902, in create_execution_tree
>>>>     descriptor.transforms, key=topological_height, reverse=True)
>>>>   File 
>>>> "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>>  line 901, in <listcomp>
>>>>     (transform_id, get_operation(transform_id)) for transform_id in sorted(
>>>>   File 
>>>> "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>>  line 791, in wrapper
>>>>     result = cache[args] = func(*args)
>>>>   File 
>>>> "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>>  line 885, in get_operation
>>>>     pcoll_id in descriptor.transforms[transform_id].outputs.items()
>>>>   File 
>>>> "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>>  line 885, in <dictcomp>
>>>>     pcoll_id in descriptor.transforms[transform_id].outputs.items()
>>>>   File 
>>>> "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>>  line 883, in <listcomp>
>>>>     tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]
>>>>   File 
>>>> "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>>  line 791, in wrapper
>>>>     result = cache[args] = func(*args)
>>>>   File 
>>>> "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>>  line 888, in get_operation
>>>>     transform_id, transform_consumers)
>>>>   File 
>>>> "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>>>  line 1174, in create_operation
>>>>     return creator(self, transform_id, transform_proto, payload, consumers)
>>>>   File 
>>>> "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/pyflink/fn_execution/beam/beam_operations.py",
>>>>  line 39, in create_scalar_function
>>>>     operations.ScalarFunctionOperation)
>>>>   File 
>>>> "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/pyflink/fn_execution/beam/beam_operations.py",
>>>>  line 166, in _create_user_defined_function_operation
>>>>     internal_operation_cls)
>>>>   File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 110, in 
>>>> pyflink.fn_execution.beam.beam_operations_fast.StatelessFunctionOperation.__init__
>>>>   File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 49, in 
>>>> pyflink.fn_execution.beam.beam_operations_fast.FunctionOperation.__init__
>>>>   File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 114, in 
>>>> pyflink.fn_execution.beam.beam_operations_fast.StatelessFunctionOperation.generate_operation
>>>>   File 
>>>> "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/pyflink/fn_execution/operations.py",
>>>>  line 91, in __init__
>>>>     super(ScalarFunctionOperation, self).__init__(spec)
>>>>   File 
>>>> "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/pyflink/fn_execution/operations.py",
>>>>  line 56, in __init__
>>>>     self.func, self.user_defined_funcs = 
>>>> self.generate_func(self.spec.serialized_fn)
>>>>   File 
>>>> "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/pyflink/fn_execution/operations.py",
>>>>  line 105, in generate_func
>>>>     [operation_utils.extract_user_defined_function(udf) for udf in 
>>>> serialized_fn.udfs])
>>>>   File 
>>>> "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/pyflink/fn_execution/operations.py",
>>>>  line 105, in <listcomp>
>>>>     [operation_utils.extract_user_defined_function(udf) for udf in 
>>>> serialized_fn.udfs])
>>>>   File 
>>>> "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/pyflink/fn_execution/operation_utils.py",
>>>>  line 86, in extract_user_defined_function
>>>>     user_defined_func = pickle.loads(user_defined_function_proto.payload)
>>>>   File 
>>>> "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/pyflink/fn_execution/pickle.py",
>>>>  line 29, in loads
>>>>     return cloudpickle.loads(payload)
>>>> ModuleNotFoundError: No module named 'decrypt_fun'
>>>> 
>>>>     at 
>>>> org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:177)
>>>>  
>>>> ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-8f27cc9e92a718bc9d3d138d1d2d49ca:1.12.0]
>>>>     at 
>>>> org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:157)
>>>>  
>>>> ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-8f27cc9e92a718bc9d3d138d1d2d49ca:1.12.0]
>>>>     at 
>>>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onMessage(ServerCalls.java:251)
>>>>  
>>>> ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-8f27cc9e92a718bc9d3d138d1d2d49ca:1.12.0]
>>>>     at 
>>>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ForwardingServerCallListener.onMessage(ForwardingServerCallListener.java:33)
>>>>  
>>>> ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-8f27cc9e92a718bc9d3d138d1d2d49ca:1.12.0]
>>>>     at 
>>>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.Contexts$ContextualizedServerCallListener.onMessage(Contexts.java:76)
>>>>  
>>>> ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-8f27cc9e92a718bc9d3d138d1d2d49ca:1.12.0]
>>>>     at 
>>>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailableInternal(ServerCallImpl.java:309)
>>>>  
>>>> ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-8f27cc9e92a718bc9d3d138d1d2d49ca:1.12.0]
>>>>     at 
>>>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailable(ServerCallImpl.java:292)
>>>>  
>>>> ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-8f27cc9e92a718bc9d3d138d1d2d49ca:1.12.0]
>>>>     at 
>>>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1MessagesAvailable.runInContext(ServerImpl.java:782)
>>>>  
>>>> ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-8f27cc9e92a718bc9d3d138d1d2d49ca:1.12.0]
>>>>     at 
>>>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
>>>>  
>>>> ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-8f27cc9e92a718bc9d3d138d1d2d49ca:1.12.0]
>>>>     at 
>>>> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
>>>>  
>>>> ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-8f27cc9e92a718bc9d3d138d1d2d49ca:1.12.0]
>>>>     at 
>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>>>  ~[?:1.8.0_282]
>>>>     at 
>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>>>  ~[?:1.8.0_282]
>>>>     ... 1 more
>>>> ```
>>>> 
>>>> I wonder why? If I move the Decrypt class into udf_use_resource.py, 
>>>> everything works just fine.
>>>> 
>>>> Thank you!
>>>> 
>>>> Best,
>>>> Yik San
>>> 
>> 
> 

Reply via email to