Hi XQ

I haven't changed anything and the issue would persist on my end. The print
stuff is called only when self.verbose is True and, by default, it is False.

BTW Do you have any idea about the error message? I haven't seen such error.

Cheers,
Jaehyeon

On Sun, 12 May 2024, 12:15 am XQ Hu via user, <user@beam.apache.org> wrote:

> Do you still have the same issue? I tried to follow your setup.sh to
> reproduce this but somehow I am stuck at the word_len step. I saw you also
> tried to use `print(kafka_kv)` to debug it. I am not sure about your
> current status.
>
> On Fri, May 10, 2024 at 9:18 AM Jaehyeon Kim <dott...@gmail.com> wrote:
>
>> Hello,
>>
>> I'm playing with deploying a python pipeline to a flink cluster on
>> kubernetes via flink kubernetes operator. The pipeline simply calculates
>> average word lengths in a fixed time window of 5 seconds and it works with
>> the embedded flink cluster.
>>
>> First, I created a k8s cluster (v1.25.3) on minikube and a docker image
>> named beam-python-example:1.17 created using the following docker file -
>> the full details can be checked in
>> https://github.com/jaehyeon-kim/beam-demos/blob/feature/beam-deploy/beam-deploy/beam/Dockerfile
>>
>> The java sdk is used for the sdk harness of the kafka io's expansion
>> service while the job server is used to execute the python pipeline in the
>> flink operator.
>>
>> FROM flink:1.17
>> ...
>> ## add java SDK and job server
>> COPY --from=apache/beam_java8_sdk:2.56.0 /opt/apache/beam/
>> /opt/apache/beam/
>>
>> COPY --from=apache/beam_flink1.17_job_server:2.56.0  \
>>   /opt/apache/beam/jars/beam-runners-flink-job-server.jar
>> /opt/apache/beam/jars/beam-runners-flink-job-server.jar
>>
>> RUN chown -R flink:flink /opt/apache/beam
>>
>> ## install python 3.10.13
>> RUN apt-get update -y && \
>>   apt-get install -y build-essential libssl-dev zlib1g-dev libbz2-dev
>> libffi-dev liblzma-dev && \
>>   wget https://www.python.org/ftp/python/${PYTHON_VERSION}/Python-
>> ${PYTHON_VERSION}.tgz && \
>> ...
>> ## install apache beam 2.56.0
>> RUN pip3 install apache-beam==${BEAM_VERSION}
>>
>> ## copy pipeline source
>> RUN mkdir /opt/flink/app
>> COPY word_len.py /opt/flink/app/
>>
>> Then the pipeline is deployed using the following manifest - the full
>> details can be found in
>> https://github.com/jaehyeon-kim/beam-demos/blob/feature/beam-deploy/beam-deploy/beam/word_len.yml
>>
>> apiVersion: flink.apache.org/v1beta1
>> kind: FlinkDeployment
>> metadata:
>>   name: beam-word-len
>> spec:
>>   image: beam-python-example:1.17
>>   imagePullPolicy: Never
>>   flinkVersion: v1_17
>>   flinkConfiguration:
>>     taskmanager.numberOfTaskSlots: "5"
>>   serviceAccount: flink
>>   podTemplate:
>>     spec:
>>       containers:
>>         - name: flink-main-container
>>           env:
>>             - name: BOOTSTRAP_SERVERS
>>               value: demo-cluster-kafka-bootstrap:9092
>> ...
>>   jobManager:
>>     resource:
>>       memory: "2048m"
>>       cpu: 1
>>   taskManager:
>>     replicas: 2
>>     resource:
>>       memory: "2048m"
>>       cpu: 1
>>     podTemplate:
>>       spec:
>>         containers:
>>           - name: python-worker-harness
>>             image: apache/beam_python3.10_sdk:2.56.0
>>             imagePullPolicy: Never
>>             args: ["--worker_pool"]
>>             ports:
>>               - containerPort: 50000
>>
>>   job:
>>     jarURI:
>> local:///opt/apache/beam/jars/beam-runners-flink-job-server.jar
>>     entryClass:
>> org.apache.beam.runners.flink.FlinkPortableClientEntryPoint
>>     args:
>>       - "--driver-cmd"
>>       - "python /opt/flink/app/word_len.py --deploy"
>>     parallelism: 3
>>     upgradeMode: stateless
>>
>> Here is the pipeline source - the full details can be found in
>> https://github.com/jaehyeon-kim/beam-demos/blob/feature/beam-deploy/beam-deploy/beam/word_len.py
>>
>> When I add the --deploy flag, the python sdk harness is set to EXTERNAL
>> and its config is set to localhost:50000 - I believe it'll point to the
>> side car container of the task manager. For the kafka io, the expansion
>> service's sdk harness is configured as PROCESS and the command points to
>> the java sdk that is added in the beam-python-example:1.17 image.
>>
>> ...
>> def run(args=None):
>>     parser = argparse.ArgumentParser(description="Beam pipeline
>> arguments")
>>     parser.add_argument("--runner", default="FlinkRunner", help="Apache
>> Beam runner")
>>     parser.add_argument(
>>         "--deploy",
>>         action="store_true",
>>         default="Flag to indicate whether to use an own local cluster",
>>     )
>>     opts, _ = parser.parse_known_args(args)
>>
>>     pipeline_opts = {
>>         "runner": opts.runner,
>>         "job_name": "avg-word-length-beam",
>>         "streaming": True,
>>         "environment_type": "EXTERNAL" if opts.deploy is True else
>> "LOOPBACK",
>>         "checkpointing_interval": "60000",
>>     }
>>
>>     expansion_service = None
>>     if pipeline_opts["environment_type"] == "EXTERNAL":
>>         pipeline_opts = {
>>             **pipeline_opts,
>>             **{
>>                 "environment_config": "localhost:50000",
>>                 "flink_submit_uber_jar": True,
>>             },
>>         }
>>         expansion_service = kafka.default_io_expansion_service(
>>             append_args=[
>>                 "--defaultEnvironmentType=PROCESS",
>>
>> '--defaultEnvironmentConfig={"command":"/opt/apache/beam/boot"}',
>>                 "--experiments=use_deprecated_read",  #
>> https://github.com/apache/beam/issues/20979
>>             ]
>>         )
>>     print(pipeline_opts)
>>     options = PipelineOptions([], **pipeline_opts)
>>     # Required, else it will complain that when importing worker
>> functions
>>     options.view_as(SetupOptions).save_main_session = True
>>
>>     with beam.Pipeline(options=options) as p:
>>         (
>>             p
>>             | "ReadWordsFromKafka"
>>             >> ReadWordsFromKafka(
>>                 bootstrap_servers=os.getenv(
>>                     "BOOTSTRAP_SERVERS",
>>                     "host.docker.internal:29092",
>>                 ),
>>                 topics=[os.getenv("INPUT_TOPIC", "input-topic")],
>>                 group_id=os.getenv("GROUP_ID", "beam-word-len"),
>>                 expansion_service=expansion_service,
>>             )
>>             | "CalculateAvgWordLen" >> CalculateAvgWordLen()
>>             | "WriteWordLenToKafka"
>>             >> WriteWordLenToKafka(
>>                 bootstrap_servers=os.getenv(
>>                     "BOOTSTRAP_SERVERS",
>>                     "host.docker.internal:29092",
>>                 ),
>>                 topic=os.getenv("OUTPUT_TOPIC", "output-topic-beam"),
>>                 expansion_service=expansion_service,
>>             )
>>         )
>>
>>         logging.getLogger().setLevel(logging.INFO)
>>         logging.info("Building pipeline ...")
>>
>> When I run the pipeline, I see the following error and it fails to be
>> submitted to the job manager. It looks like the pipeline DAG is not created
>> but I'm not sure what makes the error.
>>
>> T4: <class 'apache_beam.transforms.core.CallableWrapperDoFn'>
>> INFO:dill:T4: <class 'apache_beam.transforms.core.CallableWrapperDoFn'>
>> # T4
>> INFO:dill:# T4
>> D2: <dict object at 0x7f6c918c1d00>
>> INFO:dill:D2: <dict object at 0x7f6c918c1d00>
>> F1: <function Map.<locals>.<lambda> at 0x7f6c918bb640>
>> INFO:dill:F1: <function Map.<locals>.<lambda> at 0x7f6c918bb640>
>> F2: <function _create_function at 0x7f6cb03de4d0>
>> INFO:dill:F2: <function _create_function at 0x7f6cb03de4d0>
>> # F2
>> INFO:dill:# F2
>> T1: <class 'code'>
>> INFO:dill:T1: <class 'code'>
>> F2: <function _load_type at 0x7f6cb03de3b0>
>> INFO:dill:F2: <function _load_type at 0x7f6cb03de3b0>
>> # F2
>> INFO:dill:# F2
>> # T1
>> INFO:dill:# T1
>> B1: <built-in function getattr>
>> INFO:dill:B1: <built-in function getattr>
>> F2: <function _get_attr at 0x7f6cb03deef0>
>> INFO:dill:F2: <function _get_attr at 0x7f6cb03deef0>
>> # F2
>> INFO:dill:# F2
>> # B1
>> INFO:dill:# B1
>> M2: <module 'apache_beam.transforms.core' from
>> '/usr/local/lib/python3.10/site-packages/apache_beam/transforms/core.py'>
>> INFO:dill:M2: <module 'apache_beam.transforms.core' from
>> '/usr/local/lib/python3.10/site-packages/apache_beam/transforms/core.py'>
>> F2: <function _import_module at 0x7f6cb03df010>
>> INFO:dill:F2: <function _import_module at 0x7f6cb03df010>
>> # F2
>> INFO:dill:# F2
>> # M2
>> INFO:dill:# M2
>> Ce: <cell at 0x7f6c91862c50: function object at 0x7f6c91857b50>
>> INFO:dill:Ce: <cell at 0x7f6c91862c50: function object at 0x7f6c91857b50>
>> F2: <function _create_cell at 0x7f6cb03de8c0>
>> INFO:dill:F2: <function _create_cell at 0x7f6cb03de8c0>
>> # F2
>> INFO:dill:# F2
>> F1: <function ReadWordsFromKafka.expand.<locals>.decode_message at
>> 0x7f6c91857b50>
>> INFO:dill:F1: <function ReadWordsFromKafka.expand.<locals>.decode_message
>> at 0x7f6c91857b50>
>> D1: <dict object at 0x7f6cb0a5a040>
>> INFO:dill:D1: <dict object at 0x7f6cb0a5a040>
>> # D1
>> INFO:dill:# D1
>> Ce: <cell at 0x7f6c918636a0: ReadWordsFromKafka object at 0x7f6c918639a0>
>> INFO:dill:Ce: <cell at 0x7f6c918636a0: ReadWordsFromKafka object at
>> 0x7f6c918639a0>
>> T2: <class '__main__.ReadWordsFromKafka'>
>> INFO:dill:T2: <class '__main__.ReadWordsFromKafka'>
>> F2: <function _create_type at 0x7f6cb03de440>
>> INFO:dill:F2: <function _create_type at 0x7f6cb03de440>
>> # F2
>> INFO:dill:# F2
>> T1: <class 'type'>
>> INFO:dill:T1: <class 'type'>
>> # T1
>> INFO:dill:# T1
>> T4: <class 'apache_beam.transforms.ptransform.PTransform'>
>> INFO:dill:T4: <class 'apache_beam.transforms.ptransform.PTransform'>
>> # T4
>> INFO:dill:# T4
>> D2: <dict object at 0x7f6c922f44c0>
>> INFO:dill:D2: <dict object at 0x7f6c922f44c0>
>> F1: <function ReadWordsFromKafka.__init__ at 0x7f6c922d3b50>
>> INFO:dill:F1: <function ReadWordsFromKafka.__init__ at 0x7f6c922d3b50>
>> D1: <dict object at 0x7f6cb0a5a040>
>> INFO:dill:D1: <dict object at 0x7f6cb0a5a040>
>> # D1
>> INFO:dill:# D1
>> Ce: <cell at 0x7f6cb0ac35e0: type object at 0x7f6c93424410>
>> INFO:dill:Ce: <cell at 0x7f6cb0ac35e0: type object at 0x7f6c93424410>
>> T5: <class '__main__.ReadWordsFromKafka'>
>> INFO:dill:T5: <class '__main__.ReadWordsFromKafka'>
>> # T5
>> INFO:dill:# T5
>> # Ce
>> INFO:dill:# Ce
>> D2: <dict object at 0x7f6c9186a740>
>> INFO:dill:D2: <dict object at 0x7f6c9186a740>
>> # D2
>> INFO:dill:# D2
>> # F1
>> INFO:dill:# F1
>> F1: <function ReadWordsFromKafka.expand at 0x7f6c922d3be0>
>> INFO:dill:F1: <function ReadWordsFromKafka.expand at 0x7f6c922d3be0>
>> D1: <dict object at 0x7f6cb0a5a040>
>> INFO:dill:D1: <dict object at 0x7f6cb0a5a040>
>> # D1
>> INFO:dill:# D1
>> D2: <dict object at 0x7f6c9186a5c0>
>> INFO:dill:D2: <dict object at 0x7f6c9186a5c0>
>> # D2
>> INFO:dill:# D2
>> # F1
>> INFO:dill:# F1
>> T6: <class 'apache_beam.typehints.decorators.IOTypeHints'>
>> INFO:dill:T6: <class 'apache_beam.typehints.decorators.IOTypeHints'>
>> F2: <function _create_namedtuple at 0x7f6cb03dedd0>
>> INFO:dill:F2: <function _create_namedtuple at 0x7f6cb03dedd0>
>> # F2
>> INFO:dill:# F2
>> # T6
>> INFO:dill:# T6
>> # D2
>> INFO:dill:# D2
>> # T2
>> INFO:dill:# T2
>> D2: <dict object at 0x7f6c9184be80>
>> INFO:dill:D2: <dict object at 0x7f6c9184be80>
>> T4: <class 'apache_beam.transforms.external.BeamJarExpansionService'>
>> INFO:dill:T4: <class
>> 'apache_beam.transforms.external.BeamJarExpansionService'>
>> # T4
>> INFO:dill:# T4
>> D2: <dict object at 0x7f6c922d7c00>
>> INFO:dill:D2: <dict object at 0x7f6c922d7c00>
>> T4: <class 'apache_beam.utils.subprocess_server.JavaJarServer'>
>> INFO:dill:T4: <class 'apache_beam.utils.subprocess_server.JavaJarServer'>
>> # T4
>> INFO:dill:# T4
>> D2: <dict object at 0x7f6c9184aa80>
>> INFO:dill:D2: <dict object at 0x7f6c9184aa80>
>> T4: <class
>> 'apache_beam.transforms.external.ExpansionAndArtifactRetrievalStub'>
>> INFO:dill:T4: <class
>> 'apache_beam.transforms.external.ExpansionAndArtifactRetrievalStub'>
>> # T4
>> INFO:dill:# T4
>> # D2
>> INFO:dill:# D2
>> D2: <dict object at 0x7f6c918b78c0>
>> INFO:dill:D2: <dict object at 0x7f6c918b78c0>
>> T4: <class 'grpc._channel.Channel'>
>> INFO:dill:T4: <class 'grpc._channel.Channel'>
>> # T4
>> INFO:dill:# T4
>> D2: <dict object at 0x7f6c91868440>
>> INFO:dill:D2: <dict object at 0x7f6c91868440>
>> {'runner': 'FlinkRunner', 'job_name': 'avg-word-length-beam', 'streaming':
>> True, 'environment_type': 'EXTERNAL', 'checkpointing_interval': '60000',
>> 'environment_config': 'localhost:50000', 'flink_submit_uber_jar': True}
>> Traceback (most recent call last):
>>   File
>> "/usr/local/lib/python3.10/site-packages/apache_beam/internal/dill_pickler.py",
>> line 379, in dumps
>>     s = dill.dumps(o, byref=settings['dill_byref'])
>>   File "/usr/local/lib/python3.10/site-packages/dill/_dill.py", line 265,
>> in dumps
>>     dump(obj, file, protocol, byref, fmode, recurse, **kwds)#, strictio)
>>   File "/usr/local/lib/python3.10/site-packages/dill/_dill.py", line 259,
>> in dump
>>     Pickler(file, protocol, **_kwds).dump(obj)
>>   File "/usr/local/lib/python3.10/site-packages/dill/_dill.py", line 445,
>> in dump
>>     StockPickler.dump(self, obj)
>>   File "/usr/local/lib/python3.10/pickle.py", line 487, in dump
>>     self.save(obj)
>>   File "/usr/local/lib/python3.10/pickle.py", line 603, in save
>>     self.save_reduce(obj=obj, *rv)
>>   File "/usr/local/lib/python3.10/pickle.py", line 717, in save_reduce
>>     save(state)
>>   File "/usr/local/lib/python3.10/pickle.py", line 560, in save
>>     f(self, obj)  # Call unbound method with explicit self
>>   File
>> "/usr/local/lib/python3.10/site-packages/apache_beam/internal/dill_pickler.py",
>> line 349, in new_save_module_dict
>>     return old_save_module_dict(pickler, obj)
>>   File "/usr/local/lib/python3.10/site-packages/dill/_dill.py", line 912,
>> in save_module_dict
>>     StockPickler.save_dict(pickler, obj)
>>   File "/usr/local/lib/python3.10/pickle.py", line 972, in save_dict
>>     self._batch_setitems(obj.items())
>>   File "/usr/local/lib/python3.10/pickle.py", line 998, in
>> _batch_setitems
>>     save(v)
>>   File "/usr/local/lib/python3.10/pickle.py", line 560, in save
>>     f(self, obj)  # Call unbound method with explicit self
>>   File "/usr/local/lib/python3.10/site-packages/dill/_dill.py", line 1410,
>> in save_function
>>     pickler.save_reduce(_create_function, (obj.__code__,
>>   File "/usr/local/lib/python3.10/pickle.py", line 692, in save_reduce
>>     save(args)
>>   File "/usr/local/lib/python3.10/pickle.py", line 560, in save
>>     f(self, obj)  # Call unbound method with explicit self
>>   File "/usr/local/lib/python3.10/pickle.py", line 902, in save_tuple
>>     save(element)
>>   File "/usr/local/lib/python3.10/pickle.py", line 560, in save
>>     f(self, obj)  # Call unbound method with explicit self
>>   File "/usr/local/lib/python3.10/pickle.py", line 887, in save_tuple
>>     save(element)
>>   File "/usr/local/lib/python3.10/pickle.py", line 560, in save
>>     f(self, obj)  # Call unbound method with explicit self
>>   File "/usr/local/lib/python3.10/site-packages/dill/_dill.py", line 1147,
>> in save_cell
>>     pickler.save_reduce(_create_cell, (f,), obj=obj)
>>   File "/usr/local/lib/python3.10/pickle.py", line 692, in save_reduce
>>     save(args)
>>   File "/usr/local/lib/python3.10/pickle.py", line 560, in save
>>     f(self, obj)  # Call unbound method with explicit self
>>   File "/usr/local/lib/python3.10/pickle.py", line 887, in save_tuple
>>     save(element)
>>   File "/usr/local/lib/python3.10/pickle.py", line 560, in save
>>     f(self, obj)  # Call unbound method with explicit self
>>   File "/usr/local/lib/python3.10/site-packages/dill/_dill.py", line 1410,
>> in save_function
>>     pickler.save_reduce(_create_function, (obj.__code__,
>>   File "/usr/local/lib/python3.10/pickle.py", line 692, in save_reduce
>>     save(args)
>>   File "/usr/local/lib/python3.10/pickle.py", line 560, in save
>>     f(self, obj)  # Call unbound method with explicit self
>>   File "/usr/local/lib/python3.10/pickle.py", line 902, in save_tuple
>>     save(element)
>>   File "/usr/local/lib/python3.10/pickle.py", line 560, in save
>>     f(self, obj)  # Call unbound method with explicit self
>>   File "/usr/local/lib/python3.10/pickle.py", line 887, in save_tuple
>>     save(element)
>>   File "/usr/local/lib/python3.10/pickle.py", line 560, in save
>>     f(self, obj)  # Call unbound method with explicit self
>>   File "/usr/local/lib/python3.10/site-packages/dill/_dill.py", line 1147,
>> in save_cell
>>     pickler.save_reduce(_create_cell, (f,), obj=obj)
>>   File "/usr/local/lib/python3.10/pickle.py", line 692, in save_reduce
>>     save(args)
>>   File "/usr/local/lib/python3.10/pickle.py", line 560, in save
>>     f(self, obj)  # Call unbound method with explicit self
>>   File "/usr/local/lib/python3.10/pickle.py", line 887, in save_tuple
>>     save(element)
>>   File "/usr/local/lib/python3.10/pickle.py", line 603, in save
>>     self.save_reduce(obj=obj, *rv)
>>   File "/usr/local/lib/python3.10/pickle.py", line 717, in save_reduce
>>     save(state)
>>   File "/usr/local/lib/python3.10/pickle.py", line 560, in save
>>     f(self, obj)  # Call unbound method with explicit self
>>   File
>> "/usr/local/lib/python3.10/site-packages/apache_beam/internal/dill_pickler.py",
>> line 349, in new_save_module_dict
>>     return old_save_module_dict(pickler, obj)
>>   File "/usr/local/lib/python3.10/site-packages/dill/_dill.py", line 912,
>> in save_module_dict
>>     StockPickler.save_dict(pickler, obj)
>>   File "/usr/local/lib/python3.10/pickle.py", line 972, in save_dict
>>     self._batch_setitems(obj.items())
>>   File "/usr/local/lib/python3.10/pickle.py", line 998, in
>> _batch_setitems
>>     save(v)
>>   File "/usr/local/lib/python3.10/pickle.py", line 603, in save
>>     self.save_reduce(obj=obj, *rv)
>>   File "/usr/local/lib/python3.10/pickle.py", line 717, in save_reduce
>>     save(state)
>>   File "/usr/local/lib/python3.10/pickle.py", line 560, in save
>>     f(self, obj)  # Call unbound method with explicit self
>>   File
>> "/usr/local/lib/python3.10/site-packages/apache_beam/internal/dill_pickler.py",
>> line 349, in new_save_module_dict
>>     return old_save_module_dict(pickler, obj)
>>   File "/usr/local/lib/python3.10/site-packages/dill/_dill.py", line 912,
>> in save_module_dict
>>     StockPickler.save_dict(pickler, obj)
>>   File "/usr/local/lib/python3.10/pickle.py", line 972, in save_dict
>>     self._batch_setitems(obj.items())
>>   File "/usr/local/lib/python3.10/pickle.py", line 998, in
>> _batch_setitems
>>     save(v)
>>   File "/usr/local/lib/python3.10/pickle.py", line 603, in save
>>     self.save_reduce(obj=obj, *rv)
>>   File "/usr/local/lib/python3.10/pickle.py", line 717, in save_reduce
>>     save(state)
>>   File "/usr/local/lib/python3.10/pickle.py", line 560, in save
>>     f(self, obj)  # Call unbound method with explicit self
>>   File
>> "/usr/local/lib/python3.10/site-packages/apache_beam/internal/dill_pickler.py",
>> line 349, in new_save_module_dict
>>     return old_save_module_dict(pickler, obj)
>>   File "/usr/local/lib/python3.10/site-packages/dill/_dill.py", line 912,
>> in save_module_dict
>>     StockPickler.save_dict(pickler, obj)
>>   File "/usr/local/lib/python3.10/pickle.py", line 972, in save_dict
>>     self._batch_setitems(obj.items())
>>   File "/usr/local/lib/python3.10/pickle.py", line 998, in
>> _batch_setitems
>>     save(v)
>>   File "/usr/local/lib/python3.10/pickle.py", line 603, in save
>>     self.save_reduce(obj=obj, *rv)
>>   File "/usr/local/lib/python3.10/pickle.py", line 717, in save_reduce
>>     save(state)
>>   File "/usr/local/lib/python3.10/pickle.py", line 560, in save
>>     f(self, obj)  # Call unbound method with explicit self
>>   File
>> "/usr/local/lib/python3.10/site-packages/apache_beam/internal/dill_pickler.py",
>> line 349, in new_save_module_dict
>>     return old_save_module_dict(pickler, obj)
>>   File "/usr/local/lib/python3.10/site-packages/dill/_dill.py", line 912,
>> in save_module_dict
>>     StockPickler.save_dict(pickler, obj)
>>   File "/usr/local/lib/python3.10/pickle.py", line 972, in save_dict
>>     self._batch_setitems(obj.items())
>>   File "/usr/local/lib/python3.10/pickle.py", line 998, in
>> _batch_setitems
>>     save(v)
>>   File "/usr/local/lib/python3.10/pickle.py", line 578, in save
>>     rv = reduce(self.proto)
>>   File "stringsource", line 2, in
>> grpc._cython.cygrpc.Channel.__reduce_cython__
>> TypeError: no default __reduce__ due to non-trivial __cinit__
>>
>> During handling of the above exception, another exception occurred:
>>
>> Traceback (most recent call last):
>>   File "/opt/flink/app/word_len.py", line 212, in <module>
>>     run()
>>   File "/opt/flink/app/word_len.py", line 184, in run
>>     p
>>   File
>> "/usr/local/lib/python3.10/site-packages/apache_beam/transforms/ptransform.py",
>> line 1110, in __ror__
>>     return self.transform.__ror__(pvalueish, self.label)
>>   File
>> "/usr/local/lib/python3.10/site-packages/apache_beam/transforms/ptransform.py",
>> line 623, in __ror__
>>     result = p.apply(self, pvalueish, label)
>>   File "/usr/local/lib/python3.10/site-packages/apache_beam/pipeline.py",
>> line 679, in apply
>>     return self.apply(transform, pvalueish)
>>   File "/usr/local/lib/python3.10/site-packages/apache_beam/pipeline.py",
>> line 732, in apply
>>     pvalueish_result = self.runner.apply(transform, pvalueish,
>> self._options)
>>   File
>> "/usr/local/lib/python3.10/site-packages/apache_beam/runners/runner.py",
>> line 203, in apply
>>     return self.apply_PTransform(transform, input, options)
>>   File
>> "/usr/local/lib/python3.10/site-packages/apache_beam/runners/runner.py",
>> line 207, in apply_PTransform
>>     return transform.expand(input)
>>   File "/opt/flink/app/word_len.py", line 94, in expand
>>     | "DecodeMessage" >> beam.Map(decode_message)
>>   File
>> "/usr/local/lib/python3.10/site-packages/apache_beam/transforms/core.py",
>> line 1994, in Map
>>     pardo = FlatMap(wrapper, *args, **kwargs)
>>   File
>> "/usr/local/lib/python3.10/site-packages/apache_beam/transforms/core.py",
>> line 1937, in FlatMap
>>     pardo = ParDo(CallableWrapperDoFn(fn), *args, **kwargs)
>>   File
>> "/usr/local/lib/python3.10/site-packages/apache_beam/transforms/core.py",
>> line 1473, in __init__
>>     super().__init__(fn, *args, **kwargs)
>>   File
>> "/usr/local/lib/python3.10/site-packages/apache_beam/transforms/ptransform.py",
>> line 870, in __init__
>>     self.fn = pickler.loads(pickler.dumps(self.fn))
>>   File
>> "/usr/local/lib/python3.10/site-packages/apache_beam/internal/pickler.py",
>> line 44, in dumps
>>     return desired_pickle_lib.dumps(
>>   File
>> "/usr/local/lib/python3.10/site-packages/apache_beam/internal/dill_pickler.py",
>> line 383, in dumps
>>     s = dill.dumps(o, byref=settings['dill_byref'])
>>   File "/usr/local/lib/python3.10/site-packages/dill/_dill.py", line 265,
>> in dumps
>>     dump(obj, file, protocol, byref, fmode, recurse, **kwds)#, strictio)
>>   File "/usr/local/lib/python3.10/site-packages/dill/_dill.py", line 259,
>> in dump
>>     Pickler(file, protocol, **_kwds).dump(obj)
>>   File "/usr/local/lib/python3.10/site-packages/dill/_dill.py", line 445,
>> in dump
>>     StockPickler.dump(self, obj)
>>   File "/usr/local/lib/python3.10/pickle.py", line 487, in dump
>>     self.save(obj)
>>   File "/usr/local/lib/python3.10/pickle.py", line 603, in save
>>     self.save_reduce(obj=obj, *rv)
>>   File "/usr/local/lib/python3.10/pickle.py", line 717, in save_reduce
>>     save(state)
>>   File "/usr/local/lib/python3.10/pickle.py", line 560, in save
>>     f(self, obj)  # Call unbound method with explicit self
>>   File
>> "/usr/local/lib/python3.10/site-packages/apache_beam/internal/dill_pickler.py",
>> line 349, in new_save_module_dict
>>     return old_save_module_dict(pickler, obj)
>>   File "/usr/local/lib/python3.10/site-packages/dill/_dill.py", line 912,
>> in save_module_dict
>>     StockPickler.save_dict(pickler, obj)
>>   File "/usr/local/lib/python3.10/pickle.py", line 972, in save_dict
>>     self._batch_setitems(obj.items())
>>   File "/usr/local/lib/python3.10/pickle.py", line 998, in
>> _batch_setitems
>>     save(v)
>>   File "/usr/local/lib/python3.10/pickle.py", line 560, in save
>>     f(self, obj)  # Call unbound method with explicit self
>>   File "/usr/local/lib/python3.10/site-packages/dill/_dill.py", line 1410,
>> in save_function
>>     pickler.save_reduce(_create_function, (obj.__code__,
>>   File "/usr/local/lib/python3.10/pickle.py", line 692, in save_reduce
>>     save(args)
>>   File "/usr/local/lib/python3.10/pickle.py", line 560, in save
>>     f(self, obj)  # Call unbound method with explicit self
>>   File "/usr/local/lib/python3.10/pickle.py", line 902, in save_tuple
>>     save(element)
>>   File "/usr/local/lib/python3.10/pickle.py", line 560, in save
>>     f(self, obj)  # Call unbound method with explicit self
>>   File "/usr/local/lib/python3.10/pickle.py", line 887, in save_tuple
>>     save(element)
>>   File "/usr/local/lib/python3.10/pickle.py", line 560, in save
>>     f(self, obj)  # Call unbound method with explicit self
>>   File "/usr/local/lib/python3.10/site-packages/dill/_dill.py", line 1147,
>> in save_cell
>>     pickler.save_reduce(_create_cell, (f,), obj=obj)
>>   File "/usr/local/lib/python3.10/pickle.py", line 692, in save_reduce
>>     save(args)
>>   File "/usr/local/lib/python3.10/pickle.py", line 560, in save
>>     f(self, obj)  # Call unbound method with explicit self
>>   File "/usr/local/lib/python3.10/pickle.py", line 887, in save_tuple
>>     save(element)
>>   File "/usr/local/lib/python3.10/pickle.py", line 560, in save
>>     f(self, obj)  # Call unbound method with explicit self
>>   File "/usr/local/lib/python3.10/site-packages/dill/_dill.py", line 1410,
>> in save_function
>>     pickler.save_reduce(_create_function, (obj.__code__,
>>   File "/usr/local/lib/python3.10/pickle.py", line 692, in save_reduce
>>     save(args)
>>   File "/usr/local/lib/python3.10/pickle.py", line 560, in save
>>     f(self, obj)  # Call unbound method with explicit self
>>   File "/usr/local/lib/python3.10/pickle.py", line 902, in save_tuple
>>     save(element)
>>   File "/usr/local/lib/python3.10/pickle.py", line 560, in save
>>     f(self, obj)  # Call unbound method with explicit self
>>   File "/usr/local/lib/python3.10/pickle.py", line 887, in save_tuple
>>     save(element)
>>   File "/usr/local/lib/python3.10/pickle.py", line 560, in save
>>     f(self, obj)  # Call unbound method with explicit self
>>   File "/usr/local/lib/python3.10/site-packages/dill/_dill.py", line 1147,
>> in save_cell
>>     pickler.save_reduce(_create_cell, (f,), obj=obj)
>>   File "/usr/local/lib/python3.10/pickle.py", line 692, in save_reduce
>>     save(args)
>>   File "/usr/local/lib/python3.10/pickle.py", line 560, in save
>>     f(self, obj)  # Call unbound method with explicit self
>>   File "/usr/local/lib/python3.10/pickle.py", line 887, in save_tuple
>>     save(element)
>>   File "/usr/local/lib/python3.10/pickle.py", line 603, in save
>>     self.save_reduce(obj=obj, *rv)
>>   File "/usr/local/lib/python3.10/pickle.py", line 717, in save_reduce
>>     save(state)
>>   File "/usr/local/lib/python3.10/pickle.py", line 560, in save
>>     f(self, obj)  # Call unbound method with explicit self
>>   File
>> "/usr/local/lib/python3.10/site-packages/apache_beam/internal/dill_pickler.py",
>> line 349, in new_save_module_dict
>>     return old_save_module_dict(pickler, obj)
>>   File "/usr/local/lib/python3.10/site-packages/dill/_dill.py", line 912,
>> in save_module_dict
>>     StockPickler.save_dict(pickler, obj)
>>   File "/usr/local/lib/python3.10/pickle.py", line 972, in save_dict
>>     self._batch_setitems(obj.items())
>>   File "/usr/local/lib/python3.10/pickle.py", line 998, in
>> _batch_setitems
>>     save(v)
>>   File "/usr/local/lib/python3.10/pickle.py", line 603, in save
>>     self.save_reduce(obj=obj, *rv)
>>   File "/usr/local/lib/python3.10/pickle.py", line 717, in save_reduce
>>     save(state)
>>   File "/usr/local/lib/python3.10/pickle.py", line 560, in save
>>     f(self, obj)  # Call unbound method with explicit self
>>   File
>> "/usr/local/lib/python3.10/site-packages/apache_beam/internal/dill_pickler.py",
>> line 349, in new_save_module_dict
>>     return old_save_module_dict(pickler, obj)
>>   File "/usr/local/lib/python3.10/site-packages/dill/_dill.py", line 912,
>> in save_module_dict
>>     StockPickler.save_dict(pickler, obj)
>>   File "/usr/local/lib/python3.10/pickle.py", line 972, in save_dict
>>     self._batch_setitems(obj.items())
>>   File "/usr/local/lib/python3.10/pickle.py", line 998, in
>> _batch_setitems
>>     save(v)
>>   File "/usr/local/lib/python3.10/pickle.py", line 603, in save
>>     self.save_reduce(obj=obj, *rv)
>>   File "/usr/local/lib/python3.10/pickle.py", line 717, in save_reduce
>>     save(state)
>>   File "/usr/local/lib/python3.10/pickle.py", line 560, in save
>>     f(self, obj)  # Call unbound method with explicit self
>>   File
>> "/usr/local/lib/python3.10/site-packages/apache_beam/internal/dill_pickler.py",
>> line 349, in new_save_module_dict
>>     return old_save_module_dict(pickler, obj)
>>   File "/usr/local/lib/python3.10/site-packages/dill/_dill.py", line 912,
>> in save_module_dict
>>     StockPickler.save_dict(pickler, obj)
>>   File "/usr/local/lib/python3.10/pickle.py", line 972, in save_dict
>>     self._batch_setitems(obj.items())
>>   File "/usr/local/lib/python3.10/pickle.py", line 998, in
>> _batch_setitems
>>     save(v)
>>   File "/usr/local/lib/python3.10/pickle.py", line 603, in save
>>     self.save_reduce(obj=obj, *rv)
>>   File "/usr/local/lib/python3.10/pickle.py", line 717, in save_reduce
>>     save(state)
>>   File "/usr/local/lib/python3.10/pickle.py", line 560, in save
>>     f(self, obj)  # Call unbound method with explicit self
>>   File
>> "/usr/local/lib/python3.10/site-packages/apache_beam/internal/dill_pickler.py",
>> line 349, in new_save_module_dict
>>     return old_save_module_dict(pickler, obj)
>>   File "/usr/local/lib/python3.10/site-packages/dill/_dill.py", line 912,
>> in save_module_dict
>>     StockPickler.save_dict(pickler, obj)
>>   File "/usr/local/lib/python3.10/pickle.py", line 972, in save_dict
>>     self._batch_setitems(obj.items())
>>   File "/usr/local/lib/python3.10/pickle.py", line 998, in
>> _batch_setitems
>>     save(v)
>>   File "/usr/local/lib/python3.10/pickle.py", line 578, in save
>>     rv = reduce(self.proto)
>>   File "stringsource", line 2, in
>> grpc._cython.cygrpc.Channel.__reduce_cython__
>> TypeError: no default __reduce__ due to non-trivial __cinit__
>>
>>         at
>> org.apache.beam.runners.flink.FlinkPortableClientEntryPoint.runDriverProgram
>> (FlinkPortableClientEntryPoint.java:192) ~[?:?]
>>         at
>> org.apache.beam.runners.flink.FlinkPortableClientEntryPoint.main(
>> FlinkPortableClientEntryPoint.java:100) ~[?:?]
>>         at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
>> Method) ~[?:?]
>>         at jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown
>> Source) ~[?:?]
>>         at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown
>> Source) ~[?:?]
>>         at java.lang.reflect.Method.invoke(Unknown Source) ~[?:?]
>>         at org.apache.flink.client.program.PackagedProgram.callMainMethod
>> (PackagedProgram.java:355) ~[flink-dist-1.17.2.jar:1.17.2]
>>         at
>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution
>> (PackagedProgram.java:222) ~[flink-dist-1.17.2.jar:1.17.2]
>>         at org.apache.flink.client.ClientUtils.executeProgram(
>> ClientUtils.java:105) ~[flink-dist-1.17.2.jar:1.17.2]
>>         at
>> org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint
>> (ApplicationDispatcherBootstrap.java:301) ~[flink-dist-1.17.2.jar:1.17.2]
>>         ... 13 more
>> Caused by: java.util.concurrent.TimeoutException: Timeout of 30 seconds
>> waiting for job submission.
>>         at org.apache.beam.runners.flink.FlinkPortableClientEntryPoint$
>> DetachedJobInvokerFactory.executeDetachedJob(
>> FlinkPortableClientEntryPoint.java:256) ~[?:?]
>>         at org.apache.beam.runners.flink.FlinkPortableClientEntryPoint$
>> DetachedJobInvokerFactory.access$300(FlinkPortableClientEntryPoint.java:
>> 206) ~[?:?]
>>         at
>> org.apache.beam.runners.flink.FlinkPortableClientEntryPoint.runDriverProgram
>> (FlinkPortableClientEntryPoint.java:180) ~[?:?]
>>         at
>> org.apache.beam.runners.flink.FlinkPortableClientEntryPoint.main(
>> FlinkPortableClientEntryPoint.java:100) ~[?:?]
>>         at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
>> Method) ~[?:?]
>>         at jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown
>> Source) ~[?:?]
>>         at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown
>> Source) ~[?:?]
>>         at java.lang.reflect.Method.invoke(Unknown Source) ~[?:?]
>>         at org.apache.flink.client.program.PackagedProgram.callMainMethod
>> (PackagedProgram.java:355) ~[flink-dist-1.17.2.jar:1.17.2]
>>         at
>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution
>> (PackagedProgram.java:222) ~[flink-dist-1.17.2.jar:1.17.2]
>>         at org.apache.flink.client.ClientUtils.executeProgram(
>> ClientUtils.java:105) ~[flink-dist-1.17.2.jar:1.17.2]
>>         at
>> org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint
>> (ApplicationDispatcherBootstrap.java:301) ~[flink-dist-1.17.2.jar:1.17.2]
>>         ... 13 more
>> 2024-05-10 13:11:41,217 ERROR
>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint        [] - Fatal
>> error occurred in the cluster entrypoint.
>> java.util.concurrent.CompletionException:
>> org.apache.flink.client.deployment.application.ApplicationExecutionException:
>> Could not execute application.
>>         at java.util.concurrent.CompletableFuture.encodeThrowable(Unknown
>> Source) ~[?:?]
>>         at java.util.concurrent.CompletableFuture.completeThrowable(Unknown
>> Source) ~[?:?]
>>         at java.util.concurrent.CompletableFuture$UniCompose.tryFire(Unknown
>> Source) ~[?:?]
>>         at java.util.concurrent.CompletableFuture.postComplete(Unknown
>> Source) ~[?:?]
>>         at 
>> java.util.concurrent.CompletableFuture.completeExceptionally(Unknown
>> Source) ~[?:?]
>>         at
>> org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint
>> (ApplicationDispatcherBootstrap.java:337) ~[flink-dist-1.17.2.jar:1.17.2]
>>         at
>> org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.lambda
>> $runApplicationAsync$2(ApplicationDispatcherBootstrap.java:254) ~[
>> flink-dist-1.17.2.jar:1.17.2]
>>         at java.util.concurrent.Executors$RunnableAdapter.call(Unknown
>> Source) ~[?:?]
>>         at java.util.concurrent.FutureTask.run(Unknown Source) ~[?:?]
>>         at
>> org.apache.flink.runtime.concurrent.akka.ActorSystemScheduledExecutorAdapter
>> $ScheduledFutureTask.run(ActorSystemScheduledExecutorAdapter.java:171) ~[
>> flink-rpc-akka_974f9cb1-cb21-497a-9825-4e660c5425bc.jar:1.17.2]
>>         at
>> org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader
>> (ClassLoadingUtils.java:68) ~[
>> flink-rpc-akka_974f9cb1-cb21-497a-9825-4e660c5425bc.jar:1.17.2]
>>         at
>> org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda
>> $withContextClassLoader$0(ClassLoadingUtils.java:41) ~[
>> flink-rpc-akka_974f9cb1-cb21-497a-9825-4e660c5425bc.jar:1.17.2]
>>         at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:49)
>> [flink-rpc-akka_974f9cb1-cb21-497a-9825-4e660c5425bc.jar:1.17.2]
>>         at akka.dispatch.ForkJoinExecutorConfigurator$
>> AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:48) [
>> flink-rpc-akka_974f9cb1-cb21-497a-9825-4e660c5425bc.jar:1.17.2]
>>         at java.util.concurrent.ForkJoinTask.doExec(Unknown Source) [?:?]
>>         at java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(Unknown
>> Source) [?:?]
>>         at java.util.concurrent.ForkJoinPool.scan(Unknown Source) [?:?]
>>         at java.util.concurrent.ForkJoinPool.runWorker(Unknown Source)
>> [?:?]
>>         at java.util.concurrent.ForkJoinWorkerThread.run(Unknown Source)
>> [?:?]
>> Caused by:
>> org.apache.flink.client.deployment.application.ApplicationExecutionException:
>> Could not execute application.
>>         ... 14 more
>> Caused by: org.apache.flink.client.program.ProgramInvocationException:
>> The main method caused an error: Job python /opt/flink/app/word_len.py
>> --deploy failed.
>>         at org.apache.flink.client.program.PackagedProgram.callMainMethod
>> (PackagedProgram.java:372) ~[flink-dist-1.17.2.jar:1.17.2]
>>         at
>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution
>> (PackagedProgram.java:222) ~[flink-dist-1.17.2.jar:1.17.2]
>>         at org.apache.flink.client.ClientUtils.executeProgram(
>> ClientUtils.java:105) ~[flink-dist-1.17.2.jar:1.17.2]
>>         at
>> org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint
>> (ApplicationDispatcherBootstrap.java:301) ~[flink-dist-1.17.2.jar:1.17.2]
>>         ... 13 more
>> Caused by: java.lang.RuntimeException: Job python /opt/flink/app/
>> word_len.py --deploy failed.
>>         at
>> org.apache.beam.runners.flink.FlinkPortableClientEntryPoint.main(
>> FlinkPortableClientEntryPoint.java:102) ~[?:?]
>>         at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
>> Method) ~[?:?]
>>         at jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown
>> Source) ~[?:?]
>>         at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown
>> Source) ~[?:?]
>>         at java.lang.reflect.Method.invoke(Unknown Source) ~[?:?]
>>         at org.apache.flink.client.program.PackagedProgram.callMainMethod
>> (PackagedProgram.java:355) ~[flink-dist-1.17.2.jar:1.17.2]
>>         at
>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution
>> (PackagedProgram.java:222) ~[flink-dist-1.17.2.jar:1.17.2]
>>         at org.apache.flink.client.ClientUtils.executeProgram(
>> ClientUtils.java:105) ~[flink-dist-1.17.2.jar:1.17.2]
>>         at
>> org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint
>> (ApplicationDispatcherBootstrap.java:301) ~[flink-dist-1.17.2.jar:1.17.2]
>>         ... 13 more
>> Caused by: java.lang.RuntimeException: Failed to start job with driver
>> program: bash [-c, python /opt/flink/app/word_len.py --deploy
>> --job_endpoint=localhost:37015] output: WARNING:root:Waiting for grpc
>> channel to be ready at localhost:32895.
>>
>> Can you please inform me how to fix this issue?
>>
>> Cheers,
>> Jaehyeon
>>
>

Reply via email to