SameerMesiah97 commented on code in PR #60778:
URL: https://github.com/apache/airflow/pull/60778#discussion_r2709838466


##########
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/kubernetes_helper_functions.py:
##########
@@ -210,3 +212,15 @@ def annotations_for_logging_task_metadata(annotation_set):
     else:
         annotations_for_logging = "<omitted>"
     return annotations_for_logging
+
+
+def serializable_callback(f):
+    """Convert async callback so it can run in sync or async mode."""
+
+    @wraps(f)
+    def wrapper(*args, mode: str, **kwargs):
+        if mode == ExecutionMode.ASYNC:
+            return f(*args, mode=mode, **kwargs)
+        return asyncio.run(f(*args, mode=mode, **kwargs))
+

Review Comment:
   Are you confident that `def serialiable_callback` will not be called from 
within a running event loop? Since this is a helper we cannot guarantee that it 
will not be invoked in a context without a pre-existing event loop and this 
could result ina `RuntimeError`. I believe it would be better to not 
special-case ASYNC mode and let the wrapper return the function as is 
regardless of `ExecutionMode` without invoking `asyncio.run()`. Unless you have 
a strong reason to do it this way. 



##########
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py:
##########
@@ -1080,31 +1085,57 @@ async def fetch_container_logs_before_current_sec(
             since_seconds=(math.ceil((now - since_time).total_seconds()) if 
since_time else None),
         )
         message_to_log = None
-        try:
-            now_seconds = now.replace(microsecond=0)
-            for line in logs:
-                line_timestamp, message = parse_log_line(line)
-                # Skip log lines from the current second to prevent duplicate 
entries on the next read.
-                # The API only allows specifying 'since_seconds', not an exact 
timestamp.
-                if line_timestamp and line_timestamp.replace(microsecond=0) == 
now_seconds:
-                    break
-                if line_timestamp:  # detect new log line
-                    if message_to_log is None:  # first line in the log
-                        message_to_log = message
-                    else:  # previous log line is complete
-                        if message_to_log is not None:
-                            if is_log_group_marker(message_to_log):
-                                print(message_to_log)
-                            else:
-                                self.log.info("[%s] %s", container_name, 
message_to_log)
-                        message_to_log = message
-                elif message_to_log:  # continuation of the previous log line
-                    message_to_log = f"{message_to_log}\n{message}"
-        finally:
-            # log the last line and update the last_captured_timestamp
-            if message_to_log is not None:
-                if is_log_group_marker(message_to_log):
-                    print(message_to_log)
-                else:
-                    self.log.info("[%s] %s", container_name, message_to_log)
+        async with self._hook.get_conn() as connection:
+            v1_api = async_k8s.CoreV1Api(connection)
+            try:
+                now_seconds = now.replace(microsecond=0)
+                for line in logs:
+                    line_timestamp, message = parse_log_line(line)
+                    # Skip log lines from the current second to prevent 
duplicate entries on the next read.
+                    # The API only allows specifying 'since_seconds', not an 
exact timestamp.
+                    if line_timestamp and 
line_timestamp.replace(microsecond=0) == now_seconds:
+                        break
+                    if line_timestamp:  # detect new log line
+                        if message_to_log is None:  # first line in the log
+                            message_to_log = message
+                        else:  # previous log line is complete
+                            if message_to_log is not None:
+                                if is_log_group_marker(message_to_log):
+                                    print(message_to_log)
+                                else:
+                                    for callback in self._callbacks:
+                                        cb = callback.progress_callback(
+                                            line=message_to_log,
+                                            client=v1_api,
+                                            mode=ExecutionMode.ASYNC,
+                                            container_name=container_name,
+                                            timestamp=line_timestamp,
+                                            pod=pod,
+                                        )
+                                        if asyncio.iscoroutine(cb):
+                                            await cb

Review Comment:
   Should we really be executing arbitrary user code inside the triggerer? Even 
though this is async, a long-running or blocking progress callback (for example 
calling an external API without a timeout) can still starve the triggerer’s 
event loop. That at least blocks the trigger executing it, and potentially 
other triggers handled by the same triggerer process. This feels like a fairly 
big design trade-off just to support progress logging, and I’m not sure it’s 
worth it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to