[ https://issues.apache.org/jira/browse/BEAM-4805?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Stephan Hoyer updated BEAM-4805: -------------------------------- Description: Consider the following example: {code:python} import apache_beam as beam def f(*args, **kwargs): return args, kwargs [1, 2, 3] | beam.Map(f) {code} When I run this code using the latest released version of Beam (2.5.0), I see the following error: {noformat} TypeErrorTraceback (most recent call last) <ipython-input-20-9003b3f5887a> in <module>() ----> 1 range(3) | beam.Map(f) /usr/local/lib/python2.7/dist-packages/apache_beam/transforms/ptransform.pyc in __ror__(self, left, label) 491 _allocate_materialized_pipeline(p) 492 materialized_result = _AddMaterializationTransforms().visit(result) --> 493 p.run().wait_until_finish() 494 _release_materialized_pipeline(p) 495 return _FinalizeMaterialization().visit(materialized_result) /usr/local/lib/python2.7/dist-packages/apache_beam/pipeline.pyc in run(self, test_runner_api) 388 if test_runner_api and self._verify_runner_api_compatible(): 389 return Pipeline.from_runner_api( --> 390 self.to_runner_api(), self.runner, self._options).run(False) 391 392 if self._options.view_as(TypeOptions).runtime_type_check: /usr/local/lib/python2.7/dist-packages/apache_beam/pipeline.pyc in run(self, test_runner_api) 401 finally: 402 shutil.rmtree(tmpdir) --> 403 return self.runner.run_pipeline(self) 404 405 def __enter__(self): /usr/local/lib/python2.7/dist-packages/apache_beam/runners/direct/direct_runner.pyc in run_pipeline(self, pipeline) 132 runner = BundleBasedDirectRunner() 133 --> 134 return runner.run_pipeline(pipeline) 135 136 /usr/local/lib/python2.7/dist-packages/apache_beam/runners/portability/fn_api_runner.pyc in run_pipeline(self, pipeline) 216 from apache_beam.runners.dataflow.dataflow_runner import DataflowRunner 217 pipeline.visit(DataflowRunner.group_by_key_input_visitor()) --> 218 return self.run_via_runner_api(pipeline.to_runner_api()) 219 220 def run_via_runner_api(self, pipeline_proto): /usr/local/lib/python2.7/dist-packages/apache_beam/runners/portability/fn_api_runner.pyc in run_via_runner_api(self, pipeline_proto) 219 220 def run_via_runner_api(self, pipeline_proto): --> 221 return self.run_stages(*self.create_stages(pipeline_proto)) 222 223 def create_stages(self, pipeline_proto): /usr/local/lib/python2.7/dist-packages/apache_beam/runners/portability/fn_api_runner.pyc in run_stages(self, pipeline_components, stages, safe_coders) 857 metrics_by_stage[stage.name] = self.run_stage( 858 controller, pipeline_components, stage, --> 859 pcoll_buffers, safe_coders).process_bundle.metrics 860 finally: 861 controller.close() /usr/local/lib/python2.7/dist-packages/apache_beam/runners/portability/fn_api_runner.pyc in run_stage(self, controller, pipeline_components, stage, pcoll_buffers, safe_coders) 968 return BundleManager( 969 controller, get_buffer, process_bundle_descriptor, --> 970 self._progress_frequency).process_bundle(data_input, data_output) 971 972 # These classes are used to interact with the worker. /usr/local/lib/python2.7/dist-packages/apache_beam/runners/portability/fn_api_runner.pyc in process_bundle(self, inputs, expected_outputs) 1172 process_bundle=beam_fn_api_pb2.ProcessBundleRequest( 1173 process_bundle_descriptor_reference=self._bundle_descriptor.id)) -> 1174 result_future = self._controller.control_handler.push(process_bundle) 1175 1176 with ProgressRequester( /usr/local/lib/python2.7/dist-packages/apache_beam/runners/portability/fn_api_runner.pyc in push(self, request) 1052 request.instruction_id = 'control_%s' % self._uid_counter 1053 logging.debug('CONTROL REQUEST %s', request) -> 1054 response = self.worker.do_instruction(request) 1055 logging.debug('CONTROL RESPONSE %s', response) 1056 return ControlFuture(request.instruction_id, response) /usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/sdk_worker.pyc in do_instruction(self, request) 206 # E.g. if register is set, this will call self.register(request.register)) 207 return getattr(self, request_type)(getattr(request, request_type), --> 208 request.instruction_id) 209 else: 210 raise NotImplementedError /usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/sdk_worker.pyc in process_bundle(self, request, instruction_id) 228 try: 229 with state_handler.process_instruction_id(instruction_id): --> 230 processor.process_bundle(instruction_id) 231 finally: 232 del self.bundle_processors[instruction_id] /usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/bundle_processor.pyc in process_bundle(self, instruction_id) 287 for op in reversed(self.ops.values()): 288 logging.info('start %s', op) --> 289 op.start() 290 291 # Inject inputs from data plane. /usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/operations.so in apache_beam.runners.worker.operations.ReadOperation.start() /usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/operations.so in apache_beam.runners.worker.operations.ReadOperation.start() /usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/operations.so in apache_beam.runners.worker.operations.ReadOperation.start() /usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/operations.so in apache_beam.runners.worker.operations.Operation.output() /usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/operations.so in apache_beam.runners.worker.operations.ConsumerSet.receive() /usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/operations.so in apache_beam.runners.worker.operations.DoOperation.process() /usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/operations.so in apache_beam.runners.worker.operations.DoOperation.process() /usr/local/lib/python2.7/dist-packages/apache_beam/runners/common.so in apache_beam.runners.common.DoFnRunner.receive() /usr/local/lib/python2.7/dist-packages/apache_beam/runners/common.so in apache_beam.runners.common.DoFnRunner.process() /usr/local/lib/python2.7/dist-packages/apache_beam/runners/common.so in apache_beam.runners.common.DoFnRunner._reraise_augmented() /usr/local/lib/python2.7/dist-packages/apache_beam/runners/common.so in apache_beam.runners.common.DoFnRunner.process() /usr/local/lib/python2.7/dist-packages/apache_beam/runners/common.so in apache_beam.runners.common.SimpleInvoker.invoke_process() /usr/local/lib/python2.7/dist-packages/apache_beam/runners/common.so in apache_beam.runners.common._OutputProcessor.process_outputs() /usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/operations.so in apache_beam.runners.worker.operations.ConsumerSet.receive() /usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/operations.so in apache_beam.runners.worker.operations.ConsumerSet.update_counters_start() /usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/opcounters.so in apache_beam.runners.worker.opcounters.OperationCounters.update_from() /usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/opcounters.so in apache_beam.runners.worker.opcounters.OperationCounters.do_sample() /usr/local/lib/python2.7/dist-packages/apache_beam/coders/coder_impl.so in apache_beam.coders.coder_impl.WindowedValueCoderImpl.get_estimated_size_and_observables() /usr/local/lib/python2.7/dist-packages/apache_beam/coders/coder_impl.so in apache_beam.coders.coder_impl.WindowedValueCoderImpl.get_estimated_size_and_observables() /usr/local/lib/python2.7/dist-packages/apache_beam/coders/coder_impl.so in apache_beam.coders.coder_impl.AbstractComponentCoderImpl.get_estimated_size_and_observables() /usr/local/lib/python2.7/dist-packages/apache_beam/coders/coder_impl.so in apache_beam.coders.coder_impl.CoderImpl.get_estimated_size_and_observables() /usr/local/lib/python2.7/dist-packages/apache_beam/coders/coder_impl.so in apache_beam.coders.coder_impl.VarIntCoderImpl.estimate_size() /usr/local/lib/python2.7/dist-packages/apache_beam/coders/stream.pyx in apache_beam.coders.stream.get_varint_size() 220 return (<double*><char*>&as_long)[0] 221 --> 222 cpdef libc.stdint.int64_t get_varint_size(libc.stdint.int64_t value): 223 """Returns the size of the given integer value when encode as a VarInt.""" 224 cdef libc.stdint.int64_t varint_size = 0 TypeError: an integer is required [while running 'Map(f)'] {noformat} was: Consider the following example: {code:python} import apache_beam as beam def f(*args, **kwargs): return args, kwargs [1, 2, 3] | beam.Map(f) {code} When I run this code using the latest released version of Beam (2.5.0), I see the following error: {noformat} TypeErrorTraceback (most recent call last) <ipython-input-20-9003b3f5887a> in <module>() ----> 1 range(3) | beam.Map(f) /usr/local/lib/python2.7/dist-packages/apache_beam/transforms/ptransform.pyc in __ror__(self, left, label) 491 _allocate_materialized_pipeline(p) 492 materialized_result = _AddMaterializationTransforms().visit(result) --> 493 p.run().wait_until_finish() 494 _release_materialized_pipeline(p) 495 return _FinalizeMaterialization().visit(materialized_result) /usr/local/lib/python2.7/dist-packages/apache_beam/pipeline.pyc in run(self, test_runner_api) 388 if test_runner_api and self._verify_runner_api_compatible(): 389 return Pipeline.from_runner_api( --> 390 self.to_runner_api(), self.runner, self._options).run(False) 391 392 if self._options.view_as(TypeOptions).runtime_type_check: /usr/local/lib/python2.7/dist-packages/apache_beam/pipeline.pyc in run(self, test_runner_api) 401 finally: 402 shutil.rmtree(tmpdir) --> 403 return self.runner.run_pipeline(self) 404 405 def __enter__(self): /usr/local/lib/python2.7/dist-packages/apache_beam/runners/direct/direct_runner.pyc in run_pipeline(self, pipeline) 132 runner = BundleBasedDirectRunner() 133 --> 134 return runner.run_pipeline(pipeline) 135 136 /usr/local/lib/python2.7/dist-packages/apache_beam/runners/portability/fn_api_runner.pyc in run_pipeline(self, pipeline) 216 from apache_beam.runners.dataflow.dataflow_runner import DataflowRunner 217 pipeline.visit(DataflowRunner.group_by_key_input_visitor()) --> 218 return self.run_via_runner_api(pipeline.to_runner_api()) 219 220 def run_via_runner_api(self, pipeline_proto): /usr/local/lib/python2.7/dist-packages/apache_beam/runners/portability/fn_api_runner.pyc in run_via_runner_api(self, pipeline_proto) 219 220 def run_via_runner_api(self, pipeline_proto): --> 221 return self.run_stages(*self.create_stages(pipeline_proto)) 222 223 def create_stages(self, pipeline_proto): /usr/local/lib/python2.7/dist-packages/apache_beam/runners/portability/fn_api_runner.pyc in run_stages(self, pipeline_components, stages, safe_coders) 857 metrics_by_stage[stage.name] = self.run_stage( 858 controller, pipeline_components, stage, --> 859 pcoll_buffers, safe_coders).process_bundle.metrics 860 finally: 861 controller.close() /usr/local/lib/python2.7/dist-packages/apache_beam/runners/portability/fn_api_runner.pyc in run_stage(self, controller, pipeline_components, stage, pcoll_buffers, safe_coders) 968 return BundleManager( 969 controller, get_buffer, process_bundle_descriptor, --> 970 self._progress_frequency).process_bundle(data_input, data_output) 971 972 # These classes are used to interact with the worker. /usr/local/lib/python2.7/dist-packages/apache_beam/runners/portability/fn_api_runner.pyc in process_bundle(self, inputs, expected_outputs) 1172 process_bundle=beam_fn_api_pb2.ProcessBundleRequest( 1173 process_bundle_descriptor_reference=self._bundle_descriptor.id)) -> 1174 result_future = self._controller.control_handler.push(process_bundle) 1175 1176 with ProgressRequester( /usr/local/lib/python2.7/dist-packages/apache_beam/runners/portability/fn_api_runner.pyc in push(self, request) 1052 request.instruction_id = 'control_%s' % self._uid_counter 1053 logging.debug('CONTROL REQUEST %s', request) -> 1054 response = self.worker.do_instruction(request) 1055 logging.debug('CONTROL RESPONSE %s', response) 1056 return ControlFuture(request.instruction_id, response) /usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/sdk_worker.pyc in do_instruction(self, request) 206 # E.g. if register is set, this will call self.register(request.register)) 207 return getattr(self, request_type)(getattr(request, request_type), --> 208 request.instruction_id) 209 else: 210 raise NotImplementedError /usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/sdk_worker.pyc in process_bundle(self, request, instruction_id) 228 try: 229 with state_handler.process_instruction_id(instruction_id): --> 230 processor.process_bundle(instruction_id) 231 finally: 232 del self.bundle_processors[instruction_id] /usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/bundle_processor.pyc in process_bundle(self, instruction_id) 287 for op in reversed(self.ops.values()): 288 logging.info('start %s', op) --> 289 op.start() 290 291 # Inject inputs from data plane. /usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/operations.so in apache_beam.runners.worker.operations.ReadOperation.start() /usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/operations.so in apache_beam.runners.worker.operations.ReadOperation.start() /usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/operations.so in apache_beam.runners.worker.operations.ReadOperation.start() /usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/operations.so in apache_beam.runners.worker.operations.Operation.output() /usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/operations.so in apache_beam.runners.worker.operations.ConsumerSet.receive() /usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/operations.so in apache_beam.runners.worker.operations.DoOperation.process() /usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/operations.so in apache_beam.runners.worker.operations.DoOperation.process() /usr/local/lib/python2.7/dist-packages/apache_beam/runners/common.so in apache_beam.runners.common.DoFnRunner.receive() /usr/local/lib/python2.7/dist-packages/apache_beam/runners/common.so in apache_beam.runners.common.DoFnRunner.process() /usr/local/lib/python2.7/dist-packages/apache_beam/runners/common.so in apache_beam.runners.common.DoFnRunner._reraise_augmented() /usr/local/lib/python2.7/dist-packages/apache_beam/runners/common.so in apache_beam.runners.common.DoFnRunner.process() /usr/local/lib/python2.7/dist-packages/apache_beam/runners/common.so in apache_beam.runners.common.SimpleInvoker.invoke_process() /usr/local/lib/python2.7/dist-packages/apache_beam/runners/common.so in apache_beam.runners.common._OutputProcessor.process_outputs() /usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/operations.so in apache_beam.runners.worker.operations.ConsumerSet.receive() /usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/operations.so in apache_beam.runners.worker.operations.ConsumerSet.update_counters_start() /usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/opcounters.so in apache_beam.runners.worker.opcounters.OperationCounters.update_from() /usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/opcounters.so in apache_beam.runners.worker.opcounters.OperationCounters.do_sample() /usr/local/lib/python2.7/dist-packages/apache_beam/coders/coder_impl.so in apache_beam.coders.coder_impl.WindowedValueCoderImpl.get_estimated_size_and_observables() /usr/local/lib/python2.7/dist-packages/apache_beam/coders/coder_impl.so in apache_beam.coders.coder_impl.WindowedValueCoderImpl.get_estimated_size_and_observables() /usr/local/lib/python2.7/dist-packages/apache_beam/coders/coder_impl.so in apache_beam.coders.coder_impl.AbstractComponentCoderImpl.get_estimated_size_and_observables() /usr/local/lib/python2.7/dist-packages/apache_beam/coders/coder_impl.so in apache_beam.coders.coder_impl.CoderImpl.get_estimated_size_and_observables() /usr/local/lib/python2.7/dist-packages/apache_beam/coders/coder_impl.so in apache_beam.coders.coder_impl.VarIntCoderImpl.estimate_size() /usr/local/lib/python2.7/dist-packages/apache_beam/coders/stream.pyx in apache_beam.coders.stream.get_varint_size() 220 return (<double*><char*>&as_long)[0] 221 --> 222 cpdef libc.stdint.int64_t get_varint_size(libc.stdint.int64_t value): 223 """Returns the size of the given integer value when encode as a VarInt.""" 224 cdef libc.stdint.int64_t varint_size = 0 TypeError: an integer is required [while running 'Map(f)'] {noformat} > beam.Map doesn't work on functions defined with *args > ----------------------------------------------------- > > Key: BEAM-4805 > URL: https://issues.apache.org/jira/browse/BEAM-4805 > Project: Beam > Issue Type: Bug > Components: sdk-py-core > Reporter: Stephan Hoyer > Assignee: Ahmet Altay > Priority: Major > > Consider the following example: > {code:python} > import apache_beam as beam > def f(*args, **kwargs): > return args, kwargs > [1, 2, 3] | beam.Map(f) > {code} > When I run this code using the latest released version of Beam (2.5.0), I see > the following error: > {noformat} > TypeErrorTraceback (most recent call last) > <ipython-input-20-9003b3f5887a> in <module>() > ----> 1 range(3) | beam.Map(f) > /usr/local/lib/python2.7/dist-packages/apache_beam/transforms/ptransform.pyc > in __ror__(self, left, label) > 491 _allocate_materialized_pipeline(p) > 492 materialized_result = > _AddMaterializationTransforms().visit(result) > --> 493 p.run().wait_until_finish() > 494 _release_materialized_pipeline(p) > 495 return _FinalizeMaterialization().visit(materialized_result) > /usr/local/lib/python2.7/dist-packages/apache_beam/pipeline.pyc in run(self, > test_runner_api) > 388 if test_runner_api and self._verify_runner_api_compatible(): > 389 return Pipeline.from_runner_api( > --> 390 self.to_runner_api(), self.runner, self._options).run(False) > 391 > 392 if self._options.view_as(TypeOptions).runtime_type_check: > /usr/local/lib/python2.7/dist-packages/apache_beam/pipeline.pyc in run(self, > test_runner_api) > 401 finally: > 402 shutil.rmtree(tmpdir) > --> 403 return self.runner.run_pipeline(self) > 404 > 405 def __enter__(self): > /usr/local/lib/python2.7/dist-packages/apache_beam/runners/direct/direct_runner.pyc > in run_pipeline(self, pipeline) > 132 runner = BundleBasedDirectRunner() > 133 > --> 134 return runner.run_pipeline(pipeline) > 135 > 136 > /usr/local/lib/python2.7/dist-packages/apache_beam/runners/portability/fn_api_runner.pyc > in run_pipeline(self, pipeline) > 216 from apache_beam.runners.dataflow.dataflow_runner import > DataflowRunner > 217 pipeline.visit(DataflowRunner.group_by_key_input_visitor()) > --> 218 return self.run_via_runner_api(pipeline.to_runner_api()) > 219 > 220 def run_via_runner_api(self, pipeline_proto): > /usr/local/lib/python2.7/dist-packages/apache_beam/runners/portability/fn_api_runner.pyc > in run_via_runner_api(self, pipeline_proto) > 219 > 220 def run_via_runner_api(self, pipeline_proto): > --> 221 return self.run_stages(*self.create_stages(pipeline_proto)) > 222 > 223 def create_stages(self, pipeline_proto): > /usr/local/lib/python2.7/dist-packages/apache_beam/runners/portability/fn_api_runner.pyc > in run_stages(self, pipeline_components, stages, safe_coders) > 857 metrics_by_stage[stage.name] = self.run_stage( > 858 controller, pipeline_components, stage, > --> 859 pcoll_buffers, safe_coders).process_bundle.metrics > 860 finally: > 861 controller.close() > /usr/local/lib/python2.7/dist-packages/apache_beam/runners/portability/fn_api_runner.pyc > in run_stage(self, controller, pipeline_components, stage, pcoll_buffers, > safe_coders) > 968 return BundleManager( > 969 controller, get_buffer, process_bundle_descriptor, > --> 970 self._progress_frequency).process_bundle(data_input, > data_output) > 971 > 972 # These classes are used to interact with the worker. > /usr/local/lib/python2.7/dist-packages/apache_beam/runners/portability/fn_api_runner.pyc > in process_bundle(self, inputs, expected_outputs) > 1172 process_bundle=beam_fn_api_pb2.ProcessBundleRequest( > 1173 > process_bundle_descriptor_reference=self._bundle_descriptor.id)) > -> 1174 result_future = > self._controller.control_handler.push(process_bundle) > 1175 > 1176 with ProgressRequester( > /usr/local/lib/python2.7/dist-packages/apache_beam/runners/portability/fn_api_runner.pyc > in push(self, request) > 1052 request.instruction_id = 'control_%s' % self._uid_counter > 1053 logging.debug('CONTROL REQUEST %s', request) > -> 1054 response = self.worker.do_instruction(request) > 1055 logging.debug('CONTROL RESPONSE %s', response) > 1056 return ControlFuture(request.instruction_id, response) > /usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/sdk_worker.pyc > in do_instruction(self, request) > 206 # E.g. if register is set, this will call > self.register(request.register)) > 207 return getattr(self, request_type)(getattr(request, > request_type), > --> 208 request.instruction_id) > 209 else: > 210 raise NotImplementedError > /usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/sdk_worker.pyc > in process_bundle(self, request, instruction_id) > 228 try: > 229 with state_handler.process_instruction_id(instruction_id): > --> 230 processor.process_bundle(instruction_id) > 231 finally: > 232 del self.bundle_processors[instruction_id] > /usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/bundle_processor.pyc > in process_bundle(self, instruction_id) > 287 for op in reversed(self.ops.values()): > 288 logging.info('start %s', op) > --> 289 op.start() > 290 > 291 # Inject inputs from data plane. > /usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/operations.so > in apache_beam.runners.worker.operations.ReadOperation.start() > /usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/operations.so > in apache_beam.runners.worker.operations.ReadOperation.start() > /usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/operations.so > in apache_beam.runners.worker.operations.ReadOperation.start() > /usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/operations.so > in apache_beam.runners.worker.operations.Operation.output() > /usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/operations.so > in apache_beam.runners.worker.operations.ConsumerSet.receive() > /usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/operations.so > in apache_beam.runners.worker.operations.DoOperation.process() > /usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/operations.so > in apache_beam.runners.worker.operations.DoOperation.process() > /usr/local/lib/python2.7/dist-packages/apache_beam/runners/common.so in > apache_beam.runners.common.DoFnRunner.receive() > /usr/local/lib/python2.7/dist-packages/apache_beam/runners/common.so in > apache_beam.runners.common.DoFnRunner.process() > /usr/local/lib/python2.7/dist-packages/apache_beam/runners/common.so in > apache_beam.runners.common.DoFnRunner._reraise_augmented() > /usr/local/lib/python2.7/dist-packages/apache_beam/runners/common.so in > apache_beam.runners.common.DoFnRunner.process() > /usr/local/lib/python2.7/dist-packages/apache_beam/runners/common.so in > apache_beam.runners.common.SimpleInvoker.invoke_process() > /usr/local/lib/python2.7/dist-packages/apache_beam/runners/common.so in > apache_beam.runners.common._OutputProcessor.process_outputs() > /usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/operations.so > in apache_beam.runners.worker.operations.ConsumerSet.receive() > /usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/operations.so > in apache_beam.runners.worker.operations.ConsumerSet.update_counters_start() > /usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/opcounters.so > in apache_beam.runners.worker.opcounters.OperationCounters.update_from() > /usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/opcounters.so > in apache_beam.runners.worker.opcounters.OperationCounters.do_sample() > /usr/local/lib/python2.7/dist-packages/apache_beam/coders/coder_impl.so in > apache_beam.coders.coder_impl.WindowedValueCoderImpl.get_estimated_size_and_observables() > /usr/local/lib/python2.7/dist-packages/apache_beam/coders/coder_impl.so in > apache_beam.coders.coder_impl.WindowedValueCoderImpl.get_estimated_size_and_observables() > /usr/local/lib/python2.7/dist-packages/apache_beam/coders/coder_impl.so in > apache_beam.coders.coder_impl.AbstractComponentCoderImpl.get_estimated_size_and_observables() > /usr/local/lib/python2.7/dist-packages/apache_beam/coders/coder_impl.so in > apache_beam.coders.coder_impl.CoderImpl.get_estimated_size_and_observables() > /usr/local/lib/python2.7/dist-packages/apache_beam/coders/coder_impl.so in > apache_beam.coders.coder_impl.VarIntCoderImpl.estimate_size() > /usr/local/lib/python2.7/dist-packages/apache_beam/coders/stream.pyx in > apache_beam.coders.stream.get_varint_size() > 220 return (<double*><char*>&as_long)[0] > 221 > --> 222 cpdef libc.stdint.int64_t get_varint_size(libc.stdint.int64_t value): > 223 """Returns the size of the given integer value when encode as a > VarInt.""" > 224 cdef libc.stdint.int64_t varint_size = 0 > TypeError: an integer is required [while running 'Map(f)'] > {noformat} -- This message was sent by Atlassian JIRA (v7.6.3#76005)