Do you still have the same issue? I tried to follow your setup.sh to reproduce this but somehow I am stuck at the word_len step. I saw you also tried to use `print(kafka_kv)` to debug it. I am not sure about your current status.
On Fri, May 10, 2024 at 9:18 AM Jaehyeon Kim <dott...@gmail.com> wrote: > Hello, > > I'm playing with deploying a python pipeline to a flink cluster on > kubernetes via flink kubernetes operator. The pipeline simply calculates > average word lengths in a fixed time window of 5 seconds and it works with > the embedded flink cluster. > > First, I created a k8s cluster (v1.25.3) on minikube and a docker image > named beam-python-example:1.17 created using the following docker file - > the full details can be checked in > https://github.com/jaehyeon-kim/beam-demos/blob/feature/beam-deploy/beam-deploy/beam/Dockerfile > > The java sdk is used for the sdk harness of the kafka io's expansion > service while the job server is used to execute the python pipeline in the > flink operator. > > FROM flink:1.17 > ... > ## add java SDK and job server > COPY --from=apache/beam_java8_sdk:2.56.0 /opt/apache/beam/ > /opt/apache/beam/ > > COPY --from=apache/beam_flink1.17_job_server:2.56.0 \ > /opt/apache/beam/jars/beam-runners-flink-job-server.jar > /opt/apache/beam/jars/beam-runners-flink-job-server.jar > > RUN chown -R flink:flink /opt/apache/beam > > ## install python 3.10.13 > RUN apt-get update -y && \ > apt-get install -y build-essential libssl-dev zlib1g-dev libbz2-dev > libffi-dev liblzma-dev && \ > wget https://www.python.org/ftp/python/${PYTHON_VERSION}/Python- > ${PYTHON_VERSION}.tgz && \ > ... > ## install apache beam 2.56.0 > RUN pip3 install apache-beam==${BEAM_VERSION} > > ## copy pipeline source > RUN mkdir /opt/flink/app > COPY word_len.py /opt/flink/app/ > > Then the pipeline is deployed using the following manifest - the full > details can be found in > https://github.com/jaehyeon-kim/beam-demos/blob/feature/beam-deploy/beam-deploy/beam/word_len.yml > > apiVersion: flink.apache.org/v1beta1 > kind: FlinkDeployment > metadata: > name: beam-word-len > spec: > image: beam-python-example:1.17 > imagePullPolicy: Never > flinkVersion: v1_17 > flinkConfiguration: > taskmanager.numberOfTaskSlots: "5" > serviceAccount: flink > podTemplate: > spec: > containers: > - name: flink-main-container > env: > - name: BOOTSTRAP_SERVERS > value: demo-cluster-kafka-bootstrap:9092 > ... > jobManager: > resource: > memory: "2048m" > cpu: 1 > taskManager: > replicas: 2 > resource: > memory: "2048m" > cpu: 1 > podTemplate: > spec: > containers: > - name: python-worker-harness > image: apache/beam_python3.10_sdk:2.56.0 > imagePullPolicy: Never > args: ["--worker_pool"] > ports: > - containerPort: 50000 > > job: > jarURI: > local:///opt/apache/beam/jars/beam-runners-flink-job-server.jar > entryClass: > org.apache.beam.runners.flink.FlinkPortableClientEntryPoint > args: > - "--driver-cmd" > - "python /opt/flink/app/word_len.py --deploy" > parallelism: 3 > upgradeMode: stateless > > Here is the pipeline source - the full details can be found in > https://github.com/jaehyeon-kim/beam-demos/blob/feature/beam-deploy/beam-deploy/beam/word_len.py > > When I add the --deploy flag, the python sdk harness is set to EXTERNAL > and its config is set to localhost:50000 - I believe it'll point to the > side car container of the task manager. For the kafka io, the expansion > service's sdk harness is configured as PROCESS and the command points to > the java sdk that is added in the beam-python-example:1.17 image. > > ... > def run(args=None): > parser = argparse.ArgumentParser(description="Beam pipeline arguments" > ) > parser.add_argument("--runner", default="FlinkRunner", help="Apache > Beam runner") > parser.add_argument( > "--deploy", > action="store_true", > default="Flag to indicate whether to use an own local cluster", > ) > opts, _ = parser.parse_known_args(args) > > pipeline_opts = { > "runner": opts.runner, > "job_name": "avg-word-length-beam", > "streaming": True, > "environment_type": "EXTERNAL" if opts.deploy is True else > "LOOPBACK", > "checkpointing_interval": "60000", > } > > expansion_service = None > if pipeline_opts["environment_type"] == "EXTERNAL": > pipeline_opts = { > **pipeline_opts, > **{ > "environment_config": "localhost:50000", > "flink_submit_uber_jar": True, > }, > } > expansion_service = kafka.default_io_expansion_service( > append_args=[ > "--defaultEnvironmentType=PROCESS", > > '--defaultEnvironmentConfig={"command":"/opt/apache/beam/boot"}', > "--experiments=use_deprecated_read", # > https://github.com/apache/beam/issues/20979 > ] > ) > print(pipeline_opts) > options = PipelineOptions([], **pipeline_opts) > # Required, else it will complain that when importing worker functions > options.view_as(SetupOptions).save_main_session = True > > with beam.Pipeline(options=options) as p: > ( > p > | "ReadWordsFromKafka" > >> ReadWordsFromKafka( > bootstrap_servers=os.getenv( > "BOOTSTRAP_SERVERS", > "host.docker.internal:29092", > ), > topics=[os.getenv("INPUT_TOPIC", "input-topic")], > group_id=os.getenv("GROUP_ID", "beam-word-len"), > expansion_service=expansion_service, > ) > | "CalculateAvgWordLen" >> CalculateAvgWordLen() > | "WriteWordLenToKafka" > >> WriteWordLenToKafka( > bootstrap_servers=os.getenv( > "BOOTSTRAP_SERVERS", > "host.docker.internal:29092", > ), > topic=os.getenv("OUTPUT_TOPIC", "output-topic-beam"), > expansion_service=expansion_service, > ) > ) > > logging.getLogger().setLevel(logging.INFO) > logging.info("Building pipeline ...") > > When I run the pipeline, I see the following error and it fails to be > submitted to the job manager. It looks like the pipeline DAG is not created > but I'm not sure what makes the error. > > T4: <class 'apache_beam.transforms.core.CallableWrapperDoFn'> > INFO:dill:T4: <class 'apache_beam.transforms.core.CallableWrapperDoFn'> > # T4 > INFO:dill:# T4 > D2: <dict object at 0x7f6c918c1d00> > INFO:dill:D2: <dict object at 0x7f6c918c1d00> > F1: <function Map.<locals>.<lambda> at 0x7f6c918bb640> > INFO:dill:F1: <function Map.<locals>.<lambda> at 0x7f6c918bb640> > F2: <function _create_function at 0x7f6cb03de4d0> > INFO:dill:F2: <function _create_function at 0x7f6cb03de4d0> > # F2 > INFO:dill:# F2 > T1: <class 'code'> > INFO:dill:T1: <class 'code'> > F2: <function _load_type at 0x7f6cb03de3b0> > INFO:dill:F2: <function _load_type at 0x7f6cb03de3b0> > # F2 > INFO:dill:# F2 > # T1 > INFO:dill:# T1 > B1: <built-in function getattr> > INFO:dill:B1: <built-in function getattr> > F2: <function _get_attr at 0x7f6cb03deef0> > INFO:dill:F2: <function _get_attr at 0x7f6cb03deef0> > # F2 > INFO:dill:# F2 > # B1 > INFO:dill:# B1 > M2: <module 'apache_beam.transforms.core' from > '/usr/local/lib/python3.10/site-packages/apache_beam/transforms/core.py'> > INFO:dill:M2: <module 'apache_beam.transforms.core' from > '/usr/local/lib/python3.10/site-packages/apache_beam/transforms/core.py'> > F2: <function _import_module at 0x7f6cb03df010> > INFO:dill:F2: <function _import_module at 0x7f6cb03df010> > # F2 > INFO:dill:# F2 > # M2 > INFO:dill:# M2 > Ce: <cell at 0x7f6c91862c50: function object at 0x7f6c91857b50> > INFO:dill:Ce: <cell at 0x7f6c91862c50: function object at 0x7f6c91857b50> > F2: <function _create_cell at 0x7f6cb03de8c0> > INFO:dill:F2: <function _create_cell at 0x7f6cb03de8c0> > # F2 > INFO:dill:# F2 > F1: <function ReadWordsFromKafka.expand.<locals>.decode_message at > 0x7f6c91857b50> > INFO:dill:F1: <function ReadWordsFromKafka.expand.<locals>.decode_message > at 0x7f6c91857b50> > D1: <dict object at 0x7f6cb0a5a040> > INFO:dill:D1: <dict object at 0x7f6cb0a5a040> > # D1 > INFO:dill:# D1 > Ce: <cell at 0x7f6c918636a0: ReadWordsFromKafka object at 0x7f6c918639a0> > INFO:dill:Ce: <cell at 0x7f6c918636a0: ReadWordsFromKafka object at > 0x7f6c918639a0> > T2: <class '__main__.ReadWordsFromKafka'> > INFO:dill:T2: <class '__main__.ReadWordsFromKafka'> > F2: <function _create_type at 0x7f6cb03de440> > INFO:dill:F2: <function _create_type at 0x7f6cb03de440> > # F2 > INFO:dill:# F2 > T1: <class 'type'> > INFO:dill:T1: <class 'type'> > # T1 > INFO:dill:# T1 > T4: <class 'apache_beam.transforms.ptransform.PTransform'> > INFO:dill:T4: <class 'apache_beam.transforms.ptransform.PTransform'> > # T4 > INFO:dill:# T4 > D2: <dict object at 0x7f6c922f44c0> > INFO:dill:D2: <dict object at 0x7f6c922f44c0> > F1: <function ReadWordsFromKafka.__init__ at 0x7f6c922d3b50> > INFO:dill:F1: <function ReadWordsFromKafka.__init__ at 0x7f6c922d3b50> > D1: <dict object at 0x7f6cb0a5a040> > INFO:dill:D1: <dict object at 0x7f6cb0a5a040> > # D1 > INFO:dill:# D1 > Ce: <cell at 0x7f6cb0ac35e0: type object at 0x7f6c93424410> > INFO:dill:Ce: <cell at 0x7f6cb0ac35e0: type object at 0x7f6c93424410> > T5: <class '__main__.ReadWordsFromKafka'> > INFO:dill:T5: <class '__main__.ReadWordsFromKafka'> > # T5 > INFO:dill:# T5 > # Ce > INFO:dill:# Ce > D2: <dict object at 0x7f6c9186a740> > INFO:dill:D2: <dict object at 0x7f6c9186a740> > # D2 > INFO:dill:# D2 > # F1 > INFO:dill:# F1 > F1: <function ReadWordsFromKafka.expand at 0x7f6c922d3be0> > INFO:dill:F1: <function ReadWordsFromKafka.expand at 0x7f6c922d3be0> > D1: <dict object at 0x7f6cb0a5a040> > INFO:dill:D1: <dict object at 0x7f6cb0a5a040> > # D1 > INFO:dill:# D1 > D2: <dict object at 0x7f6c9186a5c0> > INFO:dill:D2: <dict object at 0x7f6c9186a5c0> > # D2 > INFO:dill:# D2 > # F1 > INFO:dill:# F1 > T6: <class 'apache_beam.typehints.decorators.IOTypeHints'> > INFO:dill:T6: <class 'apache_beam.typehints.decorators.IOTypeHints'> > F2: <function _create_namedtuple at 0x7f6cb03dedd0> > INFO:dill:F2: <function _create_namedtuple at 0x7f6cb03dedd0> > # F2 > INFO:dill:# F2 > # T6 > INFO:dill:# T6 > # D2 > INFO:dill:# D2 > # T2 > INFO:dill:# T2 > D2: <dict object at 0x7f6c9184be80> > INFO:dill:D2: <dict object at 0x7f6c9184be80> > T4: <class 'apache_beam.transforms.external.BeamJarExpansionService'> > INFO:dill:T4: <class > 'apache_beam.transforms.external.BeamJarExpansionService'> > # T4 > INFO:dill:# T4 > D2: <dict object at 0x7f6c922d7c00> > INFO:dill:D2: <dict object at 0x7f6c922d7c00> > T4: <class 'apache_beam.utils.subprocess_server.JavaJarServer'> > INFO:dill:T4: <class 'apache_beam.utils.subprocess_server.JavaJarServer'> > # T4 > INFO:dill:# T4 > D2: <dict object at 0x7f6c9184aa80> > INFO:dill:D2: <dict object at 0x7f6c9184aa80> > T4: <class > 'apache_beam.transforms.external.ExpansionAndArtifactRetrievalStub'> > INFO:dill:T4: <class > 'apache_beam.transforms.external.ExpansionAndArtifactRetrievalStub'> > # T4 > INFO:dill:# T4 > # D2 > INFO:dill:# D2 > D2: <dict object at 0x7f6c918b78c0> > INFO:dill:D2: <dict object at 0x7f6c918b78c0> > T4: <class 'grpc._channel.Channel'> > INFO:dill:T4: <class 'grpc._channel.Channel'> > # T4 > INFO:dill:# T4 > D2: <dict object at 0x7f6c91868440> > INFO:dill:D2: <dict object at 0x7f6c91868440> > {'runner': 'FlinkRunner', 'job_name': 'avg-word-length-beam', 'streaming': > True, 'environment_type': 'EXTERNAL', 'checkpointing_interval': '60000', > 'environment_config': 'localhost:50000', 'flink_submit_uber_jar': True} > Traceback (most recent call last): > File > "/usr/local/lib/python3.10/site-packages/apache_beam/internal/dill_pickler.py", > line 379, in dumps > s = dill.dumps(o, byref=settings['dill_byref']) > File "/usr/local/lib/python3.10/site-packages/dill/_dill.py", line 265, > in dumps > dump(obj, file, protocol, byref, fmode, recurse, **kwds)#, strictio) > File "/usr/local/lib/python3.10/site-packages/dill/_dill.py", line 259, > in dump > Pickler(file, protocol, **_kwds).dump(obj) > File "/usr/local/lib/python3.10/site-packages/dill/_dill.py", line 445, > in dump > StockPickler.dump(self, obj) > File "/usr/local/lib/python3.10/pickle.py", line 487, in dump > self.save(obj) > File "/usr/local/lib/python3.10/pickle.py", line 603, in save > self.save_reduce(obj=obj, *rv) > File "/usr/local/lib/python3.10/pickle.py", line 717, in save_reduce > save(state) > File "/usr/local/lib/python3.10/pickle.py", line 560, in save > f(self, obj) # Call unbound method with explicit self > File > "/usr/local/lib/python3.10/site-packages/apache_beam/internal/dill_pickler.py", > line 349, in new_save_module_dict > return old_save_module_dict(pickler, obj) > File "/usr/local/lib/python3.10/site-packages/dill/_dill.py", line 912, > in save_module_dict > StockPickler.save_dict(pickler, obj) > File "/usr/local/lib/python3.10/pickle.py", line 972, in save_dict > self._batch_setitems(obj.items()) > File "/usr/local/lib/python3.10/pickle.py", line 998, in _batch_setitems > save(v) > File "/usr/local/lib/python3.10/pickle.py", line 560, in save > f(self, obj) # Call unbound method with explicit self > File "/usr/local/lib/python3.10/site-packages/dill/_dill.py", line 1410, > in save_function > pickler.save_reduce(_create_function, (obj.__code__, > File "/usr/local/lib/python3.10/pickle.py", line 692, in save_reduce > save(args) > File "/usr/local/lib/python3.10/pickle.py", line 560, in save > f(self, obj) # Call unbound method with explicit self > File "/usr/local/lib/python3.10/pickle.py", line 902, in save_tuple > save(element) > File "/usr/local/lib/python3.10/pickle.py", line 560, in save > f(self, obj) # Call unbound method with explicit self > File "/usr/local/lib/python3.10/pickle.py", line 887, in save_tuple > save(element) > File "/usr/local/lib/python3.10/pickle.py", line 560, in save > f(self, obj) # Call unbound method with explicit self > File "/usr/local/lib/python3.10/site-packages/dill/_dill.py", line 1147, > in save_cell > pickler.save_reduce(_create_cell, (f,), obj=obj) > File "/usr/local/lib/python3.10/pickle.py", line 692, in save_reduce > save(args) > File "/usr/local/lib/python3.10/pickle.py", line 560, in save > f(self, obj) # Call unbound method with explicit self > File "/usr/local/lib/python3.10/pickle.py", line 887, in save_tuple > save(element) > File "/usr/local/lib/python3.10/pickle.py", line 560, in save > f(self, obj) # Call unbound method with explicit self > File "/usr/local/lib/python3.10/site-packages/dill/_dill.py", line 1410, > in save_function > pickler.save_reduce(_create_function, (obj.__code__, > File "/usr/local/lib/python3.10/pickle.py", line 692, in save_reduce > save(args) > File "/usr/local/lib/python3.10/pickle.py", line 560, in save > f(self, obj) # Call unbound method with explicit self > File "/usr/local/lib/python3.10/pickle.py", line 902, in save_tuple > save(element) > File "/usr/local/lib/python3.10/pickle.py", line 560, in save > f(self, obj) # Call unbound method with explicit self > File "/usr/local/lib/python3.10/pickle.py", line 887, in save_tuple > save(element) > File "/usr/local/lib/python3.10/pickle.py", line 560, in save > f(self, obj) # Call unbound method with explicit self > File "/usr/local/lib/python3.10/site-packages/dill/_dill.py", line 1147, > in save_cell > pickler.save_reduce(_create_cell, (f,), obj=obj) > File "/usr/local/lib/python3.10/pickle.py", line 692, in save_reduce > save(args) > File "/usr/local/lib/python3.10/pickle.py", line 560, in save > f(self, obj) # Call unbound method with explicit self > File "/usr/local/lib/python3.10/pickle.py", line 887, in save_tuple > save(element) > File "/usr/local/lib/python3.10/pickle.py", line 603, in save > self.save_reduce(obj=obj, *rv) > File "/usr/local/lib/python3.10/pickle.py", line 717, in save_reduce > save(state) > File "/usr/local/lib/python3.10/pickle.py", line 560, in save > f(self, obj) # Call unbound method with explicit self > File > "/usr/local/lib/python3.10/site-packages/apache_beam/internal/dill_pickler.py", > line 349, in new_save_module_dict > return old_save_module_dict(pickler, obj) > File "/usr/local/lib/python3.10/site-packages/dill/_dill.py", line 912, > in save_module_dict > StockPickler.save_dict(pickler, obj) > File "/usr/local/lib/python3.10/pickle.py", line 972, in save_dict > self._batch_setitems(obj.items()) > File "/usr/local/lib/python3.10/pickle.py", line 998, in _batch_setitems > save(v) > File "/usr/local/lib/python3.10/pickle.py", line 603, in save > self.save_reduce(obj=obj, *rv) > File "/usr/local/lib/python3.10/pickle.py", line 717, in save_reduce > save(state) > File "/usr/local/lib/python3.10/pickle.py", line 560, in save > f(self, obj) # Call unbound method with explicit self > File > "/usr/local/lib/python3.10/site-packages/apache_beam/internal/dill_pickler.py", > line 349, in new_save_module_dict > return old_save_module_dict(pickler, obj) > File "/usr/local/lib/python3.10/site-packages/dill/_dill.py", line 912, > in save_module_dict > StockPickler.save_dict(pickler, obj) > File "/usr/local/lib/python3.10/pickle.py", line 972, in save_dict > self._batch_setitems(obj.items()) > File "/usr/local/lib/python3.10/pickle.py", line 998, in _batch_setitems > save(v) > File "/usr/local/lib/python3.10/pickle.py", line 603, in save > self.save_reduce(obj=obj, *rv) > File "/usr/local/lib/python3.10/pickle.py", line 717, in save_reduce > save(state) > File "/usr/local/lib/python3.10/pickle.py", line 560, in save > f(self, obj) # Call unbound method with explicit self > File > "/usr/local/lib/python3.10/site-packages/apache_beam/internal/dill_pickler.py", > line 349, in new_save_module_dict > return old_save_module_dict(pickler, obj) > File "/usr/local/lib/python3.10/site-packages/dill/_dill.py", line 912, > in save_module_dict > StockPickler.save_dict(pickler, obj) > File "/usr/local/lib/python3.10/pickle.py", line 972, in save_dict > self._batch_setitems(obj.items()) > File "/usr/local/lib/python3.10/pickle.py", line 998, in _batch_setitems > save(v) > File "/usr/local/lib/python3.10/pickle.py", line 603, in save > self.save_reduce(obj=obj, *rv) > File "/usr/local/lib/python3.10/pickle.py", line 717, in save_reduce > save(state) > File "/usr/local/lib/python3.10/pickle.py", line 560, in save > f(self, obj) # Call unbound method with explicit self > File > "/usr/local/lib/python3.10/site-packages/apache_beam/internal/dill_pickler.py", > line 349, in new_save_module_dict > return old_save_module_dict(pickler, obj) > File "/usr/local/lib/python3.10/site-packages/dill/_dill.py", line 912, > in save_module_dict > StockPickler.save_dict(pickler, obj) > File "/usr/local/lib/python3.10/pickle.py", line 972, in save_dict > self._batch_setitems(obj.items()) > File "/usr/local/lib/python3.10/pickle.py", line 998, in _batch_setitems > save(v) > File "/usr/local/lib/python3.10/pickle.py", line 578, in save > rv = reduce(self.proto) > File "stringsource", line 2, in > grpc._cython.cygrpc.Channel.__reduce_cython__ > TypeError: no default __reduce__ due to non-trivial __cinit__ > > During handling of the above exception, another exception occurred: > > Traceback (most recent call last): > File "/opt/flink/app/word_len.py", line 212, in <module> > run() > File "/opt/flink/app/word_len.py", line 184, in run > p > File > "/usr/local/lib/python3.10/site-packages/apache_beam/transforms/ptransform.py", > line 1110, in __ror__ > return self.transform.__ror__(pvalueish, self.label) > File > "/usr/local/lib/python3.10/site-packages/apache_beam/transforms/ptransform.py", > line 623, in __ror__ > result = p.apply(self, pvalueish, label) > File "/usr/local/lib/python3.10/site-packages/apache_beam/pipeline.py", > line 679, in apply > return self.apply(transform, pvalueish) > File "/usr/local/lib/python3.10/site-packages/apache_beam/pipeline.py", > line 732, in apply > pvalueish_result = self.runner.apply(transform, pvalueish, > self._options) > File > "/usr/local/lib/python3.10/site-packages/apache_beam/runners/runner.py", > line 203, in apply > return self.apply_PTransform(transform, input, options) > File > "/usr/local/lib/python3.10/site-packages/apache_beam/runners/runner.py", > line 207, in apply_PTransform > return transform.expand(input) > File "/opt/flink/app/word_len.py", line 94, in expand > | "DecodeMessage" >> beam.Map(decode_message) > File > "/usr/local/lib/python3.10/site-packages/apache_beam/transforms/core.py", > line 1994, in Map > pardo = FlatMap(wrapper, *args, **kwargs) > File > "/usr/local/lib/python3.10/site-packages/apache_beam/transforms/core.py", > line 1937, in FlatMap > pardo = ParDo(CallableWrapperDoFn(fn), *args, **kwargs) > File > "/usr/local/lib/python3.10/site-packages/apache_beam/transforms/core.py", > line 1473, in __init__ > super().__init__(fn, *args, **kwargs) > File > "/usr/local/lib/python3.10/site-packages/apache_beam/transforms/ptransform.py", > line 870, in __init__ > self.fn = pickler.loads(pickler.dumps(self.fn)) > File > "/usr/local/lib/python3.10/site-packages/apache_beam/internal/pickler.py", > line 44, in dumps > return desired_pickle_lib.dumps( > File > "/usr/local/lib/python3.10/site-packages/apache_beam/internal/dill_pickler.py", > line 383, in dumps > s = dill.dumps(o, byref=settings['dill_byref']) > File "/usr/local/lib/python3.10/site-packages/dill/_dill.py", line 265, > in dumps > dump(obj, file, protocol, byref, fmode, recurse, **kwds)#, strictio) > File "/usr/local/lib/python3.10/site-packages/dill/_dill.py", line 259, > in dump > Pickler(file, protocol, **_kwds).dump(obj) > File "/usr/local/lib/python3.10/site-packages/dill/_dill.py", line 445, > in dump > StockPickler.dump(self, obj) > File "/usr/local/lib/python3.10/pickle.py", line 487, in dump > self.save(obj) > File "/usr/local/lib/python3.10/pickle.py", line 603, in save > self.save_reduce(obj=obj, *rv) > File "/usr/local/lib/python3.10/pickle.py", line 717, in save_reduce > save(state) > File "/usr/local/lib/python3.10/pickle.py", line 560, in save > f(self, obj) # Call unbound method with explicit self > File > "/usr/local/lib/python3.10/site-packages/apache_beam/internal/dill_pickler.py", > line 349, in new_save_module_dict > return old_save_module_dict(pickler, obj) > File "/usr/local/lib/python3.10/site-packages/dill/_dill.py", line 912, > in save_module_dict > StockPickler.save_dict(pickler, obj) > File "/usr/local/lib/python3.10/pickle.py", line 972, in save_dict > self._batch_setitems(obj.items()) > File "/usr/local/lib/python3.10/pickle.py", line 998, in _batch_setitems > save(v) > File "/usr/local/lib/python3.10/pickle.py", line 560, in save > f(self, obj) # Call unbound method with explicit self > File "/usr/local/lib/python3.10/site-packages/dill/_dill.py", line 1410, > in save_function > pickler.save_reduce(_create_function, (obj.__code__, > File "/usr/local/lib/python3.10/pickle.py", line 692, in save_reduce > save(args) > File "/usr/local/lib/python3.10/pickle.py", line 560, in save > f(self, obj) # Call unbound method with explicit self > File "/usr/local/lib/python3.10/pickle.py", line 902, in save_tuple > save(element) > File "/usr/local/lib/python3.10/pickle.py", line 560, in save > f(self, obj) # Call unbound method with explicit self > File "/usr/local/lib/python3.10/pickle.py", line 887, in save_tuple > save(element) > File "/usr/local/lib/python3.10/pickle.py", line 560, in save > f(self, obj) # Call unbound method with explicit self > File "/usr/local/lib/python3.10/site-packages/dill/_dill.py", line 1147, > in save_cell > pickler.save_reduce(_create_cell, (f,), obj=obj) > File "/usr/local/lib/python3.10/pickle.py", line 692, in save_reduce > save(args) > File "/usr/local/lib/python3.10/pickle.py", line 560, in save > f(self, obj) # Call unbound method with explicit self > File "/usr/local/lib/python3.10/pickle.py", line 887, in save_tuple > save(element) > File "/usr/local/lib/python3.10/pickle.py", line 560, in save > f(self, obj) # Call unbound method with explicit self > File "/usr/local/lib/python3.10/site-packages/dill/_dill.py", line 1410, > in save_function > pickler.save_reduce(_create_function, (obj.__code__, > File "/usr/local/lib/python3.10/pickle.py", line 692, in save_reduce > save(args) > File "/usr/local/lib/python3.10/pickle.py", line 560, in save > f(self, obj) # Call unbound method with explicit self > File "/usr/local/lib/python3.10/pickle.py", line 902, in save_tuple > save(element) > File "/usr/local/lib/python3.10/pickle.py", line 560, in save > f(self, obj) # Call unbound method with explicit self > File "/usr/local/lib/python3.10/pickle.py", line 887, in save_tuple > save(element) > File "/usr/local/lib/python3.10/pickle.py", line 560, in save > f(self, obj) # Call unbound method with explicit self > File "/usr/local/lib/python3.10/site-packages/dill/_dill.py", line 1147, > in save_cell > pickler.save_reduce(_create_cell, (f,), obj=obj) > File "/usr/local/lib/python3.10/pickle.py", line 692, in save_reduce > save(args) > File "/usr/local/lib/python3.10/pickle.py", line 560, in save > f(self, obj) # Call unbound method with explicit self > File "/usr/local/lib/python3.10/pickle.py", line 887, in save_tuple > save(element) > File "/usr/local/lib/python3.10/pickle.py", line 603, in save > self.save_reduce(obj=obj, *rv) > File "/usr/local/lib/python3.10/pickle.py", line 717, in save_reduce > save(state) > File "/usr/local/lib/python3.10/pickle.py", line 560, in save > f(self, obj) # Call unbound method with explicit self > File > "/usr/local/lib/python3.10/site-packages/apache_beam/internal/dill_pickler.py", > line 349, in new_save_module_dict > return old_save_module_dict(pickler, obj) > File "/usr/local/lib/python3.10/site-packages/dill/_dill.py", line 912, > in save_module_dict > StockPickler.save_dict(pickler, obj) > File "/usr/local/lib/python3.10/pickle.py", line 972, in save_dict > self._batch_setitems(obj.items()) > File "/usr/local/lib/python3.10/pickle.py", line 998, in _batch_setitems > save(v) > File "/usr/local/lib/python3.10/pickle.py", line 603, in save > self.save_reduce(obj=obj, *rv) > File "/usr/local/lib/python3.10/pickle.py", line 717, in save_reduce > save(state) > File "/usr/local/lib/python3.10/pickle.py", line 560, in save > f(self, obj) # Call unbound method with explicit self > File > "/usr/local/lib/python3.10/site-packages/apache_beam/internal/dill_pickler.py", > line 349, in new_save_module_dict > return old_save_module_dict(pickler, obj) > File "/usr/local/lib/python3.10/site-packages/dill/_dill.py", line 912, > in save_module_dict > StockPickler.save_dict(pickler, obj) > File "/usr/local/lib/python3.10/pickle.py", line 972, in save_dict > self._batch_setitems(obj.items()) > File "/usr/local/lib/python3.10/pickle.py", line 998, in _batch_setitems > save(v) > File "/usr/local/lib/python3.10/pickle.py", line 603, in save > self.save_reduce(obj=obj, *rv) > File "/usr/local/lib/python3.10/pickle.py", line 717, in save_reduce > save(state) > File "/usr/local/lib/python3.10/pickle.py", line 560, in save > f(self, obj) # Call unbound method with explicit self > File > "/usr/local/lib/python3.10/site-packages/apache_beam/internal/dill_pickler.py", > line 349, in new_save_module_dict > return old_save_module_dict(pickler, obj) > File "/usr/local/lib/python3.10/site-packages/dill/_dill.py", line 912, > in save_module_dict > StockPickler.save_dict(pickler, obj) > File "/usr/local/lib/python3.10/pickle.py", line 972, in save_dict > self._batch_setitems(obj.items()) > File "/usr/local/lib/python3.10/pickle.py", line 998, in _batch_setitems > save(v) > File "/usr/local/lib/python3.10/pickle.py", line 603, in save > self.save_reduce(obj=obj, *rv) > File "/usr/local/lib/python3.10/pickle.py", line 717, in save_reduce > save(state) > File "/usr/local/lib/python3.10/pickle.py", line 560, in save > f(self, obj) # Call unbound method with explicit self > File > "/usr/local/lib/python3.10/site-packages/apache_beam/internal/dill_pickler.py", > line 349, in new_save_module_dict > return old_save_module_dict(pickler, obj) > File "/usr/local/lib/python3.10/site-packages/dill/_dill.py", line 912, > in save_module_dict > StockPickler.save_dict(pickler, obj) > File "/usr/local/lib/python3.10/pickle.py", line 972, in save_dict > self._batch_setitems(obj.items()) > File "/usr/local/lib/python3.10/pickle.py", line 998, in _batch_setitems > save(v) > File "/usr/local/lib/python3.10/pickle.py", line 578, in save > rv = reduce(self.proto) > File "stringsource", line 2, in > grpc._cython.cygrpc.Channel.__reduce_cython__ > TypeError: no default __reduce__ due to non-trivial __cinit__ > > at > org.apache.beam.runners.flink.FlinkPortableClientEntryPoint.runDriverProgram > (FlinkPortableClientEntryPoint.java:192) ~[?:?] > at > org.apache.beam.runners.flink.FlinkPortableClientEntryPoint.main( > FlinkPortableClientEntryPoint.java:100) ~[?:?] > at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native > Method) ~[?:?] > at jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown > Source) ~[?:?] > at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown > Source) ~[?:?] > at java.lang.reflect.Method.invoke(Unknown Source) ~[?:?] > at org.apache.flink.client.program.PackagedProgram.callMainMethod( > PackagedProgram.java:355) ~[flink-dist-1.17.2.jar:1.17.2] > at > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution > (PackagedProgram.java:222) ~[flink-dist-1.17.2.jar:1.17.2] > at org.apache.flink.client.ClientUtils.executeProgram( > ClientUtils.java:105) ~[flink-dist-1.17.2.jar:1.17.2] > at > org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint > (ApplicationDispatcherBootstrap.java:301) ~[flink-dist-1.17.2.jar:1.17.2] > ... 13 more > Caused by: java.util.concurrent.TimeoutException: Timeout of 30 seconds > waiting for job submission. > at org.apache.beam.runners.flink.FlinkPortableClientEntryPoint$ > DetachedJobInvokerFactory.executeDetachedJob( > FlinkPortableClientEntryPoint.java:256) ~[?:?] > at org.apache.beam.runners.flink.FlinkPortableClientEntryPoint$ > DetachedJobInvokerFactory.access$300(FlinkPortableClientEntryPoint.java: > 206) ~[?:?] > at > org.apache.beam.runners.flink.FlinkPortableClientEntryPoint.runDriverProgram > (FlinkPortableClientEntryPoint.java:180) ~[?:?] > at > org.apache.beam.runners.flink.FlinkPortableClientEntryPoint.main( > FlinkPortableClientEntryPoint.java:100) ~[?:?] > at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native > Method) ~[?:?] > at jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown > Source) ~[?:?] > at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown > Source) ~[?:?] > at java.lang.reflect.Method.invoke(Unknown Source) ~[?:?] > at org.apache.flink.client.program.PackagedProgram.callMainMethod( > PackagedProgram.java:355) ~[flink-dist-1.17.2.jar:1.17.2] > at > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution > (PackagedProgram.java:222) ~[flink-dist-1.17.2.jar:1.17.2] > at org.apache.flink.client.ClientUtils.executeProgram( > ClientUtils.java:105) ~[flink-dist-1.17.2.jar:1.17.2] > at > org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint > (ApplicationDispatcherBootstrap.java:301) ~[flink-dist-1.17.2.jar:1.17.2] > ... 13 more > 2024-05-10 13:11:41,217 ERROR > org.apache.flink.runtime.entrypoint.ClusterEntrypoint [] - Fatal > error occurred in the cluster entrypoint. > java.util.concurrent.CompletionException: > org.apache.flink.client.deployment.application.ApplicationExecutionException: > Could not execute application. > at java.util.concurrent.CompletableFuture.encodeThrowable(Unknown > Source) ~[?:?] > at java.util.concurrent.CompletableFuture.completeThrowable(Unknown > Source) ~[?:?] > at java.util.concurrent.CompletableFuture$UniCompose.tryFire(Unknown > Source) ~[?:?] > at java.util.concurrent.CompletableFuture.postComplete(Unknown > Source) ~[?:?] > at > java.util.concurrent.CompletableFuture.completeExceptionally(Unknown > Source) ~[?:?] > at > org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint > (ApplicationDispatcherBootstrap.java:337) ~[flink-dist-1.17.2.jar:1.17.2] > at > org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.lambda > $runApplicationAsync$2(ApplicationDispatcherBootstrap.java:254) ~[ > flink-dist-1.17.2.jar:1.17.2] > at java.util.concurrent.Executors$RunnableAdapter.call(Unknown > Source) ~[?:?] > at java.util.concurrent.FutureTask.run(Unknown Source) ~[?:?] > at > org.apache.flink.runtime.concurrent.akka.ActorSystemScheduledExecutorAdapter > $ScheduledFutureTask.run(ActorSystemScheduledExecutorAdapter.java:171) ~[ > flink-rpc-akka_974f9cb1-cb21-497a-9825-4e660c5425bc.jar:1.17.2] > at > org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader > (ClassLoadingUtils.java:68) ~[ > flink-rpc-akka_974f9cb1-cb21-497a-9825-4e660c5425bc.jar:1.17.2] > at > org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda > $withContextClassLoader$0(ClassLoadingUtils.java:41) ~[ > flink-rpc-akka_974f9cb1-cb21-497a-9825-4e660c5425bc.jar:1.17.2] > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:49) [ > flink-rpc-akka_974f9cb1-cb21-497a-9825-4e660c5425bc.jar:1.17.2] > at akka.dispatch.ForkJoinExecutorConfigurator$ > AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:48) [ > flink-rpc-akka_974f9cb1-cb21-497a-9825-4e660c5425bc.jar:1.17.2] > at java.util.concurrent.ForkJoinTask.doExec(Unknown Source) [?:?] > at java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(Unknown > Source) [?:?] > at java.util.concurrent.ForkJoinPool.scan(Unknown Source) [?:?] > at java.util.concurrent.ForkJoinPool.runWorker(Unknown Source) > [?:?] > at java.util.concurrent.ForkJoinWorkerThread.run(Unknown Source) > [?:?] > Caused by: > org.apache.flink.client.deployment.application.ApplicationExecutionException: > Could not execute application. > ... 14 more > Caused by: org.apache.flink.client.program.ProgramInvocationException: > The main method caused an error: Job python /opt/flink/app/word_len.py > --deploy failed. > at org.apache.flink.client.program.PackagedProgram.callMainMethod( > PackagedProgram.java:372) ~[flink-dist-1.17.2.jar:1.17.2] > at > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution > (PackagedProgram.java:222) ~[flink-dist-1.17.2.jar:1.17.2] > at org.apache.flink.client.ClientUtils.executeProgram( > ClientUtils.java:105) ~[flink-dist-1.17.2.jar:1.17.2] > at > org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint > (ApplicationDispatcherBootstrap.java:301) ~[flink-dist-1.17.2.jar:1.17.2] > ... 13 more > Caused by: java.lang.RuntimeException: Job python /opt/flink/app/ > word_len.py --deploy failed. > at > org.apache.beam.runners.flink.FlinkPortableClientEntryPoint.main( > FlinkPortableClientEntryPoint.java:102) ~[?:?] > at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native > Method) ~[?:?] > at jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown > Source) ~[?:?] > at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown > Source) ~[?:?] > at java.lang.reflect.Method.invoke(Unknown Source) ~[?:?] > at org.apache.flink.client.program.PackagedProgram.callMainMethod( > PackagedProgram.java:355) ~[flink-dist-1.17.2.jar:1.17.2] > at > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution > (PackagedProgram.java:222) ~[flink-dist-1.17.2.jar:1.17.2] > at org.apache.flink.client.ClientUtils.executeProgram( > ClientUtils.java:105) ~[flink-dist-1.17.2.jar:1.17.2] > at > org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint > (ApplicationDispatcherBootstrap.java:301) ~[flink-dist-1.17.2.jar:1.17.2] > ... 13 more > Caused by: java.lang.RuntimeException: Failed to start job with driver > program: bash [-c, python /opt/flink/app/word_len.py --deploy > --job_endpoint=localhost:37015] output: WARNING:root:Waiting for grpc > channel to be ready at localhost:32895. > > Can you please inform me how to fix this issue? > > Cheers, > Jaehyeon >