[ 
https://issues.apache.org/jira/browse/FLINK-38105?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dian Fu updated FLINK-38105:
----------------------------
    Attachment:     (was: image-2025-07-15-11-43-31-134.png)

> AttributeError: 'pyflink.fn_execution.table.window_aggregate_fast.S' object 
> has no attribute 'get_accumulators'
> ---------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-38105
>                 URL: https://issues.apache.org/jira/browse/FLINK-38105
>             Project: Flink
>          Issue Type: Bug
>          Components: API / Python
>            Reporter: Dian Fu
>            Assignee: Dian Fu
>            Priority: Major
>
> Job failover with the following exception:
> {code}
> Caused by: java.util.concurrent.ExecutionException: 
> java.lang.RuntimeException: Error received from SDK harness for instruction 
> 1: Traceback (most recent call last):
>   File 
> "/usr/local/python3/lib/python3.9/site-packages/apache_beam/runners/worker/sdk_worker.py",
>  line 295, in _execute
>     response = task()
>   File 
> "/usr/local/python3/lib/python3.9/site-packages/apache_beam/runners/worker/sdk_worker.py",
>  line 370, in <lambda>
>     lambda: self.create_worker().do_instruction(request), request)
>   File 
> "/usr/local/python3/lib/python3.9/site-packages/apache_beam/runners/worker/sdk_worker.py",
>  line 629, in do_instruction
>     return getattr(self, request_type)(
>   File 
> "/usr/local/python3/lib/python3.9/site-packages/apache_beam/runners/worker/sdk_worker.py",
>  line 667, in process_bundle
>     bundle_processor.process_bundle(instruction_id))
>   File 
> "/usr/local/python3/lib/python3.9/site-packages/apache_beam/runners/worker/bundle_processor.py",
>  line 1061, in process_bundle
>     input_op_by_transform_id[element.transform_id].process_encoded(
>   File 
> "/usr/local/python3/lib/python3.9/site-packages/apache_beam/runners/worker/bundle_processor.py",
>  line 231, in process_encoded
>     self.output(decoded_value)
>   File "apache_beam/runners/worker/operations.py", line 526, in 
> apache_beam.runners.worker.operations.Operation.output
>   File "apache_beam/runners/worker/operations.py", line 528, in 
> apache_beam.runners.worker.operations.Operation.output
>   File "apache_beam/runners/worker/operations.py", line 237, in 
> apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
>   File "apache_beam/runners/worker/operations.py", line 240, in 
> apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
>   File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 169, in 
> pyflink.fn_execution.beam.beam_operations_fast.FunctionOperation.process
>   File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 194, in 
> pyflink.fn_execution.beam.beam_operations_fast.FunctionOperation.process
>   File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 92, in 
> pyflink.fn_execution.beam.beam_operations_fast.NetworkOutputProcessor.process_outputs
>   File "pyflink/fn_execution/beam/beam_coder_impl_fast.pyx", line 101, in 
> pyflink.fn_execution.beam.beam_coder_impl_fast.FlinkLengthPrefixCoderBeamWrapper.encode_to_stream
>   File "pyflink/fn_execution/coder_impl_fast.pyx", line 272, in 
> pyflink.fn_execution.coder_impl_fast.IterableCoderImpl.encode_to_stream
>   File 
> "/usr/local/python3/lib/python3.9/site-packages/pyflink/fn_execution/table/operations.py",
>  line 529, in process_element_or_timer
>     result_datas = self.group_agg_function.process_element(input_row)
>   File "pyflink/fn_execution/table/window_aggregate_fast.pyx", line 374, in 
> pyflink.fn_execution.table.window_aggregate_fast.GroupWindowAggFunctionBase.process_element
>   File "pyflink/fn_execution/table/window_aggregate_fast.pyx", line 394, in 
> pyflink.fn_execution.table.window_aggregate_fast.GroupWindowAggFunctionBase.process_element
>   File 
> "/usr/local/python3/lib/python3.9/site-packages/pyflink/fn_execution/table/window_process_function.py",
>  line 256, in assign_state_namespace
>     actual_window = self._add_window(window)
>   File 
> "/usr/local/python3/lib/python3.9/site-packages/pyflink/fn_execution/table/window_process_function.py",
>  line 346, in _add_window
>     self._merge(
>   File 
> "/usr/local/python3/lib/python3.9/site-packages/pyflink/fn_execution/table/window_process_function.py",
>  line 378, in _merge
>     target_acc = self._window_aggregator.get_accumulators()
> AttributeError: 'pyflink.fn_execution.table.window_aggregate_fast.S' object 
> has no attribute 'get_accumulators'
>       at 
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395) 
> ~[?:?]
>       at 
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2005) ~[?:?]
>       at org.apache.beam.sdk.util.MoreFutures.get(MoreFutures.java:61) ~[?:?]
>       at 
> org.apache.beam.runners.fnexecution.control.SdkHarnessClient$BundleProcessor$ActiveBundle.close(SdkHarnessClient.java:504)
>  ~[?:?]
>       at 
> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory$1.close(DefaultJobBundleFactory.java:607)
>  ~[?:?]
>       at 
> org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.finishBundle(BeamPythonFunctionRunner.java:421)
>  ~[?:?]
>       at 
> org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.flush(BeamPythonFunctionRunner.java:407)
>  ~[?:?]
>       at 
> org.apache.flink.streaming.api.operators.python.process.AbstractExternalPythonFunctionOperator.lambda$invokeFinishBundle$0(AbstractExternalPythonFunctionOperator.java:86)
>  ~[?:?]
>       at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[?:?]
>       at java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[?:?]
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>  ~[?:?]
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>  ~[?:?]
>       ... 1 more
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to