This is an automated email from the ASF dual-hosted git repository.
shunping pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 258b8d1ed50 [Prism] Surface log format option and improve logging
presentation (#36008)
258b8d1ed50 is described below
commit 258b8d1ed504ba22cb799270c969b420525dcfe3
Author: Shunping Huang <[email protected]>
AuthorDate: Fri Aug 29 21:28:34 2025 -0400
[Prism] Surface log format option and improve logging presentation (#36008)
* Surface log format to pipeline option and add log filter to process logs
if needed.
* Address reviews and change 'natural' to 'console' in option value
---
.../python/apache_beam/options/pipeline_options.py | 7 +++
.../runners/portability/prism_runner.py | 53 ++++++++++++++++++++++
2 files changed, 60 insertions(+)
diff --git a/sdks/python/apache_beam/options/pipeline_options.py
b/sdks/python/apache_beam/options/pipeline_options.py
index d05839551a2..4ece14f25d2 100644
--- a/sdks/python/apache_beam/options/pipeline_options.py
+++ b/sdks/python/apache_beam/options/pipeline_options.py
@@ -1958,6 +1958,13 @@ class PrismRunnerOptions(PipelineOptions):
help=(
'Controls the log level in Prism. Values can be "debug", "info", '
'"warn", and "error". Default log level is "info".'))
+ parser.add_argument(
+ '--prism_log_kind',
+ default="console",
+ choices=["dev", "json", "text", "console"],
+ help=(
+ 'Controls the log format in Prism. Values can be "dev", "json", '
+ '"text", and "console". Default log format is "console".'))
class TestOptions(PipelineOptions):
diff --git a/sdks/python/apache_beam/runners/portability/prism_runner.py
b/sdks/python/apache_beam/runners/portability/prism_runner.py
index 4cda4f41625..50ecdb1252e 100644
--- a/sdks/python/apache_beam/runners/portability/prism_runner.py
+++ b/sdks/python/apache_beam/runners/portability/prism_runner.py
@@ -22,7 +22,9 @@
# sunset it
from __future__ import annotations
+import datetime
import hashlib
+import json
import logging
import os
import platform
@@ -42,6 +44,7 @@ from apache_beam.runners.portability import portable_runner
from apache_beam.transforms import environments
from apache_beam.utils import shared
from apache_beam.utils import subprocess_server
+from apache_beam.utils.subprocess_server import _LOGGER as
subprocess_server_logger
from apache_beam.version import __version__ as beam_version
# pytype: skip-file
@@ -111,11 +114,53 @@ def _rename_if_different(src, dst):
os.rename(src, dst)
+class PrismRunnerLogFilter(logging.Filter):
+ COMMON_FIELDS = set(["level", "source", "msg", "time"])
+
+ def filter(self, record):
+ if record.funcName == 'log_stdout':
+ try:
+ # TODO: Fix this error message from prism
+ message = record.getMessage().replace(
+ '"!ERROR:time.Time year outside of range [0,9999]"', '')
+ json_record = json.loads(message)
+ record.levelno = getattr(logging, json_record["level"])
+ record.levelname = logging.getLevelName(record.levelno)
+ if "source" in json_record:
+ record.funcName = json_record["source"]["function"]
+ record.pathname = json_record["source"]["file"]
+ record.filename = os.path.basename(record.pathname)
+ record.lineno = json_record["source"]["line"]
+ record.created = datetime.datetime.fromisoformat(
+ json_record["time"]).timestamp()
+ extras = {
+ k: v
+ for k, v in json_record.items()
+ if k not in PrismRunnerLogFilter.COMMON_FIELDS
+ }
+
+ if json_record["msg"] == "log from SDK worker":
+ # TODO: Use location and time inside the nested message to set record
+ record.name = "SdkWorker" + "@" + json_record["worker"]["ID"]
+ record.msg = json_record["sdk"]["msg"]
+ else:
+ record.name = "PrismRunner"
+ record.msg = (
+ f"{json_record['msg']} "
+ f"({', '.join(f'{k}={v!r}' for k, v in extras.items())})")
+ except (json.JSONDecodeError, KeyError, ValueError):
+ # The log parsing/filtering is best-effort.
+ pass
+
+ return True # Always return True to allow the record to pass.
+
+
class PrismJobServer(job_server.SubprocessJobServer):
BIN_CACHE = os.path.expanduser("~/.apache_beam/cache/prism/bin")
def __init__(self, options):
super().__init__()
+
prism_options = options.view_as(pipeline_options.PrismRunnerOptions)
# Options flow:
# If the path is set, always download and unzip the provided path,
@@ -131,6 +176,12 @@ class PrismJobServer(job_server.SubprocessJobServer):
self._job_port = job_options.job_port
self._log_level = prism_options.prism_log_level
+ self._log_kind = prism_options.prism_log_kind
+
+ # override console to json with log filter enabled
+ if self._log_kind == "console":
+ self._log_kind = "json"
+ subprocess_server_logger.addFilter(PrismRunnerLogFilter())
# the method is only kept for testing and backward compatibility
@classmethod
@@ -429,6 +480,8 @@ class PrismJobServer(job_server.SubprocessJobServer):
job_port,
'--log_level',
self._log_level,
+ '--log_kind',
+ self._log_kind,
'--serve_http',
False,
]