Jeoffreybauvin commented on issue #64212:
URL: https://github.com/apache/airflow/issues/64212#issuecomment-4132623449

   ```
   from datetime import datetime, timedelta
   from airflow import DAG
   from docker.types import Mount
   from airflow.providers.standard.operators.python import PythonOperator
   from airflow.models import Variable
   from airflow.providers.amazon.aws.operators.s3 import S3Hook
   from lib.testDockerOperator import testDockerOperator
   import os
   import ast
   
   dag = DAG(dag_id="vmware_inventory", start_date=datetime(2024, 6, 10), 
schedule="0 * * * *", doc_md=__doc__, tags=["vmware_inventory"])
   
   default_owner = 'test'
   
   clusters_txt = Variable.get('VMWARE_INVENTORY')
   clusters = ast.literal_eval(clusters_txt)
   
   for cluster in clusters:
   
     if(cluster.startswith('fo_')):
         conn_user = Variable.get('VCENTER_BOFO_USER')
         conn_password = Variable.get('VCENTER_BOFO_PASSWORD')
         queue = Variable.get('WORKER_DEFAULT_QUEUE_BOFO')
     else:
         conn_user = Variable.get('VCENTER_' + cluster + '_USER')
         conn_password = Variable.get('VCENTER_' + cluster + '_PASSWORD')
         queue = Variable.get('WORKER_DEFAULT_QUEUE_MIXBLUE')
         
     docker_inventory = testDockerOperator(
         owner=default_owner,      
         dag=dag,
         queue=queue,
         task_id='make_vmware_inventory_' + cluster,
         image='test-f-docker/test_vmware_tooling:0.5.1',
         container_name='docker_vmware_inventory_' + cluster,
         mount_tmp_dir=False,
         command='inventory_vmware_threaded_v2.py -D ' + cluster + ' -o /' + 
cluster + '.json',
         execution_timeout=timedelta(minutes=30),
         retrieve_output=True,
         retrieve_output_path='/' + cluster + '.pickle',
         environment={
           'DC_USER': conn_user,
           'DC_PASSWORD': conn_password,
           'HTTP_PROXY': os.environ['HTTP_PROXY'],
           'HTTPS_PROXY': os.environ['HTTPS_PROXY'],
         },
         mounts=[
           Mount(
               source=os.environ['DAG_EXTERNAL_PATH'] + 
'/live/conf/vmware_inventory.yaml',
               target='/apps/config.yml',
               read_only=True,
               type='bind',
           )
         ]
     )
   
     def push_xcom_to_s3(**context):
         """
         Récupère l'XCom et le pousse vers S3.
   
         Args:
             **context: Contexte Airflow incluant `ti` et `cluster`.
         """
         # Récupérez l'XCom
         cluster = context['cluster']
         xcom_value = 
str(context['ti'].xcom_pull(task_ids='make_vmware_inventory_' + cluster))
   
         print('This is xcom for ' + cluster + ' :')
         print(xcom_value)
   
         # Connectez-vous à S3
         s3_hook = S3Hook(aws_conn_id='s3_infra', verify=False)
   
         # Poussez l'XCom vers S3
         s3_hook.load_bytes(
           bytes_data=xcom_value.encode('utf-8'), 
           bucket_name=Variable.get('S3_INFRA_BUCKET'), 
           key='vmware_inventory_' + cluster + '.json',
           replace=True,
         )
   
     push_xcom_to_s3_task = PythonOperator(
         owner=default_owner,      
         task_id='push_xcom_to_s3_' + cluster,
         queue=Variable.get("WORKER_DEFAULT_QUEUE_MIXBLUE", "default"),
         python_callable=push_xcom_to_s3,
         op_kwargs={'cluster': cluster},
         dag=dag,
     )      
   
     docker_inventory >> push_xcom_to_s3_task
   ```
   
   And vmware_inventory.yaml is not beeing modified at all, the dag and its 
conf are the same since a few months...


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to