+Daniel Mills <mil...@google.com> for usability in job messages / logging
integration across Beam runners.

On Wed, Nov 6, 2019 at 10:30 AM Chad Dombrova <chad...@gmail.com> wrote:

> Hi all,
> I’ve been working lately on improving the state stream and message stream
> on the job service (links to issues and PRs below), and I’m somewhat
> confused by the inclusion of states in the message stream, since there’s a
> separate dedicated state stream for that. Here’s the proto for the message
> response:
>
> message GetJobStateResponse {  JobState.Enum state = 1; // (required)
> }
> message JobMessage {
>   string message_id = 1;
>   string time = 2;  MessageImportance importance = 3;
>   string message_text = 4;
>
>   enum MessageImportance {    MESSAGE_IMPORTANCE_UNSPECIFIED = 0;    
> JOB_MESSAGE_DEBUG = 1;    JOB_MESSAGE_DETAILED = 2;    JOB_MESSAGE_BASIC = 3; 
>    JOB_MESSAGE_WARNING = 4;    JOB_MESSAGE_ERROR = 5;
>   }
> }
> message JobMessagesResponse {
>   oneof response {    JobMessage message_response = 1;    GetJobStateResponse 
> state_response = 2;
>   }
> }
>
> You can see that each JobMessagesResponse may contain a message *or* a
> GetJobStateResponse.
>
> What’s the intention behind this design?
>
I believe this was because a user may want to listen to both job state and
messages all in one stream.

> The main benefit I see is that it’s easier to ensure that the state and
> message logs are properly ordered. For example, in the code below it’s
> unclear at a glance (at least to me) whether we’d need to use locking
> between the main thread and read_messages thread if the main thread were
> made solely responsible for logging state messages:
>
>   def wait_until_finish(self):
>
>     def read_messages():
>       for message in self._message_stream:
>         if message.HasField('message_response'):
>           logging.log(
>               MESSAGE_LOG_LEVELS[message.message_response.importance],
>               "%s",
>               message.message_response.message_text)
>         else:
>           logging.info(
>               "Job state changed to %s",
>               self._runner_api_state_to_pipeline_state(
>                   message.state_response.state))
>         self._messages.append(message)
>
>     t = threading.Thread(target=read_messages, name='wait_until_finish_read')
>     t.daemon = True
>     t.start()
>
>     try:
>       for state_response in self._state_stream:
>         self._state = self._runner_api_state_to_pipeline_state(
>             state_response.state)
>         if state_response.state in TERMINAL_STATES:
>           # Wait for any last messages.
>           t.join(10)
>           break
>       if self._state != runner.PipelineState.DONE:
>         raise RuntimeError(
>             'Pipeline %s failed in state %s: %s' % (
>                 self._job_id, self._state, self._last_error_message()))
>       return self._state
>     finally:
>       self._cleanup()
>
> The reason this is important to me is I’d like to make a handful of
> changes to GetMessageStream to make it more powerful:
>
>    - propagate messages from user code (if they opt in to setting up
>    their logger appropriately). currently, AFAICT, the only message the
>    message stream delivers is a final error, if the job fails (other than
>    state changes). It was clearly the original intent of this endpoint to
>    carry other types of messages, and I'd like to bring that to fruition.
>
> Log messages is a lot of data, we do have users writing GBs/s when
aggregated across all their machines in Google Cloud so not sure if this
will scale without a lot of control on filtering. Users sometimes don't
recognize how much they are logging and if you have a 1000 VMs each writing
only a few lines at a time you can easily saturate this stream.

>
>    - make it possible to backfill log messages when a client connection
>    is made (limited by a min timestamp and/or max number of messages).  so if
>    a client connects late it can still easily catch up with a limited amount
>    of recent activity.
>
> +1

>
>    - make it possible to back GetMessageStream with logging services like
>    StackDriver, CloudWatch, or Elasticsearch
>
> That is interesting, originally the message stream was designed around
system messages from the runner and not specifically around users log
messages due to volume concerns. All logging integration to my knowledge
has been deferred to the client libraries for those specific services.

>
>
> Mixing state changes and log messages in the same stream adds some
> wrinkles to this plan, especially for the last one.  The reason is that log
> messages will come primarily from user code, whereas state changes come
> from the runner, and it might require some unwanted abstractions throughout
> the various runners to enable them to deliver state changes to this
> external service, whereas delivering user logs is very straightforward -
> just setup your logging handler.
>
> I’d love to know others' thoughts on what they’d like to see out of the
> future of logging in Beam.
>
> *Current Progress:*
>
> Sorting out proper state transitions:
>
>    - https://issues.apache.org/jira/browse/BEAM-8552
>    - https://issues.apache.org/jira/browse/BEAM-8539
>    - https://github.com/apache/beam/pull/9965
>    <https://github.com/apache/beam/pull/9965>
>
> Adding timestamps and history to the state stream:
>
>    - https://issues.apache.org/jira/browse/BEAM-8523
>    - https://github.com/apache/beam/pull/9959
>
> -chad
>
>

Reply via email to