Hi all, I’ve been working lately on improving the state stream and message stream on the job service (links to issues and PRs below), and I’m somewhat confused by the inclusion of states in the message stream, since there’s a separate dedicated state stream for that. Here’s the proto for the message response:
message GetJobStateResponse { JobState.Enum state = 1; // (required) } message JobMessage { string message_id = 1; string time = 2; MessageImportance importance = 3; string message_text = 4; enum MessageImportance { MESSAGE_IMPORTANCE_UNSPECIFIED = 0; JOB_MESSAGE_DEBUG = 1; JOB_MESSAGE_DETAILED = 2; JOB_MESSAGE_BASIC = 3; JOB_MESSAGE_WARNING = 4; JOB_MESSAGE_ERROR = 5; } } message JobMessagesResponse { oneof response { JobMessage message_response = 1; GetJobStateResponse state_response = 2; } } You can see that each JobMessagesResponse may contain a message *or* a GetJobStateResponse. What’s the intention behind this design? The main benefit I see is that it’s easier to ensure that the state and message logs are properly ordered. For example, in the code below it’s unclear at a glance (at least to me) whether we’d need to use locking between the main thread and read_messages thread if the main thread were made solely responsible for logging state messages: def wait_until_finish(self): def read_messages(): for message in self._message_stream: if message.HasField('message_response'): logging.log( MESSAGE_LOG_LEVELS[message.message_response.importance], "%s", message.message_response.message_text) else: logging.info( "Job state changed to %s", self._runner_api_state_to_pipeline_state( message.state_response.state)) self._messages.append(message) t = threading.Thread(target=read_messages, name='wait_until_finish_read') t.daemon = True t.start() try: for state_response in self._state_stream: self._state = self._runner_api_state_to_pipeline_state( state_response.state) if state_response.state in TERMINAL_STATES: # Wait for any last messages. t.join(10) break if self._state != runner.PipelineState.DONE: raise RuntimeError( 'Pipeline %s failed in state %s: %s' % ( self._job_id, self._state, self._last_error_message())) return self._state finally: self._cleanup() The reason this is important to me is I’d like to make a handful of changes to GetMessageStream to make it more powerful: - propagate messages from user code (if they opt in to setting up their logger appropriately). currently, AFAICT, the only message the message stream delivers is a final error, if the job fails (other than state changes). It was clearly the original intent of this endpoint to carry other types of messages, and I'd like to bring that to fruition. - make it possible to backfill log messages when a client connection is made (limited by a min timestamp and/or max number of messages). so if a client connects late it can still easily catch up with a limited amount of recent activity. - make it possible to back GetMessageStream with logging services like StackDriver, CloudWatch, or Elasticsearch Mixing state changes and log messages in the same stream adds some wrinkles to this plan, especially for the last one. The reason is that log messages will come primarily from user code, whereas state changes come from the runner, and it might require some unwanted abstractions throughout the various runners to enable them to deliver state changes to this external service, whereas delivering user logs is very straightforward - just setup your logging handler. I’d love to know others' thoughts on what they’d like to see out of the future of logging in Beam. *Current Progress:* Sorting out proper state transitions: - https://issues.apache.org/jira/browse/BEAM-8552 - https://issues.apache.org/jira/browse/BEAM-8539 - https://github.com/apache/beam/pull/9965 <https://github.com/apache/beam/pull/9965> Adding timestamps and history to the state stream: - https://issues.apache.org/jira/browse/BEAM-8523 - https://github.com/apache/beam/pull/9959 -chad