Hi Pawel, could you tell us which version of the Beam Python SDK you are
using?

For the record, this looks like a known issue:
https://issues.apache.org/jira/browse/BEAM-6860

Kyle Weaver | Software Engineer | github.com/ibzib | kcwea...@google.com


On Wed, Sep 11, 2019 at 6:33 AM Paweł Kordek <pawel.kor...@farfetch.com>
wrote:

> Hi
>
> I was developing a simple pipeline where I aggregate records by key and
> sum values for a predefined window. I was getting some errors, and after
> checking, I am getting exactly the same issues when running Wikipedia
> example from the Beam repo. The output is as follows:
> -------------------------------------------
> INFO:root:Missing pipeline option (runner). Executing pipeline using the
> default runner: DirectRunner.
> INFO:root:==================== <function annotate_downstream_side_inputs
> at 0x7f333fc1fe60> ====================
> INFO:root:==================== <function fix_side_input_pcoll_coders at
> 0x7f333fc1ff80> ====================
> INFO:root:==================== <function lift_combiners at 0x7f333fc1d050>
> ====================
> INFO:root:==================== <function expand_sdf at 0x7f333fc1d0e0>
> ====================
> INFO:root:==================== <function expand_gbk at 0x7f333fc1d170>
> ====================
> INFO:root:==================== <function sink_flattens at 0x7f333fc1d290>
> ====================
> INFO:root:==================== <function greedily_fuse at 0x7f333fc1d320>
> ====================
> INFO:root:==================== <function read_to_impulse at
> 0x7f333fc1d3b0> ====================
> INFO:root:==================== <function impulse_to_input at
> 0x7f333fc1d440> ====================
> INFO:root:==================== <function inject_timer_pcollections at
> 0x7f333fc1d5f0> ====================
> INFO:root:==================== <function sort_stages at 0x7f333fc1d680>
> ====================
> INFO:root:==================== <function window_pcollection_coders at
> 0x7f333fc1d710> ====================
> INFO:root:Running
> ((((((ref_AppliedPTransform_ReadFromText/Read_3)+(ref_AppliedPTransform_ComputeTopSessions/ExtractUserAndTimestamp_5))+(ref_AppliedPTransform_ComputeTopSessions/Filter(<lambda
> at
> top_wikipedia_sessions.py:127>)_6))+(ref_AppliedPTransform_ComputeTopSessions/ComputeSessions/ComputeSessionsWindow_8))+(ref_AppliedPTransform_ComputeTopSessions/ComputeSessions/PerElement/PerElement:PairWithVoid_10))+(ComputeTopSessions/ComputeSessions/PerElement/CombinePerKey(CountCombineFn)/Precombine))+(ComputeTopSessions/ComputeSessions/PerElement/CombinePerKey(CountCombineFn)/Group/Write)
> INFO:root:Running
> (((((((ComputeTopSessions/ComputeSessions/PerElement/CombinePerKey(CountCombineFn)/Group/Read)+(ComputeTopSessions/ComputeSessions/PerElement/CombinePerKey(CountCombineFn)/Merge))+(ComputeTopSessions/ComputeSessions/PerElement/CombinePerKey(CountCombineFn)/ExtractOutputs))+(ref_AppliedPTransform_ComputeTopSessions/SessionsToStrings_18))+(ref_AppliedPTransform_ComputeTopSessions/TopPerMonth/TopPerMonthWindow_20))+(ref_AppliedPTransform_ComputeTopSessions/TopPerMonth/Top/KeyWithVoid_22))+(ComputeTopSessions/TopPerMonth/Top/CombinePerKey/Precombine))+(ComputeTopSessions/TopPerMonth/Top/CombinePerKey/Group/Write)
> INFO:root:Running
> (((ref_AppliedPTransform_WriteToText/Write/WriteImpl/DoOnce/Read_36)+(ref_AppliedPTransform_WriteToText/Write/WriteImpl/InitializeWrite_37))+(ref_PCollection_PCollection_19/Write))+(ref_PCollection_PCollection_20/Write)
> INFO:root:Running
> ((((((((ComputeTopSessions/TopPerMonth/Top/CombinePerKey/Group/Read)+(ComputeTopSessions/TopPerMonth/Top/CombinePerKey/Merge))+(ComputeTopSessions/TopPerMonth/Top/CombinePerKey/ExtractOutputs))+(ref_AppliedPTransform_ComputeTopSessions/TopPerMonth/Top/UnKey_30))+(ref_AppliedPTransform_ComputeTopSessions/FormatOutput_31))+(ref_AppliedPTransform_WriteToText/Write/WriteImpl/WriteBundles_38))+(ref_AppliedPTransform_WriteToText/Write/WriteImpl/Pair_39))+(ref_AppliedPTransform_WriteToText/Write/WriteImpl/WindowInto(WindowIntoFn)_40))+(WriteToText/Write/WriteImpl/GroupByKey/Write)
> Traceback (most recent call last):
>   File "apache_beam/runners/common.py", line 829, in
> apache_beam.runners.common.DoFnRunner._invoke_bundle_method
>   File "apache_beam/runners/common.py", line 403, in
> apache_beam.runners.common.DoFnInvoker.invoke_finish_bundle
>   File "apache_beam/runners/common.py", line 406, in
> apache_beam.runners.common.DoFnInvoker.invoke_finish_bundle
>   File "apache_beam/runners/common.py", line 982, in
> apache_beam.runners.common._OutputProcessor.finish_bundle_outputs
>   File "apache_beam/runners/worker/operations.py", line 142, in
> apache_beam.runners.worker.operations.SingletonConsumerSet.receive
>   File "apache_beam/runners/worker/operations.py", line 122, in
> apache_beam.runners.worker.operations.ConsumerSet.update_counters_start
>   File "apache_beam/runners/worker/opcounters.py", line 196, in
> apache_beam.runners.worker.opcounters.OperationCounters.update_from
>   File "apache_beam/runners/worker/opcounters.py", line 214, in
> apache_beam.runners.worker.opcounters.OperationCounters.do_sample
>   File "apache_beam/coders/coder_impl.py", line 1014, in
> apache_beam.coders.coder_impl.WindowedValueCoderImpl.get_estimated_size_and_observables
>   File "apache_beam/coders/coder_impl.py", line 1030, in
> apache_beam.coders.coder_impl.WindowedValueCoderImpl.get_estimated_size_and_observables
>   File "apache_beam/coders/coder_impl.py", line 814, in
> apache_beam.coders.coder_impl.SequenceCoderImpl.estimate_size
>   File "apache_beam/coders/coder_impl.py", line 828, in
> apache_beam.coders.coder_impl.SequenceCoderImpl.get_estimated_size_and_observables
>   File "apache_beam/coders/coder_impl.py", line 145, in
> apache_beam.coders.coder_impl.CoderImpl.get_estimated_size_and_observables
>   File "apache_beam/coders/coder_impl.py", line 494, in
> apache_beam.coders.coder_impl.IntervalWindowCoderImpl.estimate_size
> TypeError: Cannot convert GlobalWindow to
> apache_beam.utils.windowed_value._IntervalWindowBase
>
> During handling of the above exception, another exception occurred:
>
> Traceback (most recent call last):
>   File "top_wikipedia_sessions.py", line 171, in <module>
>     run()
>   File "top_wikipedia_sessions.py", line 166, in run
>     | WriteToText(known_args.output))
>   File
> "/home/pawel/miniconda3/envs/beam/lib/python3.7/site-packages/apache_beam/pipeline.py",
> line 426, in __exit__
>     self.run().wait_until_finish()
>   File
> "/home/pawel/miniconda3/envs/beam/lib/python3.7/site-packages/apache_beam/pipeline.py",
> line 406, in run
>     self._options).run(False)
>   File
> "/home/pawel/miniconda3/envs/beam/lib/python3.7/site-packages/apache_beam/pipeline.py",
> line 419, in run
>     return self.runner.run_pipeline(self, self._options)
>   File
> "/home/pawel/miniconda3/envs/beam/lib/python3.7/site-packages/apache_beam/runners/direct/direct_runner.py",
> line 129, in run_pipeline
>     return runner.run_pipeline(pipeline, options)
>   File
> "/home/pawel/miniconda3/envs/beam/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner.py",
> line 366, in run_pipeline
>     default_environment=self._default_environment))
>   File
> "/home/pawel/miniconda3/envs/beam/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner.py",
> line 373, in run_via_runner_api
>     return self.run_stages(stage_context, stages)
>   File
> "/home/pawel/miniconda3/envs/beam/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner.py",
> line 455, in run_stages
>     stage_context.safe_coders)
>   File
> "/home/pawel/miniconda3/envs/beam/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner.py",
> line 733, in _run_stage
>     result, splits = bundle_manager.process_bundle(data_input, data_output)
>   File
> "/home/pawel/miniconda3/envs/beam/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner.py",
> line 1688, in process_bundle
>     part, expected_outputs), part_inputs):
>   File
> "/home/pawel/miniconda3/envs/beam/lib/python3.7/concurrent/futures/_base.py",
> line 598, in result_iterator
>     yield fs.pop().result()
>   File
> "/home/pawel/miniconda3/envs/beam/lib/python3.7/concurrent/futures/_base.py",
> line 435, in result
>     return self.__get_result()
>   File
> "/home/pawel/miniconda3/envs/beam/lib/python3.7/concurrent/futures/_base.py",
> line 384, in __get_result
>     raise self._exception
>   File
> "/home/pawel/miniconda3/envs/beam/lib/python3.7/concurrent/futures/thread.py",
> line 57, in run
>     result = self.fn(*self.args, **self.kwargs)
>   File
> "/home/pawel/miniconda3/envs/beam/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner.py",
> line 1688, in <lambda>
>     part, expected_outputs), part_inputs):
>   File
> "/home/pawel/miniconda3/envs/beam/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner.py",
> line 1626, in process_bundle
>     result_future =
> self._worker_handler.control_conn.push(process_bundle_req)
>   File
> "/home/pawel/miniconda3/envs/beam/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner.py",
> line 1080, in push
>     response = self.worker.do_instruction(request)
>   File
> "/home/pawel/miniconda3/envs/beam/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
> line 343, in do_instruction
>     request.instruction_id)
>   File
> "/home/pawel/miniconda3/envs/beam/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
> line 369, in process_bundle
>     bundle_processor.process_bundle(instruction_id))
>   File
> "/home/pawel/miniconda3/envs/beam/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
> line 598, in process_bundle
>     op.finish()
>   File "apache_beam/runners/worker/operations.py", line 611, in
> apache_beam.runners.worker.operations.DoOperation.finish
>   File "apache_beam/runners/worker/operations.py", line 612, in
> apache_beam.runners.worker.operations.DoOperation.finish
>   File "apache_beam/runners/worker/operations.py", line 613, in
> apache_beam.runners.worker.operations.DoOperation.finish
>   File "apache_beam/runners/common.py", line 847, in
> apache_beam.runners.common.DoFnRunner.finish
>   File "apache_beam/runners/common.py", line 831, in
> apache_beam.runners.common.DoFnRunner._invoke_bundle_method
>   File "apache_beam/runners/common.py", line 872, in
> apache_beam.runners.common.DoFnRunner._reraise_augmented
>   File
> "/home/pawel/miniconda3/envs/beam/lib/python3.7/site-packages/future/utils/__init__.py",
> line 421, in raise_with_traceback
>     raise exc.with_traceback(traceback)
>   File "apache_beam/runners/common.py", line 829, in
> apache_beam.runners.common.DoFnRunner._invoke_bundle_method
>   File "apache_beam/runners/common.py", line 403, in
> apache_beam.runners.common.DoFnInvoker.invoke_finish_bundle
>   File "apache_beam/runners/common.py", line 406, in
> apache_beam.runners.common.DoFnInvoker.invoke_finish_bundle
>   File "apache_beam/runners/common.py", line 982, in
> apache_beam.runners.common._OutputProcessor.finish_bundle_outputs
>   File "apache_beam/runners/worker/operations.py", line 142, in
> apache_beam.runners.worker.operations.SingletonConsumerSet.receive
>   File "apache_beam/runners/worker/operations.py", line 122, in
> apache_beam.runners.worker.operations.ConsumerSet.update_counters_start
>   File "apache_beam/runners/worker/opcounters.py", line 196, in
> apache_beam.runners.worker.opcounters.OperationCounters.update_from
>   File "apache_beam/runners/worker/opcounters.py", line 214, in
> apache_beam.runners.worker.opcounters.OperationCounters.do_sample
>   File "apache_beam/coders/coder_impl.py", line 1014, in
> apache_beam.coders.coder_impl.WindowedValueCoderImpl.get_estimated_size_and_observables
>   File "apache_beam/coders/coder_impl.py", line 1030, in
> apache_beam.coders.coder_impl.WindowedValueCoderImpl.get_estimated_size_and_observables
>   File "apache_beam/coders/coder_impl.py", line 814, in
> apache_beam.coders.coder_impl.SequenceCoderImpl.estimate_size
>   File "apache_beam/coders/coder_impl.py", line 828, in
> apache_beam.coders.coder_impl.SequenceCoderImpl.get_estimated_size_and_observables
>   File "apache_beam/coders/coder_impl.py", line 145, in
> apache_beam.coders.coder_impl.CoderImpl.get_estimated_size_and_observables
>   File "apache_beam/coders/coder_impl.py", line 494, in
> apache_beam.coders.coder_impl.IntervalWindowCoderImpl.estimate_size
> TypeError: Cannot convert GlobalWindow to
> apache_beam.utils.windowed_value._IntervalWindowBase [while running
> 'WriteToText/Write/WriteImpl/WriteBundles']
> -------------------------------------------------------
>
> To run it I've downloaded a single json with Wiki data and run is as
> follows (running from BEAM_REPO/sdks/python/apache_beam/examples/complete
>
> *python top_wikipedia_sessions.py --input
> /data/wiki/wiki_data-000000000492.json --output /tmp/beam/wiki*
>
> This fails somewhere close to the end, in fact I can find some results
> (possibly complete) in
> */tmp/beam/beam-temp-wiki-5971176ad49411e9b16448ba4ef75ccc/c18aa9d5-221f-4dc3-bc1a-83c101ee54ba.wiki*
>
> I tried to find the exact cause in the code but I don't understand Beam's
> codebase enough, would appreciate some hints/explanations. There is a question
> on SO
> <https://stackoverflow.com/questions/54745869/how-to-create-a-dataflow-pipeline-from-pub-sub-to-gcs-in-python/54791913#54791913>,
> but that doesn't help much in explaining why this error is observed.
>
> Thanks!
> Pawel
>
> This email and any files transmitted with it are confidential and intended
> solely for the use of the individual or entity to whom they are addressed.
> If you have received this email in error please notify the system manager.
> This message contains confidential information and is intended only for the
> individual named. If you are not the named addressee you should not
> disseminate, distribute or copy this e-mail. Please notify the sender
> immediately by e-mail if you have received this e-mail by mistake and
> delete this e-mail from your system. If you are not the intended recipient
> you are notified that disclosing, copying, distributing or taking any
> action in reliance on the contents of this information is strictly
> prohibited.
>

Reply via email to