Hi Yik San, From the exception message, it’s clear that it could not find module `decrypt_fun` during execution.
You need to specify file `decrypt_fun.py` as a dependency during submitting the job, e.g. via -pyfs command line arguments. Otherwise, this file will not be available during execution. Regards, Dian > 2021年4月27日 下午8:01,Yik San Chan <[email protected]> 写道: > > Hi, > > Here's the reproducible code sample: > https://github.com/YikSanChan/pyflink-quickstart/tree/83526abca832f9ed5b8ce20be52fd506c45044d3 > > <https://github.com/YikSanChan/pyflink-quickstart/tree/83526abca832f9ed5b8ce20be52fd506c45044d3> > > I implement my Python UDF by extending the ScalarFunction base class in a > separate file named decrypt_fun.py, and try to import the udf into my main > python file named udf_use_resource.py. > > However, after I `flink run`, I find the error log in TaskManager log: > > ``` > Caused by: java.lang.RuntimeException: Error received from SDK harness for > instruction 1: Traceback (most recent call last): > File > "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", > line 376, in get > processor = self.cached_bundle_processors[bundle_descriptor_id].pop() > IndexError: pop from empty list > > During handling of the above exception, another exception occurred: > > Traceback (most recent call last): > File > "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", > line 253, in _execute > response = task() > File > "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", > line 310, in <lambda> > lambda: self.create_worker().do_instruction(request), request) > File > "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", > line 480, in do_instruction > getattr(request, request_type), request.instruction_id) > File > "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", > line 509, in process_bundle > instruction_id, request.process_bundle_descriptor_id) > File > "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", > line 382, in get > self.data_channel_factory) > File > "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", > line 847, in __init__ > self.ops = self.create_execution_tree(self.process_bundle_descriptor) > File > "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", > line 902, in create_execution_tree > descriptor.transforms, key=topological_height, reverse=True) > File > "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", > line 901, in <listcomp> > (transform_id, get_operation(transform_id)) for transform_id in sorted( > File > "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", > line 791, in wrapper > result = cache[args] = func(*args) > File > "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", > line 885, in get_operation > pcoll_id in descriptor.transforms[transform_id].outputs.items() > File > "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", > line 885, in <dictcomp> > pcoll_id in descriptor.transforms[transform_id].outputs.items() > File > "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", > line 883, in <listcomp> > tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]] > File > "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", > line 791, in wrapper > result = cache[args] = func(*args) > File > "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", > line 888, in get_operation > transform_id, transform_consumers) > File > "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", > line 1174, in create_operation > return creator(self, transform_id, transform_proto, payload, consumers) > File > "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/pyflink/fn_execution/beam/beam_operations.py", > line 39, in create_scalar_function > operations.ScalarFunctionOperation) > File > "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/pyflink/fn_execution/beam/beam_operations.py", > line 166, in _create_user_defined_function_operation > internal_operation_cls) > File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 110, in > pyflink.fn_execution.beam.beam_operations_fast.StatelessFunctionOperation.__init__ > File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 49, in > pyflink.fn_execution.beam.beam_operations_fast.FunctionOperation.__init__ > File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 114, in > pyflink.fn_execution.beam.beam_operations_fast.StatelessFunctionOperation.generate_operation > File > "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/pyflink/fn_execution/operations.py", > line 91, in __init__ > super(ScalarFunctionOperation, self).__init__(spec) > File > "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/pyflink/fn_execution/operations.py", > line 56, in __init__ > self.func, self.user_defined_funcs = > self.generate_func(self.spec.serialized_fn) > File > "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/pyflink/fn_execution/operations.py", > line 105, in generate_func > [operation_utils.extract_user_defined_function(udf) for udf in > serialized_fn.udfs]) > File > "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/pyflink/fn_execution/operations.py", > line 105, in <listcomp> > [operation_utils.extract_user_defined_function(udf) for udf in > serialized_fn.udfs]) > File > "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/pyflink/fn_execution/operation_utils.py", > line 86, in extract_user_defined_function > user_defined_func = pickle.loads(user_defined_function_proto.payload) > File > "/usr/local/anaconda3/envs/featflow-ml-env/lib/python3.7/site-packages/pyflink/fn_execution/pickle.py", > line 29, in loads > return cloudpickle.loads(payload) > ModuleNotFoundError: No module named 'decrypt_fun' > > at > org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:177) > > ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-8f27cc9e92a718bc9d3d138d1d2d49ca:1.12.0] > at > org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:157) > > ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-8f27cc9e92a718bc9d3d138d1d2d49ca:1.12.0] > at > org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onMessage(ServerCalls.java:251) > > ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-8f27cc9e92a718bc9d3d138d1d2d49ca:1.12.0] > at > org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ForwardingServerCallListener.onMessage(ForwardingServerCallListener.java:33) > > ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-8f27cc9e92a718bc9d3d138d1d2d49ca:1.12.0] > at > org.apache.beam.vendor.grpc.v1p26p0.io.grpc.Contexts$ContextualizedServerCallListener.onMessage(Contexts.java:76) > > ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-8f27cc9e92a718bc9d3d138d1d2d49ca:1.12.0] > at > org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailableInternal(ServerCallImpl.java:309) > > ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-8f27cc9e92a718bc9d3d138d1d2d49ca:1.12.0] > at > org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailable(ServerCallImpl.java:292) > > ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-8f27cc9e92a718bc9d3d138d1d2d49ca:1.12.0] > at > org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1MessagesAvailable.runInContext(ServerImpl.java:782) > > ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-8f27cc9e92a718bc9d3d138d1d2d49ca:1.12.0] > at > org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37) > > ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-8f27cc9e92a718bc9d3d138d1d2d49ca:1.12.0] > at > org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123) > > ~[blob_p-c18fee26bdebc8cb6523e7161974631be9f3b3d0-8f27cc9e92a718bc9d3d138d1d2d49ca:1.12.0] > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > ~[?:1.8.0_282] > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > ~[?:1.8.0_282] > ... 1 more > ``` > > I wonder why? If I move the Decrypt class into udf_use_resource.py, > everything works just fine. > > Thank you! > > Best, > Yik San
