larryzhu2018 commented on a change in pull request #7141: [AIRFLOW-6544] add log_id to end-of-file mark and also add an index config for logs URL: https://github.com/apache/airflow/pull/7141#discussion_r367788223
########## File path: airflow/utils/log/es_task_handler.py ########## @@ -255,7 +256,9 @@ def close(self): # Mark the end of file using end of log mark, # so we know where to stop while auto-tailing. - self.handler.stream.write(self.end_of_log_mark) + if self.write_stdout: + print() + self.handler.emit(logging.makeLogRecord({'msg': self.end_of_log_mark})) Review comment: Kevin, the removal of end-of-log mark is already handled by Andrii's initial changes for the stdout support so it won't be displayed to the users. Pleas see https://github.com/apache/airflow/commit/0da976a0e1e28e2c0cd274d7384cf2976db6deec#diff-485751b55125e8a90050d22f69e8467c # end_of_log_mark may contain characters like '\n' which is needed to # have the log uploaded but will not be stored in elasticsearch. metadata['end_of_log'] = False if not logs \ else logs[-1].message == self.end_of_log_mark.strip() then # If we hit the end of the log, remove the actual end_of_log message # to prevent it from showing in the UI. i = len(logs) if not metadata['end_of_log'] else len(logs) - 1 message = '\n'.join([log.message for log in logs[0:i]]) Please see my test case test_close_with_log_id that exercises this logic in the tests now. Can you please check if this is clear to you now? Log_id is constructed on the Elasticsearch but it needs the dag_id, task_id, execution_date and try_number to compute the log_id and that is why you need to use emit() to include the information. In my test case, here is how I simulate the logic in the elastic search processors: msg['log_id'] = self.log_id_template.format( dag_id=msg['dag_id'], task_id=msg['task_id'], execution_date=msg['execution_date'], try_number=msg['try_number']) msg['message'] = msg['message'].strip() msg['offset'] = 100 To do the same, the elastic search ingest processor pipeline looks like the following for me: "description" : "cluster json log Pipeline", "processors" : [ { "rename" : { "field" : "message", "target_field" : "raw_message" } }, { "json" : { "field" : "raw_message", "add_to_root" : false, "target_field" : "json_target" } }, { "grok" : { "field" : "json_target.message", "patterns" : [ "Job %{DATA:job_id}: Subtask %{DATA} %{GREEDYDATA:json_msg}", "%{GREEDYDATA}" ] } }, { "json" : { "field" : "json_msg", "add_to_root" : true, "if" : "ctx.job_id != null" } }, { "json" : { "field" : "raw_message", "add_to_root" : true, "if" : "ctx.job_id == null" } }, { "remove" : { "field" : "json_msg", "ignore_missing" : true } }, { "remove" : { "field" : "json_target" } }, { "set" : { "field" : "event.kind", "value" : "tasks", "if" : "ctx.message != null" } }, { "set" : { "field" : "event.dataset", "value" : "airflow", "if" : "ctx.dag_id != null && ctx.task_id != null" } }, { "set" : { "field" : "log_id", "value" : "{{dag_id}}-{{task_id}}-{{execution_date}}-{{try_number}}", "if" : "ctx.event?.dataset == 'airflow'" } }, { "set" : { "field" : "offset", "value" : "{{log.offset}}", "if" : "ctx.event?.dataset == 'airflow'" } } ], "on_failure" : [ { "set" : { "field" : "error.message", "value" : "{{ _ingest.on_failure_message }}" } } ] ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services