Hi Luan,

Your error is because the version of the Python interpreter used by your
client and the cluster side is inconsistent. The default protocol version
used by pickle before python 3.8 is 4, and after python 3.8 it is 5. If the
two version do not match, an error will be reported as shown in your error
message `return cloudpickle.loads(payload) in the error message
TypeError: an integer is required (got type bytes)`. I have previously
created JIRA https://issues.apache.org/jira/browse/FLINK-22517 to
illustrate this issue. At present, if you want to solve this problem, the
best way is to keep the version of the python interpreter on the client
side and the cluster side the same.

Best,
Xingbo

Luan Cooper <gc.su...@gmail.com> 于2022年4月6日周三 14:26写道:

> actually I had build/compile
> - pyarrow==2.0.0 (test skipped)
> - apache-beam==2.27.0 (test skipped)
> on python 3.9, and test with example python jobs( bin/flink run
> -pyclientexec python3.7 -pyexec python3.9 -py
> examples/python/table/word_count.py )
> but got exceptions following
>
> Caused by: java.util.concurrent.ExecutionException:
> java.lang.RuntimeException: Error received from SDK harness for instruction
> 1: Traceback (most recent call last):
>   File
>
> "/usr/local/lib/python3.9/dist-packages/apache_beam/runners/worker/sdk_worker.py",
> line 289, in _execute
>     response = task()
>   File
>
> "/usr/local/lib/python3.9/dist-packages/apache_beam/runners/worker/sdk_worker.py",
> line 362, in <lambda>
>     lambda: self.create_worker().do_instruction(request), request)
>   File
>
> "/usr/local/lib/python3.9/dist-packages/apache_beam/runners/worker/sdk_worker.py",
> line 606, in do_instruction
>     return getattr(self, request_type)(
>   File
>
> "/usr/local/lib/python3.9/dist-packages/apache_beam/runners/worker/sdk_worker.py",
> line 637, in process_bundle
>     bundle_processor = self.bundle_processor_cache.get(
>   File
>
> "/usr/local/lib/python3.9/dist-packages/apache_beam/runners/worker/sdk_worker.py",
> line 463, in get
>     processor = bundle_processor.BundleProcessor(
>   File
>
> "/usr/local/lib/python3.9/dist-packages/apache_beam/runners/worker/bundle_processor.py",
> line 868, in __init__
>     self.ops = self.create_execution_tree(self.process_bundle_descriptor)
>   File
>
> "/usr/local/lib/python3.9/dist-packages/apache_beam/runners/worker/bundle_processor.py",
> line 921, in create_execution_tree
>     return collections.OrderedDict([(
>   File
>
> "/usr/local/lib/python3.9/dist-packages/apache_beam/runners/worker/bundle_processor.py",
> line 924, in <listcomp>
>     get_operation(transform_id))) for transform_id in sorted(
>   File
>
> "/usr/local/lib/python3.9/dist-packages/apache_beam/runners/worker/bundle_processor.py",
> line 812, in wrapper
>     result = cache[args] = func(*args)
>   File
>
> "/usr/local/lib/python3.9/dist-packages/apache_beam/runners/worker/bundle_processor.py",
> line 903, in get_operation
>     transform_consumers = {
>   File
>
> "/usr/local/lib/python3.9/dist-packages/apache_beam/runners/worker/bundle_processor.py",
> line 904, in <dictcomp>
>     tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]
>   File
>
> "/usr/local/lib/python3.9/dist-packages/apache_beam/runners/worker/bundle_processor.py",
> line 904, in <listcomp>
>     tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]
>   File
>
> "/usr/local/lib/python3.9/dist-packages/apache_beam/runners/worker/bundle_processor.py",
> line 812, in wrapper
>     result = cache[args] = func(*args)
>   File
>
> "/usr/local/lib/python3.9/dist-packages/apache_beam/runners/worker/bundle_processor.py",
> line 908, in get_operation
>     return transform_factory.create_operation(
>   File
>
> "/usr/local/lib/python3.9/dist-packages/apache_beam/runners/worker/bundle_processor.py",
> line 1198, in create_operation
>     return creator(self, transform_id, transform_proto, payload, consumers)
>   File
>
> "/usr/local/lib/python3.9/dist-packages/pyflink/fn_execution/beam/beam_operations.py",
> line 55, in create_table_function
>     return _create_user_defined_function_operation(
>   File
>
> "/usr/local/lib/python3.9/dist-packages/pyflink/fn_execution/beam/beam_operations.py",
> line 199, in _create_user_defined_function_operation
>     return beam_operation_cls(
>   File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 198, in
>
> pyflink.fn_execution.beam.beam_operations_fast.StatelessFunctionOperation.__init__
>   File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 129, in
> pyflink.fn_execution.beam.beam_operations_fast.FunctionOperation.__init__
>   File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 202, in
>
> pyflink.fn_execution.beam.beam_operations_fast.StatelessFunctionOperation.generate_operation
>   File
>
> "/usr/local/lib/python3.9/dist-packages/pyflink/fn_execution/table/operations.py",
> line 124, in __init__
>     super(TableFunctionOperation, self).__init__(spec)
>   File
>
> "/usr/local/lib/python3.9/dist-packages/pyflink/fn_execution/table/operations.py",
> line 81, in __init__
>     self.func, self.user_defined_funcs =
> self.generate_func(self.spec.serialized_fn)
>   File
>
> "/usr/local/lib/python3.9/dist-packages/pyflink/fn_execution/table/operations.py",
> line 134, in generate_func
>     operation_utils.extract_user_defined_function(serialized_fn.udfs[0])
>   File
>
> "/usr/local/lib/python3.9/dist-packages/pyflink/fn_execution/utils/operation_utils.py",
> line 137, in extract_user_defined_function
>     user_defined_func = pickle.loads(user_defined_function_proto.payload)
>   File
> "/usr/local/lib/python3.9/dist-packages/pyflink/fn_execution/pickle.py",
> line 29, in loads
>     return cloudpickle.loads(payload)
> TypeError: an integer is required (got type bytes)
>
>         at
>
> java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395)
>         at
>
> java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1999)
>         at org.apache.beam.sdk.util.MoreFutures.get(MoreFutures.java:60)
>         at
>
> org.apache.beam.runners.fnexecution.control.SdkHarnessClient$BundleProcessor$ActiveBundle.close(SdkHarnessClient.java:504)
>       at
>
> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory$1.close(DefaultJobBundleFactory.java:555)
>         at
>
> org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.finishBundle(BeamPythonFunctionRunner.java:375)
>         ... 7 more
> Caused by: java.lang.RuntimeException: Error received from SDK harness for
> instruction 1: Traceback (most recent call last):
>   File
>
> "/usr/local/lib/python3.9/dist-packages/apache_beam/runners/worker/sdk_worker.py",
> line 289, in _execute
>     response = task()
>   File
>
> "/usr/local/lib/python3.9/dist-packages/apache_beam/runners/worker/sdk_worker.py",
> line 362, in <lambda>
>     lambda: self.create_worker().do_instruction(request), request)
>   File
>
> "/usr/local/lib/python3.9/dist-packages/apache_beam/runners/worker/sdk_worker.py",
> line 606, in do_instruction
>     return getattr(self, request_type)(
>   File
>
> "/usr/local/lib/python3.9/dist-packages/apache_beam/runners/worker/sdk_worker.py",
> line 637, in process_bundle
>     bundle_processor = self.bundle_processor_cache.get(
>   File
>
> "/usr/local/lib/python3.9/dist-packages/apache_beam/runners/worker/sdk_worker.py",
> line 463, in get
>     processor = bundle_processor.BundleProcessor(
>   File
>
> "/usr/local/lib/python3.9/dist-packages/apache_beam/runners/worker/bundle_processor.py",
> line 868, in __init__
>     self.ops = self.create_execution_tree(self.process_bundle_descriptor)
>   File
>
> "/usr/local/lib/python3.9/dist-packages/apache_beam/runners/worker/bundle_processor.py",
> line 921, in create_execution_tree
>     return collections.OrderedDict([(
>   File
>
> "/usr/local/lib/python3.9/dist-packages/apache_beam/runners/worker/bundle_processor.py",
> line 924, in <listcomp>
>     get_operation(transform_id))) for transform_id in sorted(
>   File
>
> "/usr/local/lib/python3.9/dist-packages/apache_beam/runners/worker/bundle_processor.py",
> line 812, in wrapper
>     result = cache[args] = func(*args)
>   File
>
> "/usr/local/lib/python3.9/dist-packages/apache_beam/runners/worker/bundle_processor.py",
> line 903, in get_operation
>     transform_consumers = {
>   File
>
> "/usr/local/lib/python3.9/dist-packages/apache_beam/runners/worker/bundle_processor.py",
> line 904, in <dictcomp>
>     tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]
>   File
>
> "/usr/local/lib/python3.9/dist-packages/apache_beam/runners/worker/bundle_processor.py",
> line 904, in <listcomp>
>     tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]
>   File
>
> "/usr/local/lib/python3.9/dist-packages/apache_beam/runners/worker/bundle_processor.py",
> line 812, in wrapper
>     result = cache[args] = func(*args)
>   File
>
> "/usr/local/lib/python3.9/dist-packages/apache_beam/runners/worker/bundle_processor.py",
> line 908, in get_operation
>     return transform_factory.create_operation(
>   File
>
> "/usr/local/lib/python3.9/dist-packages/apache_beam/runners/worker/bundle_processor.py",
> line 1198, in create_operation
>     return creator(self, transform_id, transform_proto, payload, consumers)
>   File
>
> "/usr/local/lib/python3.9/dist-packages/pyflink/fn_execution/beam/beam_operations.py",
> line 55, in create_table_function
>     return _create_user_defined_function_operation(
>   File
>
> "/usr/local/lib/python3.9/dist-packages/pyflink/fn_execution/beam/beam_operations.py",
> line 199, in _create_user_defined_function_operation
>     return beam_operation_cls(
>   File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 198, in
>
> pyflink.fn_execution.beam.beam_operations_fast.StatelessFunctionOperation.__init__
>   File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 129, in
> pyflink.fn_execution.beam.beam_operations_fast.FunctionOperation.__init__
>   File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 202, in
>
> pyflink.fn_execution.beam.beam_operations_fast.StatelessFunctionOperation.generate_operation
>   File
>
> "/usr/local/lib/python3.9/dist-packages/pyflink/fn_execution/table/operations.py",
> line 124, in __init__
>     super(TableFunctionOperation, self).__init__(spec)
>   File
>
> "/usr/local/lib/python3.9/dist-packages/pyflink/fn_execution/table/operations.py",
> line 81, in __init__
>     self.func, self.user_defined_funcs =
> self.generate_func(self.spec.serialized_fn)
>   File
>
> "/usr/local/lib/python3.9/dist-packages/pyflink/fn_execution/table/operations.py",
> line 134, in generate_func
>     operation_utils.extract_user_defined_function(serialized_fn.udfs[0])
>   File
>
> "/usr/local/lib/python3.9/dist-packages/pyflink/fn_execution/utils/operation_utils.py",
> line 137, in extract_user_defined_function
>     user_defined_func = pickle.loads(user_defined_function_proto.payload)
>   File
> "/usr/local/lib/python3.9/dist-packages/pyflink/fn_execution/pickle.py",
> line 29, in loads
>     return cloudpickle.loads(payload)
> TypeError: an integer is required (got type bytes)
>
>         at
>
> org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:180)
>         at
>
> org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:160)
>         at
>
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onMessage(ServerCalls.java:251)
>         at
>
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ForwardingServerCallListener.onMessage(ForwardingServerCallListener.java:33)
>         at
>
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.Contexts$ContextualizedServerCallListener.onMessage(Contexts.java:76)
>         at
>
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailableInternal(ServerCallImpl.java:309)
>         at
>
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailable(ServerCallImpl.java:292)
>         at
>
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1MessagesAvailable.runInContext(ServerImpl.java:782)
>         at
>
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
>         at
>
> org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
>         ... 3 more
>
> 08:14:10.118 [main] ERROR org.apache.flink.client.python.PythonDriver - Run
> python process failed
> java.lang.RuntimeException: Python process exits with code: 1
>         at
> org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:130)
> ~[flink-python_2.11-1.14.3.jar:1.14.3]
>         at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
> Method) ~[?:?]
>         at
>
> jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> ~[?:?]
>         at
>
> jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> ~[?:?]
>         at java.lang.reflect.Method.invoke(Method.java:566) ~[?:?]
>         at
>
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
> ~[flink-dist_2.11-1.14.3.jar:1.14.3]
>         at
>
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
> ~[flink-dist_2.11-1.14.3.jar:1.14.3]
>         at
> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
> ~[flink-dist_2.11-1.14.3.jar:1.14.3]
>         at
>
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812)
> ~[flink-dist_2.11-1.14.3.jar:1.14.3]
>         at
> org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246)
> ~[flink-dist_2.11-1.14.3.jar:1.14.3]
>         at
> org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)
> ~[flink-dist_2.11-1.14.3.jar:1.14.3]
>         at
>
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)
> ~[flink-dist_2.11-1.14.3.jar:1.14.3]
>         at
>
> org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
> [flink-dist_2.11-1.14.3.jar:1.14.3]
>         at
> org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)
> [flink-dist_2.11-1.14.3.jar:1.14.3]
> 08:14:10.129 [main] ERROR org.apache.flink.client.cli.CliFrontend - Fatal
> error while running command line interface.
> org.apache.flink.client.program.ProgramAbortException:
> java.lang.RuntimeException: Python process exits with code: 1
>         at
> org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:140)
> ~[?:?]
>         at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
> Method) ~[?:?]
>         at
>
> jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> ~[?:?]
>         at
>
> jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> ~[?:?]
>         at java.lang.reflect.Method.invoke(Method.java:566) ~[?:?]
>         at
>
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
> ~[flink-dist_2.11-1.14.3.jar:1.14.3]
>         at
>
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
> ~[flink-dist_2.11-1.14.3.jar:1.14.3]
>         at
> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
> ~[flink-dist_2.11-1.14.3.jar:1.14.3]
>         at
>
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812)
> ~[flink-dist_2.11-1.14.3.jar:1.14.3]
>         at
> org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246)
> ~[flink-dist_2.11-1.14.3.jar:1.14.3]
>         at
> org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)
> ~[flink-dist_2.11-1.14.3.jar:1.14.3]
>         at
>
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)
> ~[flink-dist_2.11-1.14.3.jar:1.14.3]
>         at
>
> org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
> ~[flink-dist_2.11-1.14.3.jar:1.14.3]
>         at
> org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)
> [flink-dist_2.11-1.14.3.jar:1.14.3]
> Caused by: java.lang.RuntimeException: Python process exits with code: 1
>         at
> org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:130)
> ~[?:?]
>         ... 13 more
> org.apache.flink.client.program.ProgramAbortException:
> java.lang.RuntimeException: Python process exits with code: 1
>         at
> org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:140)
>         at
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
> Method)
>         at
>
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>         at
>
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.base/java.lang.reflect.Method.invoke(Method.java:566)
>         at
>
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
>         at
>
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
>         at
> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
>         at
>
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812)
>         at
> org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246)
>         at
> org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)
>         at
>
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)
>         at
>
> org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
>         at
> org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)
> Caused by: java.lang.RuntimeException: Python process exits with code: 1
>         at
> org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:130)
>         ... 13 more
>
>
> It seems I should follow https://issues.apache.org/jira/browse/FLINK-25188
> and test on Python 3.9
>
>
> On Wed, Apr 6, 2022 at 10:27 AM Xingbo Huang <hxbks...@gmail.com> wrote:
>
> > Hi Martjin and Luan,
> >
> > As of now, the main reason why PyFlink has not declared to support Python
> > 3.9 is that the dependent apache-beam, and the versions of numpy and
> > pyarrow that apache-beam depends on do not provide corresponding whl
> > packages in Python 3.9. Users need source code installation, but source
> > code installation is really difficult to install successfully, especially
> > pyarrow.
> > If you can successfully install these dependencies on python3.9 through
> > source installation, as far as I know, you can successfully run the
> pyflink
> > job. We are also upgrading the versions of these dependencies[1], and
> then
> > we can easily provide support for python 3.9 and Mac M1.
> >
> > [1] https://issues.apache.org/jira/browse/FLINK-25188
> >
> > Best,
> > Xingbo
> >
> > Martijn Visser <martijnvis...@apache.org> 于2022年4月5日周二 18:50写道:
> >
> > > Hi Luan,
> > >
> > > According to the documentation Python 3.9 is currently indeed not
> > > supported. I briefly checked the Jira tickets and also couldn't find
> one
> > > about adding support for this, so I've created
> > > https://issues.apache.org/jira/browse/FLINK-27058 for that.
> > >
> > > @dian0511...@gmail.com <dian0511...@gmail.com> @hxbks...@gmail.co
> > > <hxbks...@gmail.com>m can you let us know your thoughts on this?
> > > Especially if it's "just" a matter of upgrading dependencies, since we
> > also
> > > have another PyFlink ticket for that because it currently can't compile
> > on
> > > Mac M1 [1]
> > >
> > > Best regards,
> > >
> > > Martijn Visser
> > > https://twitter.com/MartijnVisser82
> > > https://github.com/MartijnVisser
> > >
> > > [1] https://issues.apache.org/jira/browse/FLINK-25188
> > >
> > >
> > > On Tue, 5 Apr 2022 at 11:40, Luan Cooper <gc.su...@gmail.com> wrote:
> > >
> > >> Hi
> > >>
> > >> currently I'll need to run pyflink udf on python 3.9 which is not
> > >> supported
> > >> right now
> > >>
> > >> I tried building
> > >> - pyarrow==2.0.0
> > >> - apache-beam==2.27.0
> > >> on python 3.9 and test python jobs but failed
> > >>
> > >> Is there any discussions/git branch on python 3.9 before? (I didn't
> find
> > >> any in this dev list)
> > >> so I can continue working to fit 3.9
> > >>
> > >
> >
>

Reply via email to