jedcunningham commented on a change in pull request #21261:
URL: https://github.com/apache/airflow/pull/21261#discussion_r797133065



##########
File path: airflow/providers/elasticsearch/log/es_task_handler.py
##########
@@ -187,15 +187,24 @@ def _read(
         metadata['end_of_log'] = False if not logs else len(loading_hosts) == 0
 
         cur_ts = pendulum.now()
-        # Assume end of log after not receiving new log for 5 min,
-        # as executor heartbeat is 1 min and there might be some
-        # delay before Elasticsearch makes the log available.
         if 'last_log_timestamp' in metadata:
             last_log_ts = timezone.parse(metadata['last_log_timestamp'])
-            if (
+
+            # if we are not getting any logs at all after more than N seconds 
of trying,
+            # assume logs do not exist
+            if int(next_offset) == 0 and cur_ts.diff(last_log_ts).in_seconds() 
> 5:
+                metadata['end_of_log'] = True
+                message = (
+                    f"*** Log {log_id} not found in elasticsearch. "
+                    f"If your task started recently, please wait a moment and 
reload this page. "
+                    f"Otherwise, the logs for this task instance may have been 
removed."

Review comment:
       ```suggestion
                       "If your task started recently, please wait a moment and 
reload this page. "
                       "Otherwise, the logs for this task instance may have 
been removed."
   ```
   
   nit

##########
File path: tests/providers/elasticsearch/log/test_es_task_handler.py
##########
@@ -131,6 +141,28 @@ def test_read(self, ti):
         assert '1' == metadatas[0]['offset']
         assert timezone.parse(metadatas[0]['last_log_timestamp']) > ts
 
+    def test_read_missing_logs(self, create_task_instance):
+        """
+        When the log actually isn't there to be found, we only want to wait 
for 5 seconds.
+        In this case we expect to receive a message of the form 'Log {log_id} 
not found in elasticsearch ...'
+        """
+        ti = get_ti(
+            self.DAG_ID,
+            self.TASK_ID,
+            pendulum.instance(self.EXECUTION_DATE).add(days=1),  # so logs are 
not found
+            create_task_instance=create_task_instance,
+        )
+        ts = pendulum.now().add(seconds=-6)
+        logs, metadatas = self.es_task_handler.read(ti, 1, {'offset': 0, 
'last_log_timestamp': str(ts)})
+
+        assert 1 == len(logs)
+        assert re.match(r'.*Log .* not found in elasticsearch.*', 
logs[0][0][1]) is not None

Review comment:
       ```suggestion
           assert re.match(r'^\*\*\* Log .* not found in elasticsearch.*', 
logs[0][0][1]) is not None
   ```
   
   A little better?

##########
File path: airflow/providers/elasticsearch/log/es_task_handler.py
##########
@@ -187,15 +187,24 @@ def _read(
         metadata['end_of_log'] = False if not logs else len(loading_hosts) == 0
 
         cur_ts = pendulum.now()
-        # Assume end of log after not receiving new log for 5 min,
-        # as executor heartbeat is 1 min and there might be some
-        # delay before Elasticsearch makes the log available.
         if 'last_log_timestamp' in metadata:
             last_log_ts = timezone.parse(metadata['last_log_timestamp'])
-            if (
+
+            # if we are not getting any logs at all after more than N seconds 
of trying,
+            # assume logs do not exist
+            if int(next_offset) == 0 and cur_ts.diff(last_log_ts).in_seconds() 
> 5:
+                metadata['end_of_log'] = True
+                message = (
+                    f"*** Log {log_id} not found in elasticsearch. "
+                    f"If your task started recently, please wait a moment and 
reload this page. "
+                    f"Otherwise, the logs for this task instance may have been 
removed."
+                )
+                return [('', message)], metadata
+            elif (

Review comment:
       ```suggestion
   
               if (
   ```
   
   nit




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to