potiuk commented on code in PR #27912:
URL: https://github.com/apache/airflow/pull/27912#discussion_r1032471088
##########
airflow/providers/common/sql/operators/sql.py:
##########
@@ -225,54 +230,37 @@ def __init__(
self.split_statements = split_statements
self.return_last = return_last
- @overload
- def _process_output(
- self, results: Any, description: Sequence[Sequence] | None,
scalar_results: Literal[True]
- ) -> Any:
- pass
-
- @overload
- def _process_output(
- self, results: list[Any], description: Sequence[Sequence] | None,
scalar_results: Literal[False]
- ) -> Any:
- pass
-
- def _process_output(
- self, results: Any | list[Any], description: Sequence[Sequence] |
None, scalar_results: bool
- ) -> Any:
+ def _process_output(self, results: list[Any], descriptions:
list[Sequence[Sequence] | None]) -> list[Any]:
"""
- Can be overridden by the subclass in case some extra processing is
needed.
+ Processes output before it is returned by the operator.
+
+ It can be overridden by the subclass in case some extra processing is
needed.
The "process_output" method can override the returned output -
augmenting or processing the
output as needed - the output returned will be returned as execute
return value and if
- do_xcom_push is set to True, it will be set as XCom returned
+ do_xcom_push is set to True, it will be set as XCom returned.
:param results: results in the form of list of rows.
- :param description: as returned by ``cur.description`` in the Python
DBAPI
- :param scalar_results: True if result is single scalar value rather
than list of rows
+ :param descriptions: list of descriptions returned by
``cur.description`` in the Python DBAPI
"""
return results
def execute(self, context):
self.log.info("Executing: %s", self.sql)
hook = self.get_db_hook()
- if self.do_xcom_push:
- output = hook.run(
- sql=self.sql,
- autocommit=self.autocommit,
- parameters=self.parameters,
- handler=self.handler,
- split_statements=self.split_statements,
- return_last=self.return_last,
- )
- else:
- output = hook.run(
- sql=self.sql,
- autocommit=self.autocommit,
- parameters=self.parameters,
- split_statements=self.split_statements,
- )
-
- return self._process_output(output, hook.last_description,
hook.scalar_return_last)
+ output = hook.run(
+ sql=self.sql,
+ autocommit=self.autocommit,
+ parameters=self.parameters,
+ handler=self.handler if self.do_xcom_push else None,
+ split_statements=self.split_statements,
+ return_last=self.return_last,
+ )
+ if has_scalar_return_value(self.sql, self.return_last,
self.split_statements):
+ # For simplicity, we pass always list as input to _process_output,
regardless if
+ # scalar is going to be returned, and we return the first element
of the list in this case
+ # from the list returned by _process_output
+ return self._process_output([output], hook.descriptions)[0]
Review Comment:
Because in this case it will be always one-element array. But yeah. I can
change it back to -1. Both are a little confusing though.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]