[ https://issues.apache.org/jira/browse/BEAM-7981?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16907813#comment-16907813 ]
Ahmet Altay commented on BEAM-7981: ----------------------------------- Ack. Thank you. > ParDo function wrapper doesn't support Iterable output types > ------------------------------------------------------------ > > Key: BEAM-7981 > URL: https://issues.apache.org/jira/browse/BEAM-7981 > Project: Beam > Issue Type: Bug > Components: sdk-py-core > Reporter: Udi Meiri > Priority: Major > > I believe the bug is in CallableWrapperDoFn.default_type_hints, which > converts Iterable[str] to str. > This test will be included (commented out) in > https://github.com/apache/beam/pull/9283 > {code} > def test_typed_callable_iterable_output(self): > @typehints.with_input_types(int) > @typehints.with_output_types(typehints.Iterable[str]) > def do_fn(element): > return [[str(element)] * 2] > result = [1, 2] | beam.ParDo(do_fn) > self.assertEqual([['1', '1'], ['2', '2']], sorted(result)) > {code} > Result: > {code} > ====================================================================== > ERROR: test_typed_callable_iterable_output > (apache_beam.typehints.typed_pipeline_test.MainInputTest) > ---------------------------------------------------------------------- > Traceback (most recent call last): > File > "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/typehints/typed_pipeline_test.py", > line 104, in test_typed_callable_iterable_output > result = [1, 2] | beam.ParDo(do_fn) > File > "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/transforms/ptransform.py", > line 519, in __ror__ > p.run().wait_until_finish() > File > "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/pipeline.py", > line 406, in run > self._options).run(False) > File > "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/pipeline.py", > line 419, in run > return self.runner.run_pipeline(self, self._options) > File > "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/direct/direct_runner.py", > line 129, in run_pipeline > return runner.run_pipeline(pipeline, options) > File > "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/portability/fn_api_runner.py", > line 366, in run_pipeline > default_environment=self._default_environment)) > File > "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/portability/fn_api_runner.py", > line 373, in run_via_runner_api > return self.run_stages(stage_context, stages) > File > "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/portability/fn_api_runner.py", > line 455, in run_stages > stage_context.safe_coders) > File > "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/portability/fn_api_runner.py", > line 733, in _run_stage > result, splits = bundle_manager.process_bundle(data_input, data_output) > File > "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/portability/fn_api_runner.py", > line 1663, in process_bundle > part, expected_outputs), part_inputs): > File "/usr/lib/python3.7/concurrent/futures/_base.py", line 586, in > result_iterator > yield fs.pop().result() > File "/usr/lib/python3.7/concurrent/futures/_base.py", line 432, in result > return self.__get_result() > File "/usr/lib/python3.7/concurrent/futures/_base.py", line 384, in > __get_result > raise self._exception > File "/usr/lib/python3.7/concurrent/futures/thread.py", line 57, in run > result = self.fn(*self.args, **self.kwargs) > File > "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/portability/fn_api_runner.py", > line 1663, in <lambda> > part, expected_outputs), part_inputs): > File > "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/portability/fn_api_runner.py", > line 1601, in process_bundle > result_future = self._worker_handler.control_conn.push(process_bundle_req) > File > "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/portability/fn_api_runner.py", > line 1080, in push > response = self.worker.do_instruction(request) > File > "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/worker/sdk_worker.py", > line 343, in do_instruction > request.instruction_id) > File > "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/worker/sdk_worker.py", > line 369, in process_bundle > bundle_processor.process_bundle(instruction_id)) > File > "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/worker/bundle_processor.py", > line 593, in process_bundle > data.ptransform_id].process_encoded(data.data) > File > "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/worker/bundle_processor.py", > line 143, in process_encoded > self.output(decoded_value) > File > "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/worker/operations.py", > line 256, in output > cython.cast(Receiver, > self.receivers[output_index]).receive(windowed_value) > File > "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/worker/operations.py", > line 143, in receive > self.consumer.process(windowed_value) > File > "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/worker/operations.py", > line 435, in process > self.output(windowed_value) > File > "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/worker/operations.py", > line 256, in output > cython.cast(Receiver, > self.receivers[output_index]).receive(windowed_value) > File > "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/worker/operations.py", > line 143, in receive > self.consumer.process(windowed_value) > File > "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/worker/operations.py", > line 594, in process > delayed_application = self.dofn_receiver.receive(o) > File > "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/common.py", > line 776, in receive > self.process(windowed_value) > File > "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/common.py", > line 782, in process > self._reraise_augmented(exn) > File > "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/common.py", > line 849, in _reraise_augmented > raise_with_traceback(new_exn) > File > "/usr/local/google/home/ehudm/virtualenvs/beam-py37/lib/python3.7/site-packages/future/utils/__init__.py", > line 419, in raise_with_traceback > raise exc.with_traceback(traceback) > File > "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/common.py", > line 780, in process > return self.do_fn_invoker.invoke_process(windowed_value) > File > "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/common.py", > line 441, in invoke_process > windowed_value, self.process_method(windowed_value.value)) > File > "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/common.py", > line 919, in process_outputs > self.main_receivers.receive(windowed_value) > File > "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/worker/operations.py", > line 142, in receive > self.update_counters_start(windowed_value) > File > "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/worker/operations.py", > line 122, in update_counters_start > self.opcounter.update_from(windowed_value) > File > "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/worker/opcounters.py", > line 196, in update_from > self.do_sample(windowed_value) > File > "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/worker/opcounters.py", > line 214, in do_sample > self.coder_impl.get_estimated_size_and_observables(windowed_value)) > File > "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/coders/coder_impl.py", > line 1024, in get_estimated_size_and_observables > value.value, nested=nested)) > File > "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/coders/coder_impl.py", > line 220, in get_estimated_size_and_observables > return self.estimate_size(value, nested), [] > File > "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/coders/coder_impl.py", > line 212, in estimate_size > return self._get_nested_size(self._size_estimator(value), nested) > File > "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/coders/coders.py", > line 135, in estimate_size > return len(self.encode(value)) > File > "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/coders/coders.py", > line 326, in encode > return value.encode('utf-8') > AttributeError: 'list' object has no attribute 'encode' [while running > 'ParDo(CallableWrapperDoFn)'] > -------------------- >> begin captured logging << -------------------- > root: INFO: Generating grammar tables from > /usr/lib/python3.7/lib2to3/Grammar.txt > root: INFO: Generating grammar tables from > /usr/lib/python3.7/lib2to3/PatternGrammar.txt > avro.schema: Level 5: Register new name for 'org.apache.avro.file.Header' > avro.schema: Level 5: Register new name for 'org.apache.avro.file.magic' > avro.schema: Level 5: Register new name for 'org.apache.avro.file.sync' > root: INFO: ==================== <function annotate_downstream_side_inputs at > 0x7f3a39918158> ==================== > root: DEBUG: 3 [1, 1, 1] > root: DEBUG: Stages: ['ref_AppliedPTransform_CreatePInput0/Read_3\n > CreatePInput0/Read:beam:transform:read:v1\n must follow: \n > downstream_side_inputs: ', > 'ref_AppliedPTransform_ParDo(CallableWrapperDoFn)_4\n > ParDo(CallableWrapperDoFn):beam:transform:pardo:v1\n must follow: \n > downstream_side_inputs: ', 'ref_AppliedPTransform__MaterializeValues0_5\n > _MaterializeValues0:beam:transform:pardo:v1\n must follow: \n > downstream_side_inputs: '] > root: INFO: ==================== <function fix_side_input_pcoll_coders at > 0x7f3a39918268> ==================== > root: DEBUG: 3 [1, 1, 1] > root: DEBUG: Stages: ['ref_AppliedPTransform_CreatePInput0/Read_3\n > CreatePInput0/Read:beam:transform:read:v1\n must follow: \n > downstream_side_inputs: ', > 'ref_AppliedPTransform_ParDo(CallableWrapperDoFn)_4\n > ParDo(CallableWrapperDoFn):beam:transform:pardo:v1\n must follow: \n > downstream_side_inputs: ', 'ref_AppliedPTransform__MaterializeValues0_5\n > _MaterializeValues0:beam:transform:pardo:v1\n must follow: \n > downstream_side_inputs: '] > root: INFO: ==================== <function lift_combiners at 0x7f3a399182f0> > ==================== > root: DEBUG: 3 [1, 1, 1] > root: DEBUG: Stages: ['ref_AppliedPTransform_CreatePInput0/Read_3\n > CreatePInput0/Read:beam:transform:read:v1\n must follow: \n > downstream_side_inputs: ', > 'ref_AppliedPTransform_ParDo(CallableWrapperDoFn)_4\n > ParDo(CallableWrapperDoFn):beam:transform:pardo:v1\n must follow: \n > downstream_side_inputs: ', 'ref_AppliedPTransform__MaterializeValues0_5\n > _MaterializeValues0:beam:transform:pardo:v1\n must follow: \n > downstream_side_inputs: '] > root: INFO: ==================== <function expand_sdf at 0x7f3a39918378> > ==================== > root: DEBUG: 3 [1, 1, 1] > root: DEBUG: Stages: ['ref_AppliedPTransform_CreatePInput0/Read_3\n > CreatePInput0/Read:beam:transform:read:v1\n must follow: \n > downstream_side_inputs: ', > 'ref_AppliedPTransform_ParDo(CallableWrapperDoFn)_4\n > ParDo(CallableWrapperDoFn):beam:transform:pardo:v1\n must follow: \n > downstream_side_inputs: ', 'ref_AppliedPTransform__MaterializeValues0_5\n > _MaterializeValues0:beam:transform:pardo:v1\n must follow: \n > downstream_side_inputs: '] > root: INFO: ==================== <function expand_gbk at 0x7f3a39918400> > ==================== > root: DEBUG: 3 [1, 1, 1] > root: DEBUG: Stages: ['ref_AppliedPTransform_CreatePInput0/Read_3\n > CreatePInput0/Read:beam:transform:read:v1\n must follow: \n > downstream_side_inputs: ', > 'ref_AppliedPTransform_ParDo(CallableWrapperDoFn)_4\n > ParDo(CallableWrapperDoFn):beam:transform:pardo:v1\n must follow: \n > downstream_side_inputs: ', 'ref_AppliedPTransform__MaterializeValues0_5\n > _MaterializeValues0:beam:transform:pardo:v1\n must follow: \n > downstream_side_inputs: '] > root: INFO: ==================== <function sink_flattens at 0x7f3a39918510> > ==================== > root: DEBUG: 3 [1, 1, 1] > root: DEBUG: Stages: ['ref_AppliedPTransform_CreatePInput0/Read_3\n > CreatePInput0/Read:beam:transform:read:v1\n must follow: \n > downstream_side_inputs: ', > 'ref_AppliedPTransform_ParDo(CallableWrapperDoFn)_4\n > ParDo(CallableWrapperDoFn):beam:transform:pardo:v1\n must follow: \n > downstream_side_inputs: ', 'ref_AppliedPTransform__MaterializeValues0_5\n > _MaterializeValues0:beam:transform:pardo:v1\n must follow: \n > downstream_side_inputs: '] > root: INFO: ==================== <function greedily_fuse at 0x7f3a39918598> > ==================== > root: DEBUG: 1 [3] > root: DEBUG: Stages: > ['((ref_AppliedPTransform_CreatePInput0/Read_3)+(ref_AppliedPTransform_ParDo(CallableWrapperDoFn)_4))+(ref_AppliedPTransform__MaterializeValues0_5)\n > > CreatePInput0/Read:beam:transform:read:v1\nParDo(CallableWrapperDoFn):beam:transform:pardo:v1\n_MaterializeValues0:beam:transform:pardo:v1\n > must follow: \n downstream_side_inputs: '] > root: INFO: ==================== <function read_to_impulse at 0x7f3a39918620> > ==================== > root: DEBUG: 1 [4] > root: DEBUG: Stages: > ['((ref_AppliedPTransform_CreatePInput0/Read_3)+(ref_AppliedPTransform_ParDo(CallableWrapperDoFn)_4))+(ref_AppliedPTransform__MaterializeValues0_5)\n > > ParDo(CallableWrapperDoFn):beam:transform:pardo:v1\n_MaterializeValues0:beam:transform:pardo:v1\nCreatePInput0/Read/Impulse:beam:transform:impulse:v1\nCreatePInput0/Read:beam:transform:read_from_impulse_python:v1\n > must follow: \n downstream_side_inputs: '] > root: INFO: ==================== <function impulse_to_input at > 0x7f3a399186a8> ==================== > root: DEBUG: 1 [4] > root: DEBUG: Stages: > ['((ref_AppliedPTransform_CreatePInput0/Read_3)+(ref_AppliedPTransform_ParDo(CallableWrapperDoFn)_4))+(ref_AppliedPTransform__MaterializeValues0_5)\n > > ParDo(CallableWrapperDoFn):beam:transform:pardo:v1\n_MaterializeValues0:beam:transform:pardo:v1\nCreatePInput0/Read:beam:transform:read_from_impulse_python:v1\nCreatePInput0/Read/Impulse:beam:source:runner:0.1\n > must follow: \n downstream_side_inputs: '] > root: INFO: ==================== <function inject_timer_pcollections at > 0x7f3a39918840> ==================== > root: DEBUG: 1 [4] > root: DEBUG: Stages: > ['((ref_AppliedPTransform_CreatePInput0/Read_3)+(ref_AppliedPTransform_ParDo(CallableWrapperDoFn)_4))+(ref_AppliedPTransform__MaterializeValues0_5)\n > > ParDo(CallableWrapperDoFn):beam:transform:pardo:v1\n_MaterializeValues0:beam:transform:pardo:v1\nCreatePInput0/Read:beam:transform:read_from_impulse_python:v1\nCreatePInput0/Read/Impulse:beam:source:runner:0.1\n > must follow: \n downstream_side_inputs: '] > root: INFO: ==================== <function sort_stages at 0x7f3a399188c8> > ==================== > root: DEBUG: 1 [4] > root: DEBUG: Stages: > ['((ref_AppliedPTransform_CreatePInput0/Read_3)+(ref_AppliedPTransform_ParDo(CallableWrapperDoFn)_4))+(ref_AppliedPTransform__MaterializeValues0_5)\n > > ParDo(CallableWrapperDoFn):beam:transform:pardo:v1\n_MaterializeValues0:beam:transform:pardo:v1\nCreatePInput0/Read:beam:transform:read_from_impulse_python:v1\nCreatePInput0/Read/Impulse:beam:source:runner:0.1\n > must follow: \n downstream_side_inputs: '] > root: INFO: ==================== <function window_pcollection_coders at > 0x7f3a39918950> ==================== > root: DEBUG: 1 [4] > root: DEBUG: Stages: > ['((ref_AppliedPTransform_CreatePInput0/Read_3)+(ref_AppliedPTransform_ParDo(CallableWrapperDoFn)_4))+(ref_AppliedPTransform__MaterializeValues0_5)\n > > ParDo(CallableWrapperDoFn):beam:transform:pardo:v1\n_MaterializeValues0:beam:transform:pardo:v1\nCreatePInput0/Read:beam:transform:read_from_impulse_python:v1\nCreatePInput0/Read/Impulse:beam:source:runner:0.1\n > must follow: \n downstream_side_inputs: '] > root: INFO: Running > ((ref_AppliedPTransform_CreatePInput0/Read_3)+(ref_AppliedPTransform_ParDo(CallableWrapperDoFn)_4))+(ref_AppliedPTransform__MaterializeValues0_5) > root: DEBUG: start <DoOperation _MaterializeValues0 output_tags=['out'], > receivers=[ConsumerSet[_MaterializeValues0.out0, > coder=WindowedValueCoder[FastPrimitivesCoder], len(consumers)=0]]> > root: DEBUG: start <DoOperation ParDo(CallableWrapperDoFn) > output_tags=['out'], > receivers=[SingletonConsumerSet[ParDo(CallableWrapperDoFn).out0, > coder=WindowedValueCoder[StrUtf8Coder], len(consumers)=1]]> > root: DEBUG: start <ImpulseReadOperation > receivers=[SingletonConsumerSet[CreatePInput0/Read.out0, > coder=WindowedValueCoder[VarIntCoder], len(consumers)=1]]> > root: DEBUG: start <DataInputOperation > receivers=[SingletonConsumerSet[CreatePInput0/Read/Impulse.out0, > coder=WindowedValueCoder[BytesCoder], len(consumers)=1]]> > --------------------- >> end captured logging << --------------------- > ---------------------------------------------------------------------- > Ran 1 test in 0.068s > FAILED (errors=1) > Error > Traceback (most recent call last): > File "/usr/lib/python3.7/unittest/case.py", line 59, in testPartExecutor > yield > File "/usr/lib/python3.7/unittest/case.py", line 615, in run > testMethod() > File > "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/typehints/typed_pipeline_test.py", > line 104, in test_typed_callable_iterable_output > result = [1, 2] | beam.ParDo(do_fn) > File > "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/transforms/ptransform.py", > line 519, in __ror__ > p.run().wait_until_finish() > File > "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/pipeline.py", > line 406, in run > self._options).run(False) > File > "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/pipeline.py", > line 419, in run > return self.runner.run_pipeline(self, self._options) > File > "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/direct/direct_runner.py", > line 129, in run_pipeline > return runner.run_pipeline(pipeline, options) > File > "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/portability/fn_api_runner.py", > line 366, in run_pipeline > default_environment=self._default_environment)) > File > "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/portability/fn_api_runner.py", > line 373, in run_via_runner_api > return self.run_stages(stage_context, stages) > File > "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/portability/fn_api_runner.py", > line 455, in run_stages > stage_context.safe_coders) > File > "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/portability/fn_api_runner.py", > line 733, in _run_stage > result, splits = bundle_manager.process_bundle(data_input, data_output) > File > "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/portability/fn_api_runner.py", > line 1663, in process_bundle > part, expected_outputs), part_inputs): > File "/usr/lib/python3.7/concurrent/futures/_base.py", line 586, in > result_iterator > yield fs.pop().result() > File "/usr/lib/python3.7/concurrent/futures/_base.py", line 432, in result > return self.__get_result() > File "/usr/lib/python3.7/concurrent/futures/_base.py", line 384, in > __get_result > raise self._exception > File "/usr/lib/python3.7/concurrent/futures/thread.py", line 57, in run > result = self.fn(*self.args, **self.kwargs) > File > "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/portability/fn_api_runner.py", > line 1663, in <lambda> > part, expected_outputs), part_inputs): > File > "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/portability/fn_api_runner.py", > line 1601, in process_bundle > result_future = self._worker_handler.control_conn.push(process_bundle_req) > File > "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/portability/fn_api_runner.py", > line 1080, in push > response = self.worker.do_instruction(request) > File > "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/worker/sdk_worker.py", > line 343, in do_instruction > request.instruction_id) > File > "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/worker/sdk_worker.py", > line 369, in process_bundle > bundle_processor.process_bundle(instruction_id)) > File > "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/worker/bundle_processor.py", > line 593, in process_bundle > data.ptransform_id].process_encoded(data.data) > File > "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/worker/bundle_processor.py", > line 143, in process_encoded > self.output(decoded_value) > File > "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/worker/operations.py", > line 256, in output > cython.cast(Receiver, > self.receivers[output_index]).receive(windowed_value) > File > "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/worker/operations.py", > line 143, in receive > self.consumer.process(windowed_value) > File > "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/worker/operations.py", > line 435, in process > self.output(windowed_value) > File > "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/worker/operations.py", > line 256, in output > cython.cast(Receiver, > self.receivers[output_index]).receive(windowed_value) > File > "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/worker/operations.py", > line 143, in receive > self.consumer.process(windowed_value) > File > "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/worker/operations.py", > line 594, in process > delayed_application = self.dofn_receiver.receive(o) > File > "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/common.py", > line 776, in receive > self.process(windowed_value) > File > "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/common.py", > line 782, in process > self._reraise_augmented(exn) > File > "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/common.py", > line 849, in _reraise_augmented > raise_with_traceback(new_exn) > File > "/usr/local/google/home/ehudm/virtualenvs/beam-py37/lib/python3.7/site-packages/future/utils/__init__.py", > line 419, in raise_with_traceback > raise exc.with_traceback(traceback) > File > "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/common.py", > line 780, in process > return self.do_fn_invoker.invoke_process(windowed_value) > File > "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/common.py", > line 441, in invoke_process > windowed_value, self.process_method(windowed_value.value)) > File > "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/common.py", > line 919, in process_outputs > self.main_receivers.receive(windowed_value) > File > "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/worker/operations.py", > line 142, in receive > self.update_counters_start(windowed_value) > File > "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/worker/operations.py", > line 122, in update_counters_start > self.opcounter.update_from(windowed_value) > File > "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/worker/opcounters.py", > line 196, in update_from > self.do_sample(windowed_value) > File > "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/worker/opcounters.py", > line 214, in do_sample > self.coder_impl.get_estimated_size_and_observables(windowed_value)) > File > "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/coders/coder_impl.py", > line 1024, in get_estimated_size_and_observables > value.value, nested=nested)) > File > "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/coders/coder_impl.py", > line 220, in get_estimated_size_and_observables > return self.estimate_size(value, nested), [] > File > "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/coders/coder_impl.py", > line 212, in estimate_size > return self._get_nested_size(self._size_estimator(value), nested) > File > "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/coders/coders.py", > line 135, in estimate_size > return len(self.encode(value)) > File > "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/coders/coders.py", > line 326, in encode > return value.encode('utf-8') > Exception: 'list' object has no attribute 'encode' [while running > 'ParDo(CallableWrapperDoFn)'] > -------------------- >> begin captured logging << -------------------- > root: INFO: Generating grammar tables from > /usr/lib/python3.7/lib2to3/Grammar.txt > root: INFO: Generating grammar tables from > /usr/lib/python3.7/lib2to3/PatternGrammar.txt > avro.schema: Level 5: Register new name for 'org.apache.avro.file.Header' > avro.schema: Level 5: Register new name for 'org.apache.avro.file.magic' > avro.schema: Level 5: Register new name for 'org.apache.avro.file.sync' > root: INFO: ==================== <function annotate_downstream_side_inputs at > 0x7f3a39918158> ==================== > root: DEBUG: 3 [1, 1, 1] > root: DEBUG: Stages: ['ref_AppliedPTransform_CreatePInput0/Read_3\n > CreatePInput0/Read:beam:transform:read:v1\n must follow: \n > downstream_side_inputs: ', > 'ref_AppliedPTransform_ParDo(CallableWrapperDoFn)_4\n > ParDo(CallableWrapperDoFn):beam:transform:pardo:v1\n must follow: \n > downstream_side_inputs: ', 'ref_AppliedPTransform__MaterializeValues0_5\n > _MaterializeValues0:beam:transform:pardo:v1\n must follow: \n > downstream_side_inputs: '] > root: INFO: ==================== <function fix_side_input_pcoll_coders at > 0x7f3a39918268> ==================== > root: DEBUG: 3 [1, 1, 1] > root: DEBUG: Stages: ['ref_AppliedPTransform_CreatePInput0/Read_3\n > CreatePInput0/Read:beam:transform:read:v1\n must follow: \n > downstream_side_inputs: ', > 'ref_AppliedPTransform_ParDo(CallableWrapperDoFn)_4\n > ParDo(CallableWrapperDoFn):beam:transform:pardo:v1\n must follow: \n > downstream_side_inputs: ', 'ref_AppliedPTransform__MaterializeValues0_5\n > _MaterializeValues0:beam:transform:pardo:v1\n must follow: \n > downstream_side_inputs: '] > root: INFO: ==================== <function lift_combiners at 0x7f3a399182f0> > ==================== > root: DEBUG: 3 [1, 1, 1] > root: DEBUG: Stages: ['ref_AppliedPTransform_CreatePInput0/Read_3\n > CreatePInput0/Read:beam:transform:read:v1\n must follow: \n > downstream_side_inputs: ', > 'ref_AppliedPTransform_ParDo(CallableWrapperDoFn)_4\n > ParDo(CallableWrapperDoFn):beam:transform:pardo:v1\n must follow: \n > downstream_side_inputs: ', 'ref_AppliedPTransform__MaterializeValues0_5\n > _MaterializeValues0:beam:transform:pardo:v1\n must follow: \n > downstream_side_inputs: '] > root: INFO: ==================== <function expand_sdf at 0x7f3a39918378> > ==================== > root: DEBUG: 3 [1, 1, 1] > root: DEBUG: Stages: ['ref_AppliedPTransform_CreatePInput0/Read_3\n > CreatePInput0/Read:beam:transform:read:v1\n must follow: \n > downstream_side_inputs: ', > 'ref_AppliedPTransform_ParDo(CallableWrapperDoFn)_4\n > ParDo(CallableWrapperDoFn):beam:transform:pardo:v1\n must follow: \n > downstream_side_inputs: ', 'ref_AppliedPTransform__MaterializeValues0_5\n > _MaterializeValues0:beam:transform:pardo:v1\n must follow: \n > downstream_side_inputs: '] > root: INFO: ==================== <function expand_gbk at 0x7f3a39918400> > ==================== > root: DEBUG: 3 [1, 1, 1] > root: DEBUG: Stages: ['ref_AppliedPTransform_CreatePInput0/Read_3\n > CreatePInput0/Read:beam:transform:read:v1\n must follow: \n > downstream_side_inputs: ', > 'ref_AppliedPTransform_ParDo(CallableWrapperDoFn)_4\n > ParDo(CallableWrapperDoFn):beam:transform:pardo:v1\n must follow: \n > downstream_side_inputs: ', 'ref_AppliedPTransform__MaterializeValues0_5\n > _MaterializeValues0:beam:transform:pardo:v1\n must follow: \n > downstream_side_inputs: '] > root: INFO: ==================== <function sink_flattens at 0x7f3a39918510> > ==================== > root: DEBUG: 3 [1, 1, 1] > root: DEBUG: Stages: ['ref_AppliedPTransform_CreatePInput0/Read_3\n > CreatePInput0/Read:beam:transform:read:v1\n must follow: \n > downstream_side_inputs: ', > 'ref_AppliedPTransform_ParDo(CallableWrapperDoFn)_4\n > ParDo(CallableWrapperDoFn):beam:transform:pardo:v1\n must follow: \n > downstream_side_inputs: ', 'ref_AppliedPTransform__MaterializeValues0_5\n > _MaterializeValues0:beam:transform:pardo:v1\n must follow: \n > downstream_side_inputs: '] > root: INFO: ==================== <function greedily_fuse at 0x7f3a39918598> > ==================== > root: DEBUG: 1 [3] > root: DEBUG: Stages: > ['((ref_AppliedPTransform_CreatePInput0/Read_3)+(ref_AppliedPTransform_ParDo(CallableWrapperDoFn)_4))+(ref_AppliedPTransform__MaterializeValues0_5)\n > > CreatePInput0/Read:beam:transform:read:v1\nParDo(CallableWrapperDoFn):beam:transform:pardo:v1\n_MaterializeValues0:beam:transform:pardo:v1\n > must follow: \n downstream_side_inputs: '] > root: INFO: ==================== <function read_to_impulse at 0x7f3a39918620> > ==================== > root: DEBUG: 1 [4] > root: DEBUG: Stages: > ['((ref_AppliedPTransform_CreatePInput0/Read_3)+(ref_AppliedPTransform_ParDo(CallableWrapperDoFn)_4))+(ref_AppliedPTransform__MaterializeValues0_5)\n > > ParDo(CallableWrapperDoFn):beam:transform:pardo:v1\n_MaterializeValues0:beam:transform:pardo:v1\nCreatePInput0/Read/Impulse:beam:transform:impulse:v1\nCreatePInput0/Read:beam:transform:read_from_impulse_python:v1\n > must follow: \n downstream_side_inputs: '] > root: INFO: ==================== <function impulse_to_input at > 0x7f3a399186a8> ==================== > root: DEBUG: 1 [4] > root: DEBUG: Stages: > ['((ref_AppliedPTransform_CreatePInput0/Read_3)+(ref_AppliedPTransform_ParDo(CallableWrapperDoFn)_4))+(ref_AppliedPTransform__MaterializeValues0_5)\n > > ParDo(CallableWrapperDoFn):beam:transform:pardo:v1\n_MaterializeValues0:beam:transform:pardo:v1\nCreatePInput0/Read:beam:transform:read_from_impulse_python:v1\nCreatePInput0/Read/Impulse:beam:source:runner:0.1\n > must follow: \n downstream_side_inputs: '] > root: INFO: ==================== <function inject_timer_pcollections at > 0x7f3a39918840> ==================== > root: DEBUG: 1 [4] > root: DEBUG: Stages: > ['((ref_AppliedPTransform_CreatePInput0/Read_3)+(ref_AppliedPTransform_ParDo(CallableWrapperDoFn)_4))+(ref_AppliedPTransform__MaterializeValues0_5)\n > > ParDo(CallableWrapperDoFn):beam:transform:pardo:v1\n_MaterializeValues0:beam:transform:pardo:v1\nCreatePInput0/Read:beam:transform:read_from_impulse_python:v1\nCreatePInput0/Read/Impulse:beam:source:runner:0.1\n > must follow: \n downstream_side_inputs: '] > root: INFO: ==================== <function sort_stages at 0x7f3a399188c8> > ==================== > root: DEBUG: 1 [4] > root: DEBUG: Stages: > ['((ref_AppliedPTransform_CreatePInput0/Read_3)+(ref_AppliedPTransform_ParDo(CallableWrapperDoFn)_4))+(ref_AppliedPTransform__MaterializeValues0_5)\n > > ParDo(CallableWrapperDoFn):beam:transform:pardo:v1\n_MaterializeValues0:beam:transform:pardo:v1\nCreatePInput0/Read:beam:transform:read_from_impulse_python:v1\nCreatePInput0/Read/Impulse:beam:source:runner:0.1\n > must follow: \n downstream_side_inputs: '] > root: INFO: ==================== <function window_pcollection_coders at > 0x7f3a39918950> ==================== > root: DEBUG: 1 [4] > root: DEBUG: Stages: > ['((ref_AppliedPTransform_CreatePInput0/Read_3)+(ref_AppliedPTransform_ParDo(CallableWrapperDoFn)_4))+(ref_AppliedPTransform__MaterializeValues0_5)\n > > ParDo(CallableWrapperDoFn):beam:transform:pardo:v1\n_MaterializeValues0:beam:transform:pardo:v1\nCreatePInput0/Read:beam:transform:read_from_impulse_python:v1\nCreatePInput0/Read/Impulse:beam:source:runner:0.1\n > must follow: \n downstream_side_inputs: '] > root: INFO: Running > ((ref_AppliedPTransform_CreatePInput0/Read_3)+(ref_AppliedPTransform_ParDo(CallableWrapperDoFn)_4))+(ref_AppliedPTransform__MaterializeValues0_5) > root: DEBUG: start <DoOperation _MaterializeValues0 output_tags=['out'], > receivers=[ConsumerSet[_MaterializeValues0.out0, > coder=WindowedValueCoder[FastPrimitivesCoder], len(consumers)=0]]> > root: DEBUG: start <DoOperation ParDo(CallableWrapperDoFn) > output_tags=['out'], > receivers=[SingletonConsumerSet[ParDo(CallableWrapperDoFn).out0, > coder=WindowedValueCoder[StrUtf8Coder], len(consumers)=1]]> > root: DEBUG: start <ImpulseReadOperation > receivers=[SingletonConsumerSet[CreatePInput0/Read.out0, > coder=WindowedValueCoder[VarIntCoder], len(consumers)=1]]> > root: DEBUG: start <DataInputOperation > receivers=[SingletonConsumerSet[CreatePInput0/Read/Impulse.out0, > coder=WindowedValueCoder[BytesCoder], len(consumers)=1]]> > --------------------- >> end captured logging << --------------------- > {code} -- This message was sent by Atlassian JIRA (v7.6.14#76016)