amoghrajesh commented on code in PR #46265:
URL: https://github.com/apache/airflow/pull/46265#discussion_r1935053679
##########
airflow/executors/base_executor.py:
##########
@@ -332,12 +351,12 @@ def trigger_tasks(self, open_slots: int) -> None:
:param open_slots: Number of open slots
"""
- span = Trace.get_current_span()
Review Comment:
Do we not want to have the tracer anymore?
##########
airflow/jobs/scheduler_job_runner.py:
##########
@@ -849,6 +848,9 @@ def process_executor_events(
continue
ti.task = task
if task.on_retry_callback or task.on_failure_callback:
+ # Only log the error/extra info here, since the
`ti.handle_failure()` path will log it
+ # too, which would lead to double logging
+ cls.logger().error(msg)
Review Comment:
Nice!
##########
providers/celery/src/airflow/providers/celery/executors/celery_executor_utils.py:
##########
@@ -125,21 +130,54 @@ def on_celery_import_modules(*args, **kwargs):
import kubernetes.client # noqa: F401
[email protected]
-def execute_command(command_to_exec: CommandType) -> None:
- """Execute command."""
- dag_id, task_id =
BaseExecutor.validate_airflow_tasks_run_command(command_to_exec)
+# Once Celery5 is out of beta, we can pass `pydantic=True` to the decorator
and it will handle the validation
+# and deserialization for us
[email protected](name="execute_workload")
+def execute_workload(input: str) -> None:
+ from pydantic import TypeAdapter
+
+ from airflow.configuration import conf
+ from airflow.executors import workloads
+ from airflow.sdk.execution_time.supervisor import supervise
+
+ decoder = TypeAdapter(workloads.All)
+ workload = decoder.validate_json(input)
+
celery_task_id = app.current_task.request.id
- log.info("[%s] Executing command in Celery: %s", celery_task_id,
command_to_exec)
- with _airflow_parsing_context_manager(dag_id=dag_id, task_id=task_id):
- try:
- if settings.EXECUTE_TASKS_NEW_PYTHON_INTERPRETER:
- _execute_in_subprocess(command_to_exec, celery_task_id)
- else:
- _execute_in_fork(command_to_exec, celery_task_id)
- except Exception:
- Stats.incr("celery.execute_command.failure")
- raise
+
+ if not isinstance(workload, workloads.ExecuteTask):
+ raise ValueError(f"CeleryExecutor does not now how to handle
{type(workload)}")
Review Comment:
We can do this before trying to decode right?
##########
providers/celery/src/airflow/providers/celery/executors/celery_executor.py:
##########
@@ -228,6 +231,11 @@ class CeleryExecutor(BaseExecutor):
supports_ad_hoc_ti_run: bool = True
supports_sentry: bool = True
+ if TYPE_CHECKING and AIRFLOW_V_3_0_PLUS:
+ # In the v3 path, we store workloads, not commands as strings.
+ # TODO: TaskSDK: move this type change into BaseExecutor
Review Comment:
```suggestion
# TODO: TaskSDK: move this type change into BaseExecutor once all
executors are ported
```
##########
providers/celery/src/airflow/providers/celery/executors/celery_executor.py:
##########
@@ -256,10 +264,25 @@ def _num_tasks_per_send_process(self, to_send_count: int)
-> int:
return max(1, math.ceil(to_send_count / self._sync_parallelism))
def _process_tasks(self, task_tuples: list[TaskTuple]) -> None:
+ # Airflow V2 version
from airflow.providers.celery.executors.celery_executor_utils import
execute_command
task_tuples_to_send = [task_tuple[:3] + (execute_command,) for
task_tuple in task_tuples]
- first_task = next(t[3] for t in task_tuples_to_send)
+
+ self._send_tasks(task_tuples_to_send)
+
+ def _process_workloads(self, workloads: list[workloads.All]) -> None:
+ # Airflow V3 version
+ from airflow.providers.celery.executors.celery_executor_utils import
execute_workload
+
+ tasks = [
+ (workload.ti.key, (workload.model_dump_json(),),
workload.ti.queue, execute_workload)
+ for workload in workloads
+ ]
+ self._send_tasks(tasks)
+
+ def _send_tasks(self, task_tuples_to_send):
Review Comment:
Can we add typing?
Its probably: `list[TaskTuple])`
##########
providers/celery/src/airflow/providers/celery/executors/celery_executor.py:
##########
@@ -256,10 +264,25 @@ def _num_tasks_per_send_process(self, to_send_count: int)
-> int:
return max(1, math.ceil(to_send_count / self._sync_parallelism))
def _process_tasks(self, task_tuples: list[TaskTuple]) -> None:
+ # Airflow V2 version
from airflow.providers.celery.executors.celery_executor_utils import
execute_command
task_tuples_to_send = [task_tuple[:3] + (execute_command,) for
task_tuple in task_tuples]
- first_task = next(t[3] for t in task_tuples_to_send)
+
+ self._send_tasks(task_tuples_to_send)
+
+ def _process_workloads(self, workloads: list[workloads.All]) -> None:
+ # Airflow V3 version
+ from airflow.providers.celery.executors.celery_executor_utils import
execute_workload
+
+ tasks = [
+ (workload.ti.key, (workload.model_dump_json(),),
workload.ti.queue, execute_workload)
+ for workload in workloads
+ ]
+ self._send_tasks(tasks)
+
+ def _send_tasks(self, task_tuples_to_send):
+ first_task = next(t[-1] for t in task_tuples_to_send)
Review Comment:
Simple change but much cleaner!
##########
task_sdk/src/airflow/sdk/log.py:
##########
@@ -197,9 +193,19 @@ def logging_processors(
exc_group_processor = None
def json_dumps(msg, default):
+ # Note: this is likely an "expensive" step, but lets massage the
dict order for nice
+ # viewing of the raw JSON logs.
+ # Maybe we don't need this once the UI renders the JSON instead of
displaying the raw text
+ msg = {
+ "timestamp": msg.pop("timestamp"),
+ "level": msg.pop("level"),
+ "event": msg.pop("event"),
+ **msg,
+ }
Review Comment:
This is super useful, I have been meaning to get to it, as it was just
harder to see the "event" normally. +1 for this change
##########
providers/celery/src/airflow/providers/celery/executors/celery_executor.py:
##########
@@ -416,6 +439,10 @@ def try_adopt_task_instances(self, tis:
Sequence[TaskInstance]) -> Sequence[Task
for celery_task_id, (state, info) in states_by_celery_task_id.items():
result, ti = celery_tasks[celery_task_id]
result.backend = cached_celery_backend
+ if isinstance(result.result, BaseException):
+ e = result.result
+ # Log the exception we got from the remote end
+ self.log.warning("Task %s tailed with error", ti.key,
exc_info=e)
Review Comment:
```suggestion
self.log.warning("Task %s failed with error", ti.key,
exc_info=e)
```
##########
providers/celery/src/airflow/providers/celery/executors/celery_executor_utils.py:
##########
@@ -125,21 +130,54 @@ def on_celery_import_modules(*args, **kwargs):
import kubernetes.client # noqa: F401
[email protected]
-def execute_command(command_to_exec: CommandType) -> None:
- """Execute command."""
- dag_id, task_id =
BaseExecutor.validate_airflow_tasks_run_command(command_to_exec)
+# Once Celery5 is out of beta, we can pass `pydantic=True` to the decorator
and it will handle the validation
+# and deserialization for us
[email protected](name="execute_workload")
+def execute_workload(input: str) -> None:
+ from pydantic import TypeAdapter
+
+ from airflow.configuration import conf
+ from airflow.executors import workloads
+ from airflow.sdk.execution_time.supervisor import supervise
+
+ decoder = TypeAdapter(workloads.All)
+ workload = decoder.validate_json(input)
Review Comment:
Should this rather be in a try block?
```
try:
workload = decoder.validate_json(input)
```
##########
providers/celery/src/airflow/providers/celery/executors/celery_executor_utils.py:
##########
@@ -125,21 +130,54 @@ def on_celery_import_modules(*args, **kwargs):
import kubernetes.client # noqa: F401
[email protected]
-def execute_command(command_to_exec: CommandType) -> None:
- """Execute command."""
- dag_id, task_id =
BaseExecutor.validate_airflow_tasks_run_command(command_to_exec)
+# Once Celery5 is out of beta, we can pass `pydantic=True` to the decorator
and it will handle the validation
Review Comment:
Lets add a TODO?
##########
providers/celery/src/airflow/providers/celery/executors/celery_executor_utils.py:
##########
@@ -125,21 +126,54 @@ def on_celery_import_modules(*args, **kwargs):
import kubernetes.client # noqa: F401
[email protected]
-def execute_command(command_to_exec: CommandType) -> None:
- """Execute command."""
- dag_id, task_id =
BaseExecutor.validate_airflow_tasks_run_command(command_to_exec)
+# Once Celery5 is out of beta, we can pass `pydantic=True` to the decorator
and it will handle the validation
+# and deserialization for us
[email protected](name="execute_workload")
Review Comment:
Cool, damn nice!
##########
providers/celery/src/airflow/providers/celery/executors/default_celery.py:
##########
@@ -126,9 +135,7 @@ def _get_celery_ssl_active() -> bool:
DEFAULT_CELERY_CONFIG["broker_use_ssl"] = broker_use_ssl
except AirflowConfigException:
raise AirflowException(
- "AirflowConfigException: SSL_ACTIVE is True, "
- "please ensure SSL_KEY, "
- "SSL_CERT and SSL_CACERT are set"
+ "AirflowConfigException: SSL_ACTIVE is True, please ensure SSL_KEY,
SSL_CERT and SSL_CACERT are set"
Review Comment:
Haha!
##########
task_sdk/src/airflow/sdk/execution_time/supervisor.py:
##########
@@ -352,7 +341,17 @@ def start(
del logger
# Run the child entrypoint
- _fork_main(child_stdin, child_stdout, child_stderr,
child_logs.fileno(), target)
+ try:
+ _fork_main(child_stdin, child_stdout, child_stderr,
child_logs.fileno(), target)
+ except BaseException as e:
+ try:
+ print("Exception in _fork_main, exiting with code 124", e,
file=sys.stderr)
Review Comment:
Lets log instead?
##########
providers/celery/tests/provider_tests/celery/executors/test_celery_executor.py:
##########
@@ -147,43 +147,6 @@ def test_gauge_executor_metrics(self, mock_stats_gauge,
mock_trigger_tasks, mock
]
mock_stats_gauge.assert_has_calls(calls)
- @pytest.mark.parametrize(
- "command, raise_exception",
- [
- pytest.param(["true"], True, id="wrong-command"),
- pytest.param(["airflow", "tasks"], True, id="incomplete-command"),
- pytest.param(["airflow", "tasks", "run"], False,
id="complete-command"),
- ],
- )
- def test_command_validation(self, command, raise_exception):
Review Comment:
+1 on what Jed has to say, we will still continue to run the compat tests
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]