This is an automated email from the ASF dual-hosted git repository. ephraimanierobi pushed a commit to branch v2-2-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 13cc4881684219f8fb36cbc0106bea28a6d398aa Author: KevinYanesG <75472729+kevinyan...@users.noreply.github.com> AuthorDate: Tue Feb 15 19:26:30 2022 +0100 Fix postgres part of pipeline example of tutorial (#21586) (cherry picked from commit 40028f3ea3e78a9cf0db9de6b16fa67fa730dd7a) --- docs/apache-airflow/tutorial.rst | 67 ++++++++++++++++++++++++---------------- 1 file changed, 41 insertions(+), 26 deletions(-) diff --git a/docs/apache-airflow/tutorial.rst b/docs/apache-airflow/tutorial.rst index 085be42..7a2245f 100644 --- a/docs/apache-airflow/tutorial.rst +++ b/docs/apache-airflow/tutorial.rst @@ -381,11 +381,30 @@ We need to have docker and postgres installed. We will be using this `docker file <https://airflow.apache.org/docs/apache-airflow/stable/start/docker.html#docker-compose-yaml>`_ Follow the instructions properly to set up Airflow. -Create a Employee table in postgres using this: +You can use the postgres_default connection: + +- Conn id: postgres_default +- Conn Type: postgres +- Host: postgres +- Schema: airflow +- Login: airflow +- Password: airflow + + +After that, you can test your connection and if you followed all the steps correctly, it should show a success notification. Proceed with saving the connection. For + + +Open up a postgres shell: + +.. code-block:: bash + + ./airflow.sh airflow db shell + +Create the Employees table with: .. code-block:: sql - CREATE TABLE "Employees" + CREATE TABLE EMPLOYEES ( "Serial Number" NUMERIC PRIMARY KEY, "Company Name" TEXT, @@ -394,7 +413,11 @@ Create a Employee table in postgres using this: "Leave" INTEGER ); - CREATE TABLE "Employees_temp" +Afterwards, create the Employees_temp table: + +.. code-block:: sql + + CREATE TABLE EMPLOYEES_TEMP ( "Serial Number" NUMERIC PRIMARY KEY, "Company Name" TEXT, @@ -403,17 +426,9 @@ Create a Employee table in postgres using this: "Leave" INTEGER ); -We also need to add a connection to postgres. Go to the UI and click "Admin" >> "Connections". Specify the following for each field: +We are now ready write the DAG. -- Conn id: LOCAL -- Conn Type: postgres -- Host: postgres -- Schema: <DATABASE_NAME> -- Login: airflow -- Password: airflow -- Port: 5432 -After that, you can test your connection and if you followed all the steps correctly, it should show a success notification. Proceed with saving the connection and we are now ready write the DAG. Let's break this down into 2 steps: get data & merge data: @@ -436,12 +451,12 @@ Let's break this down into 2 steps: get data & merge data: with open(data_path, "w") as file: file.write(response.text) - postgres_hook = PostgresHook(postgres_conn_id="LOCAL") + postgres_hook = PostgresHook(postgres_conn_id="postgres_default") conn = postgres_hook.get_conn() cur = conn.cursor() with open(data_path, "r") as file: cur.copy_expert( - "COPY \"Employees_temp\" FROM STDIN WITH CSV HEADER DELIMITER AS ',' QUOTE '\"'", + "COPY EMPLOYEES_TEMP FROM STDIN WITH CSV HEADER DELIMITER AS ',' QUOTE '\"'", file, ) conn.commit() @@ -457,16 +472,16 @@ Here we are passing a ``GET`` request to get the data from the URL and save it i @task def merge_data(): query = """ - DELETE FROM "Employees" e - USING "Employees_temp" et + DELETE FROM EMPLOYEES e + USING EMPLOYEES_TEMP et WHERE e."Serial Number" = et."Serial Number"; - INSERT INTO "Employees" + INSERT INTO EMPLOYEES SELECT * - FROM "Employees_temp"; + FROM EMPLOYEES_TEMP; """ try: - postgres_hook = PostgresHook(postgres_conn_id="LOCAL") + postgres_hook = PostgresHook(postgres_conn_id="postgres_default") conn = postgres_hook.get_conn() cur = conn.cursor() cur.execute(query) @@ -509,12 +524,12 @@ Lets look at our DAG: with open(data_path, "w") as file: file.write(response.text) - postgres_hook = PostgresHook(postgres_conn_id="LOCAL") + postgres_hook = PostgresHook(postgres_conn_id="postgres_default") conn = postgres_hook.get_conn() cur = conn.cursor() with open(data_path, "r") as file: cur.copy_expert( - "COPY \"Employees_temp\" FROM STDIN WITH CSV HEADER DELIMITER AS ',' QUOTE '\"'", + "COPY EMPLOYEES_TEMP FROM STDIN WITH CSV HEADER DELIMITER AS ',' QUOTE '\"'", file, ) conn.commit() @@ -522,16 +537,16 @@ Lets look at our DAG: @task def merge_data(): query = """ - DELETE FROM "Employees" e - USING "Employees_temp" et + DELETE FROM EMPLOYEES e + USING EMPLOYEES_TEMP et WHERE e."Serial Number" = et."Serial Number"; - INSERT INTO "Employees" + INSERT INTO EMPLOYEES SELECT * - FROM "Employees_temp"; + FROM EMPLOYEES_TEMP; """ try: - postgres_hook = PostgresHook(postgres_conn_id="LOCAL") + postgres_hook = PostgresHook(postgres_conn_id="postgres_default") conn = postgres_hook.get_conn() cur = conn.cursor() cur.execute(query)