This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 0dd171f09aa Implement fetchmany support for ElasticsearchSQLCursor 
using an internal row buffer. (#66658)
0dd171f09aa is described below

commit 0dd171f09aac43e6c5b3ffd0f8487551c9e1048c
Author: SameerMesiah97 <[email protected]>
AuthorDate: Tue May 12 23:01:05 2026 +0100

    Implement fetchmany support for ElasticsearchSQLCursor using an internal 
row buffer. (#66658)
    
    Refactor cursor pagination semantics to progressively consume rows across 
SQL cursor pages.
    
    Add unit tests covering fetchmany batching, row exhaustion, default fetch 
size, and paginated fetchall behavior.
    
    Co-authored-by: Sameer Mesiah <[email protected]>
---
 .../providers/elasticsearch/hooks/elasticsearch.py | 58 +++++++++++---
 .../unit/elasticsearch/hooks/test_elasticsearch.py | 92 ++++++++++++++++++++--
 2 files changed, 131 insertions(+), 19 deletions(-)

diff --git 
a/providers/elasticsearch/src/airflow/providers/elasticsearch/hooks/elasticsearch.py
 
b/providers/elasticsearch/src/airflow/providers/elasticsearch/hooks/elasticsearch.py
index e8f0cfad00c..4330740fd96 100644
--- 
a/providers/elasticsearch/src/airflow/providers/elasticsearch/hooks/elasticsearch.py
+++ 
b/providers/elasticsearch/src/airflow/providers/elasticsearch/hooks/elasticsearch.py
@@ -17,6 +17,7 @@
 # under the License.
 from __future__ import annotations
 
+from collections import deque
 from collections.abc import Iterable, Mapping
 from copy import deepcopy
 from functools import cached_property
@@ -56,6 +57,10 @@ class ElasticsearchSQLCursor:
         }
         self._response: ObjectApiResponse | None = None
 
+        # Internal mutable row buffer used to progressively consume
+        # paginated Elasticsearch SQL cursor results.
+        self._rows: deque[list[Any]] = deque()
+
     @property
     def response(self) -> ObjectApiResponse:
         return self._response or {}  # type: ignore
@@ -70,11 +75,11 @@ class ElasticsearchSQLCursor:
 
     @property
     def rows(self):
-        return self.response.get("rows", [])
+        return self._rows
 
     @property
     def rowcount(self) -> int:
-        return len(self.rows)
+        return len(self.response.get("rows", []))
 
     @property
     def description(self) -> list[tuple]:
@@ -83,26 +88,57 @@ class ElasticsearchSQLCursor:
     def execute(
         self, statement: str, params: Iterable | Mapping[str, Any] | None = 
None
     ) -> ObjectApiResponse:
-        self.body["query"] = statement
-        if params:
-            self.body["params"] = params
-        self.response = self.es.sql.query(**self.body)
+
+        if self.body.get("cursor"):
+            self.response = self.es.sql.query(cursor=self.body["cursor"])
+        else:
+            self.body["query"] = statement
+
+            if params:
+                self.body["params"] = params
+
+            self.response = self.es.sql.query(**self.body)
+
+        self._rows = deque(self.response.get("rows", []))
+
         if self.cursor:
             self.body["cursor"] = self.cursor
         else:
             self.body.pop("cursor", None)
+
         return self.response
 
     def fetchone(self):
-        if self.rows:
-            return self.rows[0]
-        return None
+        while True:
+            if self._rows:
+                return self._rows.popleft()
+
+            if not self.cursor:
+                return None
+
+            self.execute(statement=self.body["query"])
 
     def fetchmany(self, size: int | None = None):
-        raise NotImplementedError()
+        size = size or self.body["fetch_size"]
+
+        results: list[list[Any]] = []
+
+        while len(results) < size:
+            while self._rows and len(results) < size:
+                results.append(self._rows.popleft())
+
+            if len(results) >= size:
+                break
+
+            if not self.cursor:
+                break
+
+            self.execute(statement=self.body["query"])
+
+        return results
 
     def fetchall(self):
-        results = self.rows
+        results = list(self.rows)
         while self.cursor:
             self.execute(statement=self.body["query"])
             results.extend(self.rows)
diff --git 
a/providers/elasticsearch/tests/unit/elasticsearch/hooks/test_elasticsearch.py 
b/providers/elasticsearch/tests/unit/elasticsearch/hooks/test_elasticsearch.py
index ed9056ae350..c770cda5c96 100644
--- 
a/providers/elasticsearch/tests/unit/elasticsearch/hooks/test_elasticsearch.py
+++ 
b/providers/elasticsearch/tests/unit/elasticsearch/hooks/test_elasticsearch.py
@@ -34,13 +34,30 @@ from airflow.providers.elasticsearch.hooks.elasticsearch 
import (
     ESConnection,
 )
 
-ROWS = [
+ROWS_PAGE_1 = [
     [1, "Stallone", "Sylvester", "78"],
     [2, "Statham", "Jason", "57"],
+]
+
+ROWS_PAGE_2 = [
     [3, "Li", "Jet", "61"],
     [4, "Lundgren", "Dolph", "66"],
     [5, "Norris", "Chuck", "84"],
 ]
+
+ROWS = ROWS_PAGE_1 + ROWS_PAGE_2
+
+RESPONSE = {
+    "columns": [
+        {"name": "index", "type": "long"},
+        {"name": "name", "type": "text"},
+        {"name": "firstname", "type": "text"},
+        {"name": "age", "type": "long"},
+    ],
+    "rows": ROWS_PAGE_1,
+    "cursor": "e7f8QwXUruW2mIebzudH4BwAA//8DAA==",
+}
+
 RESPONSE_WITHOUT_CURSOR = {
     "columns": [
         {"name": "index", "type": "long"},
@@ -48,9 +65,9 @@ RESPONSE_WITHOUT_CURSOR = {
         {"name": "firstname", "type": "text"},
         {"name": "age", "type": "long"},
     ],
-    "rows": ROWS,
+    "rows": ROWS_PAGE_2,
 }
-RESPONSE = {**RESPONSE_WITHOUT_CURSOR, **{"cursor": 
"e7f8QwXUruW2mIebzudH4BwAA//8DAA=="}}
+
 RESPONSES = [
     RESPONSE,
     RESPONSE_WITHOUT_CURSOR,
@@ -90,7 +107,7 @@ class TestElasticsearchSQLCursor:
         cursor = ElasticsearchSQLCursor(es=self.es, options={})
         cursor.execute("SELECT * FROM hollywood.actors")
 
-        assert cursor.rowcount == len(ROWS)
+        assert cursor.rowcount == len(ROWS_PAGE_1)
 
     def test_description(self):
         cursor = ElasticsearchSQLCursor(es=self.es, options={})
@@ -109,12 +126,59 @@ class TestElasticsearchSQLCursor:
 
         assert cursor.fetchone() == ROWS[0]
 
-    def test_fetchmany(self):
+    @pytest.mark.parametrize(
+        ("size", "expected"),
+        [
+            (1, ROWS[:1]),
+            (2, ROWS[:2]),
+            (5, ROWS[:5]),
+        ],
+    )
+    def test_fetchmany(self, size, expected):
         cursor = ElasticsearchSQLCursor(es=self.es, options={})
         cursor.execute("SELECT * FROM hollywood.actors")
 
-        with pytest.raises(NotImplementedError):
-            cursor.fetchmany()
+        records = cursor.fetchmany(size)
+
+        assert records == expected
+
+    def test_fetchmany_consumes_rows(self):
+        cursor = ElasticsearchSQLCursor(es=self.es, options={})
+        cursor.execute("SELECT * FROM hollywood.actors")
+
+        first_batch = cursor.fetchmany(2)
+        second_batch = cursor.fetchmany(2)
+
+        assert first_batch == ROWS[:2]
+        assert second_batch == ROWS[2:4]
+
+    def test_fetchmany_exhausts_rows(self):
+        cursor = ElasticsearchSQLCursor(es=self.es, options={})
+        cursor.execute("SELECT * FROM hollywood.actors")
+
+        records = cursor.fetchmany(100)
+
+        assert records == ROWS
+
+        # Further calls should return empty list.
+        assert cursor.fetchmany(10) == []
+
+    def test_fetchmany_uses_default_fetch_size(self):
+        cursor = ElasticsearchSQLCursor(es=self.es, fetch_size=2)
+        cursor.execute("SELECT * FROM hollywood.actors")
+
+        records = cursor.fetchmany()
+
+        assert records == ROWS[:2]
+
+    def test_fetchmany_single_page(self):
+        self.es.sql.query.side_effect = None
+        self.es.sql.query.return_value = RESPONSE_WITHOUT_CURSOR
+        cursor = ElasticsearchSQLCursor(es=self.es, options={})
+        cursor.execute("SELECT * FROM hollywood.actors")
+
+        assert cursor.fetchmany(100) == ROWS_PAGE_2
+        assert cursor.fetchmany(10) == []
 
     def test_fetchall(self):
         cursor = ElasticsearchSQLCursor(es=self.es, options={})
@@ -122,9 +186,21 @@ class TestElasticsearchSQLCursor:
 
         records = cursor.fetchall()
 
-        assert len(records) == 10
+        assert len(records) == 5
         assert records == ROWS
 
+    def test_fetchall_after_partial_fetchmany(self):
+        cursor = ElasticsearchSQLCursor(es=self.es, options={})
+        cursor.execute("SELECT * FROM hollywood.actors")
+
+        first_batch = cursor.fetchmany(2)
+
+        assert first_batch == ROWS[:2]
+
+        remaining_records = cursor.fetchall()
+
+        assert remaining_records == ROWS[2:]
+
 
 class TestElasticsearchSQLHook:
     def setup_method(self):

Reply via email to