shunping commented on code in PR #36008:
URL: https://github.com/apache/beam/pull/36008#discussion_r2310948883


##########
sdks/python/apache_beam/runners/portability/prism_runner.py:
##########
@@ -111,11 +114,55 @@
     os.rename(src, dst)
 
 
+class PrismRunnerLogFilter(logging.Filter):
+  COMMON_FIELDS = set(["level", "source", "msg", "time"])
+
+  def filter(self, record):
+    if record.funcName == 'log_stdout':
+      try:
+        # TODO: Fix this error message from prism
+        message = record.getMessage().replace(
+            '"!ERROR:time.Time year outside of range [0,9999]"', '')
+        json_record = json.loads(message)
+        record.levelno = getattr(logging, json_record["level"])
+        record.levelname = logging.getLevelName(record.levelno)
+        if "source" in json_record:
+          record.funcName = json_record["source"]["function"]
+          record.pathname = json_record["source"]["file"]
+          record.filename = os.path.basename(record.pathname)
+          record.lineno = json_record["source"]["line"]
+        record.created = datetime.datetime.fromisoformat(
+            json_record["time"]).timestamp()
+        extras = {
+            k: v
+            for k, v in json_record.items()
+            if k not in PrismRunnerLogFilter.COMMON_FIELDS
+        }
+
+        if json_record["msg"] == "log from SDK worker":
+          # TODO: Use location and time inside the nested message to set record
+          record.name = "SdkWorker" + "@" + json_record["worker"]["ID"]
+          record.msg = json_record["sdk"]["msg"]
+        else:
+          record.name = "PrismRunner"
+          record.msg = json_record["msg"] + " (" + ", ".join([
+              str(k) + "=" +
+              (("'" + str(v) + "'") if isinstance(v, str) else str(v))
+              for k, v in extras.items()
+          ]) + ")"

Review Comment:
   Done.



##########
sdks/python/apache_beam/options/pipeline_options.py:
##########
@@ -1958,6 +1958,13 @@ def _add_argparse_args(cls, parser):
         help=(
             'Controls the log level in Prism. Values can be "debug", "info", '
             '"warn", and "error". Default log level is "info".'))
+    parser.add_argument(
+        '--prism_log_kind',
+        default="json",
+        choices=["dev", "json", "text", "natural"],
+        help=(
+            'Controls the log format in Prism. Values can be "dev", "json", '
+            '"text", and "natural". Default log format is "natural".'))

Review Comment:
   Done.



##########
sdks/python/apache_beam/runners/portability/prism_runner.py:
##########
@@ -111,11 +114,55 @@ def _rename_if_different(src, dst):
     os.rename(src, dst)
 
 
+class PrismRunnerLogFilter(logging.Filter):
+  COMMON_FIELDS = set(["level", "source", "msg", "time"])
+
+  def filter(self, record):
+    if record.funcName == 'log_stdout':
+      try:
+        # TODO: Fix this error message from prism
+        message = record.getMessage().replace(
+            '"!ERROR:time.Time year outside of range [0,9999]"', '')
+        json_record = json.loads(message)
+        record.levelno = getattr(logging, json_record["level"])
+        record.levelname = logging.getLevelName(record.levelno)
+        if "source" in json_record:
+          record.funcName = json_record["source"]["function"]
+          record.pathname = json_record["source"]["file"]
+          record.filename = os.path.basename(record.pathname)
+          record.lineno = json_record["source"]["line"]
+        record.created = datetime.datetime.fromisoformat(
+            json_record["time"]).timestamp()
+        extras = {
+            k: v
+            for k, v in json_record.items()
+            if k not in PrismRunnerLogFilter.COMMON_FIELDS
+        }
+
+        if json_record["msg"] == "log from SDK worker":
+          # TODO: Use location and time inside the nested message to set record
+          record.name = "SdkWorker" + "@" + json_record["worker"]["ID"]
+          record.msg = json_record["sdk"]["msg"]
+        else:
+          record.name = "PrismRunner"
+          record.msg = json_record["msg"] + " (" + ", ".join([
+              str(k) + "=" +
+              (("'" + str(v) + "'") if isinstance(v, str) else str(v))
+              for k, v in extras.items()
+          ]) + ")"
+      except:  # pylint: disable=bare-except
+        # Return the parsed log as much as we can
+        pass

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to