Re: How to determine if customer XCom backend is being loaded

2022-01-06 Thread Lewis John McGibbney
Hi Folks,
To close this one off I want to mention some more information we were able to 
acquire. This may help, in particular people running Airflow on K8s.
If you define a custom XCom backend in your values.yaml configuration and 
Airflow fails to load the class, the entire Chart deployment will fail with 
each pod container attempting to restart time and time again.
The problem is that it is very difficult to acquire logs from the container 
because there is a very small window of availability where the trace can be 
obtained. If you are fortunate enough to query the container logs at the right 
time, you will see something similar to the following

Traceback (most recent call last):
  File "/home/airflow/.local/bin/airflow", line 8, in 
sys.exit(main())
  File "/home/airflow/.local/lib/python3.9/site-packages/airflow/__main__.py", 
line 48, in main
args.func(args)
  File 
"/home/airflow/.local/lib/python3.9/site-packages/airflow/cli/cli_parser.py", 
line 47, in command
func = import_string(import_path)
  File 
"/home/airflow/.local/lib/python3.9/site-packages/airflow/utils/module_loading.py",
 line 32, in import_string
module = import_module(module_path)
  File "/usr/local/lib/python3.9/importlib/__init__.py", line 127, in 
import_module
return _bootstrap._gcd_import(name[level:], package, level)
  File "", line 1030, in _gcd_import
  File "", line 1007, in _find_and_load
  File "", line 986, in _find_and_load_unlocked
  File "", line 680, in _load_unlocked
  File "", line 850, in exec_module
  File "", line 228, in _call_with_frames_removed
  File 
"/home/airflow/.local/lib/python3.9/site-packages/airflow/cli/commands/db_command.py",
 line 24, in 
from airflow.utils import cli as cli_utils, db
  File "/home/airflow/.local/lib/python3.9/site-packages/airflow/utils/db.py", 
line 27, in 
from airflow.jobs.base_job import BaseJob  # noqa: F401
  File 
"/home/airflow/.local/lib/python3.9/site-packages/airflow/jobs/__init__.py", 
line 19, in 
import airflow.jobs.backfill_job
  File 
"/home/airflow/.local/lib/python3.9/site-packages/airflow/jobs/backfill_job.py",
 line 28, in 
from airflow import models
  File 
"/home/airflow/.local/lib/python3.9/site-packages/airflow/models/__init__.py", 
line 20, in 
from airflow.models.baseoperator import BaseOperator, BaseOperatorLink
  File 
"/home/airflow/.local/lib/python3.9/site-packages/airflow/models/baseoperator.py",
 line 61, in 
from airflow.models.taskinstance import Context, TaskInstance, 
clear_task_instances
  File 
"/home/airflow/.local/lib/python3.9/site-packages/airflow/models/taskinstance.py",
 line 82, in 
from airflow.models.xcom import XCOM_RETURN_KEY, XCom
  File 
"/home/airflow/.local/lib/python3.9/site-packages/airflow/models/xcom.py", line 
379, in 
XCom = resolve_xcom_backend()
  File 
"/home/airflow/.local/lib/python3.9/site-packages/airflow/models/xcom.py", line 
369, in resolve_xcom_backend
clazz = conf.getimport("core", "xcom_backend", 
fallback=f"airflow.models.xcom.{BaseXCom.__name__}")
  File 
"/home/airflow/.local/lib/python3.9/site-packages/airflow/configuration.py", 
line 485, in getimport
raise AirflowConfigException(
airflow.exceptions.AirflowConfigException: The object could not be loaded. 
Please check "xcom_backend" key in "core" section. Current value: 
"xcom_custom_backend.S3XComBackend".
[2022-01-06 00:02:16,880] {settings.py:331} DEBUG - Disposing DB connection 
pool (PID 214)

As you can see, clearly in this example the path to the customer xcom is 
incorrect. 

I am going to propose an improvement to xcom.resolve_xcom_backend() which 
validates the custom XCom backend before returning the value to be interpreted 
by configuration. In the case that the xcom value is invalid, we simply fall 
back to airflow.models.xcom.{BaseXCom.__name__}.
This way we can catch things like incorrect paths, etc. before they lead to 
cryptic, difficult to catch deployment failures. 

I'll start working on the documentation patch and then 
xcom.resolve_xcom_backend() and unit tests.

On 2022/01/05 23:19:09 Daniel Standish wrote:
> Looks like you replied just before me.
> 
> You should not need to do anything beyond confirming that airflow config
> resolves the right xcom.  Its usage should be  the same.  E.g. ti.xcom_push
> etc.
> 
> Your  dags and tasks should remain unchanged.
> 


Re: How to determine if customer XCom backend is being loaded

2022-01-05 Thread Daniel Standish
Looks like you replied just before me.

You should not need to do anything beyond confirming that airflow config
resolves the right xcom.  Its usage should be  the same.  E.g. ti.xcom_push
etc.

Your  dags and tasks should remain unchanged.


Re: How to determine if customer XCom backend is being loaded

2022-01-05 Thread Daniel Standish
If you can get a terminal in the container then you should be able to do
with something like

from  airflow.models.xcom  import XCom
print(XCom.__name__)

i think  that willl print the actual class that is being used

you can see how the import works in that module

see resolve_xcom_backend

depending on how you've configured it, you can also examine airflow
configuration

from airflow.settings import conf
conf.get("core", "xcom_backend")

if using env vars check  with `env|grep  AIRFLOW__CORE__XCOM`



On Wed, Jan 5, 2022 at 2:41 PM lewis john mcgibbney 
wrote:

> Hi users@,
>
> We wish to use a custom XCom something similar to what is presented in the
> following Astronomer guide on the topic.
> https://www.astronomer.io/guides/custom-xcom-backends
>
> My question is, when running Airflow on K8s, what is the best mechanism to
> verify that a custom XCom implementation, defined in values.yaml, is
> actually being loaded?
> Is there some DEBUG/TRACE logging statement I can look out for or
> something similar?
>
> Thank you
> lewismc
>
> --
> http://home.apache.org/~lewismc/
> http://people.apache.org/keys/committer/lewismc
>


Re: How to determine if customer XCom backend is being loaded

2022-01-05 Thread Lewis John McGibbney
I just discovered 'resolve_xcom_backend' and looked at the source code and it 
appears to explicitly load the XCom class from the configuration.
https://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/models/xcom/index.html#airflow.models.xcom.resolve_xcom_backend
Should this be called in the DAG? 
Is it necessary to call this function?
If someone can give me points then i will provide a pull request to augment the 
documentation as there doesn't appear to be any.
Thanks
lewismc

On 2022/01/05 22:41:22 lewis john mcgibbney wrote:
> Hi users@,
> 
> We wish to use a custom XCom something similar to what is presented in the
> following Astronomer guide on the topic.
> https://www.astronomer.io/guides/custom-xcom-backends
> 
> My question is, when running Airflow on K8s, what is the best mechanism to
> verify that a custom XCom implementation, defined in values.yaml, is
> actually being loaded?
> Is there some DEBUG/TRACE logging statement I can look out for or something
> similar?
> 
> Thank you
> lewismc
> 
> -- 
> http://home.apache.org/~lewismc/
> http://people.apache.org/keys/committer/lewismc
>