[
https://issues.apache.org/jira/browse/BEAM-11807?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Anonymous updated BEAM-11807:
-----------------------------
Status: Triage Needed (was: Resolved)
> SDK Worker with multithreading causes boto3 the KeyError(endpoint_resolver)
> ---------------------------------------------------------------------------
>
> Key: BEAM-11807
> URL: https://issues.apache.org/jira/browse/BEAM-11807
> Project: Beam
> Issue Type: Bug
> Components: io-py-aws, sdk-py-harness
> Affects Versions: 2.27.0, 2.28.0
> Reporter: ferryvg
> Priority: P2
> Labels: AWS, S3, multi-threading
> Fix For: 2.29.0
>
> Time Spent: 1h 50m
> Remaining Estimate: 0h
>
> [https://github.com/boto/botocore/issues/1776]
> {noformat}
> Traceback (most recent call last):
> File "/tmp/beam/sdks/python/apache_beam/runners/worker/sdk_worker.py", line
> 289, in _execute
> response = task()
> File "/tmp/beam/sdks/python/apache_beam/runners/worker/sdk_worker.py", line
> 362, in <lambda>
> lambda: self.create_worker().do_instruction(request), request)
> File "/tmp/beam/sdks/python/apache_beam/runners/worker/sdk_worker.py", line
> 606, in do_instruction
> return getattr(self, request_type)(
> File "/tmp/beam/sdks/python/apache_beam/runners/worker/sdk_worker.py", line
> 644, in process_bundle
> bundle_processor.process_bundle(instruction_id))
> File
> "/tmp/beam/sdks/python/apache_beam/runners/worker/bundle_processor.py", line
> 1000, in process_bundle
> input_op_by_transform_id[element.transform_id].process_encoded(
> File
> "/tmp/beam/sdks/python/apache_beam/runners/worker/bundle_processor.py", line
> 229, in process_encoded
> self.output(decoded_value)
> File "/tmp/beam/sdks/python/apache_beam/runners/worker/operations.py", line
> 359, in output
> cython.cast(Receiver,
> self.receivers[output_index]).receive(windowed_value)
> File "/tmp/beam/sdks/python/apache_beam/runners/worker/operations.py", line
> 221, in receive
> self.consumer.process(windowed_value)
> File "/tmp/beam/sdks/python/apache_beam/runners/worker/operations.py", line
> 719, in process
> delayed_application = self.dofn_runner.process(o)
> File "/tmp/beam/sdks/python/apache_beam/runners/common.py", line 1241, in
> process
> self._reraise_augmented(exn)
> File "/tmp/beam/sdks/python/apache_beam/runners/common.py", line 1239, in
> process
> return self.do_fn_invoker.invoke_process(windowed_value)
> File "/tmp/beam/sdks/python/apache_beam/runners/common.py", line 587, in
> invoke_process
> self.output_processor.process_outputs(
> File "/tmp/beam/sdks/python/apache_beam/runners/common.py", line 1401, in
> process_outputs
> self.main_receivers.receive(windowed_value)
> File "/tmp/beam/sdks/python/apache_beam/runners/worker/operations.py", line
> 221, in receive
> self.consumer.process(windowed_value)
> File "/tmp/beam/sdks/python/apache_beam/runners/worker/operations.py", line
> 719, in process
> delayed_application = self.dofn_runner.process(o)
> File "/tmp/beam/sdks/python/apache_beam/runners/common.py", line 1241, in
> process
> self._reraise_augmented(exn)
> File "/tmp/beam/sdks/python/apache_beam/runners/common.py", line 1321, in
> _reraise_augmented
> raise_with_traceback(new_exn)
> File
> "/home/local/.local/lib/python3.8/site-packages/future/utils/__init__.py",
> line 446, in raise_with_traceback
> raise exc.with_traceback(traceback)
> File "/tmp/beam/sdks/python/apache_beam/runners/common.py", line 1239, in
> process
> return self.do_fn_invoker.invoke_process(windowed_value)
> File "/tmp/beam/sdks/python/apache_beam/runners/common.py", line 587, in
> invoke_process
> self.output_processor.process_outputs(
> File "/tmp/beam/sdks/python/apache_beam/runners/common.py", line 1374, in
> process_outputs
> for result in results:
> File
> "/tmp/beam/sdks/python/apache_beam/runners/worker/bundle_processor.py", line
> 1426, in process
> initial_restriction = self.restriction_provider.initial_restriction(
> File "/tmp/beam/sdks/python/apache_beam/io/iobase.py", line 1545, in
> initial_restriction
> range_tracker = element_source.get_range_tracker(None, None)
> File "/tmp/beam/sdks/python/apache_beam/io/filebasedsource.py", line 210,
> in get_range_tracker
> return self._get_concat_source().get_range_tracker(
> File "/tmp/beam/sdks/python/apache_beam/options/value_provider.py", line
> 200, in _f
> return fnc(self, *args, **kwargs)
> File "/tmp/beam/sdks/python/apache_beam/io/filebasedsource.py", line 145,
> in _get_concat_source
> match_result = FileSystems.match([pattern])[0]
> File "/tmp/beam/sdks/python/apache_beam/io/filesystems.py", line 209, in
> match
> return filesystem.match(patterns, limits)
> File "/tmp/beam/sdks/python/apache_beam/io/filesystem.py", line 765, in
> match
> raise BeamIOError("Match operation failed", exceptions)
> apache_beam.io.filesystem.BeamIOError: Match operation failed with exceptions
> {'s3://bucket_name/pipeline_output/pipe-name/ImportExampleGen/examples/7/train/*':
> BeamIOError("List operation failed with exceptions
> {'s3://bucket_name/pipeline_output/pipe-name/ImportExampleGen/examples/7/train/':
> KeyError('endpoint_resolver')}")} [while running
> 'TFXIOReadAndDecode[TransformIndex1]/RawRecordBeamSource/ReadRawRecords/ReadFromTFRecord[0]/Read/SDFBoundedSourceReader/ParDo(SDFBoundedSourceDoFn)/PairWithRestriction0']
> with exceptions None{noformat}
> {noformat}
> Traceback (most recent call last):
> File "/tmp/beam/sdks/python/apache_beam/runners/worker/sdk_worker.py", line
> 289, in _execute
> response = task()
> File "/tmp/beam/sdks/python/apache_beam/runners/worker/sdk_worker.py", line
> 362, in <lambda>
> lambda: self.create_worker().do_instruction(request), request)
> File "/tmp/beam/sdks/python/apache_beam/runners/worker/sdk_worker.py", line
> 606, in do_instruction
> return getattr(self, request_type)(
> File "/tmp/beam/sdks/python/apache_beam/runners/worker/sdk_worker.py", line
> 644, in process_bundle
> bundle_processor.process_bundle(instruction_id))
> File
> "/tmp/beam/sdks/python/apache_beam/runners/worker/bundle_processor.py", line
> 1000, in process_bundle
> input_op_by_transform_id[element.transform_id].process_encoded(
> File
> "/tmp/beam/sdks/python/apache_beam/runners/worker/bundle_processor.py", line
> 229, in process_encoded
> self.output(decoded_value)
> File "/tmp/beam/sdks/python/apache_beam/runners/worker/operations.py", line
> 359, in output
> cython.cast(Receiver,
> self.receivers[output_index]).receive(windowed_value)
> File "/tmp/beam/sdks/python/apache_beam/runners/worker/operations.py", line
> 221, in receive
> self.consumer.process(windowed_value)
> File "/tmp/beam/sdks/python/apache_beam/runners/worker/operations.py", line
> 838, in process
> delayed_applications = self.dofn_runner.process_with_sized_restriction(
> File "/tmp/beam/sdks/python/apache_beam/runners/common.py", line 1247, in
> process_with_sized_restriction
> return self.do_fn_invoker.invoke_process(
> File "/tmp/beam/sdks/python/apache_beam/runners/common.py", line 748, in
> invoke_process
> residual = self._invoke_process_per_window(
> File "/tmp/beam/sdks/python/apache_beam/runners/common.py", line 886, in
> _invoke_process_per_window
> self.output_processor.process_outputs(
> File "/tmp/beam/sdks/python/apache_beam/runners/common.py", line 1374, in
> process_outputs
> for result in results:
> File "/tmp/beam/sdks/python/apache_beam/io/tfrecordio.py", line 187, in
> read_records
> with self.open_file(file_name) as file_handle:
> File "/tmp/beam/sdks/python/apache_beam/io/filebasedsource.py", line 177,
> in open_file
> return FileSystems.open(
> File "/tmp/beam/sdks/python/apache_beam/io/filesystems.py", line 249, in
> open
> return filesystem.open(path, mime_type, compression_type)
> File "/tmp/beam/sdks/python/apache_beam/io/aws/s3filesystem.py", line 190,
> in open
> return self._path_open(path, 'rb', mime_type, compression_type)
> File "/tmp/beam/sdks/python/apache_beam/io/aws/s3filesystem.py", line 154,
> in _path_open
> raw_file = s3io.S3IO(options=self._options).open(
> File "/tmp/beam/sdks/python/apache_beam/io/aws/s3io.py", line 67, in
> __init__
> self.client = boto3_client.Client(options=options)
> File "/tmp/beam/sdks/python/apache_beam/io/aws/clients/s3/boto3_client.py",
> line 60, in __init__
> self.client = boto3.client(
> File "/home/local/.local/lib/python3.8/site-packages/boto3/__init__.py",
> line 93, in client
> return _get_default_session().client(*args, **kwargs)
> File "/home/local/.local/lib/python3.8/site-packages/boto3/session.py",
> line 258, in client
> return self._session.create_client(
> File "/home/local/.local/lib/python3.8/site-packages/botocore/session.py",
> line 827, in create_client
> endpoint_resolver = self._get_internal_component('endpoint_resolver')
> File "/home/local/.local/lib/python3.8/site-packages/botocore/session.py",
> line 700, in _get_internal_component
> return self._internal_components.get_component(name)
> File "/home/local/.local/lib/python3.8/site-packages/botocore/session.py",
> line 928, in get_component
> del self._deferred[name]
> KeyError: 'endpoint_resolver'{noformat}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)