pierrejeambrun commented on code in PR #45715:
URL: https://github.com/apache/airflow/pull/45715#discussion_r1922453886


##########
airflow/api_fastapi/core_api/datamodels/connections.py:
##########
@@ -90,8 +91,74 @@ class ConnectionBody(BaseModel):
     extra: str | None = Field(default=None)
 
 
-class ConnectionBulkBody(BaseModel):
-    """Connections Serializer for requests body."""
+class ConnectionBulkCreateAction(BaseModel):
+    """Bulk Create Variable serializer for request bodies."""
+
+    action: Literal["create"] = "create"
+    connections: list[ConnectionBody] = Field(..., description="A list of 
connections to be created.")
+    action_if_exists: Literal["skip", "overwrite", "fail"] = "fail"
+
+
+class ConnectionBulkUpdateAction(BaseModel):
+    """Bulk Update Connection serializer for request bodies."""
+
+    action: Literal["update"] = "update"
+    connections: list[ConnectionBody] = Field(..., description="A list of 
connections to be updated.")
+    action_if_not_exists: Literal["skip", "fail"] = "fail"
 
-    connections: list[ConnectionBody]
-    overwrite: bool | None = Field(default=False)
+
+class ConnectionBulkDeleteAction(BaseModel):
+    """Bulk Delete Connection serializer for request bodies."""
+
+    action: Literal["delete"] = "delete"
+    connection_ids: list[str] = Field(..., description="A list of connection 
IDs to be deleted.")
+    action_if_not_exists: Literal["skip", "fail"] = "fail"
+
+
+class ConnectionBulkBody(BaseModel):
+    """Request body for bulk Connection operations (create, update, delete)."""
+
+    actions: list[ConnectionBulkCreateAction | ConnectionBulkUpdateAction | 
ConnectionBulkDeleteAction] = (

Review Comment:
   I think we can add a pydantic discriminator to save time at the 
serialization time. Basically the `action` field tells us what model should be 
use to deserialize an action.



##########
airflow/api_fastapi/core_api/datamodels/connections.py:
##########
@@ -90,8 +91,74 @@ class ConnectionBody(BaseModel):
     extra: str | None = Field(default=None)
 
 
-class ConnectionBulkBody(BaseModel):
-    """Connections Serializer for requests body."""
+class ConnectionBulkCreateAction(BaseModel):
+    """Bulk Create Variable serializer for request bodies."""
+
+    action: Literal["create"] = "create"

Review Comment:
   Maybe we should switch to an enum for the `action`. To avoid hardcoding 
everywhere `create` `delete` ... strings.



##########
airflow/api_fastapi/core_api/datamodels/connections.py:
##########
@@ -90,8 +91,74 @@ class ConnectionBody(BaseModel):
     extra: str | None = Field(default=None)
 
 
-class ConnectionBulkBody(BaseModel):
-    """Connections Serializer for requests body."""
+class ConnectionBulkCreateAction(BaseModel):
+    """Bulk Create Variable serializer for request bodies."""
+
+    action: Literal["create"] = "create"
+    connections: list[ConnectionBody] = Field(..., description="A list of 
connections to be created.")
+    action_if_exists: Literal["skip", "overwrite", "fail"] = "fail"
+
+
+class ConnectionBulkUpdateAction(BaseModel):
+    """Bulk Update Connection serializer for request bodies."""
+
+    action: Literal["update"] = "update"
+    connections: list[ConnectionBody] = Field(..., description="A list of 
connections to be updated.")
+    action_if_not_exists: Literal["skip", "fail"] = "fail"
 
-    connections: list[ConnectionBody]
-    overwrite: bool | None = Field(default=False)
+
+class ConnectionBulkDeleteAction(BaseModel):
+    """Bulk Delete Connection serializer for request bodies."""
+
+    action: Literal["delete"] = "delete"
+    connection_ids: list[str] = Field(..., description="A list of connection 
IDs to be deleted.")
+    action_if_not_exists: Literal["skip", "fail"] = "fail"
+
+
+class ConnectionBulkBody(BaseModel):
+    """Request body for bulk Connection operations (create, update, delete)."""
+
+    actions: list[ConnectionBulkCreateAction | ConnectionBulkUpdateAction | 
ConnectionBulkDeleteAction] = (
+        Field(..., description="A list of Connection actions to perform.")
+    )
+
+
+class ConnectionBulkActionResponse(BaseModel):
+    """
+    Serializer for individual bulk action responses.
+
+    Represents the outcome of a single bulk operation (create, update, or 
delete).
+    The response includes a list of successful connection_ids and any errors 
encountered during the operation.
+    This structure helps users understand which key actions succeeded and 
which failed.
+    """
+
+    success: list[str] = Field(
+        default_factory=list, description="A list of connection_ids 
representing successful operations."
+    )
+    errors: list[dict[str, Any]] = Field(
+        default_factory=list,
+        description="A list of errors encountered during the operation, each 
containing details about the issue.",
+    )

Review Comment:
   For instance this can almost be used for all resources (connection, 
variables etc..), we just need to update the `suecces` field description.



##########
airflow/api_fastapi/core_api/datamodels/connections.py:
##########
@@ -90,8 +91,74 @@ class ConnectionBody(BaseModel):
     extra: str | None = Field(default=None)
 
 
-class ConnectionBulkBody(BaseModel):
-    """Connections Serializer for requests body."""
+class ConnectionBulkCreateAction(BaseModel):
+    """Bulk Create Variable serializer for request bodies."""
+
+    action: Literal["create"] = "create"
+    connections: list[ConnectionBody] = Field(..., description="A list of 
connections to be created.")
+    action_if_exists: Literal["skip", "overwrite", "fail"] = "fail"
+
+
+class ConnectionBulkUpdateAction(BaseModel):
+    """Bulk Update Connection serializer for request bodies."""
+
+    action: Literal["update"] = "update"
+    connections: list[ConnectionBody] = Field(..., description="A list of 
connections to be updated.")
+    action_if_not_exists: Literal["skip", "fail"] = "fail"
 
-    connections: list[ConnectionBody]
-    overwrite: bool | None = Field(default=False)
+
+class ConnectionBulkDeleteAction(BaseModel):
+    """Bulk Delete Connection serializer for request bodies."""
+
+    action: Literal["delete"] = "delete"
+    connection_ids: list[str] = Field(..., description="A list of connection 
IDs to be deleted.")
+    action_if_not_exists: Literal["skip", "fail"] = "fail"
+
+
+class ConnectionBulkBody(BaseModel):
+    """Request body for bulk Connection operations (create, update, delete)."""
+
+    actions: list[ConnectionBulkCreateAction | ConnectionBulkUpdateAction | 
ConnectionBulkDeleteAction] = (

Review Comment:
   https://docs.pydantic.dev/latest/concepts/unions/



##########
airflow/api_fastapi/core_api/datamodels/connections.py:
##########
@@ -90,8 +91,74 @@ class ConnectionBody(BaseModel):
     extra: str | None = Field(default=None)
 
 
-class ConnectionBulkBody(BaseModel):
-    """Connections Serializer for requests body."""
+class ConnectionBulkCreateAction(BaseModel):
+    """Bulk Create Variable serializer for request bodies."""
+
+    action: Literal["create"] = "create"
+    connections: list[ConnectionBody] = Field(..., description="A list of 
connections to be created.")
+    action_if_exists: Literal["skip", "overwrite", "fail"] = "fail"
+
+
+class ConnectionBulkUpdateAction(BaseModel):

Review Comment:
   In a follow up PR, we can unify the datamodels. Basically, all the 
`XXXXXXBulkUpdateAction` for connections, variables etc... will have `action` 
and `action_if_not_exists` that are the same, we could mutualize some fields 
and code in common base datamodels. (for later).



##########
tests/api_fastapi/core_api/routes/public/test_connections.py:
##########
@@ -1058,3 +635,304 @@ def test_should_call_db_create_default_connections(self, 
mock_db_create_default_
         response = test_client.post("/public/connections/defaults")
         assert response.status_code == 204
         mock_db_create_default_connections.assert_called_once()
+
+
+class TestBulkConnections(TestConnectionEndpoint):
+    @pytest.mark.parametrize(
+        "actions, expected_results",
+        [
+            # Test successful create
+            (
+                {
+                    "actions": [
+                        {
+                            "action": "create",
+                            "connections": [
+                                {
+                                    "connection_id": "NOT_EXISTING_CONN_ID",
+                                    "conn_type": "NOT_EXISTING_CONN_TYPE",
+                                }
+                            ],
+                            "action_if_exists": "skip",
+                        }
+                    ]
+                },
+                {
+                    "create": {
+                        "success": ["NOT_EXISTING_CONN_ID"],
+                        "errors": [],
+                    }
+                },
+            ),
+            # Test successful create with skip
+            (
+                {
+                    "actions": [
+                        {
+                            "action": "create",
+                            "connections": [
+                                {
+                                    "connection_id": TEST_CONN_ID,
+                                    "conn_type": TEST_CONN_TYPE,
+                                },
+                                {
+                                    "connection_id": "NOT_EXISTING_CONN_ID",
+                                    "conn_type": "NOT_EXISTING_CONN_TYPE",
+                                },
+                            ],
+                            "action_if_exists": "skip",
+                        }
+                    ]
+                },
+                {
+                    "create": {
+                        "success": ["NOT_EXISTING_CONN_ID"],
+                        "errors": [],
+                    }
+                },
+            ),
+            # Test create with overwrite
+            (
+                {
+                    "actions": [
+                        {
+                            "action": "create",
+                            "connections": [
+                                {
+                                    "connection_id": TEST_CONN_ID,
+                                    "conn_type": TEST_CONN_TYPE,
+                                    "description": "new_description",
+                                }
+                            ],
+                            "action_if_exists": "overwrite",
+                        }
+                    ]
+                },
+                {
+                    "create": {
+                        "success": [TEST_CONN_ID],
+                        "errors": [],
+                    }
+                },
+            ),
+            # Test create conflict
+            (
+                {
+                    "actions": [
+                        {
+                            "action": "create",
+                            "connections": [
+                                {
+                                    "connection_id": TEST_CONN_ID,
+                                    "conn_type": TEST_CONN_TYPE,
+                                    "description": TEST_CONN_DESCRIPTION,
+                                    "host": TEST_CONN_HOST,
+                                    "port": TEST_CONN_PORT,
+                                    "login": TEST_CONN_LOGIN,
+                                },
+                            ],
+                            "action_if_exists": "fail",
+                        }
+                    ]
+                },
+                {
+                    "create": {
+                        "success": [],
+                        "errors": [
+                            {
+                                "error": "The connections with these 
connection_ids: {'test_connection_id'} already exist.",
+                                "status_code": 409,
+                            },
+                        ],
+                    }
+                },
+            ),
+            # Test successful update
+            (
+                {
+                    "actions": [
+                        {
+                            "action": "update",
+                            "connections": [
+                                {
+                                    "connection_id": TEST_CONN_ID,
+                                    "conn_type": TEST_CONN_TYPE,
+                                    "description": "new_description",
+                                }
+                            ],
+                            "action_if_not_exists": "skip",
+                        }
+                    ]
+                },
+                {
+                    "update": {
+                        "success": [TEST_CONN_ID],
+                        "errors": [],
+                    }
+                },
+            ),
+            # Test update with skip
+            (
+                {
+                    "actions": [
+                        {
+                            "action": "update",
+                            "connections": [
+                                {
+                                    "connection_id": "NOT_EXISTING_CONN_ID",
+                                    "conn_type": "NOT_EXISTING_CONN_TYPE",
+                                }
+                            ],
+                            "action_if_not_exists": "skip",
+                        }
+                    ]
+                },
+                {
+                    "update": {
+                        "success": [],
+                        "errors": [],
+                    }
+                },
+            ),
+            # Test update with fail
+            (
+                {
+                    "actions": [
+                        {
+                            "action": "update",
+                            "connections": [
+                                {
+                                    "connection_id": "NOT_EXISTING_CONN_ID",
+                                    "conn_type": "NOT_EXISTING_CONN_TYPE",
+                                }
+                            ],
+                            "action_if_not_exists": "fail",
+                        }
+                    ]
+                },
+                {
+                    "update": {
+                        "success": [],
+                        "errors": [
+                            {
+                                "error": "The connections with these 
connection_ids: {'NOT_EXISTING_CONN_ID'} were not found.",
+                                "status_code": 404,
+                            }
+                        ],
+                    }
+                },
+            ),
+            # Test successful delete
+            (
+                {
+                    "actions": [
+                        {
+                            "action": "delete",
+                            "connection_ids": [TEST_CONN_ID],
+                        }
+                    ]
+                },
+                {
+                    "delete": {
+                        "success": [TEST_CONN_ID],
+                        "errors": [],
+                    }
+                },
+            ),
+            # Test delete with skip
+            (
+                {
+                    "actions": [
+                        {
+                            "action": "delete",
+                            "connection_ids": ["NOT_EXISTING_CONN_ID"],
+                            "action_if_not_exists": "skip",
+                        }
+                    ]
+                },
+                {
+                    "delete": {
+                        "success": [],
+                        "errors": [],
+                    }
+                },
+            ),
+            # Test delete not found
+            (
+                {
+                    "actions": [
+                        {
+                            "action": "delete",
+                            "connection_ids": ["NOT_EXISTING_CONN_ID"],
+                            "action_if_not_exists": "fail",
+                        }
+                    ]
+                },
+                {
+                    "delete": {
+                        "success": [],
+                        "errors": [
+                            {
+                                "error": "The connections with these 
connection_ids: {'NOT_EXISTING_CONN_ID'} were not found.",
+                                "status_code": 404,
+                            }
+                        ],
+                    }
+                },
+            ),
+            # Test Create, Update, Delete
+            (
+                {
+                    "actions": [
+                        {
+                            "action": "create",
+                            "connections": [
+                                {
+                                    "connection_id": "NOT_EXISTING_CONN_ID",
+                                    "conn_type": "NOT_EXISTING_CONN_TYPE",
+                                }
+                            ],
+                            "action_if_exists": "skip",
+                        },
+                        {
+                            "action": "update",
+                            "connections": [
+                                {
+                                    "connection_id": TEST_CONN_ID,
+                                    "conn_type": TEST_CONN_TYPE,
+                                    "description": "new_description",
+                                }
+                            ],
+                            "action_if_not_exists": "skip",
+                        },
+                        {
+                            "action": "delete",
+                            "connection_ids": [TEST_CONN_ID],
+                            "action_if_not_exists": "skip",
+                        },
+                    ]
+                },
+                {
+                    "create": {
+                        "success": ["NOT_EXISTING_CONN_ID"],
+                        "errors": [],
+                    },
+                    "update": {
+                        "success": [TEST_CONN_ID],
+                        "errors": [],
+                    },
+                    "delete": {
+                        "success": [TEST_CONN_ID],
+                        "errors": [],
+                    },
+                },
+            ),
+        ],
+    )
+    def test_bulk_connections(self, test_client, actions, expected_results):
+        self.create_connections()
+        response = test_client.patch("/public/connections", json=actions)
+        response_data = response.json()
+        print(response_data)

Review Comment:
   to remove



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to