kaxil commented on code in PR #64183:
URL: https://github.com/apache/airflow/pull/64183#discussion_r2983831951


##########
providers/common/sql/src/airflow/providers/common/sql/datafusion/engine.py:
##########
@@ -101,11 +101,22 @@ def _register_data_source_format(self, datasource_config: 
DataSourceConfig):
             datasource_config.table_name,
         )
 
-    def execute_query(self, query: str) -> dict[str, list[Any]]:
+    def execute_query(self, query: str, max_rows: int | None = None) -> 
dict[str, list[Any]]:
         """Execute a query and return the result as a dictionary."""
         try:
             self.log.info("Executing query: %s", query)
             df = self.session_context.sql(query)
+
+            if max_rows is not None:
+                row_count = df.count()
+                if row_count > max_rows:
+                    self.log.warning(
+                        "Query returned %s rows, exceeding max_rows (%s). 
Returning first %s rows.",
+                        row_count,
+                        max_rows,
+                        max_rows,
+                    )
+                df = df.limit(max_rows)
             return df.to_pydict()
         except Exception as e:
             raise QueryExecutionException(f"Error while executing query: {e}")

Review Comment:
   `df.count()` forces a full query execution to count rows, then 
`df.limit(max_rows)` re-executes the query with a limit. For expensive queries 
over large object stores (the exact use case for this operator), this doubles 
execution cost.
   
   Instead, you could execute once with `limit(max_rows + 1)`, then check if 
you got more than `max_rows` rows:
   
   ```python
   if max_rows is not None:
       result = df.limit(max_rows + 1).to_pydict()
       if result:
           actual_rows = len(next(iter(result.values())))
           if actual_rows > max_rows:
               self.log.warning(
                   "Query returned more than %s rows. Returning first %s rows.",
                   max_rows,
                   max_rows,
               )
               result = {k: v[:max_rows] for k, v in result.items()}
       return result
   ```
   
   The trade-off is you lose the exact total count in the warning message, but 
"more than N" is sufficient.
   
   Also, `df.limit(max_rows)` on line 119 runs unconditionally when `max_rows` 
is set, even when the count is already within the limit. It should be inside 
the `if row_count > max_rows:` block if you keep the current approach.



##########
providers/common/sql/src/airflow/providers/common/sql/operators/analytics.py:
##########
@@ -38,7 +38,7 @@ class AnalyticsOperator(BaseOperator):
 
     :param datasource_configs: List of datasource configurations to register.
     :param queries: List of SQL queries to execute.
-    :param max_rows_check: Maximum number of rows allowed in query results. 
Queries exceeding this will be skipped.
+    :param max_rows_check: Maximum number of rows returned in query results.

Review Comment:
   Nit: this docstring says "Maximum number of rows returned in query results" 
but doesn't mention the truncation/warning behavior that the RST docs describe. 
Would be nice to keep them consistent, e.g.: "Maximum number of rows returned 
per query. Queries exceeding this return only the first N rows with a warning."



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to