Copilot commented on code in PR #64372:
URL: https://github.com/apache/airflow/pull/64372#discussion_r3025325912


##########
providers/elasticsearch/src/airflow/providers/elasticsearch/log/es_task_handler.py:
##########
@@ -739,25 +739,22 @@ def _es_read(self, log_id: str, offset: int | str, ti: 
RuntimeTI) -> ElasticSear
 
         index_patterns = self._get_index_patterns(ti)
         try:
-            max_log_line = self.client.count(index=index_patterns, 
query=query)["count"]
+            res = self.client.search(
+                index=index_patterns,
+                query=query,
+                sort=[self.offset_field],
+                size=self.MAX_LINE_PER_PAGE,
+                from_=self.MAX_LINE_PER_PAGE * self.PAGE,
+            )
         except NotFoundError as e:
             self.log.exception("The target index pattern %s does not exist", 
index_patterns)
             raise e
+        except Exception as err:
+            self.log.exception("Could not read log with log_id: %s. Exception: 
%s", log_id, err)
+            return None
 

Review Comment:
   The code always constructs `ElasticSearchResponse(self, res)` even when the 
search result is empty. If `ElasticSearchResponse` expects a fully-shaped ES 
response (e.g., `_shards`, `took`, etc.) or non-empty hit structures, this can 
raise before you get to `response.hits`. Consider short-circuiting on the raw 
response first (e.g., `res.get('hits', {}).get('hits')`) and only constructing 
`ElasticSearchResponse` when there are hits, or make `ElasticSearchResponse` 
robust to minimal/empty responses.
   ```suggestion
   
           # Short-circuit on raw response hits to avoid constructing 
ElasticSearchResponse
           # for empty or minimally-shaped responses.
           hits = res.get("hits", {}).get("hits") or []
           if not hits:
               return None
   ```



##########
providers/elasticsearch/tests/unit/elasticsearch/log/test_es_task_handler.py:
##########
@@ -666,19 +665,18 @@ def test_es_read_builds_expected_query(self, ti):
         assert response is not None
         assert response.hits[0].event == "hello"
 
-    def test_es_read_returns_none_when_count_is_zero(self, ti):
+    def test_es_read_returns_none_when_search_returns_empty(self, ti):
         self.elasticsearch_io.client = Mock()
-        self.elasticsearch_io.client.count.return_value = {"count": 0}
+        self.elasticsearch_io.client.search.return_value = {"hits": {"total": 
{"value": 0}, "hits": []}}

Review Comment:
   This test uses a minimal hand-rolled ES response that may drift from what 
`ElasticSearchResponse` expects (and from what other tests use). To keep the 
test resilient and aligned with the expected schema, consider using the 
existing `_build_es_search_response()` helper with no sources to produce an 
empty-but-fully-shaped response.
   ```suggestion
           self.elasticsearch_io.client.search.return_value = 
_build_es_search_response()
   ```



##########
providers/elasticsearch/src/airflow/providers/elasticsearch/log/es_task_handler.py:
##########
@@ -739,25 +739,22 @@ def _es_read(self, log_id: str, offset: int | str, ti: 
RuntimeTI) -> ElasticSear
 
         index_patterns = self._get_index_patterns(ti)
         try:
-            max_log_line = self.client.count(index=index_patterns, 
query=query)["count"]
+            res = self.client.search(
+                index=index_patterns,
+                query=query,
+                sort=[self.offset_field],
+                size=self.MAX_LINE_PER_PAGE,
+                from_=self.MAX_LINE_PER_PAGE * self.PAGE,
+            )
         except NotFoundError as e:
             self.log.exception("The target index pattern %s does not exist", 
index_patterns)
             raise e
+        except Exception as err:
+            self.log.exception("Could not read log with log_id: %s. Exception: 
%s", log_id, err)

Review Comment:
   `self.log.exception(...)` already includes the exception details/traceback; 
interpolating `err` into the message is redundant and can produce noisy logs. 
Consider removing the explicit `Exception: %s` portion and keep `log_id` as 
structured context in the message.
   ```suggestion
           except Exception:
               self.log.exception("Could not read log with log_id: %s", log_id)
   ```



##########
providers/elasticsearch/src/airflow/providers/elasticsearch/log/es_task_handler.py:
##########
@@ -739,25 +739,22 @@ def _es_read(self, log_id: str, offset: int | str, ti: 
RuntimeTI) -> ElasticSear
 
         index_patterns = self._get_index_patterns(ti)
         try:
-            max_log_line = self.client.count(index=index_patterns, 
query=query)["count"]
+            res = self.client.search(
+                index=index_patterns,
+                query=query,
+                sort=[self.offset_field],
+                size=self.MAX_LINE_PER_PAGE,
+                from_=self.MAX_LINE_PER_PAGE * self.PAGE,
+            )

Review Comment:
   The code always constructs `ElasticSearchResponse(self, res)` even when the 
search result is empty. If `ElasticSearchResponse` expects a fully-shaped ES 
response (e.g., `_shards`, `took`, etc.) or non-empty hit structures, this can 
raise before you get to `response.hits`. Consider short-circuiting on the raw 
response first (e.g., `res.get('hits', {}).get('hits')`) and only constructing 
`ElasticSearchResponse` when there are hits, or make `ElasticSearchResponse` 
robust to minimal/empty responses.



##########
providers/elasticsearch/src/airflow/providers/elasticsearch/log/es_task_handler.py:
##########
@@ -739,25 +739,22 @@ def _es_read(self, log_id: str, offset: int | str, ti: 
RuntimeTI) -> ElasticSear
 
         index_patterns = self._get_index_patterns(ti)
         try:
-            max_log_line = self.client.count(index=index_patterns, 
query=query)["count"]
+            res = self.client.search(
+                index=index_patterns,
+                query=query,
+                sort=[self.offset_field],
+                size=self.MAX_LINE_PER_PAGE,
+                from_=self.MAX_LINE_PER_PAGE * self.PAGE,
+            )
         except NotFoundError as e:
             self.log.exception("The target index pattern %s does not exist", 
index_patterns)
             raise e

Review Comment:
   Use a bare `raise` instead of `raise e` to preserve the original traceback. 
`raise e` resets the traceback context and makes debugging harder.
   ```suggestion
               raise
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to