dstandish commented on a change in pull request #21261:
URL: https://github.com/apache/airflow/pull/21261#discussion_r798214056



##########
File path: airflow/providers/elasticsearch/log/es_task_handler.py
##########
@@ -187,15 +187,29 @@ def _read(
         metadata['end_of_log'] = False if not logs else len(loading_hosts) == 0
 
         cur_ts = pendulum.now()
-        # Assume end of log after not receiving new log for 5 min,
-        # as executor heartbeat is 1 min and there might be some
-        # delay before Elasticsearch makes the log available.
         if 'last_log_timestamp' in metadata:
             last_log_ts = timezone.parse(metadata['last_log_timestamp'])
+
+            # if we are not getting any logs at all after more than N seconds 
of trying,
+            # assume logs do not exist
+            if int(next_offset) == 0 and cur_ts.diff(last_log_ts).in_seconds() 
> 5:
+                metadata['end_of_log'] = True
+                return [
+                    (
+                        '',
+                        (
+                            f"*** Log {log_id} not found in elasticsearch. "

Review comment:
       just looked at elastic.co and they use `Elasticsearch`.  But we have 
`elasticsearch` in the wordlist and not `Elasticsearch`.  You reckon we should 
add `Elasticsearch` to the wordlist and change this?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to