dstandish commented on code in PR #27514:
URL: https://github.com/apache/airflow/pull/27514#discussion_r1015222828
##########
airflow/providers/common/sql/hooks/sql.py:
##########
@@ -264,6 +266,7 @@ def run(
results = []
for sql_statement in sql:
self._run_command(cur, sql_statement, parameters)
+ self._update_query_ids(cur)
Review Comment:
so, there's a weakness to this approach. the query_id is only added after
the query runs. so on_kill will never actually kill a query that isn't already
complete. or at least that's how it appears.
unless it submits asynchronously, all on_kill will do is kill queries that
have already completed.
##########
airflow/providers/common/sql/operators/sql.py:
##########
@@ -221,6 +220,16 @@ def execute(self, context):
return output
+ def on_kill(self) -> None:
+ for query_id in self._hook.query_ids.copy():
Review Comment:
do we actually need to copy?
##########
airflow/providers/common/sql/operators/sql.py:
##########
@@ -221,6 +220,16 @@ def execute(self, context):
return output
+ def on_kill(self) -> None:
+ for query_id in self._hook.query_ids.copy():
+ self.log.info("Stopping query: %s", self._hook.query_ids)
+ try:
+ self._hook.kill(query_id)
+ except NotImplementedError:
+ self.log.info("Method '.kill()' is not implemented for ",
self._hook.__class__.__name__)
+ except Exception as e:
+ self.log.info("The query '%s' can not be killed due to %s",
self._hook.query_ids, str(e))
Review Comment:
should be query_id:
```suggestion
self.log.info("The query '%s' can not be killed due to %s",
query_id, str(e))
```
but also, i think you don't need to wrap with str:
```suggestion
self.log.info("The query '%s' can not be killed due to %s",
query_id, e)
```
but also, why not just log the exception?
```suggestion
self.log.exception("The query '%s' could not be killed.",
query_id)
```
It would include the error and traceback. That seems reasonable no?
##########
airflow/providers/common/sql/hooks/sql.py:
##########
@@ -294,6 +297,22 @@ def _run_command(self, cur, sql_statement, parameters):
if cur.rowcount >= 0:
self.log.info("Rows affected: %s", cur.rowcount)
+ def _update_query_ids(self, cursor) -> None:
Review Comment:
is this meant to be part of the "public" interface or not? if so, why is it
"protected"
##########
airflow/providers/common/sql/hooks/sql.py:
##########
@@ -294,6 +297,22 @@ def _run_command(self, cur, sql_statement, parameters):
if cur.rowcount >= 0:
self.log.info("Rows affected: %s", cur.rowcount)
+ def _update_query_ids(self, cursor) -> None:
+ """
+ Adds query ids to list
+ :param cur: current cursor after run
+ :return:
+ """
+ return None
+
+ def kill(self, query_id) -> Any:
Review Comment:
maybe `cancel_query` or... at least `kill_query` would be better
##########
airflow/providers/common/sql/hooks/sql.py:
##########
@@ -114,6 +114,7 @@ def __init__(self, *args, schema: str | None = None,
log_sql: bool = True, **kwa
# Hook deriving from the DBApiHook to still have access to the field
in it's constructor
self.__schema = schema
self.log_sql = log_sql
+ self.query_ids: list[str] = []
Review Comment:
should this be `running_query_ids`?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]