pierrejeambrun commented on code in PR #45715: URL: https://github.com/apache/airflow/pull/45715#discussion_r1922453886
########## airflow/api_fastapi/core_api/datamodels/connections.py: ########## @@ -90,8 +91,74 @@ class ConnectionBody(BaseModel): extra: str | None = Field(default=None) -class ConnectionBulkBody(BaseModel): - """Connections Serializer for requests body.""" +class ConnectionBulkCreateAction(BaseModel): + """Bulk Create Variable serializer for request bodies.""" + + action: Literal["create"] = "create" + connections: list[ConnectionBody] = Field(..., description="A list of connections to be created.") + action_if_exists: Literal["skip", "overwrite", "fail"] = "fail" + + +class ConnectionBulkUpdateAction(BaseModel): + """Bulk Update Connection serializer for request bodies.""" + + action: Literal["update"] = "update" + connections: list[ConnectionBody] = Field(..., description="A list of connections to be updated.") + action_if_not_exists: Literal["skip", "fail"] = "fail" - connections: list[ConnectionBody] - overwrite: bool | None = Field(default=False) + +class ConnectionBulkDeleteAction(BaseModel): + """Bulk Delete Connection serializer for request bodies.""" + + action: Literal["delete"] = "delete" + connection_ids: list[str] = Field(..., description="A list of connection IDs to be deleted.") + action_if_not_exists: Literal["skip", "fail"] = "fail" + + +class ConnectionBulkBody(BaseModel): + """Request body for bulk Connection operations (create, update, delete).""" + + actions: list[ConnectionBulkCreateAction | ConnectionBulkUpdateAction | ConnectionBulkDeleteAction] = ( Review Comment: I think we can add a pydantic discriminator to save time at the serialization time. Basically the `action` field tells us what model should be use to deserialize an action. ########## airflow/api_fastapi/core_api/datamodels/connections.py: ########## @@ -90,8 +91,74 @@ class ConnectionBody(BaseModel): extra: str | None = Field(default=None) -class ConnectionBulkBody(BaseModel): - """Connections Serializer for requests body.""" +class ConnectionBulkCreateAction(BaseModel): + """Bulk Create Variable serializer for request bodies.""" + + action: Literal["create"] = "create" Review Comment: Maybe we should switch to an enum for the `action`. To avoid hardcoding everywhere `create` `delete` ... strings. ########## airflow/api_fastapi/core_api/datamodels/connections.py: ########## @@ -90,8 +91,74 @@ class ConnectionBody(BaseModel): extra: str | None = Field(default=None) -class ConnectionBulkBody(BaseModel): - """Connections Serializer for requests body.""" +class ConnectionBulkCreateAction(BaseModel): + """Bulk Create Variable serializer for request bodies.""" + + action: Literal["create"] = "create" + connections: list[ConnectionBody] = Field(..., description="A list of connections to be created.") + action_if_exists: Literal["skip", "overwrite", "fail"] = "fail" + + +class ConnectionBulkUpdateAction(BaseModel): + """Bulk Update Connection serializer for request bodies.""" + + action: Literal["update"] = "update" + connections: list[ConnectionBody] = Field(..., description="A list of connections to be updated.") + action_if_not_exists: Literal["skip", "fail"] = "fail" - connections: list[ConnectionBody] - overwrite: bool | None = Field(default=False) + +class ConnectionBulkDeleteAction(BaseModel): + """Bulk Delete Connection serializer for request bodies.""" + + action: Literal["delete"] = "delete" + connection_ids: list[str] = Field(..., description="A list of connection IDs to be deleted.") + action_if_not_exists: Literal["skip", "fail"] = "fail" + + +class ConnectionBulkBody(BaseModel): + """Request body for bulk Connection operations (create, update, delete).""" + + actions: list[ConnectionBulkCreateAction | ConnectionBulkUpdateAction | ConnectionBulkDeleteAction] = ( + Field(..., description="A list of Connection actions to perform.") + ) + + +class ConnectionBulkActionResponse(BaseModel): + """ + Serializer for individual bulk action responses. + + Represents the outcome of a single bulk operation (create, update, or delete). + The response includes a list of successful connection_ids and any errors encountered during the operation. + This structure helps users understand which key actions succeeded and which failed. + """ + + success: list[str] = Field( + default_factory=list, description="A list of connection_ids representing successful operations." + ) + errors: list[dict[str, Any]] = Field( + default_factory=list, + description="A list of errors encountered during the operation, each containing details about the issue.", + ) Review Comment: For instance this can almost be used for all resources (connection, variables etc..), we just need to update the `suecces` field description. ########## airflow/api_fastapi/core_api/datamodels/connections.py: ########## @@ -90,8 +91,74 @@ class ConnectionBody(BaseModel): extra: str | None = Field(default=None) -class ConnectionBulkBody(BaseModel): - """Connections Serializer for requests body.""" +class ConnectionBulkCreateAction(BaseModel): + """Bulk Create Variable serializer for request bodies.""" + + action: Literal["create"] = "create" + connections: list[ConnectionBody] = Field(..., description="A list of connections to be created.") + action_if_exists: Literal["skip", "overwrite", "fail"] = "fail" + + +class ConnectionBulkUpdateAction(BaseModel): + """Bulk Update Connection serializer for request bodies.""" + + action: Literal["update"] = "update" + connections: list[ConnectionBody] = Field(..., description="A list of connections to be updated.") + action_if_not_exists: Literal["skip", "fail"] = "fail" - connections: list[ConnectionBody] - overwrite: bool | None = Field(default=False) + +class ConnectionBulkDeleteAction(BaseModel): + """Bulk Delete Connection serializer for request bodies.""" + + action: Literal["delete"] = "delete" + connection_ids: list[str] = Field(..., description="A list of connection IDs to be deleted.") + action_if_not_exists: Literal["skip", "fail"] = "fail" + + +class ConnectionBulkBody(BaseModel): + """Request body for bulk Connection operations (create, update, delete).""" + + actions: list[ConnectionBulkCreateAction | ConnectionBulkUpdateAction | ConnectionBulkDeleteAction] = ( Review Comment: https://docs.pydantic.dev/latest/concepts/unions/ ########## airflow/api_fastapi/core_api/datamodels/connections.py: ########## @@ -90,8 +91,74 @@ class ConnectionBody(BaseModel): extra: str | None = Field(default=None) -class ConnectionBulkBody(BaseModel): - """Connections Serializer for requests body.""" +class ConnectionBulkCreateAction(BaseModel): + """Bulk Create Variable serializer for request bodies.""" + + action: Literal["create"] = "create" + connections: list[ConnectionBody] = Field(..., description="A list of connections to be created.") + action_if_exists: Literal["skip", "overwrite", "fail"] = "fail" + + +class ConnectionBulkUpdateAction(BaseModel): Review Comment: In a follow up PR, we can unify the datamodels. Basically, all the `XXXXXXBulkUpdateAction` for connections, variables etc... will have `action` and `action_if_not_exists` that are the same, we could mutualize some fields and code in common base datamodels. (for later). ########## tests/api_fastapi/core_api/routes/public/test_connections.py: ########## @@ -1058,3 +635,304 @@ def test_should_call_db_create_default_connections(self, mock_db_create_default_ response = test_client.post("/public/connections/defaults") assert response.status_code == 204 mock_db_create_default_connections.assert_called_once() + + +class TestBulkConnections(TestConnectionEndpoint): + @pytest.mark.parametrize( + "actions, expected_results", + [ + # Test successful create + ( + { + "actions": [ + { + "action": "create", + "connections": [ + { + "connection_id": "NOT_EXISTING_CONN_ID", + "conn_type": "NOT_EXISTING_CONN_TYPE", + } + ], + "action_if_exists": "skip", + } + ] + }, + { + "create": { + "success": ["NOT_EXISTING_CONN_ID"], + "errors": [], + } + }, + ), + # Test successful create with skip + ( + { + "actions": [ + { + "action": "create", + "connections": [ + { + "connection_id": TEST_CONN_ID, + "conn_type": TEST_CONN_TYPE, + }, + { + "connection_id": "NOT_EXISTING_CONN_ID", + "conn_type": "NOT_EXISTING_CONN_TYPE", + }, + ], + "action_if_exists": "skip", + } + ] + }, + { + "create": { + "success": ["NOT_EXISTING_CONN_ID"], + "errors": [], + } + }, + ), + # Test create with overwrite + ( + { + "actions": [ + { + "action": "create", + "connections": [ + { + "connection_id": TEST_CONN_ID, + "conn_type": TEST_CONN_TYPE, + "description": "new_description", + } + ], + "action_if_exists": "overwrite", + } + ] + }, + { + "create": { + "success": [TEST_CONN_ID], + "errors": [], + } + }, + ), + # Test create conflict + ( + { + "actions": [ + { + "action": "create", + "connections": [ + { + "connection_id": TEST_CONN_ID, + "conn_type": TEST_CONN_TYPE, + "description": TEST_CONN_DESCRIPTION, + "host": TEST_CONN_HOST, + "port": TEST_CONN_PORT, + "login": TEST_CONN_LOGIN, + }, + ], + "action_if_exists": "fail", + } + ] + }, + { + "create": { + "success": [], + "errors": [ + { + "error": "The connections with these connection_ids: {'test_connection_id'} already exist.", + "status_code": 409, + }, + ], + } + }, + ), + # Test successful update + ( + { + "actions": [ + { + "action": "update", + "connections": [ + { + "connection_id": TEST_CONN_ID, + "conn_type": TEST_CONN_TYPE, + "description": "new_description", + } + ], + "action_if_not_exists": "skip", + } + ] + }, + { + "update": { + "success": [TEST_CONN_ID], + "errors": [], + } + }, + ), + # Test update with skip + ( + { + "actions": [ + { + "action": "update", + "connections": [ + { + "connection_id": "NOT_EXISTING_CONN_ID", + "conn_type": "NOT_EXISTING_CONN_TYPE", + } + ], + "action_if_not_exists": "skip", + } + ] + }, + { + "update": { + "success": [], + "errors": [], + } + }, + ), + # Test update with fail + ( + { + "actions": [ + { + "action": "update", + "connections": [ + { + "connection_id": "NOT_EXISTING_CONN_ID", + "conn_type": "NOT_EXISTING_CONN_TYPE", + } + ], + "action_if_not_exists": "fail", + } + ] + }, + { + "update": { + "success": [], + "errors": [ + { + "error": "The connections with these connection_ids: {'NOT_EXISTING_CONN_ID'} were not found.", + "status_code": 404, + } + ], + } + }, + ), + # Test successful delete + ( + { + "actions": [ + { + "action": "delete", + "connection_ids": [TEST_CONN_ID], + } + ] + }, + { + "delete": { + "success": [TEST_CONN_ID], + "errors": [], + } + }, + ), + # Test delete with skip + ( + { + "actions": [ + { + "action": "delete", + "connection_ids": ["NOT_EXISTING_CONN_ID"], + "action_if_not_exists": "skip", + } + ] + }, + { + "delete": { + "success": [], + "errors": [], + } + }, + ), + # Test delete not found + ( + { + "actions": [ + { + "action": "delete", + "connection_ids": ["NOT_EXISTING_CONN_ID"], + "action_if_not_exists": "fail", + } + ] + }, + { + "delete": { + "success": [], + "errors": [ + { + "error": "The connections with these connection_ids: {'NOT_EXISTING_CONN_ID'} were not found.", + "status_code": 404, + } + ], + } + }, + ), + # Test Create, Update, Delete + ( + { + "actions": [ + { + "action": "create", + "connections": [ + { + "connection_id": "NOT_EXISTING_CONN_ID", + "conn_type": "NOT_EXISTING_CONN_TYPE", + } + ], + "action_if_exists": "skip", + }, + { + "action": "update", + "connections": [ + { + "connection_id": TEST_CONN_ID, + "conn_type": TEST_CONN_TYPE, + "description": "new_description", + } + ], + "action_if_not_exists": "skip", + }, + { + "action": "delete", + "connection_ids": [TEST_CONN_ID], + "action_if_not_exists": "skip", + }, + ] + }, + { + "create": { + "success": ["NOT_EXISTING_CONN_ID"], + "errors": [], + }, + "update": { + "success": [TEST_CONN_ID], + "errors": [], + }, + "delete": { + "success": [TEST_CONN_ID], + "errors": [], + }, + }, + ), + ], + ) + def test_bulk_connections(self, test_client, actions, expected_results): + self.create_connections() + response = test_client.patch("/public/connections", json=actions) + response_data = response.json() + print(response_data) Review Comment: to remove -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org