Hi all,
I would like to know if it is possible to have structured logging in
Dataflow.
In the attached file, you can find the code that I am trying to do.
I see the logs are appearing in gcp log explorer, but I cannot see the
extra fields.
Best regards,
from __future__ import annotations
import argparse
import logging
import google.cloud.logging
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from dotenv import load_dotenv
from google.cloud.logging_v2.handlers import CloudLoggingHandler
from pythonjsonlogger import jsonlogger
def func():
logger = logging.getLogger()
logger.warning("check Logging", extra={"custom_field1": "value1", "custom_field2": 1, "A": {"B": 2}})
logger.warning("check Logging", extra={"custom_field1": "value1", "custom_field2": 1})
def run(argv=None):
parser = argparse.ArgumentParser()
known_args, pipeline_args = parser.parse_known_args(argv)
pipeline_options = PipelineOptions(pipeline_args)
with beam.Pipeline(options=pipeline_options) as pipeline:
(
pipeline
| "debug create data" >> beam.Create(["dummy_data"])
| "dummy prints" >> beam.Map(func)
)
if __name__ == "__main__":
formatter = jsonlogger.JsonFormatter()
# setup GCP handler
logging_client = google.cloud.logging.Client()
handler = CloudLoggingHandler(logging_client)
handler.setFormatter(formatter)
# setup logger
logger = logging.getLogger()
logger.addHandler(handler)
logger.setLevel(logging.INFO)
load_dotenv()
run()