uranusjr commented on a change in pull request #21261:
URL: https://github.com/apache/airflow/pull/21261#discussion_r798216442
##########
File path: airflow/providers/elasticsearch/log/es_task_handler.py
##########
@@ -187,15 +187,29 @@ def _read(
metadata['end_of_log'] = False if not logs else len(loading_hosts) == 0
cur_ts = pendulum.now()
- # Assume end of log after not receiving new log for 5 min,
- # as executor heartbeat is 1 min and there might be some
- # delay before Elasticsearch makes the log available.
if 'last_log_timestamp' in metadata:
last_log_ts = timezone.parse(metadata['last_log_timestamp'])
+
+ # if we are not getting any logs at all after more than N seconds
of trying,
+ # assume logs do not exist
+ if int(next_offset) == 0 and cur_ts.diff(last_log_ts).in_seconds()
> 5:
+ metadata['end_of_log'] = True
+ return [
+ (
+ '',
+ (
+ f"*** Log {log_id} not found in elasticsearch. "
Review comment:
I think the wordlist is case insensitive so _Elastichsearch_ should just
work?
##########
File path: airflow/providers/elasticsearch/log/es_task_handler.py
##########
@@ -187,15 +187,29 @@ def _read(
metadata['end_of_log'] = False if not logs else len(loading_hosts) == 0
cur_ts = pendulum.now()
- # Assume end of log after not receiving new log for 5 min,
- # as executor heartbeat is 1 min and there might be some
- # delay before Elasticsearch makes the log available.
if 'last_log_timestamp' in metadata:
last_log_ts = timezone.parse(metadata['last_log_timestamp'])
+
+ # if we are not getting any logs at all after more than N seconds
of trying,
+ # assume logs do not exist
+ if int(next_offset) == 0 and cur_ts.diff(last_log_ts).in_seconds()
> 5:
+ metadata['end_of_log'] = True
+ return [
+ (
+ '',
+ (
+ f"*** Log {log_id} not found in elasticsearch. "
Review comment:
I think the wordlist is case insensitive so _Elasticsearch_ should just
work?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]