Do you still have the same issue? I tried to follow your setup.sh to
reproduce this but somehow I am stuck at the word_len step. I saw you also
tried to use `print(kafka_kv)` to debug it. I am not sure about your
current status.

On Fri, May 10, 2024 at 9:18 AM Jaehyeon Kim <dott...@gmail.com> wrote:

> Hello,
>
> I'm playing with deploying a python pipeline to a flink cluster on
> kubernetes via flink kubernetes operator. The pipeline simply calculates
> average word lengths in a fixed time window of 5 seconds and it works with
> the embedded flink cluster.
>
> First, I created a k8s cluster (v1.25.3) on minikube and a docker image
> named beam-python-example:1.17 created using the following docker file -
> the full details can be checked in
> https://github.com/jaehyeon-kim/beam-demos/blob/feature/beam-deploy/beam-deploy/beam/Dockerfile
>
> The java sdk is used for the sdk harness of the kafka io's expansion
> service while the job server is used to execute the python pipeline in the
> flink operator.
>
> FROM flink:1.17
> ...
> ## add java SDK and job server
> COPY --from=apache/beam_java8_sdk:2.56.0 /opt/apache/beam/
> /opt/apache/beam/
>
> COPY --from=apache/beam_flink1.17_job_server:2.56.0  \
>   /opt/apache/beam/jars/beam-runners-flink-job-server.jar
> /opt/apache/beam/jars/beam-runners-flink-job-server.jar
>
> RUN chown -R flink:flink /opt/apache/beam
>
> ## install python 3.10.13
> RUN apt-get update -y && \
>   apt-get install -y build-essential libssl-dev zlib1g-dev libbz2-dev
> libffi-dev liblzma-dev && \
>   wget https://www.python.org/ftp/python/${PYTHON_VERSION}/Python-
> ${PYTHON_VERSION}.tgz && \
> ...
> ## install apache beam 2.56.0
> RUN pip3 install apache-beam==${BEAM_VERSION}
>
> ## copy pipeline source
> RUN mkdir /opt/flink/app
> COPY word_len.py /opt/flink/app/
>
> Then the pipeline is deployed using the following manifest - the full
> details can be found in
> https://github.com/jaehyeon-kim/beam-demos/blob/feature/beam-deploy/beam-deploy/beam/word_len.yml
>
> apiVersion: flink.apache.org/v1beta1
> kind: FlinkDeployment
> metadata:
>   name: beam-word-len
> spec:
>   image: beam-python-example:1.17
>   imagePullPolicy: Never
>   flinkVersion: v1_17
>   flinkConfiguration:
>     taskmanager.numberOfTaskSlots: "5"
>   serviceAccount: flink
>   podTemplate:
>     spec:
>       containers:
>         - name: flink-main-container
>           env:
>             - name: BOOTSTRAP_SERVERS
>               value: demo-cluster-kafka-bootstrap:9092
> ...
>   jobManager:
>     resource:
>       memory: "2048m"
>       cpu: 1
>   taskManager:
>     replicas: 2
>     resource:
>       memory: "2048m"
>       cpu: 1
>     podTemplate:
>       spec:
>         containers:
>           - name: python-worker-harness
>             image: apache/beam_python3.10_sdk:2.56.0
>             imagePullPolicy: Never
>             args: ["--worker_pool"]
>             ports:
>               - containerPort: 50000
>
>   job:
>     jarURI:
> local:///opt/apache/beam/jars/beam-runners-flink-job-server.jar
>     entryClass:
> org.apache.beam.runners.flink.FlinkPortableClientEntryPoint
>     args:
>       - "--driver-cmd"
>       - "python /opt/flink/app/word_len.py --deploy"
>     parallelism: 3
>     upgradeMode: stateless
>
> Here is the pipeline source - the full details can be found in
> https://github.com/jaehyeon-kim/beam-demos/blob/feature/beam-deploy/beam-deploy/beam/word_len.py
>
> When I add the --deploy flag, the python sdk harness is set to EXTERNAL
> and its config is set to localhost:50000 - I believe it'll point to the
> side car container of the task manager. For the kafka io, the expansion
> service's sdk harness is configured as PROCESS and the command points to
> the java sdk that is added in the beam-python-example:1.17 image.
>
> ...
> def run(args=None):
>     parser = argparse.ArgumentParser(description="Beam pipeline arguments"
> )
>     parser.add_argument("--runner", default="FlinkRunner", help="Apache
> Beam runner")
>     parser.add_argument(
>         "--deploy",
>         action="store_true",
>         default="Flag to indicate whether to use an own local cluster",
>     )
>     opts, _ = parser.parse_known_args(args)
>
>     pipeline_opts = {
>         "runner": opts.runner,
>         "job_name": "avg-word-length-beam",
>         "streaming": True,
>         "environment_type": "EXTERNAL" if opts.deploy is True else
> "LOOPBACK",
>         "checkpointing_interval": "60000",
>     }
>
>     expansion_service = None
>     if pipeline_opts["environment_type"] == "EXTERNAL":
>         pipeline_opts = {
>             **pipeline_opts,
>             **{
>                 "environment_config": "localhost:50000",
>                 "flink_submit_uber_jar": True,
>             },
>         }
>         expansion_service = kafka.default_io_expansion_service(
>             append_args=[
>                 "--defaultEnvironmentType=PROCESS",
>
> '--defaultEnvironmentConfig={"command":"/opt/apache/beam/boot"}',
>                 "--experiments=use_deprecated_read",  #
> https://github.com/apache/beam/issues/20979
>             ]
>         )
>     print(pipeline_opts)
>     options = PipelineOptions([], **pipeline_opts)
>     # Required, else it will complain that when importing worker functions
>     options.view_as(SetupOptions).save_main_session = True
>
>     with beam.Pipeline(options=options) as p:
>         (
>             p
>             | "ReadWordsFromKafka"
>             >> ReadWordsFromKafka(
>                 bootstrap_servers=os.getenv(
>                     "BOOTSTRAP_SERVERS",
>                     "host.docker.internal:29092",
>                 ),
>                 topics=[os.getenv("INPUT_TOPIC", "input-topic")],
>                 group_id=os.getenv("GROUP_ID", "beam-word-len"),
>                 expansion_service=expansion_service,
>             )
>             | "CalculateAvgWordLen" >> CalculateAvgWordLen()
>             | "WriteWordLenToKafka"
>             >> WriteWordLenToKafka(
>                 bootstrap_servers=os.getenv(
>                     "BOOTSTRAP_SERVERS",
>                     "host.docker.internal:29092",
>                 ),
>                 topic=os.getenv("OUTPUT_TOPIC", "output-topic-beam"),
>                 expansion_service=expansion_service,
>             )
>         )
>
>         logging.getLogger().setLevel(logging.INFO)
>         logging.info("Building pipeline ...")
>
> When I run the pipeline, I see the following error and it fails to be
> submitted to the job manager. It looks like the pipeline DAG is not created
> but I'm not sure what makes the error.
>
> T4: <class 'apache_beam.transforms.core.CallableWrapperDoFn'>
> INFO:dill:T4: <class 'apache_beam.transforms.core.CallableWrapperDoFn'>
> # T4
> INFO:dill:# T4
> D2: <dict object at 0x7f6c918c1d00>
> INFO:dill:D2: <dict object at 0x7f6c918c1d00>
> F1: <function Map.<locals>.<lambda> at 0x7f6c918bb640>
> INFO:dill:F1: <function Map.<locals>.<lambda> at 0x7f6c918bb640>
> F2: <function _create_function at 0x7f6cb03de4d0>
> INFO:dill:F2: <function _create_function at 0x7f6cb03de4d0>
> # F2
> INFO:dill:# F2
> T1: <class 'code'>
> INFO:dill:T1: <class 'code'>
> F2: <function _load_type at 0x7f6cb03de3b0>
> INFO:dill:F2: <function _load_type at 0x7f6cb03de3b0>
> # F2
> INFO:dill:# F2
> # T1
> INFO:dill:# T1
> B1: <built-in function getattr>
> INFO:dill:B1: <built-in function getattr>
> F2: <function _get_attr at 0x7f6cb03deef0>
> INFO:dill:F2: <function _get_attr at 0x7f6cb03deef0>
> # F2
> INFO:dill:# F2
> # B1
> INFO:dill:# B1
> M2: <module 'apache_beam.transforms.core' from
> '/usr/local/lib/python3.10/site-packages/apache_beam/transforms/core.py'>
> INFO:dill:M2: <module 'apache_beam.transforms.core' from
> '/usr/local/lib/python3.10/site-packages/apache_beam/transforms/core.py'>
> F2: <function _import_module at 0x7f6cb03df010>
> INFO:dill:F2: <function _import_module at 0x7f6cb03df010>
> # F2
> INFO:dill:# F2
> # M2
> INFO:dill:# M2
> Ce: <cell at 0x7f6c91862c50: function object at 0x7f6c91857b50>
> INFO:dill:Ce: <cell at 0x7f6c91862c50: function object at 0x7f6c91857b50>
> F2: <function _create_cell at 0x7f6cb03de8c0>
> INFO:dill:F2: <function _create_cell at 0x7f6cb03de8c0>
> # F2
> INFO:dill:# F2
> F1: <function ReadWordsFromKafka.expand.<locals>.decode_message at
> 0x7f6c91857b50>
> INFO:dill:F1: <function ReadWordsFromKafka.expand.<locals>.decode_message
> at 0x7f6c91857b50>
> D1: <dict object at 0x7f6cb0a5a040>
> INFO:dill:D1: <dict object at 0x7f6cb0a5a040>
> # D1
> INFO:dill:# D1
> Ce: <cell at 0x7f6c918636a0: ReadWordsFromKafka object at 0x7f6c918639a0>
> INFO:dill:Ce: <cell at 0x7f6c918636a0: ReadWordsFromKafka object at
> 0x7f6c918639a0>
> T2: <class '__main__.ReadWordsFromKafka'>
> INFO:dill:T2: <class '__main__.ReadWordsFromKafka'>
> F2: <function _create_type at 0x7f6cb03de440>
> INFO:dill:F2: <function _create_type at 0x7f6cb03de440>
> # F2
> INFO:dill:# F2
> T1: <class 'type'>
> INFO:dill:T1: <class 'type'>
> # T1
> INFO:dill:# T1
> T4: <class 'apache_beam.transforms.ptransform.PTransform'>
> INFO:dill:T4: <class 'apache_beam.transforms.ptransform.PTransform'>
> # T4
> INFO:dill:# T4
> D2: <dict object at 0x7f6c922f44c0>
> INFO:dill:D2: <dict object at 0x7f6c922f44c0>
> F1: <function ReadWordsFromKafka.__init__ at 0x7f6c922d3b50>
> INFO:dill:F1: <function ReadWordsFromKafka.__init__ at 0x7f6c922d3b50>
> D1: <dict object at 0x7f6cb0a5a040>
> INFO:dill:D1: <dict object at 0x7f6cb0a5a040>
> # D1
> INFO:dill:# D1
> Ce: <cell at 0x7f6cb0ac35e0: type object at 0x7f6c93424410>
> INFO:dill:Ce: <cell at 0x7f6cb0ac35e0: type object at 0x7f6c93424410>
> T5: <class '__main__.ReadWordsFromKafka'>
> INFO:dill:T5: <class '__main__.ReadWordsFromKafka'>
> # T5
> INFO:dill:# T5
> # Ce
> INFO:dill:# Ce
> D2: <dict object at 0x7f6c9186a740>
> INFO:dill:D2: <dict object at 0x7f6c9186a740>
> # D2
> INFO:dill:# D2
> # F1
> INFO:dill:# F1
> F1: <function ReadWordsFromKafka.expand at 0x7f6c922d3be0>
> INFO:dill:F1: <function ReadWordsFromKafka.expand at 0x7f6c922d3be0>
> D1: <dict object at 0x7f6cb0a5a040>
> INFO:dill:D1: <dict object at 0x7f6cb0a5a040>
> # D1
> INFO:dill:# D1
> D2: <dict object at 0x7f6c9186a5c0>
> INFO:dill:D2: <dict object at 0x7f6c9186a5c0>
> # D2
> INFO:dill:# D2
> # F1
> INFO:dill:# F1
> T6: <class 'apache_beam.typehints.decorators.IOTypeHints'>
> INFO:dill:T6: <class 'apache_beam.typehints.decorators.IOTypeHints'>
> F2: <function _create_namedtuple at 0x7f6cb03dedd0>
> INFO:dill:F2: <function _create_namedtuple at 0x7f6cb03dedd0>
> # F2
> INFO:dill:# F2
> # T6
> INFO:dill:# T6
> # D2
> INFO:dill:# D2
> # T2
> INFO:dill:# T2
> D2: <dict object at 0x7f6c9184be80>
> INFO:dill:D2: <dict object at 0x7f6c9184be80>
> T4: <class 'apache_beam.transforms.external.BeamJarExpansionService'>
> INFO:dill:T4: <class
> 'apache_beam.transforms.external.BeamJarExpansionService'>
> # T4
> INFO:dill:# T4
> D2: <dict object at 0x7f6c922d7c00>
> INFO:dill:D2: <dict object at 0x7f6c922d7c00>
> T4: <class 'apache_beam.utils.subprocess_server.JavaJarServer'>
> INFO:dill:T4: <class 'apache_beam.utils.subprocess_server.JavaJarServer'>
> # T4
> INFO:dill:# T4
> D2: <dict object at 0x7f6c9184aa80>
> INFO:dill:D2: <dict object at 0x7f6c9184aa80>
> T4: <class
> 'apache_beam.transforms.external.ExpansionAndArtifactRetrievalStub'>
> INFO:dill:T4: <class
> 'apache_beam.transforms.external.ExpansionAndArtifactRetrievalStub'>
> # T4
> INFO:dill:# T4
> # D2
> INFO:dill:# D2
> D2: <dict object at 0x7f6c918b78c0>
> INFO:dill:D2: <dict object at 0x7f6c918b78c0>
> T4: <class 'grpc._channel.Channel'>
> INFO:dill:T4: <class 'grpc._channel.Channel'>
> # T4
> INFO:dill:# T4
> D2: <dict object at 0x7f6c91868440>
> INFO:dill:D2: <dict object at 0x7f6c91868440>
> {'runner': 'FlinkRunner', 'job_name': 'avg-word-length-beam', 'streaming':
> True, 'environment_type': 'EXTERNAL', 'checkpointing_interval': '60000',
> 'environment_config': 'localhost:50000', 'flink_submit_uber_jar': True}
> Traceback (most recent call last):
>   File
> "/usr/local/lib/python3.10/site-packages/apache_beam/internal/dill_pickler.py",
> line 379, in dumps
>     s = dill.dumps(o, byref=settings['dill_byref'])
>   File "/usr/local/lib/python3.10/site-packages/dill/_dill.py", line 265,
> in dumps
>     dump(obj, file, protocol, byref, fmode, recurse, **kwds)#, strictio)
>   File "/usr/local/lib/python3.10/site-packages/dill/_dill.py", line 259,
> in dump
>     Pickler(file, protocol, **_kwds).dump(obj)
>   File "/usr/local/lib/python3.10/site-packages/dill/_dill.py", line 445,
> in dump
>     StockPickler.dump(self, obj)
>   File "/usr/local/lib/python3.10/pickle.py", line 487, in dump
>     self.save(obj)
>   File "/usr/local/lib/python3.10/pickle.py", line 603, in save
>     self.save_reduce(obj=obj, *rv)
>   File "/usr/local/lib/python3.10/pickle.py", line 717, in save_reduce
>     save(state)
>   File "/usr/local/lib/python3.10/pickle.py", line 560, in save
>     f(self, obj)  # Call unbound method with explicit self
>   File
> "/usr/local/lib/python3.10/site-packages/apache_beam/internal/dill_pickler.py",
> line 349, in new_save_module_dict
>     return old_save_module_dict(pickler, obj)
>   File "/usr/local/lib/python3.10/site-packages/dill/_dill.py", line 912,
> in save_module_dict
>     StockPickler.save_dict(pickler, obj)
>   File "/usr/local/lib/python3.10/pickle.py", line 972, in save_dict
>     self._batch_setitems(obj.items())
>   File "/usr/local/lib/python3.10/pickle.py", line 998, in _batch_setitems
>     save(v)
>   File "/usr/local/lib/python3.10/pickle.py", line 560, in save
>     f(self, obj)  # Call unbound method with explicit self
>   File "/usr/local/lib/python3.10/site-packages/dill/_dill.py", line 1410,
> in save_function
>     pickler.save_reduce(_create_function, (obj.__code__,
>   File "/usr/local/lib/python3.10/pickle.py", line 692, in save_reduce
>     save(args)
>   File "/usr/local/lib/python3.10/pickle.py", line 560, in save
>     f(self, obj)  # Call unbound method with explicit self
>   File "/usr/local/lib/python3.10/pickle.py", line 902, in save_tuple
>     save(element)
>   File "/usr/local/lib/python3.10/pickle.py", line 560, in save
>     f(self, obj)  # Call unbound method with explicit self
>   File "/usr/local/lib/python3.10/pickle.py", line 887, in save_tuple
>     save(element)
>   File "/usr/local/lib/python3.10/pickle.py", line 560, in save
>     f(self, obj)  # Call unbound method with explicit self
>   File "/usr/local/lib/python3.10/site-packages/dill/_dill.py", line 1147,
> in save_cell
>     pickler.save_reduce(_create_cell, (f,), obj=obj)
>   File "/usr/local/lib/python3.10/pickle.py", line 692, in save_reduce
>     save(args)
>   File "/usr/local/lib/python3.10/pickle.py", line 560, in save
>     f(self, obj)  # Call unbound method with explicit self
>   File "/usr/local/lib/python3.10/pickle.py", line 887, in save_tuple
>     save(element)
>   File "/usr/local/lib/python3.10/pickle.py", line 560, in save
>     f(self, obj)  # Call unbound method with explicit self
>   File "/usr/local/lib/python3.10/site-packages/dill/_dill.py", line 1410,
> in save_function
>     pickler.save_reduce(_create_function, (obj.__code__,
>   File "/usr/local/lib/python3.10/pickle.py", line 692, in save_reduce
>     save(args)
>   File "/usr/local/lib/python3.10/pickle.py", line 560, in save
>     f(self, obj)  # Call unbound method with explicit self
>   File "/usr/local/lib/python3.10/pickle.py", line 902, in save_tuple
>     save(element)
>   File "/usr/local/lib/python3.10/pickle.py", line 560, in save
>     f(self, obj)  # Call unbound method with explicit self
>   File "/usr/local/lib/python3.10/pickle.py", line 887, in save_tuple
>     save(element)
>   File "/usr/local/lib/python3.10/pickle.py", line 560, in save
>     f(self, obj)  # Call unbound method with explicit self
>   File "/usr/local/lib/python3.10/site-packages/dill/_dill.py", line 1147,
> in save_cell
>     pickler.save_reduce(_create_cell, (f,), obj=obj)
>   File "/usr/local/lib/python3.10/pickle.py", line 692, in save_reduce
>     save(args)
>   File "/usr/local/lib/python3.10/pickle.py", line 560, in save
>     f(self, obj)  # Call unbound method with explicit self
>   File "/usr/local/lib/python3.10/pickle.py", line 887, in save_tuple
>     save(element)
>   File "/usr/local/lib/python3.10/pickle.py", line 603, in save
>     self.save_reduce(obj=obj, *rv)
>   File "/usr/local/lib/python3.10/pickle.py", line 717, in save_reduce
>     save(state)
>   File "/usr/local/lib/python3.10/pickle.py", line 560, in save
>     f(self, obj)  # Call unbound method with explicit self
>   File
> "/usr/local/lib/python3.10/site-packages/apache_beam/internal/dill_pickler.py",
> line 349, in new_save_module_dict
>     return old_save_module_dict(pickler, obj)
>   File "/usr/local/lib/python3.10/site-packages/dill/_dill.py", line 912,
> in save_module_dict
>     StockPickler.save_dict(pickler, obj)
>   File "/usr/local/lib/python3.10/pickle.py", line 972, in save_dict
>     self._batch_setitems(obj.items())
>   File "/usr/local/lib/python3.10/pickle.py", line 998, in _batch_setitems
>     save(v)
>   File "/usr/local/lib/python3.10/pickle.py", line 603, in save
>     self.save_reduce(obj=obj, *rv)
>   File "/usr/local/lib/python3.10/pickle.py", line 717, in save_reduce
>     save(state)
>   File "/usr/local/lib/python3.10/pickle.py", line 560, in save
>     f(self, obj)  # Call unbound method with explicit self
>   File
> "/usr/local/lib/python3.10/site-packages/apache_beam/internal/dill_pickler.py",
> line 349, in new_save_module_dict
>     return old_save_module_dict(pickler, obj)
>   File "/usr/local/lib/python3.10/site-packages/dill/_dill.py", line 912,
> in save_module_dict
>     StockPickler.save_dict(pickler, obj)
>   File "/usr/local/lib/python3.10/pickle.py", line 972, in save_dict
>     self._batch_setitems(obj.items())
>   File "/usr/local/lib/python3.10/pickle.py", line 998, in _batch_setitems
>     save(v)
>   File "/usr/local/lib/python3.10/pickle.py", line 603, in save
>     self.save_reduce(obj=obj, *rv)
>   File "/usr/local/lib/python3.10/pickle.py", line 717, in save_reduce
>     save(state)
>   File "/usr/local/lib/python3.10/pickle.py", line 560, in save
>     f(self, obj)  # Call unbound method with explicit self
>   File
> "/usr/local/lib/python3.10/site-packages/apache_beam/internal/dill_pickler.py",
> line 349, in new_save_module_dict
>     return old_save_module_dict(pickler, obj)
>   File "/usr/local/lib/python3.10/site-packages/dill/_dill.py", line 912,
> in save_module_dict
>     StockPickler.save_dict(pickler, obj)
>   File "/usr/local/lib/python3.10/pickle.py", line 972, in save_dict
>     self._batch_setitems(obj.items())
>   File "/usr/local/lib/python3.10/pickle.py", line 998, in _batch_setitems
>     save(v)
>   File "/usr/local/lib/python3.10/pickle.py", line 603, in save
>     self.save_reduce(obj=obj, *rv)
>   File "/usr/local/lib/python3.10/pickle.py", line 717, in save_reduce
>     save(state)
>   File "/usr/local/lib/python3.10/pickle.py", line 560, in save
>     f(self, obj)  # Call unbound method with explicit self
>   File
> "/usr/local/lib/python3.10/site-packages/apache_beam/internal/dill_pickler.py",
> line 349, in new_save_module_dict
>     return old_save_module_dict(pickler, obj)
>   File "/usr/local/lib/python3.10/site-packages/dill/_dill.py", line 912,
> in save_module_dict
>     StockPickler.save_dict(pickler, obj)
>   File "/usr/local/lib/python3.10/pickle.py", line 972, in save_dict
>     self._batch_setitems(obj.items())
>   File "/usr/local/lib/python3.10/pickle.py", line 998, in _batch_setitems
>     save(v)
>   File "/usr/local/lib/python3.10/pickle.py", line 578, in save
>     rv = reduce(self.proto)
>   File "stringsource", line 2, in
> grpc._cython.cygrpc.Channel.__reduce_cython__
> TypeError: no default __reduce__ due to non-trivial __cinit__
>
> During handling of the above exception, another exception occurred:
>
> Traceback (most recent call last):
>   File "/opt/flink/app/word_len.py", line 212, in <module>
>     run()
>   File "/opt/flink/app/word_len.py", line 184, in run
>     p
>   File
> "/usr/local/lib/python3.10/site-packages/apache_beam/transforms/ptransform.py",
> line 1110, in __ror__
>     return self.transform.__ror__(pvalueish, self.label)
>   File
> "/usr/local/lib/python3.10/site-packages/apache_beam/transforms/ptransform.py",
> line 623, in __ror__
>     result = p.apply(self, pvalueish, label)
>   File "/usr/local/lib/python3.10/site-packages/apache_beam/pipeline.py",
> line 679, in apply
>     return self.apply(transform, pvalueish)
>   File "/usr/local/lib/python3.10/site-packages/apache_beam/pipeline.py",
> line 732, in apply
>     pvalueish_result = self.runner.apply(transform, pvalueish,
> self._options)
>   File
> "/usr/local/lib/python3.10/site-packages/apache_beam/runners/runner.py",
> line 203, in apply
>     return self.apply_PTransform(transform, input, options)
>   File
> "/usr/local/lib/python3.10/site-packages/apache_beam/runners/runner.py",
> line 207, in apply_PTransform
>     return transform.expand(input)
>   File "/opt/flink/app/word_len.py", line 94, in expand
>     | "DecodeMessage" >> beam.Map(decode_message)
>   File
> "/usr/local/lib/python3.10/site-packages/apache_beam/transforms/core.py",
> line 1994, in Map
>     pardo = FlatMap(wrapper, *args, **kwargs)
>   File
> "/usr/local/lib/python3.10/site-packages/apache_beam/transforms/core.py",
> line 1937, in FlatMap
>     pardo = ParDo(CallableWrapperDoFn(fn), *args, **kwargs)
>   File
> "/usr/local/lib/python3.10/site-packages/apache_beam/transforms/core.py",
> line 1473, in __init__
>     super().__init__(fn, *args, **kwargs)
>   File
> "/usr/local/lib/python3.10/site-packages/apache_beam/transforms/ptransform.py",
> line 870, in __init__
>     self.fn = pickler.loads(pickler.dumps(self.fn))
>   File
> "/usr/local/lib/python3.10/site-packages/apache_beam/internal/pickler.py",
> line 44, in dumps
>     return desired_pickle_lib.dumps(
>   File
> "/usr/local/lib/python3.10/site-packages/apache_beam/internal/dill_pickler.py",
> line 383, in dumps
>     s = dill.dumps(o, byref=settings['dill_byref'])
>   File "/usr/local/lib/python3.10/site-packages/dill/_dill.py", line 265,
> in dumps
>     dump(obj, file, protocol, byref, fmode, recurse, **kwds)#, strictio)
>   File "/usr/local/lib/python3.10/site-packages/dill/_dill.py", line 259,
> in dump
>     Pickler(file, protocol, **_kwds).dump(obj)
>   File "/usr/local/lib/python3.10/site-packages/dill/_dill.py", line 445,
> in dump
>     StockPickler.dump(self, obj)
>   File "/usr/local/lib/python3.10/pickle.py", line 487, in dump
>     self.save(obj)
>   File "/usr/local/lib/python3.10/pickle.py", line 603, in save
>     self.save_reduce(obj=obj, *rv)
>   File "/usr/local/lib/python3.10/pickle.py", line 717, in save_reduce
>     save(state)
>   File "/usr/local/lib/python3.10/pickle.py", line 560, in save
>     f(self, obj)  # Call unbound method with explicit self
>   File
> "/usr/local/lib/python3.10/site-packages/apache_beam/internal/dill_pickler.py",
> line 349, in new_save_module_dict
>     return old_save_module_dict(pickler, obj)
>   File "/usr/local/lib/python3.10/site-packages/dill/_dill.py", line 912,
> in save_module_dict
>     StockPickler.save_dict(pickler, obj)
>   File "/usr/local/lib/python3.10/pickle.py", line 972, in save_dict
>     self._batch_setitems(obj.items())
>   File "/usr/local/lib/python3.10/pickle.py", line 998, in _batch_setitems
>     save(v)
>   File "/usr/local/lib/python3.10/pickle.py", line 560, in save
>     f(self, obj)  # Call unbound method with explicit self
>   File "/usr/local/lib/python3.10/site-packages/dill/_dill.py", line 1410,
> in save_function
>     pickler.save_reduce(_create_function, (obj.__code__,
>   File "/usr/local/lib/python3.10/pickle.py", line 692, in save_reduce
>     save(args)
>   File "/usr/local/lib/python3.10/pickle.py", line 560, in save
>     f(self, obj)  # Call unbound method with explicit self
>   File "/usr/local/lib/python3.10/pickle.py", line 902, in save_tuple
>     save(element)
>   File "/usr/local/lib/python3.10/pickle.py", line 560, in save
>     f(self, obj)  # Call unbound method with explicit self
>   File "/usr/local/lib/python3.10/pickle.py", line 887, in save_tuple
>     save(element)
>   File "/usr/local/lib/python3.10/pickle.py", line 560, in save
>     f(self, obj)  # Call unbound method with explicit self
>   File "/usr/local/lib/python3.10/site-packages/dill/_dill.py", line 1147,
> in save_cell
>     pickler.save_reduce(_create_cell, (f,), obj=obj)
>   File "/usr/local/lib/python3.10/pickle.py", line 692, in save_reduce
>     save(args)
>   File "/usr/local/lib/python3.10/pickle.py", line 560, in save
>     f(self, obj)  # Call unbound method with explicit self
>   File "/usr/local/lib/python3.10/pickle.py", line 887, in save_tuple
>     save(element)
>   File "/usr/local/lib/python3.10/pickle.py", line 560, in save
>     f(self, obj)  # Call unbound method with explicit self
>   File "/usr/local/lib/python3.10/site-packages/dill/_dill.py", line 1410,
> in save_function
>     pickler.save_reduce(_create_function, (obj.__code__,
>   File "/usr/local/lib/python3.10/pickle.py", line 692, in save_reduce
>     save(args)
>   File "/usr/local/lib/python3.10/pickle.py", line 560, in save
>     f(self, obj)  # Call unbound method with explicit self
>   File "/usr/local/lib/python3.10/pickle.py", line 902, in save_tuple
>     save(element)
>   File "/usr/local/lib/python3.10/pickle.py", line 560, in save
>     f(self, obj)  # Call unbound method with explicit self
>   File "/usr/local/lib/python3.10/pickle.py", line 887, in save_tuple
>     save(element)
>   File "/usr/local/lib/python3.10/pickle.py", line 560, in save
>     f(self, obj)  # Call unbound method with explicit self
>   File "/usr/local/lib/python3.10/site-packages/dill/_dill.py", line 1147,
> in save_cell
>     pickler.save_reduce(_create_cell, (f,), obj=obj)
>   File "/usr/local/lib/python3.10/pickle.py", line 692, in save_reduce
>     save(args)
>   File "/usr/local/lib/python3.10/pickle.py", line 560, in save
>     f(self, obj)  # Call unbound method with explicit self
>   File "/usr/local/lib/python3.10/pickle.py", line 887, in save_tuple
>     save(element)
>   File "/usr/local/lib/python3.10/pickle.py", line 603, in save
>     self.save_reduce(obj=obj, *rv)
>   File "/usr/local/lib/python3.10/pickle.py", line 717, in save_reduce
>     save(state)
>   File "/usr/local/lib/python3.10/pickle.py", line 560, in save
>     f(self, obj)  # Call unbound method with explicit self
>   File
> "/usr/local/lib/python3.10/site-packages/apache_beam/internal/dill_pickler.py",
> line 349, in new_save_module_dict
>     return old_save_module_dict(pickler, obj)
>   File "/usr/local/lib/python3.10/site-packages/dill/_dill.py", line 912,
> in save_module_dict
>     StockPickler.save_dict(pickler, obj)
>   File "/usr/local/lib/python3.10/pickle.py", line 972, in save_dict
>     self._batch_setitems(obj.items())
>   File "/usr/local/lib/python3.10/pickle.py", line 998, in _batch_setitems
>     save(v)
>   File "/usr/local/lib/python3.10/pickle.py", line 603, in save
>     self.save_reduce(obj=obj, *rv)
>   File "/usr/local/lib/python3.10/pickle.py", line 717, in save_reduce
>     save(state)
>   File "/usr/local/lib/python3.10/pickle.py", line 560, in save
>     f(self, obj)  # Call unbound method with explicit self
>   File
> "/usr/local/lib/python3.10/site-packages/apache_beam/internal/dill_pickler.py",
> line 349, in new_save_module_dict
>     return old_save_module_dict(pickler, obj)
>   File "/usr/local/lib/python3.10/site-packages/dill/_dill.py", line 912,
> in save_module_dict
>     StockPickler.save_dict(pickler, obj)
>   File "/usr/local/lib/python3.10/pickle.py", line 972, in save_dict
>     self._batch_setitems(obj.items())
>   File "/usr/local/lib/python3.10/pickle.py", line 998, in _batch_setitems
>     save(v)
>   File "/usr/local/lib/python3.10/pickle.py", line 603, in save
>     self.save_reduce(obj=obj, *rv)
>   File "/usr/local/lib/python3.10/pickle.py", line 717, in save_reduce
>     save(state)
>   File "/usr/local/lib/python3.10/pickle.py", line 560, in save
>     f(self, obj)  # Call unbound method with explicit self
>   File
> "/usr/local/lib/python3.10/site-packages/apache_beam/internal/dill_pickler.py",
> line 349, in new_save_module_dict
>     return old_save_module_dict(pickler, obj)
>   File "/usr/local/lib/python3.10/site-packages/dill/_dill.py", line 912,
> in save_module_dict
>     StockPickler.save_dict(pickler, obj)
>   File "/usr/local/lib/python3.10/pickle.py", line 972, in save_dict
>     self._batch_setitems(obj.items())
>   File "/usr/local/lib/python3.10/pickle.py", line 998, in _batch_setitems
>     save(v)
>   File "/usr/local/lib/python3.10/pickle.py", line 603, in save
>     self.save_reduce(obj=obj, *rv)
>   File "/usr/local/lib/python3.10/pickle.py", line 717, in save_reduce
>     save(state)
>   File "/usr/local/lib/python3.10/pickle.py", line 560, in save
>     f(self, obj)  # Call unbound method with explicit self
>   File
> "/usr/local/lib/python3.10/site-packages/apache_beam/internal/dill_pickler.py",
> line 349, in new_save_module_dict
>     return old_save_module_dict(pickler, obj)
>   File "/usr/local/lib/python3.10/site-packages/dill/_dill.py", line 912,
> in save_module_dict
>     StockPickler.save_dict(pickler, obj)
>   File "/usr/local/lib/python3.10/pickle.py", line 972, in save_dict
>     self._batch_setitems(obj.items())
>   File "/usr/local/lib/python3.10/pickle.py", line 998, in _batch_setitems
>     save(v)
>   File "/usr/local/lib/python3.10/pickle.py", line 578, in save
>     rv = reduce(self.proto)
>   File "stringsource", line 2, in
> grpc._cython.cygrpc.Channel.__reduce_cython__
> TypeError: no default __reduce__ due to non-trivial __cinit__
>
>         at
> org.apache.beam.runners.flink.FlinkPortableClientEntryPoint.runDriverProgram
> (FlinkPortableClientEntryPoint.java:192) ~[?:?]
>         at
> org.apache.beam.runners.flink.FlinkPortableClientEntryPoint.main(
> FlinkPortableClientEntryPoint.java:100) ~[?:?]
>         at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
> Method) ~[?:?]
>         at jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown
> Source) ~[?:?]
>         at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown
> Source) ~[?:?]
>         at java.lang.reflect.Method.invoke(Unknown Source) ~[?:?]
>         at org.apache.flink.client.program.PackagedProgram.callMainMethod(
> PackagedProgram.java:355) ~[flink-dist-1.17.2.jar:1.17.2]
>         at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution
> (PackagedProgram.java:222) ~[flink-dist-1.17.2.jar:1.17.2]
>         at org.apache.flink.client.ClientUtils.executeProgram(
> ClientUtils.java:105) ~[flink-dist-1.17.2.jar:1.17.2]
>         at
> org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint
> (ApplicationDispatcherBootstrap.java:301) ~[flink-dist-1.17.2.jar:1.17.2]
>         ... 13 more
> Caused by: java.util.concurrent.TimeoutException: Timeout of 30 seconds
> waiting for job submission.
>         at org.apache.beam.runners.flink.FlinkPortableClientEntryPoint$
> DetachedJobInvokerFactory.executeDetachedJob(
> FlinkPortableClientEntryPoint.java:256) ~[?:?]
>         at org.apache.beam.runners.flink.FlinkPortableClientEntryPoint$
> DetachedJobInvokerFactory.access$300(FlinkPortableClientEntryPoint.java:
> 206) ~[?:?]
>         at
> org.apache.beam.runners.flink.FlinkPortableClientEntryPoint.runDriverProgram
> (FlinkPortableClientEntryPoint.java:180) ~[?:?]
>         at
> org.apache.beam.runners.flink.FlinkPortableClientEntryPoint.main(
> FlinkPortableClientEntryPoint.java:100) ~[?:?]
>         at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
> Method) ~[?:?]
>         at jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown
> Source) ~[?:?]
>         at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown
> Source) ~[?:?]
>         at java.lang.reflect.Method.invoke(Unknown Source) ~[?:?]
>         at org.apache.flink.client.program.PackagedProgram.callMainMethod(
> PackagedProgram.java:355) ~[flink-dist-1.17.2.jar:1.17.2]
>         at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution
> (PackagedProgram.java:222) ~[flink-dist-1.17.2.jar:1.17.2]
>         at org.apache.flink.client.ClientUtils.executeProgram(
> ClientUtils.java:105) ~[flink-dist-1.17.2.jar:1.17.2]
>         at
> org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint
> (ApplicationDispatcherBootstrap.java:301) ~[flink-dist-1.17.2.jar:1.17.2]
>         ... 13 more
> 2024-05-10 13:11:41,217 ERROR
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint        [] - Fatal
> error occurred in the cluster entrypoint.
> java.util.concurrent.CompletionException:
> org.apache.flink.client.deployment.application.ApplicationExecutionException:
> Could not execute application.
>         at java.util.concurrent.CompletableFuture.encodeThrowable(Unknown
> Source) ~[?:?]
>         at java.util.concurrent.CompletableFuture.completeThrowable(Unknown
> Source) ~[?:?]
>         at java.util.concurrent.CompletableFuture$UniCompose.tryFire(Unknown
> Source) ~[?:?]
>         at java.util.concurrent.CompletableFuture.postComplete(Unknown
> Source) ~[?:?]
>         at 
> java.util.concurrent.CompletableFuture.completeExceptionally(Unknown
> Source) ~[?:?]
>         at
> org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint
> (ApplicationDispatcherBootstrap.java:337) ~[flink-dist-1.17.2.jar:1.17.2]
>         at
> org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.lambda
> $runApplicationAsync$2(ApplicationDispatcherBootstrap.java:254) ~[
> flink-dist-1.17.2.jar:1.17.2]
>         at java.util.concurrent.Executors$RunnableAdapter.call(Unknown
> Source) ~[?:?]
>         at java.util.concurrent.FutureTask.run(Unknown Source) ~[?:?]
>         at
> org.apache.flink.runtime.concurrent.akka.ActorSystemScheduledExecutorAdapter
> $ScheduledFutureTask.run(ActorSystemScheduledExecutorAdapter.java:171) ~[
> flink-rpc-akka_974f9cb1-cb21-497a-9825-4e660c5425bc.jar:1.17.2]
>         at
> org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader
> (ClassLoadingUtils.java:68) ~[
> flink-rpc-akka_974f9cb1-cb21-497a-9825-4e660c5425bc.jar:1.17.2]
>         at
> org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda
> $withContextClassLoader$0(ClassLoadingUtils.java:41) ~[
> flink-rpc-akka_974f9cb1-cb21-497a-9825-4e660c5425bc.jar:1.17.2]
>         at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:49) [
> flink-rpc-akka_974f9cb1-cb21-497a-9825-4e660c5425bc.jar:1.17.2]
>         at akka.dispatch.ForkJoinExecutorConfigurator$
> AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:48) [
> flink-rpc-akka_974f9cb1-cb21-497a-9825-4e660c5425bc.jar:1.17.2]
>         at java.util.concurrent.ForkJoinTask.doExec(Unknown Source) [?:?]
>         at java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(Unknown
> Source) [?:?]
>         at java.util.concurrent.ForkJoinPool.scan(Unknown Source) [?:?]
>         at java.util.concurrent.ForkJoinPool.runWorker(Unknown Source)
> [?:?]
>         at java.util.concurrent.ForkJoinWorkerThread.run(Unknown Source)
> [?:?]
> Caused by:
> org.apache.flink.client.deployment.application.ApplicationExecutionException:
> Could not execute application.
>         ... 14 more
> Caused by: org.apache.flink.client.program.ProgramInvocationException:
> The main method caused an error: Job python /opt/flink/app/word_len.py
> --deploy failed.
>         at org.apache.flink.client.program.PackagedProgram.callMainMethod(
> PackagedProgram.java:372) ~[flink-dist-1.17.2.jar:1.17.2]
>         at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution
> (PackagedProgram.java:222) ~[flink-dist-1.17.2.jar:1.17.2]
>         at org.apache.flink.client.ClientUtils.executeProgram(
> ClientUtils.java:105) ~[flink-dist-1.17.2.jar:1.17.2]
>         at
> org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint
> (ApplicationDispatcherBootstrap.java:301) ~[flink-dist-1.17.2.jar:1.17.2]
>         ... 13 more
> Caused by: java.lang.RuntimeException: Job python /opt/flink/app/
> word_len.py --deploy failed.
>         at
> org.apache.beam.runners.flink.FlinkPortableClientEntryPoint.main(
> FlinkPortableClientEntryPoint.java:102) ~[?:?]
>         at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
> Method) ~[?:?]
>         at jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown
> Source) ~[?:?]
>         at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown
> Source) ~[?:?]
>         at java.lang.reflect.Method.invoke(Unknown Source) ~[?:?]
>         at org.apache.flink.client.program.PackagedProgram.callMainMethod(
> PackagedProgram.java:355) ~[flink-dist-1.17.2.jar:1.17.2]
>         at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution
> (PackagedProgram.java:222) ~[flink-dist-1.17.2.jar:1.17.2]
>         at org.apache.flink.client.ClientUtils.executeProgram(
> ClientUtils.java:105) ~[flink-dist-1.17.2.jar:1.17.2]
>         at
> org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint
> (ApplicationDispatcherBootstrap.java:301) ~[flink-dist-1.17.2.jar:1.17.2]
>         ... 13 more
> Caused by: java.lang.RuntimeException: Failed to start job with driver
> program: bash [-c, python /opt/flink/app/word_len.py --deploy
> --job_endpoint=localhost:37015] output: WARNING:root:Waiting for grpc
> channel to be ready at localhost:32895.
>
> Can you please inform me how to fix this issue?
>
> Cheers,
> Jaehyeon
>

Reply via email to