amoghrajesh commented on code in PR #47644:
URL: https://github.com/apache/airflow/pull/47644#discussion_r2004820685
##########
airflow/version_compat.py:
##########
@@ -0,0 +1,36 @@
+# Licensed to the Apache Software Foundation (ASF) under one
Review Comment:
No need to have this file
##########
providers/apache/hive/src/airflow/providers/apache/hive/transfers/hive_to_mysql.py:
##########
@@ -26,7 +26,7 @@
from airflow.models import BaseOperator
from airflow.providers.apache.hive.hooks.hive import HiveServer2Hook
from airflow.providers.mysql.hooks.mysql import MySqlHook
-from airflow.utils.operator_helpers import context_to_airflow_vars
+from airflow.sdk.execution_time.context import context_to_airflow_vars
Review Comment:
Need version check here
##########
providers/apache/hive/tests/unit/apache/hive/transfers/test_hive_to_mysql.py:
##########
@@ -25,9 +25,15 @@
from airflow.providers.apache.hive.transfers.hive_to_mysql import
HiveToMySqlOperator
from airflow.utils import timezone
-from airflow.utils.operator_helpers import context_to_airflow_vars
from unit.apache.hive import MockHiveServer2Hook, MockMySqlHook,
TestHiveEnvironment
+from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS
+
+if AIRFLOW_V_3_0_PLUS:
+ from airflow.sdk.execution_time.context import context_to_airflow_vars
+else:
+ from airflow.utils.operator_helpers import context_to_airflow_vars #
type: ignore[no-redef, attr-defined]
Review Comment:
Yeah this is good
##########
task-sdk/src/airflow/sdk/execution_time/context.py:
##########
@@ -411,3 +448,70 @@ def context_update_for_unmapped(context: Context, task:
BaseOperator) -> None:
context["params"] = process_params(
context["dag"], task, context["dag_run"].conf, suppress_exception=False
)
+
+
+def context_to_airflow_vars(context: Mapping[str, Any], in_env_var_format:
bool = False) -> dict[str, str]:
+ """
+ Return values used to externally reconstruct relations between dags,
dag_runs, tasks and task_instances.
+
+ Given a context, this function provides a dictionary of values that can be
used to
+ externally reconstruct relations between dags, dag_runs, tasks and
task_instances.
+ Default to abc.def.ghi format and can be made to ABC_DEF_GHI format if
+ in_env_var_format is set to True.
+
+ :param context: The context for the task_instance of interest.
+ :param in_env_var_format: If returned vars should be in ABC_DEF_GHI format.
+ :return: task_instance context as dict.
+ """
+ from airflow import settings
Review Comment:
```suggestion
from airflow import settings
from datetime import datetime
```
##########
task-sdk/src/airflow/sdk/execution_time/context.py:
##########
@@ -18,6 +18,7 @@
import contextlib
from collections.abc import Generator, Iterator, Mapping
+from datetime import datetime
Review Comment:
```suggestion
```
##########
providers/apache/hive/src/airflow/providers/apache/hive/version_compat.py:
##########
@@ -0,0 +1,36 @@
+# Licensed to the Apache Software Foundation (ASF) under one
Review Comment:
No need to introduce a new file since we now have compat provider to help us.
File:
https://github.com/apache/airflow/blob/main/providers/common/compat/src/airflow/providers/common/compat/version_compat.py
And example usage:
https://github.com/apache/airflow/blob/857d14b34d393000d5d23f4acc9264298688c6dd/providers/yandex/src/airflow/providers/yandex/links/yq.py#L31-L38
##########
providers/apache/hive/src/airflow/providers/apache/hive/transfers/hive_to_samba.py:
##########
@@ -26,7 +26,7 @@
from airflow.models import BaseOperator
from airflow.providers.apache.hive.hooks.hive import HiveServer2Hook
from airflow.providers.samba.hooks.samba import SambaHook
-from airflow.utils.operator_helpers import context_to_airflow_vars
+from airflow.sdk.execution_time.context import context_to_airflow_vars
Review Comment:
Need version check here
##########
task-sdk/tests/task_sdk/execution_time/test_task_runner.py:
##########
@@ -916,6 +916,34 @@ def test_run_with_inlets_and_outlets(
mock_supervisor_comms.send_request.assert_any_call(msg=last_expected_msg,
log=mock.ANY)
[email protected]("airflow.sdk.execution_time.task_runner.context_to_airflow_vars")
[email protected](os.environ, {}, clear=True)
+def test_execute_task_exports_env_vars(
+ mock_context_to_airflow_vars, create_runtime_ti, mock_supervisor_comms
+):
+ """Test that _execute_task exports airflow context to environment
variables."""
+
+ def test_function():
+ return "test function"
+
+ task = PythonOperator(
+ task_id="test_task",
+ python_callable=test_function,
+ )
+
+ ti = create_runtime_ti(task=task, dag_id="dag_with_env_vars")
+ mock_supervisor_comms.get_message.return_value = OKResponse(
+ ok=True,
+ )
Review Comment:
You shoudlnt' need this.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]