liferoad opened a new pull request, #36388:
URL: https://github.com/apache/beam/pull/36388
Prevent unnecessary connection attempts by returning early when receiving
empty requests
```
apache_beam/ml/rag/enrichment/milvus_search_it_test.py:577:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
_ _
target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/apache_beam/pipeline.py:670:
in __exit__
self.result = self.run()
^^^^^^^^^^
target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/apache_beam/testing/test_pipeline.py:118:
in run
result = super().run(
target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/apache_beam/pipeline.py:620:
in run
self._options).run(False)
^^^^^^^^^^
target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/apache_beam/pipeline.py:644:
in run
return self.runner.run_pipeline(self, self._options)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/apache_beam/runners/direct/direct_runner.py:245:
in run_pipeline
return runner.run_pipeline(pipeline, options)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py:196:
in run_pipeline
self._latest_run_result = self.run_via_runner_api(
target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py:223:
in run_via_runner_api
return self.run_stages(stage_context, stages)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py:470:
in run_stages
bundle_results = self._execute_bundle(
target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py:795:
in _execute_bundle
self._run_bundle(
target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py:1034:
in _run_bundle
result, splits = bundle_manager.process_bundle(
target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py:1360:
in process_bundle
result_future =
self._worker_handler.control_conn.push(process_bundle_req)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/apache_beam/runners/portability/fn_api_runner/worker_handlers.py:389:
in push
response = self.worker.do_instruction(request)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/apache_beam/runners/worker/sdk_worker.py:662:
in do_instruction
return getattr(self, request_type)(
target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/apache_beam/runners/worker/sdk_worker.py:693:
in process_bundle
bundle_processor = self.bundle_processor_cache.get(
target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/apache_beam/runners/worker/sdk_worker.py:514:
in get
processor = bundle_processor.BundleProcessor(
target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/apache_beam/runners/worker/bundle_processor.py:1135:
in __init__
op.setup(self.data_sampler)
apache_beam/runners/worker/operations.py:875: in
apache_beam.runners.worker.operations.DoOperation.setup
with self.scoped_start_state:
apache_beam/runners/worker/operations.py:925: in
apache_beam.runners.worker.operations.DoOperation.setup
self.dofn_runner.setup()
apache_beam/runners/common.py:1568: in
apache_beam.runners.common.DoFnRunner.setup
self._invoke_lifecycle_method(self.do_fn_invoker.invoke_setup)
apache_beam/runners/common.py:1564: in
apache_beam.runners.common.DoFnRunner._invoke_lifecycle_method
self._reraise_augmented(exn)
apache_beam/runners/common.py:1609: in
apache_beam.runners.common.DoFnRunner._reraise_augmented
raise new_exn
apache_beam/runners/common.py:1562: in
apache_beam.runners.common.DoFnRunner._invoke_lifecycle_method
lifecycle_method()
apache_beam/runners/common.py:602: in
apache_beam.runners.common.DoFnInvoker.invoke_setup
self.signature.setup_lifecycle_method.method_value()
target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/apache_beam/io/requestresponse.py:340:
in setup
self._caller.__enter__()
apache_beam/ml/rag/enrichment/milvus_search.py:411: in __enter__
self._client = MilvusClient(**connection_params)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/pymilvus/milvus_client/milvus_client.py:65:
in __init__
self._using = create_connection(
target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/pymilvus/milvus_client/_utils.py:49:
in create_connection
raise ex from ex
target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/pymilvus/milvus_client/_utils.py:44:
in create_connection
connections.connect(
target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/pymilvus/orm/connections.py:472:
in connect
connect_milvus(**kwargs, user=user, password=password, token=token,
db_name=db_name)
target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/pymilvus/orm/connections.py:426:
in connect_milvus
raise e from e
target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/pymilvus/orm/connections.py:418:
in connect_milvus
gh._wait_for_channel_ready(timeout=timeout)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
_ _
self = <pymilvus.client.grpc_handler.GrpcHandler object at 0x7f48ab4447d0>
timeout = 10.0
def _wait_for_channel_ready(self, timeout: Union[float] = 10):
if self._channel is None:
raise MilvusException(
code=Status.CONNECT_FAILED,
message="No channel in handler, please setup grpc channel
first",
)
try:
grpc.channel_ready_future(self._channel).result(timeout=timeout)
self._setup_identifier_interceptor(self._user, timeout=timeout)
except grpc.FutureTimeoutError as e:
self.close()
> raise MilvusException(
code=Status.CONNECT_FAILED,
message=f"Fail connecting to server on {self._address},
illegal connection params or server unavailable",
) from e
E RuntimeError: pymilvus.exceptions.MilvusException:
<MilvusException: (code=2, message=Fail connecting to server on
localhost:46805, illegal connection params or server unavailable)> [while
running 'Enrichment/Enrichment-RRIO/_Call/ParDo(_CallDoFn)']
target/.tox-py313-ml/py313-ml/lib/python3.13/site-packages/pymilvus/client/grpc_handler.py:162:
RuntimeError
```
------------------------
Thank you for your contribution! Follow this checklist to help us
incorporate your contribution quickly and easily:
- [ ] Mention the appropriate issue in your description (for example:
`addresses #123`), if applicable. This will automatically add a link to the
pull request in the issue. If you would like the issue to automatically close
on merging the pull request, comment `fixes #<ISSUE NUMBER>` instead.
- [ ] Update `CHANGES.md` with noteworthy changes.
- [ ] If this contribution is large, please file an Apache [Individual
Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
See the [Contributor Guide](https://beam.apache.org/contribute) for more
tips on [how to make review process
smoother](https://github.com/apache/beam/blob/master/CONTRIBUTING.md#make-the-reviewers-job-easier).
To check the build health, please visit
[https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md](https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md)
GitHub Actions Tests Status (on master branch)
------------------------------------------------------------------------------------------------
[](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule)
[](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule)
[](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule)
[](https://github.com/apache/beam/actions?query=workflow%3A%22Go+tests%22+branch%3Amaster+event%3Aschedule)
See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more
information about GitHub Actions CI or the [workflows
README](https://github.com/apache/beam/blob/master/.github/workflows/README.md)
to see a list of phrases to trigger workflows.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]