Hi all,
I’ve been working lately on improving the state stream and message stream
on the job service (links to issues and PRs below), and I’m somewhat
confused by the inclusion of states in the message stream, since there’s a
separate dedicated state stream for that. Here’s the proto for the message
response:

message GetJobStateResponse {  JobState.Enum state = 1; // (required)
}
message JobMessage {
  string message_id = 1;
  string time = 2;  MessageImportance importance = 3;
  string message_text = 4;

  enum MessageImportance {    MESSAGE_IMPORTANCE_UNSPECIFIED = 0;
JOB_MESSAGE_DEBUG = 1;    JOB_MESSAGE_DETAILED = 2;
JOB_MESSAGE_BASIC = 3;    JOB_MESSAGE_WARNING = 4;
JOB_MESSAGE_ERROR = 5;
  }
}
message JobMessagesResponse {
  oneof response {    JobMessage message_response = 1;
GetJobStateResponse state_response = 2;
  }
}

You can see that each JobMessagesResponse may contain a message *or* a
GetJobStateResponse.

What’s the intention behind this design?

The main benefit I see is that it’s easier to ensure that the state and
message logs are properly ordered. For example, in the code below it’s
unclear at a glance (at least to me) whether we’d need to use locking
between the main thread and read_messages thread if the main thread were
made solely responsible for logging state messages:

  def wait_until_finish(self):

    def read_messages():
      for message in self._message_stream:
        if message.HasField('message_response'):
          logging.log(
              MESSAGE_LOG_LEVELS[message.message_response.importance],
              "%s",
              message.message_response.message_text)
        else:
          logging.info(
              "Job state changed to %s",
              self._runner_api_state_to_pipeline_state(
                  message.state_response.state))
        self._messages.append(message)

    t = threading.Thread(target=read_messages, name='wait_until_finish_read')
    t.daemon = True
    t.start()

    try:
      for state_response in self._state_stream:
        self._state = self._runner_api_state_to_pipeline_state(
            state_response.state)
        if state_response.state in TERMINAL_STATES:
          # Wait for any last messages.
          t.join(10)
          break
      if self._state != runner.PipelineState.DONE:
        raise RuntimeError(
            'Pipeline %s failed in state %s: %s' % (
                self._job_id, self._state, self._last_error_message()))
      return self._state
    finally:
      self._cleanup()

The reason this is important to me is I’d like to make a handful of changes
to GetMessageStream to make it more powerful:

   - propagate messages from user code (if they opt in to setting up their
   logger appropriately). currently, AFAICT, the only message the message
   stream delivers is a final error, if the job fails (other than state
   changes). It was clearly the original intent of this endpoint to carry
   other types of messages, and I'd like to bring that to fruition.
   - make it possible to backfill log messages when a client connection is
   made (limited by a min timestamp and/or max number of messages).  so if a
   client connects late it can still easily catch up with a limited amount of
   recent activity.
   - make it possible to back GetMessageStream with logging services like
   StackDriver, CloudWatch, or Elasticsearch

Mixing state changes and log messages in the same stream adds some wrinkles
to this plan, especially for the last one.  The reason is that log messages
will come primarily from user code, whereas state changes come from the
runner, and it might require some unwanted abstractions throughout the
various runners to enable them to deliver state changes to this external
service, whereas delivering user logs is very straightforward - just setup
your logging handler.

I’d love to know others' thoughts on what they’d like to see out of the
future of logging in Beam.

*Current Progress:*

Sorting out proper state transitions:

   - https://issues.apache.org/jira/browse/BEAM-8552
   - https://issues.apache.org/jira/browse/BEAM-8539
   - https://github.com/apache/beam/pull/9965
   <https://github.com/apache/beam/pull/9965>

Adding timestamps and history to the state stream:

   - https://issues.apache.org/jira/browse/BEAM-8523
   - https://github.com/apache/beam/pull/9959

-chad

Reply via email to