[
https://issues.apache.org/jira/browse/FLINK-38105?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Dian Fu closed FLINK-38105.
---------------------------
Fix Version/s: 2.1.0
Resolution: Fixed
Fixed in:
- master via 352c9d2498f15c8c21a83db6bfad7077e5807800
- release-2.1 via a61a42050d8cd9dbe705bbe72e58f658ac56ea77
> AttributeError: 'pyflink.fn_execution.table.window_aggregate_fast.S' object
> has no attribute 'get_accumulators'
> ---------------------------------------------------------------------------------------------------------------
>
> Key: FLINK-38105
> URL: https://issues.apache.org/jira/browse/FLINK-38105
> Project: Flink
> Issue Type: Bug
> Components: API / Python
> Reporter: Dian Fu
> Assignee: Dian Fu
> Priority: Major
> Labels: pull-request-available
> Fix For: 2.1.0
>
>
> Job failover with the following exception:
> {code}
> Caused by: java.util.concurrent.ExecutionException:
> java.lang.RuntimeException: Error received from SDK harness for instruction
> 1: Traceback (most recent call last):
> File
> "/usr/local/python3/lib/python3.9/site-packages/apache_beam/runners/worker/sdk_worker.py",
> line 295, in _execute
> response = task()
> File
> "/usr/local/python3/lib/python3.9/site-packages/apache_beam/runners/worker/sdk_worker.py",
> line 370, in <lambda>
> lambda: self.create_worker().do_instruction(request), request)
> File
> "/usr/local/python3/lib/python3.9/site-packages/apache_beam/runners/worker/sdk_worker.py",
> line 629, in do_instruction
> return getattr(self, request_type)(
> File
> "/usr/local/python3/lib/python3.9/site-packages/apache_beam/runners/worker/sdk_worker.py",
> line 667, in process_bundle
> bundle_processor.process_bundle(instruction_id))
> File
> "/usr/local/python3/lib/python3.9/site-packages/apache_beam/runners/worker/bundle_processor.py",
> line 1061, in process_bundle
> input_op_by_transform_id[element.transform_id].process_encoded(
> File
> "/usr/local/python3/lib/python3.9/site-packages/apache_beam/runners/worker/bundle_processor.py",
> line 231, in process_encoded
> self.output(decoded_value)
> File "apache_beam/runners/worker/operations.py", line 526, in
> apache_beam.runners.worker.operations.Operation.output
> File "apache_beam/runners/worker/operations.py", line 528, in
> apache_beam.runners.worker.operations.Operation.output
> File "apache_beam/runners/worker/operations.py", line 237, in
> apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
> File "apache_beam/runners/worker/operations.py", line 240, in
> apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
> File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 169, in
> pyflink.fn_execution.beam.beam_operations_fast.FunctionOperation.process
> File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 194, in
> pyflink.fn_execution.beam.beam_operations_fast.FunctionOperation.process
> File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 92, in
> pyflink.fn_execution.beam.beam_operations_fast.NetworkOutputProcessor.process_outputs
> File "pyflink/fn_execution/beam/beam_coder_impl_fast.pyx", line 101, in
> pyflink.fn_execution.beam.beam_coder_impl_fast.FlinkLengthPrefixCoderBeamWrapper.encode_to_stream
> File "pyflink/fn_execution/coder_impl_fast.pyx", line 272, in
> pyflink.fn_execution.coder_impl_fast.IterableCoderImpl.encode_to_stream
> File
> "/usr/local/python3/lib/python3.9/site-packages/pyflink/fn_execution/table/operations.py",
> line 529, in process_element_or_timer
> result_datas = self.group_agg_function.process_element(input_row)
> File "pyflink/fn_execution/table/window_aggregate_fast.pyx", line 374, in
> pyflink.fn_execution.table.window_aggregate_fast.GroupWindowAggFunctionBase.process_element
> File "pyflink/fn_execution/table/window_aggregate_fast.pyx", line 394, in
> pyflink.fn_execution.table.window_aggregate_fast.GroupWindowAggFunctionBase.process_element
> File
> "/usr/local/python3/lib/python3.9/site-packages/pyflink/fn_execution/table/window_process_function.py",
> line 256, in assign_state_namespace
> actual_window = self._add_window(window)
> File
> "/usr/local/python3/lib/python3.9/site-packages/pyflink/fn_execution/table/window_process_function.py",
> line 346, in _add_window
> self._merge(
> File
> "/usr/local/python3/lib/python3.9/site-packages/pyflink/fn_execution/table/window_process_function.py",
> line 378, in _merge
> target_acc = self._window_aggregator.get_accumulators()
> AttributeError: 'pyflink.fn_execution.table.window_aggregate_fast.S' object
> has no attribute 'get_accumulators'
> at
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395)
> ~[?:?]
> at
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2005) ~[?:?]
> at org.apache.beam.sdk.util.MoreFutures.get(MoreFutures.java:61) ~[?:?]
> at
> org.apache.beam.runners.fnexecution.control.SdkHarnessClient$BundleProcessor$ActiveBundle.close(SdkHarnessClient.java:504)
> ~[?:?]
> at
> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory$1.close(DefaultJobBundleFactory.java:607)
> ~[?:?]
> at
> org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.finishBundle(BeamPythonFunctionRunner.java:421)
> ~[?:?]
> at
> org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.flush(BeamPythonFunctionRunner.java:407)
> ~[?:?]
> at
> org.apache.flink.streaming.api.operators.python.process.AbstractExternalPythonFunctionOperator.lambda$invokeFinishBundle$0(AbstractExternalPythonFunctionOperator.java:86)
> ~[?:?]
> at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[?:?]
> at java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[?:?]
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
> ~[?:?]
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
> ~[?:?]
> ... 1 more
> {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)