uranusjr commented on a change in pull request #16110:
URL: https://github.com/apache/airflow/pull/16110#discussion_r649381846



##########
File path: airflow/utils/process_utils.py
##########
@@ -79,7 +86,7 @@ def signal_procs(sig):
             else:
                 raise
 
-    if pgid == os.getpgid(0):
+    if IS_WINDOWS and pgid == os.getpid() or not IS_WINDOWS and pgid == 
os.getpgid(0):

Review comment:
       I think this needs some parentheses? At least for readability.

##########
File path: setup.cfg
##########
@@ -156,6 +156,9 @@ install_requires =
     typing-extensions>=3.7.4;python_version<"3.8"
     unicodecsv>=0.14.1
     werkzeug~=1.0, >=1.0.1
+    # needed for generating virtual python environments when running tasks
+    virtualenv>=20.4.3
+    psycopg2>=2.8.6

Review comment:
       Why is psycopg needed?

##########
File path: airflow/utils/process_utils.py
##########
@@ -155,11 +162,13 @@ def execute_interactive(cmd: List[str], **kwargs):
     """
     log.info("Executing cmd: %s", " ".join(shlex.quote(c) for c in cmd))
 
-    old_tty = termios.tcgetattr(sys.stdin)
-    tty.setraw(sys.stdin.fileno())
+    if not IS_WINDOWS:
+        old_tty = termios.tcgetattr(sys.stdin)
+        tty.setraw(sys.stdin.fileno())
+
+        # open pseudo-terminal to interact with subprocess
+        master_fd, slave_fd = pty.openpty()

Review comment:
       I don’t think the function works after the patch (at least `slave_fd` 
would become undefined). Since this function is only used to run database 
commands, maybe we should mark this function as POSIX-only, and implement a 
separate function for Windows. (Would it be enough to simply brige 
stdin/out/err to the subprocess?)

##########
File path: airflow/utils/timeout.py
##########
@@ -31,20 +34,34 @@ def __init__(self, seconds=1, error_message='Timeout'):
         self.seconds = seconds
         self.error_message = error_message + ', PID: ' + str(os.getpid())
 
-    def handle_timeout(self, signum, frame):  # pylint: disable=unused-argument
+    def handle_timeout(self, *args):  # pylint: disable=unused-argument
         """Logs information and raises AirflowTaskTimeout."""
         self.log.error("Process timed out, PID: %s", str(os.getpid()))
         raise AirflowTaskTimeout(self.error_message)
 
     def __enter__(self):
         try:
-            signal.signal(signal.SIGALRM, self.handle_timeout)
-            signal.setitimer(signal.ITIMER_REAL, self.seconds)
+            if IS_WINDOWS:
+                if hasattr(self, TIMER_THREAD_ATTR) and getattr(self, 
TIMER_THREAD_ATTR) is not None:
+                    getattr(self, TIMER_THREAD_ATTR).cancel()
+                timer = Timer(self.seconds, self.handle_timeout)
+                setattr(self, TIMER_THREAD_ATTR, timer)
+                timer.start()

Review comment:
       The exception message below (about hte current context) doesn’t apply to 
the thread-based implementation, so we should move the `if IS_WINDOWS:` block 
out of `try:`.

##########
File path: airflow/utils/timeout.py
##########
@@ -31,20 +34,34 @@ def __init__(self, seconds=1, error_message='Timeout'):
         self.seconds = seconds
         self.error_message = error_message + ', PID: ' + str(os.getpid())
 
-    def handle_timeout(self, signum, frame):  # pylint: disable=unused-argument
+    def handle_timeout(self, *args):  # pylint: disable=unused-argument
         """Logs information and raises AirflowTaskTimeout."""
         self.log.error("Process timed out, PID: %s", str(os.getpid()))
         raise AirflowTaskTimeout(self.error_message)
 
     def __enter__(self):
         try:
-            signal.signal(signal.SIGALRM, self.handle_timeout)
-            signal.setitimer(signal.ITIMER_REAL, self.seconds)
+            if IS_WINDOWS:
+                if hasattr(self, TIMER_THREAD_ATTR) and getattr(self, 
TIMER_THREAD_ATTR) is not None:
+                    getattr(self, TIMER_THREAD_ATTR).cancel()
+                timer = Timer(self.seconds, self.handle_timeout)
+                setattr(self, TIMER_THREAD_ATTR, timer)
+                timer.start()

Review comment:
       BTW why do we need to use `has|get|set_attr` here? Why not put them on 
the instance instead, they would just be unused on POSIX. Or we can do 
something like
   
   ```python
   _timeout = ContextManager[None]
   
   class _timeout_windows(_timeout):
      ...  # Implementation for Windows.
   
   class _timeout_posix(_timeout):
      ...  # Implementation for POSIX.
   
   if IS_WINDOWS:
       timeout: Type[_timeout] = _timeout_windows
   else:
       timeout = _timeout_posix
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to