daniel-dylag-openx commented on code in PR #34945:
URL: https://github.com/apache/airflow/pull/34945#discussion_r1369137222


##########
airflow/cli/commands/celery_command.py:
##########
@@ -214,33 +187,31 @@ def worker(args):
         # executed.
         maybe_patch_concurrency(["-P", pool])
 
-    if args.daemon:
-        # Run Celery worker as daemon
-        handle = setup_logging(log_file)
-
-        with open(stdout, "a") as stdout_handle, open(stderr, "a") as 
stderr_handle:
-            if args.umask:
-                umask = args.umask
-            else:
-                umask = conf.get("celery", "worker_umask", 
fallback=settings.DAEMON_UMASK)
-
-            stdout_handle.truncate(0)
-            stderr_handle.truncate(0)
-
-            daemon_context = daemon.DaemonContext(
-                files_preserve=[handle],
-                umask=int(umask, 8),
-                stdout=stdout_handle,
-                stderr=stderr_handle,
-            )
-            with daemon_context, _serve_logs(skip_serve_logs):
-                celery_app.worker_main(options)
+    _, stdout, stderr, log_file = setup_locations(
+        process=WORKER_PROCESS_NAME,
+        stdout=args.stdout,
+        stderr=args.stderr,
+        log=args.log_file,
+    )
 
-    else:
-        # Run Celery worker in the same process
+    def run_celery_worker():
         with _serve_logs(skip_serve_logs):
             celery_app.worker_main(options)
 
+    if args.umask:
+        umask = args.umask
+    else:
+        umask = conf.get("celery", "worker_umask", 
fallback=settings.DAEMON_UMASK)
+
+    run_command_with_daemon_mode(
+        args,

Review Comment:
   Changed all parametry to keyword-only



##########
airflow/cli/commands/celery_command.py:
##########
@@ -214,33 +187,31 @@ def worker(args):
         # executed.
         maybe_patch_concurrency(["-P", pool])
 
-    if args.daemon:
-        # Run Celery worker as daemon
-        handle = setup_logging(log_file)
-
-        with open(stdout, "a") as stdout_handle, open(stderr, "a") as 
stderr_handle:
-            if args.umask:
-                umask = args.umask
-            else:
-                umask = conf.get("celery", "worker_umask", 
fallback=settings.DAEMON_UMASK)
-
-            stdout_handle.truncate(0)
-            stderr_handle.truncate(0)
-
-            daemon_context = daemon.DaemonContext(
-                files_preserve=[handle],
-                umask=int(umask, 8),
-                stdout=stdout_handle,
-                stderr=stderr_handle,
-            )
-            with daemon_context, _serve_logs(skip_serve_logs):
-                celery_app.worker_main(options)
+    _, stdout, stderr, log_file = setup_locations(
+        process=WORKER_PROCESS_NAME,
+        stdout=args.stdout,
+        stderr=args.stderr,
+        log=args.log_file,
+    )
 
-    else:
-        # Run Celery worker in the same process
+    def run_celery_worker():
         with _serve_logs(skip_serve_logs):
             celery_app.worker_main(options)
 
+    if args.umask:
+        umask = args.umask
+    else:
+        umask = conf.get("celery", "worker_umask", 
fallback=settings.DAEMON_UMASK)
+
+    run_command_with_daemon_mode(
+        args,

Review Comment:
   Changed all parametry to keyword-only



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to