Hi Greg,
I managed to get this work. There were a couple of issues I managed to fix:
The HELM charts for k8s airflow helm chart uses container image
puckel/docker-airflow and that image did not install elasticsearch python
modules.
The airflow documentation use double curly braces in LOG_ID_TEMPLATE and that
does not work: the dag_id and task_id are not picked up by the following line,
and the execution_date was in a different format than the one written to the
logs thus airflow-web cannot find the logs. I changed to the single curly brace
json form of the log_id template and it works.
if self.log_id_jinja_template:
jinja_context = ti.get_template_context()
jinja_context['try_number'] = try_number
return self.log_id_jinja_template.render(**jinja_context)
The “end of log” marker does not include the aforementioned log_id. The issue
is then airflow-web does not know when to stop tailing the logs. I made a fix
as shown below and validated it works for me.
diff --git a/airflow/utils/log/es_task_handler.py
b/airflow/utils/log/es_task_handler.py
index 2dacfb49b..10fd7e240 100644
--- a/airflow/utils/log/es_task_handler.py
+++ b/airflow/utils/log/es_task_handler.py
@@ -70,7 +70,7 @@ class ElasticsearchTaskHandler(FileTaskHandler, LoggingMixin):
self.log_id_template, self.log_id_jinja_template = \
parse_template_string(log_id_template)
-
+ self.index = conf.get('elasticsearch', 'INDEX')
self.client = elasticsearch.Elasticsearch([host], **es_kwargs)
self.mark_end_on_close = True
@@ -171,9 +171,8 @@ class ElasticsearchTaskHandler(FileTaskHandler,
LoggingMixin):
:param metadata: log metadata, used for steaming log download.
:type metadata: dict
"""
-
# Offset is the unique key for sorting logs given log_id.
- search = Search(using=self.client) \
+ search = Search(using=self.client, index=self.index) \
.query('match_phrase', log_id=log_id) \
.sort('offset')
@@ -254,8 +253,10 @@ class ElasticsearchTaskHandler(FileTaskHandler,
LoggingMixin):
self.handler.stream = self.handler._open() # pylint:
disable=protected-access
# Mark the end of file using end of log mark,
- # so we know where to stop while auto-tailing.
- self.handler.stream.write(self.end_of_log_mark)
+ # so we know where to stop while auto-tailing.\
+ if self.write_stdout:
+ print()
+ self.handler.emit(logging.LogRecord(None, logging.INFO, None, 0,
self.end_of_log_mark, None, None))
if self.write_stdout:
self.handler.close()
I could not submit a PR for these fixes to github with the following error.
remote: Permission to apache/airflow.git denied to larryzhu2018.
fatal: unable to access 'https://github.com/apache/airflow.git/': The requested
URL returned error: 403
Can you please point me how to get the permission so that I can submit PRs to
fix the issues in airflow?
Thanks,
--Larry
From: Greg Neiheisel <[email protected]>
Reply-To: <[email protected]>
Date: Friday, January 10, 2020 at 3:30 PM
To: <[email protected]>
Subject: Re: Help to write airflow task instance logs to elastic search
Hey Larry, are you using the `KubernetesExecutor`? We support ES logging for
our clients and work with Local, Celery and Kubernetes executors. I took a look
through our helm charts to see if anything jumped out. Wondering if you may
need to pass this extra configuration to the executor pods
https://github.com/astronomer/helm.astronomer.io/blob/master/charts/airflow/templates/configmap.yaml#L78
Possible that without that configuration set, it may skip logger configuration
here:
https://github.com/apache/airflow/blob/d5fa17f7b969eab6fd2af731bc63e5e6e90d56cb/airflow/config_templates/airflow_local_settings.py#L200
On Thu, Jan 9, 2020 at 2:25 AM Larry Zhu <[email protected]> wrote:
I am using 1.10.6 and here are my log configurations for running airflow on
kubernetes. I set up the kubenets to send all the console output logs to
elasticsearch and I am trying to configure airflow worker to write logs to
console. And it does not seem to work. I can see the local logs in the pod, but
the task instance logs are not getting written to console therefore my filebeat
daemon set cannot pick up the logs. Can you please help to shed lights to this?
airflow:
config:
AIRFLOW__CORE__REMOTE_LOGGING: "True"
# HTTP_PROXY: "http://proxy.mycompany.com:123"
AIRFLOW__ELASTICSEARCH__LOG_ID_TEMPLATE:
"{{dag_id}}-{{task_id}}-{{execution_date}}-{{try_number}}"
AIRFLOW__ELASTICSEARCH__END_OF_LOG_MARK: "end_of_log"
AIRFLOW__ELASTICSEARCH__WRITE_STDOUT: "True"
AIRFLOW__ELASTICSEARCH__JSON_FORMAT: "True"
AIRFLOW__ELASTICSEARCH__JSON_FIELDS: "asctime, filename, lineno, levelname,
message"
--
Greg Neiheisel / CTO Astronomer.io