This is an automated email from the ASF dual-hosted git repository.

shunping pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 258b8d1ed50 [Prism] Surface log format option and improve logging 
presentation (#36008)
258b8d1ed50 is described below

commit 258b8d1ed504ba22cb799270c969b420525dcfe3
Author: Shunping Huang <[email protected]>
AuthorDate: Fri Aug 29 21:28:34 2025 -0400

    [Prism] Surface log format option and improve logging presentation (#36008)
    
    * Surface log format to pipeline option and add log filter to process logs 
if needed.
    
    * Address reviews and change 'natural' to 'console' in option value
---
 .../python/apache_beam/options/pipeline_options.py |  7 +++
 .../runners/portability/prism_runner.py            | 53 ++++++++++++++++++++++
 2 files changed, 60 insertions(+)

diff --git a/sdks/python/apache_beam/options/pipeline_options.py 
b/sdks/python/apache_beam/options/pipeline_options.py
index d05839551a2..4ece14f25d2 100644
--- a/sdks/python/apache_beam/options/pipeline_options.py
+++ b/sdks/python/apache_beam/options/pipeline_options.py
@@ -1958,6 +1958,13 @@ class PrismRunnerOptions(PipelineOptions):
         help=(
             'Controls the log level in Prism. Values can be "debug", "info", '
             '"warn", and "error". Default log level is "info".'))
+    parser.add_argument(
+        '--prism_log_kind',
+        default="console",
+        choices=["dev", "json", "text", "console"],
+        help=(
+            'Controls the log format in Prism. Values can be "dev", "json", '
+            '"text", and "console". Default log format is "console".'))
 
 
 class TestOptions(PipelineOptions):
diff --git a/sdks/python/apache_beam/runners/portability/prism_runner.py 
b/sdks/python/apache_beam/runners/portability/prism_runner.py
index 4cda4f41625..50ecdb1252e 100644
--- a/sdks/python/apache_beam/runners/portability/prism_runner.py
+++ b/sdks/python/apache_beam/runners/portability/prism_runner.py
@@ -22,7 +22,9 @@
 # sunset it
 from __future__ import annotations
 
+import datetime
 import hashlib
+import json
 import logging
 import os
 import platform
@@ -42,6 +44,7 @@ from apache_beam.runners.portability import portable_runner
 from apache_beam.transforms import environments
 from apache_beam.utils import shared
 from apache_beam.utils import subprocess_server
+from apache_beam.utils.subprocess_server import _LOGGER as 
subprocess_server_logger
 from apache_beam.version import __version__ as beam_version
 
 # pytype: skip-file
@@ -111,11 +114,53 @@ def _rename_if_different(src, dst):
     os.rename(src, dst)
 
 
+class PrismRunnerLogFilter(logging.Filter):
+  COMMON_FIELDS = set(["level", "source", "msg", "time"])
+
+  def filter(self, record):
+    if record.funcName == 'log_stdout':
+      try:
+        # TODO: Fix this error message from prism
+        message = record.getMessage().replace(
+            '"!ERROR:time.Time year outside of range [0,9999]"', '')
+        json_record = json.loads(message)
+        record.levelno = getattr(logging, json_record["level"])
+        record.levelname = logging.getLevelName(record.levelno)
+        if "source" in json_record:
+          record.funcName = json_record["source"]["function"]
+          record.pathname = json_record["source"]["file"]
+          record.filename = os.path.basename(record.pathname)
+          record.lineno = json_record["source"]["line"]
+        record.created = datetime.datetime.fromisoformat(
+            json_record["time"]).timestamp()
+        extras = {
+            k: v
+            for k, v in json_record.items()
+            if k not in PrismRunnerLogFilter.COMMON_FIELDS
+        }
+
+        if json_record["msg"] == "log from SDK worker":
+          # TODO: Use location and time inside the nested message to set record
+          record.name = "SdkWorker" + "@" + json_record["worker"]["ID"]
+          record.msg = json_record["sdk"]["msg"]
+        else:
+          record.name = "PrismRunner"
+          record.msg = (
+              f"{json_record['msg']} "
+              f"({', '.join(f'{k}={v!r}' for k, v in extras.items())})")
+      except (json.JSONDecodeError, KeyError, ValueError):
+        # The log parsing/filtering is best-effort.
+        pass
+
+    return True  # Always return True to allow the record to pass.
+
+
 class PrismJobServer(job_server.SubprocessJobServer):
   BIN_CACHE = os.path.expanduser("~/.apache_beam/cache/prism/bin")
 
   def __init__(self, options):
     super().__init__()
+
     prism_options = options.view_as(pipeline_options.PrismRunnerOptions)
     # Options flow:
     # If the path is set, always download and unzip the provided path,
@@ -131,6 +176,12 @@ class PrismJobServer(job_server.SubprocessJobServer):
     self._job_port = job_options.job_port
 
     self._log_level = prism_options.prism_log_level
+    self._log_kind = prism_options.prism_log_kind
+
+    # override console to json with log filter enabled
+    if self._log_kind == "console":
+      self._log_kind = "json"
+      subprocess_server_logger.addFilter(PrismRunnerLogFilter())
 
   # the method is only kept for testing and backward compatibility
   @classmethod
@@ -429,6 +480,8 @@ class PrismJobServer(job_server.SubprocessJobServer):
         job_port,
         '--log_level',
         self._log_level,
+        '--log_kind',
+        self._log_kind,
         '--serve_http',
         False,
     ]

Reply via email to