[ 
https://issues.apache.org/jira/browse/BEAM-11807?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Anonymous updated BEAM-11807:
-----------------------------
    Status: Triage Needed  (was: Resolved)

> SDK Worker with multithreading causes boto3 the KeyError(endpoint_resolver)
> ---------------------------------------------------------------------------
>
>                 Key: BEAM-11807
>                 URL: https://issues.apache.org/jira/browse/BEAM-11807
>             Project: Beam
>          Issue Type: Bug
>          Components: io-py-aws, sdk-py-harness
>    Affects Versions: 2.27.0, 2.28.0
>            Reporter: ferryvg
>            Priority: P2
>              Labels: AWS, S3, multi-threading
>             Fix For: 2.29.0
>
>          Time Spent: 1h 50m
>  Remaining Estimate: 0h
>
> [https://github.com/boto/botocore/issues/1776]
> {noformat}
> Traceback (most recent call last):
>   File "/tmp/beam/sdks/python/apache_beam/runners/worker/sdk_worker.py", line 
> 289, in _execute
>     response = task()
>   File "/tmp/beam/sdks/python/apache_beam/runners/worker/sdk_worker.py", line 
> 362, in <lambda>
>     lambda: self.create_worker().do_instruction(request), request)
>   File "/tmp/beam/sdks/python/apache_beam/runners/worker/sdk_worker.py", line 
> 606, in do_instruction
>     return getattr(self, request_type)(
>   File "/tmp/beam/sdks/python/apache_beam/runners/worker/sdk_worker.py", line 
> 644, in process_bundle
>     bundle_processor.process_bundle(instruction_id))
>   File 
> "/tmp/beam/sdks/python/apache_beam/runners/worker/bundle_processor.py", line 
> 1000, in process_bundle
>     input_op_by_transform_id[element.transform_id].process_encoded(
>   File 
> "/tmp/beam/sdks/python/apache_beam/runners/worker/bundle_processor.py", line 
> 229, in process_encoded
>     self.output(decoded_value)
>   File "/tmp/beam/sdks/python/apache_beam/runners/worker/operations.py", line 
> 359, in output
>     cython.cast(Receiver, 
> self.receivers[output_index]).receive(windowed_value)
>   File "/tmp/beam/sdks/python/apache_beam/runners/worker/operations.py", line 
> 221, in receive
>     self.consumer.process(windowed_value)
>   File "/tmp/beam/sdks/python/apache_beam/runners/worker/operations.py", line 
> 719, in process
>     delayed_application = self.dofn_runner.process(o)
>   File "/tmp/beam/sdks/python/apache_beam/runners/common.py", line 1241, in 
> process
>     self._reraise_augmented(exn)
>   File "/tmp/beam/sdks/python/apache_beam/runners/common.py", line 1239, in 
> process
>     return self.do_fn_invoker.invoke_process(windowed_value)
>   File "/tmp/beam/sdks/python/apache_beam/runners/common.py", line 587, in 
> invoke_process
>     self.output_processor.process_outputs(
>   File "/tmp/beam/sdks/python/apache_beam/runners/common.py", line 1401, in 
> process_outputs
>     self.main_receivers.receive(windowed_value)
>   File "/tmp/beam/sdks/python/apache_beam/runners/worker/operations.py", line 
> 221, in receive
>     self.consumer.process(windowed_value)
>   File "/tmp/beam/sdks/python/apache_beam/runners/worker/operations.py", line 
> 719, in process
>     delayed_application = self.dofn_runner.process(o)
>   File "/tmp/beam/sdks/python/apache_beam/runners/common.py", line 1241, in 
> process
>     self._reraise_augmented(exn)
>   File "/tmp/beam/sdks/python/apache_beam/runners/common.py", line 1321, in 
> _reraise_augmented
>     raise_with_traceback(new_exn)
>   File 
> "/home/local/.local/lib/python3.8/site-packages/future/utils/__init__.py", 
> line 446, in raise_with_traceback
>     raise exc.with_traceback(traceback)
>   File "/tmp/beam/sdks/python/apache_beam/runners/common.py", line 1239, in 
> process
>     return self.do_fn_invoker.invoke_process(windowed_value)
>   File "/tmp/beam/sdks/python/apache_beam/runners/common.py", line 587, in 
> invoke_process
>     self.output_processor.process_outputs(
>   File "/tmp/beam/sdks/python/apache_beam/runners/common.py", line 1374, in 
> process_outputs
>     for result in results:
>   File 
> "/tmp/beam/sdks/python/apache_beam/runners/worker/bundle_processor.py", line 
> 1426, in process
>     initial_restriction = self.restriction_provider.initial_restriction(
>   File "/tmp/beam/sdks/python/apache_beam/io/iobase.py", line 1545, in 
> initial_restriction
>     range_tracker = element_source.get_range_tracker(None, None)
>   File "/tmp/beam/sdks/python/apache_beam/io/filebasedsource.py", line 210, 
> in get_range_tracker
>     return self._get_concat_source().get_range_tracker(
>   File "/tmp/beam/sdks/python/apache_beam/options/value_provider.py", line 
> 200, in _f
>     return fnc(self, *args, **kwargs)
>   File "/tmp/beam/sdks/python/apache_beam/io/filebasedsource.py", line 145, 
> in _get_concat_source
>     match_result = FileSystems.match([pattern])[0]
>   File "/tmp/beam/sdks/python/apache_beam/io/filesystems.py", line 209, in 
> match
>     return filesystem.match(patterns, limits)
>   File "/tmp/beam/sdks/python/apache_beam/io/filesystem.py", line 765, in 
> match
>     raise BeamIOError("Match operation failed", exceptions)
> apache_beam.io.filesystem.BeamIOError: Match operation failed with exceptions 
> {'s3://bucket_name/pipeline_output/pipe-name/ImportExampleGen/examples/7/train/*':
>  BeamIOError("List operation failed with exceptions 
> {'s3://bucket_name/pipeline_output/pipe-name/ImportExampleGen/examples/7/train/':
>  KeyError('endpoint_resolver')}")} [while running 
> 'TFXIOReadAndDecode[TransformIndex1]/RawRecordBeamSource/ReadRawRecords/ReadFromTFRecord[0]/Read/SDFBoundedSourceReader/ParDo(SDFBoundedSourceDoFn)/PairWithRestriction0']
>  with exceptions None{noformat}
> {noformat}
> Traceback (most recent call last):
>   File "/tmp/beam/sdks/python/apache_beam/runners/worker/sdk_worker.py", line 
> 289, in _execute
>     response = task()
>   File "/tmp/beam/sdks/python/apache_beam/runners/worker/sdk_worker.py", line 
> 362, in <lambda>
>     lambda: self.create_worker().do_instruction(request), request)
>   File "/tmp/beam/sdks/python/apache_beam/runners/worker/sdk_worker.py", line 
> 606, in do_instruction
>     return getattr(self, request_type)(
>   File "/tmp/beam/sdks/python/apache_beam/runners/worker/sdk_worker.py", line 
> 644, in process_bundle
>     bundle_processor.process_bundle(instruction_id))
>   File 
> "/tmp/beam/sdks/python/apache_beam/runners/worker/bundle_processor.py", line 
> 1000, in process_bundle
>     input_op_by_transform_id[element.transform_id].process_encoded(
>   File 
> "/tmp/beam/sdks/python/apache_beam/runners/worker/bundle_processor.py", line 
> 229, in process_encoded
>     self.output(decoded_value)
>   File "/tmp/beam/sdks/python/apache_beam/runners/worker/operations.py", line 
> 359, in output
>     cython.cast(Receiver, 
> self.receivers[output_index]).receive(windowed_value)
>   File "/tmp/beam/sdks/python/apache_beam/runners/worker/operations.py", line 
> 221, in receive
>     self.consumer.process(windowed_value)
>   File "/tmp/beam/sdks/python/apache_beam/runners/worker/operations.py", line 
> 838, in process
>     delayed_applications = self.dofn_runner.process_with_sized_restriction(
>   File "/tmp/beam/sdks/python/apache_beam/runners/common.py", line 1247, in 
> process_with_sized_restriction
>     return self.do_fn_invoker.invoke_process(
>   File "/tmp/beam/sdks/python/apache_beam/runners/common.py", line 748, in 
> invoke_process
>     residual = self._invoke_process_per_window(
>   File "/tmp/beam/sdks/python/apache_beam/runners/common.py", line 886, in 
> _invoke_process_per_window
>     self.output_processor.process_outputs(
>   File "/tmp/beam/sdks/python/apache_beam/runners/common.py", line 1374, in 
> process_outputs
>     for result in results:
>   File "/tmp/beam/sdks/python/apache_beam/io/tfrecordio.py", line 187, in 
> read_records
>     with self.open_file(file_name) as file_handle:
>   File "/tmp/beam/sdks/python/apache_beam/io/filebasedsource.py", line 177, 
> in open_file
>     return FileSystems.open(
>   File "/tmp/beam/sdks/python/apache_beam/io/filesystems.py", line 249, in 
> open
>     return filesystem.open(path, mime_type, compression_type)
>   File "/tmp/beam/sdks/python/apache_beam/io/aws/s3filesystem.py", line 190, 
> in open
>     return self._path_open(path, 'rb', mime_type, compression_type)
>   File "/tmp/beam/sdks/python/apache_beam/io/aws/s3filesystem.py", line 154, 
> in _path_open
>     raw_file = s3io.S3IO(options=self._options).open(
>   File "/tmp/beam/sdks/python/apache_beam/io/aws/s3io.py", line 67, in 
> __init__
>     self.client = boto3_client.Client(options=options)
>   File "/tmp/beam/sdks/python/apache_beam/io/aws/clients/s3/boto3_client.py", 
> line 60, in __init__
>     self.client = boto3.client(
>   File "/home/local/.local/lib/python3.8/site-packages/boto3/__init__.py", 
> line 93, in client
>     return _get_default_session().client(*args, **kwargs)
>   File "/home/local/.local/lib/python3.8/site-packages/boto3/session.py", 
> line 258, in client
>     return self._session.create_client(
>   File "/home/local/.local/lib/python3.8/site-packages/botocore/session.py", 
> line 827, in create_client
>     endpoint_resolver = self._get_internal_component('endpoint_resolver')
>   File "/home/local/.local/lib/python3.8/site-packages/botocore/session.py", 
> line 700, in _get_internal_component
>     return self._internal_components.get_component(name)
>   File "/home/local/.local/lib/python3.8/site-packages/botocore/session.py", 
> line 928, in get_component
>     del self._deferred[name]
> KeyError: 'endpoint_resolver'{noformat}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to