So in order to get around running airflow inside the KPO, we do the
following:
We know that the meta-database stores the XCOM data or in this case,
because of the enabled custom xcom, stores the s3 filename.
We know that to get to the xcom s3 file name we can query the xcom table
based on the primary keys (dag_id, task_id and execution_date).
Thus in our KPO we query the xcom table to get the s3 filename and then
proceed to retrieve the s3 data via aws boto3.
I am not sure if this is the most efficient way or even if it is the
correct way to do it, but it seems to work.
We do realize that if the xcom table changes or changes its primary keys
that this would stop working.

On Fri, Mar 4, 2022 at 12:08 PM Daniel Standish <dpstand...@gmail.com>
wrote:

> > Absolutely. We wrote a custom AWS S3 XCom backend to do exactly that.
>
> Well if you have it all working then what are we yabbering about :)
>
> I think custom XCom requires that your container is running airflow -- but
> you are using KPO so I don't know if that's true?  Either that or it forces
> you to pull the data in via templating but if it's large data that's back
> to your orig problem.  Did your custom backend work for you?  If so, what's
> the problem you're facing?  If it works, then great and no reason to second
> guess it I'd think.
>
> You certainly don't _need_ to use custom xcom.  And I am not sure you
> really need KPO?  Maybe you need an ancient client to work with your
> ancient ES cluster?  And modern client for most of your ES jobs?
>
> But anyway if I were doing this, my first thought would be not to bother
> with xcom (though I "grew up" in a pre-taskflow API world).
>
> my_data_path = "s3://bucket/key.txt"
> op1 = GetDataToS3(task_id='to_s3', path=my_data_path)
> op2 = LoadToES(task_id='to_es', path=my_data_path)
> op1 >> op3
>
>
>
>

Reply via email to