Hi all,

I would like to know if it is possible to have structured logging in
Dataflow.
In the attached file, you can find the code that I am trying to do.

I see the logs are appearing in gcp log explorer, but I cannot see the
extra fields.

Best regards,
from __future__ import annotations
import argparse
import logging
import google.cloud.logging
import apache_beam as beam

from apache_beam.options.pipeline_options import PipelineOptions
from dotenv import load_dotenv
from google.cloud.logging_v2.handlers import CloudLoggingHandler
from pythonjsonlogger import jsonlogger


def func():
    logger = logging.getLogger()
    logger.warning("check Logging", extra={"custom_field1": "value1", "custom_field2": 1, "A": {"B": 2}})
    logger.warning("check Logging", extra={"custom_field1": "value1", "custom_field2": 1})
    

def run(argv=None):
    parser = argparse.ArgumentParser()
    known_args, pipeline_args = parser.parse_known_args(argv)

    pipeline_options = PipelineOptions(pipeline_args)

    with beam.Pipeline(options=pipeline_options) as pipeline:
        (
                pipeline
                | "debug create data" >> beam.Create(["dummy_data"])
                | "dummy prints" >> beam.Map(func)
        )


if __name__ == "__main__":
    formatter = jsonlogger.JsonFormatter()

    # setup GCP handler
    logging_client = google.cloud.logging.Client()
    handler = CloudLoggingHandler(logging_client)
    handler.setFormatter(formatter)

    # setup logger
    logger = logging.getLogger()
    logger.addHandler(handler)
    logger.setLevel(logging.INFO)

    load_dotenv()
    run()

Reply via email to