larryzhu2018 commented on a change in pull request #7141: [AIRFLOW-6544] add 
log_id to end-of-file mark and also add an index config for logs
URL: https://github.com/apache/airflow/pull/7141#discussion_r367788223
 
 

 ##########
 File path: airflow/utils/log/es_task_handler.py
 ##########
 @@ -255,7 +256,9 @@ def close(self):
 
         # Mark the end of file using end of log mark,
         # so we know where to stop while auto-tailing.
-        self.handler.stream.write(self.end_of_log_mark)
+        if self.write_stdout:
+            print()
+        self.handler.emit(logging.makeLogRecord({'msg': self.end_of_log_mark}))
 
 
 Review comment:
   Kevin, the removal of end-of-log mark is already handled by Andrii's initial 
changes for the stdout support so it won't be displayed to the users. Pleas see 
https://github.com/apache/airflow/commit/0da976a0e1e28e2c0cd274d7384cf2976db6deec#diff-485751b55125e8a90050d22f69e8467c
   
   
           # end_of_log_mark may contain characters like '\n' which is needed to
           # have the log uploaded but will not be stored in elasticsearch.
           metadata['end_of_log'] = False if not logs \
               else logs[-1].message == self.end_of_log_mark.strip()
   
   then
   
           # If we hit the end of the log, remove the actual end_of_log message
           # to prevent it from showing in the UI.
           i = len(logs) if not metadata['end_of_log'] else len(logs) - 1
           message = '\n'.join([log.message for log in logs[0:i]])
   
   Please see my test case test_close_with_log_id that exercises this logic in 
the tests now.
   Can you please check if this is clear to you now?
   
   Log_id is constructed on the Elasticsearch but it needs the dag_id, task_id, 
execution_date and try_number to compute the log_id and that is why  you need 
to use emit() to include the information. In my test case, here is how I 
simulate the logic in the elastic search processors:
   
               msg['log_id'] = self.log_id_template.format(
                   dag_id=msg['dag_id'],
                   task_id=msg['task_id'],
                   execution_date=msg['execution_date'],
                   try_number=msg['try_number'])
               msg['message'] = msg['message'].strip()
               msg['offset'] = 100
   
   To do the same, the elastic search ingest processor pipeline looks like the 
following for me:
   
   
   
       "description" : "cluster json log Pipeline",
       "processors" : [
         {
           "rename" : {
             "field" : "message",
             "target_field" : "raw_message"
           }
         },
         {
           "json" : {
             "field" : "raw_message",
             "add_to_root" : false,
             "target_field" : "json_target"
           }
         },
         {
           "grok" : {
             "field" : "json_target.message",
             "patterns" : [
               "Job %{DATA:job_id}: Subtask %{DATA} %{GREEDYDATA:json_msg}",
               "%{GREEDYDATA}"
             ]
           }
         },
         {
           "json" : {
             "field" : "json_msg",
             "add_to_root" : true,
             "if" : "ctx.job_id != null"
           }
         },
         {
           "json" : {
             "field" : "raw_message",
             "add_to_root" : true,
             "if" : "ctx.job_id == null"
           }
         },
         {
           "remove" : {
             "field" : "json_msg",
             "ignore_missing" : true
           }
         },
         {
           "remove" : {
             "field" : "json_target"
           }
         },
         {
           "set" : {
             "field" : "event.kind",
             "value" : "tasks",
             "if" : "ctx.message != null"
           }
         },
         {
           "set" : {
             "field" : "event.dataset",
             "value" : "airflow",
             "if" : "ctx.dag_id != null && ctx.task_id != null"
           }
         },
         {
           "set" : {
             "field" : "log_id",
             "value" : 
"{{dag_id}}-{{task_id}}-{{execution_date}}-{{try_number}}",
             "if" : "ctx.event?.dataset == 'airflow'"
           }
         },
         {
           "set" : {
             "field" : "offset",
             "value" : "{{log.offset}}",
             "if" : "ctx.event?.dataset == 'airflow'"
           }
         }
       ],
       "on_failure" : [
         {
           "set" : {
             "field" : "error.message",
             "value" : "{{ _ingest.on_failure_message }}"
           }
         }
       ]
     
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to