This is an automated email from the ASF dual-hosted git repository.
taragolis pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new d8381ed250 Update SqlToSlackApiFileOperator with new param to check
empty output (#38079)
d8381ed250 is described below
commit d8381ed2508a6129142e5717b800bb4bad7a6a30
Author: Tianyou Gu <[email protected]>
AuthorDate: Mon Mar 18 10:49:54 2024 -0700
Update SqlToSlackApiFileOperator with new param to check empty output
(#38079)
* fix: Update SqlToSlackApiFileOperator with new param to check empty output
* fix: skip sending slack instead of raising exception
* fix: update param to allow different ways to handle an empty df
* Apply suggestions from code review
fmt: make formatting changes
Co-authored-by: Andrey Anshin <[email protected]>
---------
Co-authored-by: Andrey Anshin <[email protected]>
---
airflow/providers/slack/transfers/sql_to_slack.py | 18 +++++-
.../providers/slack/transfers/test_sql_to_slack.py | 69 +++++++++++++++++++++-
2 files changed, 85 insertions(+), 2 deletions(-)
diff --git a/airflow/providers/slack/transfers/sql_to_slack.py
b/airflow/providers/slack/transfers/sql_to_slack.py
index 0ecfc4e8ca..cd19fa9a74 100644
--- a/airflow/providers/slack/transfers/sql_to_slack.py
+++ b/airflow/providers/slack/transfers/sql_to_slack.py
@@ -23,7 +23,7 @@ from typing import TYPE_CHECKING, Any, Mapping, Sequence
from deprecated import deprecated
from typing_extensions import Literal
-from airflow.exceptions import AirflowException,
AirflowProviderDeprecationWarning
+from airflow.exceptions import AirflowException,
AirflowProviderDeprecationWarning, AirflowSkipException
from airflow.providers.slack.hooks.slack import SlackHook
from airflow.providers.slack.transfers.base_sql_to_slack import
BaseSqlToSlackOperator
from airflow.providers.slack.transfers.sql_to_slack_webhook import
SqlToSlackWebhookOperator
@@ -58,6 +58,11 @@ class SqlToSlackApiFileOperator(BaseSqlToSlackOperator):
:param slack_base_url: A string representing the Slack API base URL.
Optional
:param slack_method_version: The version of the Slack SDK Client method to
be used, either "v1" or "v2".
:param df_kwargs: Keyword arguments forwarded to
``pandas.DataFrame.to_{format}()`` method.
+ :param action_on_empty_df: Specifying how to handle an empty sql output
df. Possible values:
+
+ - ``send``: (default) send the slack with an empty file.
+ - ``skip``: skip sending the slack message. Task state set to
"skipped".
+ - ``error``: raise an error to fail the task. Task state set to
"failed".
"""
template_fields: Sequence[str] = (
@@ -87,6 +92,7 @@ class SqlToSlackApiFileOperator(BaseSqlToSlackOperator):
slack_base_url: str | None = None,
slack_method_version: Literal["v1", "v2"] = "v1",
df_kwargs: dict | None = None,
+ action_on_empty_df: Literal["send", "skip", "error"] = "send",
**kwargs,
):
super().__init__(
@@ -100,6 +106,9 @@ class SqlToSlackApiFileOperator(BaseSqlToSlackOperator):
self.slack_base_url = slack_base_url
self.slack_method_version = slack_method_version
self.df_kwargs = df_kwargs or {}
+ if not action_on_empty_df or action_on_empty_df not in ("send",
"skip", "error"):
+ raise ValueError(f"Invalid `action_on_empty_df` value
{action_on_empty_df!r}")
+ self.action_on_empty_df = action_on_empty_df
@cached_property
def slack_hook(self):
@@ -134,6 +143,13 @@ class SqlToSlackApiFileOperator(BaseSqlToSlackOperator):
output_file_name = fp.name
output_file_format = output_file_format.upper()
df_result = self._get_query_results()
+ if df_result.empty:
+ if self.action_on_empty_df == "skip":
+ raise AirflowSkipException("SQL output df is empty.
Skipping.")
+ elif self.action_on_empty_df == "error":
+ raise ValueError("SQL output df must be non-empty.
Failing.")
+ elif self.action_on_empty_df != "send":
+ raise ValueError(f"Invalid `action_on_empty_df` value
{self.action_on_empty_df!r}")
if output_file_format == "CSV":
df_result.to_csv(output_file_name, **self.df_kwargs)
elif output_file_format == "JSON":
diff --git a/tests/providers/slack/transfers/test_sql_to_slack.py
b/tests/providers/slack/transfers/test_sql_to_slack.py
index fd3c06189d..8db15cae66 100644
--- a/tests/providers/slack/transfers/test_sql_to_slack.py
+++ b/tests/providers/slack/transfers/test_sql_to_slack.py
@@ -20,7 +20,7 @@ from unittest import mock
import pytest
-from airflow.exceptions import AirflowProviderDeprecationWarning
+from airflow.exceptions import AirflowProviderDeprecationWarning,
AirflowSkipException
from airflow.providers.slack.transfers.sql_to_slack import
SqlToSlackApiFileOperator, SqlToSlackOperator
from airflow.utils import timezone
@@ -157,6 +157,73 @@ class TestSqlToSlackApiFileOperator:
with pytest.raises(ValueError):
op.execute(mock.MagicMock())
+ @mock.patch("airflow.providers.slack.transfers.sql_to_slack.SlackHook")
+
@mock.patch("airflow.providers.slack.transfers.sql_to_slack.BaseSqlToSlackOperator._get_query_results")
+ def test_null_output_sending_empty_file_by_default(self,
mock_get_query_results, mock_slack_hook_cls):
+ op_kwargs = {
+ **self.default_op_kwargs,
+ "slack_conn_id": "expected-test-slack-conn-id",
+ "slack_filename": "test_filename.csv",
+ "slack_channels": ["#random"],
+ "slack_initial_comment": "test_comment",
+ "slack_title": "test_title",
+ }
+ op = SqlToSlackApiFileOperator(task_id="test_send_file", **op_kwargs)
+
+ # Mock empty query results
+ mock_df = mock.MagicMock()
+ mock_df.configure_mock(**{"empty.return_value": True})
+ mock_get_query_results.return_value = mock_df
+
+ op.execute(mock.MagicMock)
+ mock_slack_hook_cls.assert_called_once()
+
+ @mock.patch("airflow.providers.slack.transfers.sql_to_slack.SlackHook")
+
@mock.patch("airflow.providers.slack.transfers.sql_to_slack.BaseSqlToSlackOperator._get_query_results")
+ def test_null_output_skip_sending_file(self, mock_get_query_results,
mock_slack_hook_cls):
+ op_kwargs = {
+ **self.default_op_kwargs,
+ "slack_conn_id": "expected-test-slack-conn-id",
+ "slack_filename": "test_filename.csv",
+ "slack_channels": ["#random"],
+ "slack_initial_comment": "test_comment",
+ "slack_title": "test_title",
+ "action_on_empty_df": "skip",
+ }
+ op = SqlToSlackApiFileOperator(task_id="test_send_file", **op_kwargs)
+
+ # Mock empty query results
+ mock_df = mock.MagicMock()
+ mock_df.configure_mock(**{"empty.return_value": True})
+ mock_get_query_results.return_value = mock_df
+
+ with pytest.raises(AirflowSkipException):
+ op.execute(mock.MagicMock())
+ mock_slack_hook_cls.assert_not_called()
+
+ @mock.patch("airflow.providers.slack.transfers.sql_to_slack.SlackHook")
+
@mock.patch("airflow.providers.slack.transfers.sql_to_slack.BaseSqlToSlackOperator._get_query_results")
+ def test_null_output_raise_error(self, mock_get_query_results,
mock_slack_hook_cls):
+ op_kwargs = {
+ **self.default_op_kwargs,
+ "slack_conn_id": "expected-test-slack-conn-id",
+ "slack_filename": "test_filename.csv",
+ "slack_channels": ["#random"],
+ "slack_initial_comment": "test_comment",
+ "slack_title": "test_title",
+ "action_on_empty_df": "error",
+ }
+ op = SqlToSlackApiFileOperator(task_id="test_send_file", **op_kwargs)
+
+ # Mock empty query results
+ mock_df = mock.MagicMock()
+ mock_df.configure_mock(**{"empty.return_value": True})
+ mock_get_query_results.return_value = mock_df
+
+ with pytest.raises(ValueError, match="output df must be non-empty\.
Failing"):
+ op.execute(mock.MagicMock())
+ mock_slack_hook_cls.assert_not_called()
+
def test_deprecated_sql_to_slack_operator():
warning_pattern = "SqlToSlackOperator` has been renamed and moved"