[ 
https://issues.apache.org/jira/browse/BEAM-7981?focusedWorklogId=322103&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-322103
 ]

ASF GitHub Bot logged work on BEAM-7981:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 02/Oct/19 18:01
            Start Date: 02/Oct/19 18:01
    Worklog Time Spent: 10m 
      Work Description: udim commented on issue #9708: [BEAM-7981] Fix double 
iterable stripping
URL: https://github.com/apache/beam/pull/9708#issuecomment-537611027
 
 
   python :docs failure should be fixed in 
https://github.com/apache/beam/pull/9714
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 322103)
    Time Spent: 20m  (was: 10m)

> ParDo function wrapper doesn't support Iterable output types
> ------------------------------------------------------------
>
>                 Key: BEAM-7981
>                 URL: https://issues.apache.org/jira/browse/BEAM-7981
>             Project: Beam
>          Issue Type: Bug
>          Components: sdk-py-core
>            Reporter: Udi Meiri
>            Assignee: Udi Meiri
>            Priority: Major
>          Time Spent: 20m
>  Remaining Estimate: 0h
>
> I believe the bug is in CallableWrapperDoFn.default_type_hints, which 
> converts Iterable[str] to str.
> This test will be included (commented out) in 
> https://github.com/apache/beam/pull/9283
> {code}
>   def test_typed_callable_iterable_output(self):
>     @typehints.with_input_types(int)
>     @typehints.with_output_types(typehints.Iterable[str])
>     def do_fn(element):
>       return [[str(element)] * 2]
>     result = [1, 2] | beam.ParDo(do_fn)
>     self.assertEqual([['1', '1'], ['2', '2']], sorted(result))
> {code}
> Result:
> {code}
> ======================================================================
> ERROR: test_typed_callable_iterable_output 
> (apache_beam.typehints.typed_pipeline_test.MainInputTest)
> ----------------------------------------------------------------------
> Traceback (most recent call last):
>   File 
> "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/typehints/typed_pipeline_test.py",
>  line 104, in test_typed_callable_iterable_output
>     result = [1, 2] | beam.ParDo(do_fn)
>   File 
> "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/transforms/ptransform.py",
>  line 519, in __ror__
>     p.run().wait_until_finish()
>   File 
> "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/pipeline.py", 
> line 406, in run
>     self._options).run(False)
>   File 
> "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/pipeline.py", 
> line 419, in run
>     return self.runner.run_pipeline(self, self._options)
>   File 
> "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/direct/direct_runner.py",
>  line 129, in run_pipeline
>     return runner.run_pipeline(pipeline, options)
>   File 
> "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/portability/fn_api_runner.py",
>  line 366, in run_pipeline
>     default_environment=self._default_environment))
>   File 
> "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/portability/fn_api_runner.py",
>  line 373, in run_via_runner_api
>     return self.run_stages(stage_context, stages)
>   File 
> "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/portability/fn_api_runner.py",
>  line 455, in run_stages
>     stage_context.safe_coders)
>   File 
> "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/portability/fn_api_runner.py",
>  line 733, in _run_stage
>     result, splits = bundle_manager.process_bundle(data_input, data_output)
>   File 
> "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/portability/fn_api_runner.py",
>  line 1663, in process_bundle
>     part, expected_outputs), part_inputs):
>   File "/usr/lib/python3.7/concurrent/futures/_base.py", line 586, in 
> result_iterator
>     yield fs.pop().result()
>   File "/usr/lib/python3.7/concurrent/futures/_base.py", line 432, in result
>     return self.__get_result()
>   File "/usr/lib/python3.7/concurrent/futures/_base.py", line 384, in 
> __get_result
>     raise self._exception
>   File "/usr/lib/python3.7/concurrent/futures/thread.py", line 57, in run
>     result = self.fn(*self.args, **self.kwargs)
>   File 
> "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/portability/fn_api_runner.py",
>  line 1663, in <lambda>
>     part, expected_outputs), part_inputs):
>   File 
> "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/portability/fn_api_runner.py",
>  line 1601, in process_bundle
>     result_future = self._worker_handler.control_conn.push(process_bundle_req)
>   File 
> "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/portability/fn_api_runner.py",
>  line 1080, in push
>     response = self.worker.do_instruction(request)
>   File 
> "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/worker/sdk_worker.py",
>  line 343, in do_instruction
>     request.instruction_id)
>   File 
> "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/worker/sdk_worker.py",
>  line 369, in process_bundle
>     bundle_processor.process_bundle(instruction_id))
>   File 
> "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/worker/bundle_processor.py",
>  line 593, in process_bundle
>     data.ptransform_id].process_encoded(data.data)
>   File 
> "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/worker/bundle_processor.py",
>  line 143, in process_encoded
>     self.output(decoded_value)
>   File 
> "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/worker/operations.py",
>  line 256, in output
>     cython.cast(Receiver, 
> self.receivers[output_index]).receive(windowed_value)
>   File 
> "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/worker/operations.py",
>  line 143, in receive
>     self.consumer.process(windowed_value)
>   File 
> "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/worker/operations.py",
>  line 435, in process
>     self.output(windowed_value)
>   File 
> "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/worker/operations.py",
>  line 256, in output
>     cython.cast(Receiver, 
> self.receivers[output_index]).receive(windowed_value)
>   File 
> "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/worker/operations.py",
>  line 143, in receive
>     self.consumer.process(windowed_value)
>   File 
> "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/worker/operations.py",
>  line 594, in process
>     delayed_application = self.dofn_receiver.receive(o)
>   File 
> "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/common.py",
>  line 776, in receive
>     self.process(windowed_value)
>   File 
> "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/common.py",
>  line 782, in process
>     self._reraise_augmented(exn)
>   File 
> "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/common.py",
>  line 849, in _reraise_augmented
>     raise_with_traceback(new_exn)
>   File 
> "/usr/local/google/home/ehudm/virtualenvs/beam-py37/lib/python3.7/site-packages/future/utils/__init__.py",
>  line 419, in raise_with_traceback
>     raise exc.with_traceback(traceback)
>   File 
> "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/common.py",
>  line 780, in process
>     return self.do_fn_invoker.invoke_process(windowed_value)
>   File 
> "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/common.py",
>  line 441, in invoke_process
>     windowed_value, self.process_method(windowed_value.value))
>   File 
> "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/common.py",
>  line 919, in process_outputs
>     self.main_receivers.receive(windowed_value)
>   File 
> "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/worker/operations.py",
>  line 142, in receive
>     self.update_counters_start(windowed_value)
>   File 
> "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/worker/operations.py",
>  line 122, in update_counters_start
>     self.opcounter.update_from(windowed_value)
>   File 
> "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/worker/opcounters.py",
>  line 196, in update_from
>     self.do_sample(windowed_value)
>   File 
> "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/worker/opcounters.py",
>  line 214, in do_sample
>     self.coder_impl.get_estimated_size_and_observables(windowed_value))
>   File 
> "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/coders/coder_impl.py",
>  line 1024, in get_estimated_size_and_observables
>     value.value, nested=nested))
>   File 
> "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/coders/coder_impl.py",
>  line 220, in get_estimated_size_and_observables
>     return self.estimate_size(value, nested), []
>   File 
> "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/coders/coder_impl.py",
>  line 212, in estimate_size
>     return self._get_nested_size(self._size_estimator(value), nested)
>   File 
> "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/coders/coders.py",
>  line 135, in estimate_size
>     return len(self.encode(value))
>   File 
> "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/coders/coders.py",
>  line 326, in encode
>     return value.encode('utf-8')
> AttributeError: 'list' object has no attribute 'encode' [while running 
> 'ParDo(CallableWrapperDoFn)']
> -------------------- >> begin captured logging << --------------------
> root: INFO: Generating grammar tables from 
> /usr/lib/python3.7/lib2to3/Grammar.txt
> root: INFO: Generating grammar tables from 
> /usr/lib/python3.7/lib2to3/PatternGrammar.txt
> avro.schema: Level 5: Register new name for 'org.apache.avro.file.Header'
> avro.schema: Level 5: Register new name for 'org.apache.avro.file.magic'
> avro.schema: Level 5: Register new name for 'org.apache.avro.file.sync'
> root: INFO: ==================== <function annotate_downstream_side_inputs at 
> 0x7f3a39918158> ====================
> root: DEBUG: 3 [1, 1, 1]
> root: DEBUG: Stages: ['ref_AppliedPTransform_CreatePInput0/Read_3\n  
> CreatePInput0/Read:beam:transform:read:v1\n  must follow: \n  
> downstream_side_inputs: ', 
> 'ref_AppliedPTransform_ParDo(CallableWrapperDoFn)_4\n  
> ParDo(CallableWrapperDoFn):beam:transform:pardo:v1\n  must follow: \n  
> downstream_side_inputs: ', 'ref_AppliedPTransform__MaterializeValues0_5\n  
> _MaterializeValues0:beam:transform:pardo:v1\n  must follow: \n  
> downstream_side_inputs: ']
> root: INFO: ==================== <function fix_side_input_pcoll_coders at 
> 0x7f3a39918268> ====================
> root: DEBUG: 3 [1, 1, 1]
> root: DEBUG: Stages: ['ref_AppliedPTransform_CreatePInput0/Read_3\n  
> CreatePInput0/Read:beam:transform:read:v1\n  must follow: \n  
> downstream_side_inputs: ', 
> 'ref_AppliedPTransform_ParDo(CallableWrapperDoFn)_4\n  
> ParDo(CallableWrapperDoFn):beam:transform:pardo:v1\n  must follow: \n  
> downstream_side_inputs: ', 'ref_AppliedPTransform__MaterializeValues0_5\n  
> _MaterializeValues0:beam:transform:pardo:v1\n  must follow: \n  
> downstream_side_inputs: ']
> root: INFO: ==================== <function lift_combiners at 0x7f3a399182f0> 
> ====================
> root: DEBUG: 3 [1, 1, 1]
> root: DEBUG: Stages: ['ref_AppliedPTransform_CreatePInput0/Read_3\n  
> CreatePInput0/Read:beam:transform:read:v1\n  must follow: \n  
> downstream_side_inputs: ', 
> 'ref_AppliedPTransform_ParDo(CallableWrapperDoFn)_4\n  
> ParDo(CallableWrapperDoFn):beam:transform:pardo:v1\n  must follow: \n  
> downstream_side_inputs: ', 'ref_AppliedPTransform__MaterializeValues0_5\n  
> _MaterializeValues0:beam:transform:pardo:v1\n  must follow: \n  
> downstream_side_inputs: ']
> root: INFO: ==================== <function expand_sdf at 0x7f3a39918378> 
> ====================
> root: DEBUG: 3 [1, 1, 1]
> root: DEBUG: Stages: ['ref_AppliedPTransform_CreatePInput0/Read_3\n  
> CreatePInput0/Read:beam:transform:read:v1\n  must follow: \n  
> downstream_side_inputs: ', 
> 'ref_AppliedPTransform_ParDo(CallableWrapperDoFn)_4\n  
> ParDo(CallableWrapperDoFn):beam:transform:pardo:v1\n  must follow: \n  
> downstream_side_inputs: ', 'ref_AppliedPTransform__MaterializeValues0_5\n  
> _MaterializeValues0:beam:transform:pardo:v1\n  must follow: \n  
> downstream_side_inputs: ']
> root: INFO: ==================== <function expand_gbk at 0x7f3a39918400> 
> ====================
> root: DEBUG: 3 [1, 1, 1]
> root: DEBUG: Stages: ['ref_AppliedPTransform_CreatePInput0/Read_3\n  
> CreatePInput0/Read:beam:transform:read:v1\n  must follow: \n  
> downstream_side_inputs: ', 
> 'ref_AppliedPTransform_ParDo(CallableWrapperDoFn)_4\n  
> ParDo(CallableWrapperDoFn):beam:transform:pardo:v1\n  must follow: \n  
> downstream_side_inputs: ', 'ref_AppliedPTransform__MaterializeValues0_5\n  
> _MaterializeValues0:beam:transform:pardo:v1\n  must follow: \n  
> downstream_side_inputs: ']
> root: INFO: ==================== <function sink_flattens at 0x7f3a39918510> 
> ====================
> root: DEBUG: 3 [1, 1, 1]
> root: DEBUG: Stages: ['ref_AppliedPTransform_CreatePInput0/Read_3\n  
> CreatePInput0/Read:beam:transform:read:v1\n  must follow: \n  
> downstream_side_inputs: ', 
> 'ref_AppliedPTransform_ParDo(CallableWrapperDoFn)_4\n  
> ParDo(CallableWrapperDoFn):beam:transform:pardo:v1\n  must follow: \n  
> downstream_side_inputs: ', 'ref_AppliedPTransform__MaterializeValues0_5\n  
> _MaterializeValues0:beam:transform:pardo:v1\n  must follow: \n  
> downstream_side_inputs: ']
> root: INFO: ==================== <function greedily_fuse at 0x7f3a39918598> 
> ====================
> root: DEBUG: 1 [3]
> root: DEBUG: Stages: 
> ['((ref_AppliedPTransform_CreatePInput0/Read_3)+(ref_AppliedPTransform_ParDo(CallableWrapperDoFn)_4))+(ref_AppliedPTransform__MaterializeValues0_5)\n
>   
> CreatePInput0/Read:beam:transform:read:v1\nParDo(CallableWrapperDoFn):beam:transform:pardo:v1\n_MaterializeValues0:beam:transform:pardo:v1\n
>   must follow: \n  downstream_side_inputs: ']
> root: INFO: ==================== <function read_to_impulse at 0x7f3a39918620> 
> ====================
> root: DEBUG: 1 [4]
> root: DEBUG: Stages: 
> ['((ref_AppliedPTransform_CreatePInput0/Read_3)+(ref_AppliedPTransform_ParDo(CallableWrapperDoFn)_4))+(ref_AppliedPTransform__MaterializeValues0_5)\n
>   
> ParDo(CallableWrapperDoFn):beam:transform:pardo:v1\n_MaterializeValues0:beam:transform:pardo:v1\nCreatePInput0/Read/Impulse:beam:transform:impulse:v1\nCreatePInput0/Read:beam:transform:read_from_impulse_python:v1\n
>   must follow: \n  downstream_side_inputs: ']
> root: INFO: ==================== <function impulse_to_input at 
> 0x7f3a399186a8> ====================
> root: DEBUG: 1 [4]
> root: DEBUG: Stages: 
> ['((ref_AppliedPTransform_CreatePInput0/Read_3)+(ref_AppliedPTransform_ParDo(CallableWrapperDoFn)_4))+(ref_AppliedPTransform__MaterializeValues0_5)\n
>   
> ParDo(CallableWrapperDoFn):beam:transform:pardo:v1\n_MaterializeValues0:beam:transform:pardo:v1\nCreatePInput0/Read:beam:transform:read_from_impulse_python:v1\nCreatePInput0/Read/Impulse:beam:source:runner:0.1\n
>   must follow: \n  downstream_side_inputs: ']
> root: INFO: ==================== <function inject_timer_pcollections at 
> 0x7f3a39918840> ====================
> root: DEBUG: 1 [4]
> root: DEBUG: Stages: 
> ['((ref_AppliedPTransform_CreatePInput0/Read_3)+(ref_AppliedPTransform_ParDo(CallableWrapperDoFn)_4))+(ref_AppliedPTransform__MaterializeValues0_5)\n
>   
> ParDo(CallableWrapperDoFn):beam:transform:pardo:v1\n_MaterializeValues0:beam:transform:pardo:v1\nCreatePInput0/Read:beam:transform:read_from_impulse_python:v1\nCreatePInput0/Read/Impulse:beam:source:runner:0.1\n
>   must follow: \n  downstream_side_inputs: ']
> root: INFO: ==================== <function sort_stages at 0x7f3a399188c8> 
> ====================
> root: DEBUG: 1 [4]
> root: DEBUG: Stages: 
> ['((ref_AppliedPTransform_CreatePInput0/Read_3)+(ref_AppliedPTransform_ParDo(CallableWrapperDoFn)_4))+(ref_AppliedPTransform__MaterializeValues0_5)\n
>   
> ParDo(CallableWrapperDoFn):beam:transform:pardo:v1\n_MaterializeValues0:beam:transform:pardo:v1\nCreatePInput0/Read:beam:transform:read_from_impulse_python:v1\nCreatePInput0/Read/Impulse:beam:source:runner:0.1\n
>   must follow: \n  downstream_side_inputs: ']
> root: INFO: ==================== <function window_pcollection_coders at 
> 0x7f3a39918950> ====================
> root: DEBUG: 1 [4]
> root: DEBUG: Stages: 
> ['((ref_AppliedPTransform_CreatePInput0/Read_3)+(ref_AppliedPTransform_ParDo(CallableWrapperDoFn)_4))+(ref_AppliedPTransform__MaterializeValues0_5)\n
>   
> ParDo(CallableWrapperDoFn):beam:transform:pardo:v1\n_MaterializeValues0:beam:transform:pardo:v1\nCreatePInput0/Read:beam:transform:read_from_impulse_python:v1\nCreatePInput0/Read/Impulse:beam:source:runner:0.1\n
>   must follow: \n  downstream_side_inputs: ']
> root: INFO: Running 
> ((ref_AppliedPTransform_CreatePInput0/Read_3)+(ref_AppliedPTransform_ParDo(CallableWrapperDoFn)_4))+(ref_AppliedPTransform__MaterializeValues0_5)
> root: DEBUG: start <DoOperation _MaterializeValues0 output_tags=['out'], 
> receivers=[ConsumerSet[_MaterializeValues0.out0, 
> coder=WindowedValueCoder[FastPrimitivesCoder], len(consumers)=0]]>
> root: DEBUG: start <DoOperation ParDo(CallableWrapperDoFn) 
> output_tags=['out'], 
> receivers=[SingletonConsumerSet[ParDo(CallableWrapperDoFn).out0, 
> coder=WindowedValueCoder[StrUtf8Coder], len(consumers)=1]]>
> root: DEBUG: start <ImpulseReadOperation 
> receivers=[SingletonConsumerSet[CreatePInput0/Read.out0, 
> coder=WindowedValueCoder[VarIntCoder], len(consumers)=1]]>
> root: DEBUG: start <DataInputOperation 
> receivers=[SingletonConsumerSet[CreatePInput0/Read/Impulse.out0, 
> coder=WindowedValueCoder[BytesCoder], len(consumers)=1]]>
> --------------------- >> end captured logging << ---------------------
> ----------------------------------------------------------------------
> Ran 1 test in 0.068s
> FAILED (errors=1)
> Error
> Traceback (most recent call last):
>   File "/usr/lib/python3.7/unittest/case.py", line 59, in testPartExecutor
>     yield
>   File "/usr/lib/python3.7/unittest/case.py", line 615, in run
>     testMethod()
>   File 
> "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/typehints/typed_pipeline_test.py",
>  line 104, in test_typed_callable_iterable_output
>     result = [1, 2] | beam.ParDo(do_fn)
>   File 
> "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/transforms/ptransform.py",
>  line 519, in __ror__
>     p.run().wait_until_finish()
>   File 
> "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/pipeline.py", 
> line 406, in run
>     self._options).run(False)
>   File 
> "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/pipeline.py", 
> line 419, in run
>     return self.runner.run_pipeline(self, self._options)
>   File 
> "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/direct/direct_runner.py",
>  line 129, in run_pipeline
>     return runner.run_pipeline(pipeline, options)
>   File 
> "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/portability/fn_api_runner.py",
>  line 366, in run_pipeline
>     default_environment=self._default_environment))
>   File 
> "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/portability/fn_api_runner.py",
>  line 373, in run_via_runner_api
>     return self.run_stages(stage_context, stages)
>   File 
> "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/portability/fn_api_runner.py",
>  line 455, in run_stages
>     stage_context.safe_coders)
>   File 
> "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/portability/fn_api_runner.py",
>  line 733, in _run_stage
>     result, splits = bundle_manager.process_bundle(data_input, data_output)
>   File 
> "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/portability/fn_api_runner.py",
>  line 1663, in process_bundle
>     part, expected_outputs), part_inputs):
>   File "/usr/lib/python3.7/concurrent/futures/_base.py", line 586, in 
> result_iterator
>     yield fs.pop().result()
>   File "/usr/lib/python3.7/concurrent/futures/_base.py", line 432, in result
>     return self.__get_result()
>   File "/usr/lib/python3.7/concurrent/futures/_base.py", line 384, in 
> __get_result
>     raise self._exception
>   File "/usr/lib/python3.7/concurrent/futures/thread.py", line 57, in run
>     result = self.fn(*self.args, **self.kwargs)
>   File 
> "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/portability/fn_api_runner.py",
>  line 1663, in <lambda>
>     part, expected_outputs), part_inputs):
>   File 
> "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/portability/fn_api_runner.py",
>  line 1601, in process_bundle
>     result_future = self._worker_handler.control_conn.push(process_bundle_req)
>   File 
> "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/portability/fn_api_runner.py",
>  line 1080, in push
>     response = self.worker.do_instruction(request)
>   File 
> "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/worker/sdk_worker.py",
>  line 343, in do_instruction
>     request.instruction_id)
>   File 
> "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/worker/sdk_worker.py",
>  line 369, in process_bundle
>     bundle_processor.process_bundle(instruction_id))
>   File 
> "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/worker/bundle_processor.py",
>  line 593, in process_bundle
>     data.ptransform_id].process_encoded(data.data)
>   File 
> "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/worker/bundle_processor.py",
>  line 143, in process_encoded
>     self.output(decoded_value)
>   File 
> "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/worker/operations.py",
>  line 256, in output
>     cython.cast(Receiver, 
> self.receivers[output_index]).receive(windowed_value)
>   File 
> "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/worker/operations.py",
>  line 143, in receive
>     self.consumer.process(windowed_value)
>   File 
> "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/worker/operations.py",
>  line 435, in process
>     self.output(windowed_value)
>   File 
> "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/worker/operations.py",
>  line 256, in output
>     cython.cast(Receiver, 
> self.receivers[output_index]).receive(windowed_value)
>   File 
> "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/worker/operations.py",
>  line 143, in receive
>     self.consumer.process(windowed_value)
>   File 
> "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/worker/operations.py",
>  line 594, in process
>     delayed_application = self.dofn_receiver.receive(o)
>   File 
> "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/common.py",
>  line 776, in receive
>     self.process(windowed_value)
>   File 
> "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/common.py",
>  line 782, in process
>     self._reraise_augmented(exn)
>   File 
> "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/common.py",
>  line 849, in _reraise_augmented
>     raise_with_traceback(new_exn)
>   File 
> "/usr/local/google/home/ehudm/virtualenvs/beam-py37/lib/python3.7/site-packages/future/utils/__init__.py",
>  line 419, in raise_with_traceback
>     raise exc.with_traceback(traceback)
>   File 
> "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/common.py",
>  line 780, in process
>     return self.do_fn_invoker.invoke_process(windowed_value)
>   File 
> "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/common.py",
>  line 441, in invoke_process
>     windowed_value, self.process_method(windowed_value.value))
>   File 
> "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/common.py",
>  line 919, in process_outputs
>     self.main_receivers.receive(windowed_value)
>   File 
> "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/worker/operations.py",
>  line 142, in receive
>     self.update_counters_start(windowed_value)
>   File 
> "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/worker/operations.py",
>  line 122, in update_counters_start
>     self.opcounter.update_from(windowed_value)
>   File 
> "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/worker/opcounters.py",
>  line 196, in update_from
>     self.do_sample(windowed_value)
>   File 
> "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/worker/opcounters.py",
>  line 214, in do_sample
>     self.coder_impl.get_estimated_size_and_observables(windowed_value))
>   File 
> "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/coders/coder_impl.py",
>  line 1024, in get_estimated_size_and_observables
>     value.value, nested=nested))
>   File 
> "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/coders/coder_impl.py",
>  line 220, in get_estimated_size_and_observables
>     return self.estimate_size(value, nested), []
>   File 
> "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/coders/coder_impl.py",
>  line 212, in estimate_size
>     return self._get_nested_size(self._size_estimator(value), nested)
>   File 
> "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/coders/coders.py",
>  line 135, in estimate_size
>     return len(self.encode(value))
>   File 
> "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/coders/coders.py",
>  line 326, in encode
>     return value.encode('utf-8')
> Exception: 'list' object has no attribute 'encode' [while running 
> 'ParDo(CallableWrapperDoFn)']
> -------------------- >> begin captured logging << --------------------
> root: INFO: Generating grammar tables from 
> /usr/lib/python3.7/lib2to3/Grammar.txt
> root: INFO: Generating grammar tables from 
> /usr/lib/python3.7/lib2to3/PatternGrammar.txt
> avro.schema: Level 5: Register new name for 'org.apache.avro.file.Header'
> avro.schema: Level 5: Register new name for 'org.apache.avro.file.magic'
> avro.schema: Level 5: Register new name for 'org.apache.avro.file.sync'
> root: INFO: ==================== <function annotate_downstream_side_inputs at 
> 0x7f3a39918158> ====================
> root: DEBUG: 3 [1, 1, 1]
> root: DEBUG: Stages: ['ref_AppliedPTransform_CreatePInput0/Read_3\n  
> CreatePInput0/Read:beam:transform:read:v1\n  must follow: \n  
> downstream_side_inputs: ', 
> 'ref_AppliedPTransform_ParDo(CallableWrapperDoFn)_4\n  
> ParDo(CallableWrapperDoFn):beam:transform:pardo:v1\n  must follow: \n  
> downstream_side_inputs: ', 'ref_AppliedPTransform__MaterializeValues0_5\n  
> _MaterializeValues0:beam:transform:pardo:v1\n  must follow: \n  
> downstream_side_inputs: ']
> root: INFO: ==================== <function fix_side_input_pcoll_coders at 
> 0x7f3a39918268> ====================
> root: DEBUG: 3 [1, 1, 1]
> root: DEBUG: Stages: ['ref_AppliedPTransform_CreatePInput0/Read_3\n  
> CreatePInput0/Read:beam:transform:read:v1\n  must follow: \n  
> downstream_side_inputs: ', 
> 'ref_AppliedPTransform_ParDo(CallableWrapperDoFn)_4\n  
> ParDo(CallableWrapperDoFn):beam:transform:pardo:v1\n  must follow: \n  
> downstream_side_inputs: ', 'ref_AppliedPTransform__MaterializeValues0_5\n  
> _MaterializeValues0:beam:transform:pardo:v1\n  must follow: \n  
> downstream_side_inputs: ']
> root: INFO: ==================== <function lift_combiners at 0x7f3a399182f0> 
> ====================
> root: DEBUG: 3 [1, 1, 1]
> root: DEBUG: Stages: ['ref_AppliedPTransform_CreatePInput0/Read_3\n  
> CreatePInput0/Read:beam:transform:read:v1\n  must follow: \n  
> downstream_side_inputs: ', 
> 'ref_AppliedPTransform_ParDo(CallableWrapperDoFn)_4\n  
> ParDo(CallableWrapperDoFn):beam:transform:pardo:v1\n  must follow: \n  
> downstream_side_inputs: ', 'ref_AppliedPTransform__MaterializeValues0_5\n  
> _MaterializeValues0:beam:transform:pardo:v1\n  must follow: \n  
> downstream_side_inputs: ']
> root: INFO: ==================== <function expand_sdf at 0x7f3a39918378> 
> ====================
> root: DEBUG: 3 [1, 1, 1]
> root: DEBUG: Stages: ['ref_AppliedPTransform_CreatePInput0/Read_3\n  
> CreatePInput0/Read:beam:transform:read:v1\n  must follow: \n  
> downstream_side_inputs: ', 
> 'ref_AppliedPTransform_ParDo(CallableWrapperDoFn)_4\n  
> ParDo(CallableWrapperDoFn):beam:transform:pardo:v1\n  must follow: \n  
> downstream_side_inputs: ', 'ref_AppliedPTransform__MaterializeValues0_5\n  
> _MaterializeValues0:beam:transform:pardo:v1\n  must follow: \n  
> downstream_side_inputs: ']
> root: INFO: ==================== <function expand_gbk at 0x7f3a39918400> 
> ====================
> root: DEBUG: 3 [1, 1, 1]
> root: DEBUG: Stages: ['ref_AppliedPTransform_CreatePInput0/Read_3\n  
> CreatePInput0/Read:beam:transform:read:v1\n  must follow: \n  
> downstream_side_inputs: ', 
> 'ref_AppliedPTransform_ParDo(CallableWrapperDoFn)_4\n  
> ParDo(CallableWrapperDoFn):beam:transform:pardo:v1\n  must follow: \n  
> downstream_side_inputs: ', 'ref_AppliedPTransform__MaterializeValues0_5\n  
> _MaterializeValues0:beam:transform:pardo:v1\n  must follow: \n  
> downstream_side_inputs: ']
> root: INFO: ==================== <function sink_flattens at 0x7f3a39918510> 
> ====================
> root: DEBUG: 3 [1, 1, 1]
> root: DEBUG: Stages: ['ref_AppliedPTransform_CreatePInput0/Read_3\n  
> CreatePInput0/Read:beam:transform:read:v1\n  must follow: \n  
> downstream_side_inputs: ', 
> 'ref_AppliedPTransform_ParDo(CallableWrapperDoFn)_4\n  
> ParDo(CallableWrapperDoFn):beam:transform:pardo:v1\n  must follow: \n  
> downstream_side_inputs: ', 'ref_AppliedPTransform__MaterializeValues0_5\n  
> _MaterializeValues0:beam:transform:pardo:v1\n  must follow: \n  
> downstream_side_inputs: ']
> root: INFO: ==================== <function greedily_fuse at 0x7f3a39918598> 
> ====================
> root: DEBUG: 1 [3]
> root: DEBUG: Stages: 
> ['((ref_AppliedPTransform_CreatePInput0/Read_3)+(ref_AppliedPTransform_ParDo(CallableWrapperDoFn)_4))+(ref_AppliedPTransform__MaterializeValues0_5)\n
>   
> CreatePInput0/Read:beam:transform:read:v1\nParDo(CallableWrapperDoFn):beam:transform:pardo:v1\n_MaterializeValues0:beam:transform:pardo:v1\n
>   must follow: \n  downstream_side_inputs: ']
> root: INFO: ==================== <function read_to_impulse at 0x7f3a39918620> 
> ====================
> root: DEBUG: 1 [4]
> root: DEBUG: Stages: 
> ['((ref_AppliedPTransform_CreatePInput0/Read_3)+(ref_AppliedPTransform_ParDo(CallableWrapperDoFn)_4))+(ref_AppliedPTransform__MaterializeValues0_5)\n
>   
> ParDo(CallableWrapperDoFn):beam:transform:pardo:v1\n_MaterializeValues0:beam:transform:pardo:v1\nCreatePInput0/Read/Impulse:beam:transform:impulse:v1\nCreatePInput0/Read:beam:transform:read_from_impulse_python:v1\n
>   must follow: \n  downstream_side_inputs: ']
> root: INFO: ==================== <function impulse_to_input at 
> 0x7f3a399186a8> ====================
> root: DEBUG: 1 [4]
> root: DEBUG: Stages: 
> ['((ref_AppliedPTransform_CreatePInput0/Read_3)+(ref_AppliedPTransform_ParDo(CallableWrapperDoFn)_4))+(ref_AppliedPTransform__MaterializeValues0_5)\n
>   
> ParDo(CallableWrapperDoFn):beam:transform:pardo:v1\n_MaterializeValues0:beam:transform:pardo:v1\nCreatePInput0/Read:beam:transform:read_from_impulse_python:v1\nCreatePInput0/Read/Impulse:beam:source:runner:0.1\n
>   must follow: \n  downstream_side_inputs: ']
> root: INFO: ==================== <function inject_timer_pcollections at 
> 0x7f3a39918840> ====================
> root: DEBUG: 1 [4]
> root: DEBUG: Stages: 
> ['((ref_AppliedPTransform_CreatePInput0/Read_3)+(ref_AppliedPTransform_ParDo(CallableWrapperDoFn)_4))+(ref_AppliedPTransform__MaterializeValues0_5)\n
>   
> ParDo(CallableWrapperDoFn):beam:transform:pardo:v1\n_MaterializeValues0:beam:transform:pardo:v1\nCreatePInput0/Read:beam:transform:read_from_impulse_python:v1\nCreatePInput0/Read/Impulse:beam:source:runner:0.1\n
>   must follow: \n  downstream_side_inputs: ']
> root: INFO: ==================== <function sort_stages at 0x7f3a399188c8> 
> ====================
> root: DEBUG: 1 [4]
> root: DEBUG: Stages: 
> ['((ref_AppliedPTransform_CreatePInput0/Read_3)+(ref_AppliedPTransform_ParDo(CallableWrapperDoFn)_4))+(ref_AppliedPTransform__MaterializeValues0_5)\n
>   
> ParDo(CallableWrapperDoFn):beam:transform:pardo:v1\n_MaterializeValues0:beam:transform:pardo:v1\nCreatePInput0/Read:beam:transform:read_from_impulse_python:v1\nCreatePInput0/Read/Impulse:beam:source:runner:0.1\n
>   must follow: \n  downstream_side_inputs: ']
> root: INFO: ==================== <function window_pcollection_coders at 
> 0x7f3a39918950> ====================
> root: DEBUG: 1 [4]
> root: DEBUG: Stages: 
> ['((ref_AppliedPTransform_CreatePInput0/Read_3)+(ref_AppliedPTransform_ParDo(CallableWrapperDoFn)_4))+(ref_AppliedPTransform__MaterializeValues0_5)\n
>   
> ParDo(CallableWrapperDoFn):beam:transform:pardo:v1\n_MaterializeValues0:beam:transform:pardo:v1\nCreatePInput0/Read:beam:transform:read_from_impulse_python:v1\nCreatePInput0/Read/Impulse:beam:source:runner:0.1\n
>   must follow: \n  downstream_side_inputs: ']
> root: INFO: Running 
> ((ref_AppliedPTransform_CreatePInput0/Read_3)+(ref_AppliedPTransform_ParDo(CallableWrapperDoFn)_4))+(ref_AppliedPTransform__MaterializeValues0_5)
> root: DEBUG: start <DoOperation _MaterializeValues0 output_tags=['out'], 
> receivers=[ConsumerSet[_MaterializeValues0.out0, 
> coder=WindowedValueCoder[FastPrimitivesCoder], len(consumers)=0]]>
> root: DEBUG: start <DoOperation ParDo(CallableWrapperDoFn) 
> output_tags=['out'], 
> receivers=[SingletonConsumerSet[ParDo(CallableWrapperDoFn).out0, 
> coder=WindowedValueCoder[StrUtf8Coder], len(consumers)=1]]>
> root: DEBUG: start <ImpulseReadOperation 
> receivers=[SingletonConsumerSet[CreatePInput0/Read.out0, 
> coder=WindowedValueCoder[VarIntCoder], len(consumers)=1]]>
> root: DEBUG: start <DataInputOperation 
> receivers=[SingletonConsumerSet[CreatePInput0/Read/Impulse.out0, 
> coder=WindowedValueCoder[BytesCoder], len(consumers)=1]]>
> --------------------- >> end captured logging << ---------------------
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to