Copilot commented on code in PR #64372:
URL: https://github.com/apache/airflow/pull/64372#discussion_r3025325912
##########
providers/elasticsearch/src/airflow/providers/elasticsearch/log/es_task_handler.py:
##########
@@ -739,25 +739,22 @@ def _es_read(self, log_id: str, offset: int | str, ti:
RuntimeTI) -> ElasticSear
index_patterns = self._get_index_patterns(ti)
try:
- max_log_line = self.client.count(index=index_patterns,
query=query)["count"]
+ res = self.client.search(
+ index=index_patterns,
+ query=query,
+ sort=[self.offset_field],
+ size=self.MAX_LINE_PER_PAGE,
+ from_=self.MAX_LINE_PER_PAGE * self.PAGE,
+ )
except NotFoundError as e:
self.log.exception("The target index pattern %s does not exist",
index_patterns)
raise e
+ except Exception as err:
+ self.log.exception("Could not read log with log_id: %s. Exception:
%s", log_id, err)
+ return None
Review Comment:
The code always constructs `ElasticSearchResponse(self, res)` even when the
search result is empty. If `ElasticSearchResponse` expects a fully-shaped ES
response (e.g., `_shards`, `took`, etc.) or non-empty hit structures, this can
raise before you get to `response.hits`. Consider short-circuiting on the raw
response first (e.g., `res.get('hits', {}).get('hits')`) and only constructing
`ElasticSearchResponse` when there are hits, or make `ElasticSearchResponse`
robust to minimal/empty responses.
```suggestion
# Short-circuit on raw response hits to avoid constructing
ElasticSearchResponse
# for empty or minimally-shaped responses.
hits = res.get("hits", {}).get("hits") or []
if not hits:
return None
```
##########
providers/elasticsearch/tests/unit/elasticsearch/log/test_es_task_handler.py:
##########
@@ -666,19 +665,18 @@ def test_es_read_builds_expected_query(self, ti):
assert response is not None
assert response.hits[0].event == "hello"
- def test_es_read_returns_none_when_count_is_zero(self, ti):
+ def test_es_read_returns_none_when_search_returns_empty(self, ti):
self.elasticsearch_io.client = Mock()
- self.elasticsearch_io.client.count.return_value = {"count": 0}
+ self.elasticsearch_io.client.search.return_value = {"hits": {"total":
{"value": 0}, "hits": []}}
Review Comment:
This test uses a minimal hand-rolled ES response that may drift from what
`ElasticSearchResponse` expects (and from what other tests use). To keep the
test resilient and aligned with the expected schema, consider using the
existing `_build_es_search_response()` helper with no sources to produce an
empty-but-fully-shaped response.
```suggestion
self.elasticsearch_io.client.search.return_value =
_build_es_search_response()
```
##########
providers/elasticsearch/src/airflow/providers/elasticsearch/log/es_task_handler.py:
##########
@@ -739,25 +739,22 @@ def _es_read(self, log_id: str, offset: int | str, ti:
RuntimeTI) -> ElasticSear
index_patterns = self._get_index_patterns(ti)
try:
- max_log_line = self.client.count(index=index_patterns,
query=query)["count"]
+ res = self.client.search(
+ index=index_patterns,
+ query=query,
+ sort=[self.offset_field],
+ size=self.MAX_LINE_PER_PAGE,
+ from_=self.MAX_LINE_PER_PAGE * self.PAGE,
+ )
except NotFoundError as e:
self.log.exception("The target index pattern %s does not exist",
index_patterns)
raise e
+ except Exception as err:
+ self.log.exception("Could not read log with log_id: %s. Exception:
%s", log_id, err)
Review Comment:
`self.log.exception(...)` already includes the exception details/traceback;
interpolating `err` into the message is redundant and can produce noisy logs.
Consider removing the explicit `Exception: %s` portion and keep `log_id` as
structured context in the message.
```suggestion
except Exception:
self.log.exception("Could not read log with log_id: %s", log_id)
```
##########
providers/elasticsearch/src/airflow/providers/elasticsearch/log/es_task_handler.py:
##########
@@ -739,25 +739,22 @@ def _es_read(self, log_id: str, offset: int | str, ti:
RuntimeTI) -> ElasticSear
index_patterns = self._get_index_patterns(ti)
try:
- max_log_line = self.client.count(index=index_patterns,
query=query)["count"]
+ res = self.client.search(
+ index=index_patterns,
+ query=query,
+ sort=[self.offset_field],
+ size=self.MAX_LINE_PER_PAGE,
+ from_=self.MAX_LINE_PER_PAGE * self.PAGE,
+ )
Review Comment:
The code always constructs `ElasticSearchResponse(self, res)` even when the
search result is empty. If `ElasticSearchResponse` expects a fully-shaped ES
response (e.g., `_shards`, `took`, etc.) or non-empty hit structures, this can
raise before you get to `response.hits`. Consider short-circuiting on the raw
response first (e.g., `res.get('hits', {}).get('hits')`) and only constructing
`ElasticSearchResponse` when there are hits, or make `ElasticSearchResponse`
robust to minimal/empty responses.
##########
providers/elasticsearch/src/airflow/providers/elasticsearch/log/es_task_handler.py:
##########
@@ -739,25 +739,22 @@ def _es_read(self, log_id: str, offset: int | str, ti:
RuntimeTI) -> ElasticSear
index_patterns = self._get_index_patterns(ti)
try:
- max_log_line = self.client.count(index=index_patterns,
query=query)["count"]
+ res = self.client.search(
+ index=index_patterns,
+ query=query,
+ sort=[self.offset_field],
+ size=self.MAX_LINE_PER_PAGE,
+ from_=self.MAX_LINE_PER_PAGE * self.PAGE,
+ )
except NotFoundError as e:
self.log.exception("The target index pattern %s does not exist",
index_patterns)
raise e
Review Comment:
Use a bare `raise` instead of `raise e` to preserve the original traceback.
`raise e` resets the traceback context and makes debugging harder.
```suggestion
raise
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]