This is an automated email from the ASF dual-hosted git repository.

vincbeck pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new b2e7e102250 Add real-time concurrency control for edge workers via UI 
(#63142)
b2e7e102250 is described below

commit b2e7e102250d47b866b0302e925c0aec57cb675e
Author: Dheeraj Turaga <[email protected]>
AuthorDate: Mon Mar 9 18:18:41 2026 -0500

    Add real-time concurrency control for edge workers via UI (#63142)
    
    * feat(edge3): Add real-time concurrency control for edge workers via UI
    
      Previously, operators had no way to dynamically adjust an edge worker's
      concurrency limit without SSH access and CLI intervention, creating an
      operational bottleneck in distributed task execution environments.
    
      This contribution adds a new REST API endpoint
      (PATCH /worker/{worker_name}/concurrency) and a React UI control to the
      Apache Airflow Edge Provider, enabling administrators to tune worker
      throughput in real time from the Airflow web interface. The change is
      propagated to the worker process on its next heartbeat cycle without
      requiring a restart.
    
    * Add back what prek removed
---
 .../plugins/www/openapi-gen/queries/common.ts      |   1 +
 .../plugins/www/openapi-gen/queries/queries.ts     |   9 +-
 .../www/openapi-gen/requests/schemas.gen.ts        |  15 ++
 .../www/openapi-gen/requests/services.gen.ts       |  28 +++-
 .../plugins/www/openapi-gen/requests/types.gen.ts  |  29 ++++
 .../www/src/components/WorkerConcurrencyButton.tsx | 157 +++++++++++++++++++++
 .../www/src/components/WorkerOperations.tsx        |   2 +
 .../providers/edge3/worker_api/datamodels_ui.py    |   6 +
 .../providers/edge3/worker_api/routes/ui.py        |  25 ++++
 .../edge3/worker_api/v2-edge-generated.yaml        |  47 ++++++
 .../tests/unit/edge3/worker_api/routes/test_ui.py  |  27 ++++
 11 files changed, 343 insertions(+), 3 deletions(-)

diff --git 
a/providers/edge3/src/airflow/providers/edge3/plugins/www/openapi-gen/queries/common.ts
 
b/providers/edge3/src/airflow/providers/edge3/plugins/www/openapi-gen/queries/common.ts
index 4d553521aea..2b90edb3b00 100644
--- 
a/providers/edge3/src/airflow/providers/edge3/plugins/www/openapi-gen/queries/common.ts
+++ 
b/providers/edge3/src/airflow/providers/edge3/plugins/www/openapi-gen/queries/common.ts
@@ -50,3 +50,4 @@ export type UiServiceUpdateWorkerMaintenanceMutationResult = 
Awaited<ReturnType<
 export type UiServiceExitWorkerMaintenanceMutationResult = 
Awaited<ReturnType<typeof UiService.exitWorkerMaintenance>>;
 export type UiServiceDeleteWorkerMutationResult = Awaited<ReturnType<typeof 
UiService.deleteWorker>>;
 export type UiServiceRemoveWorkerQueueMutationResult = 
Awaited<ReturnType<typeof UiService.removeWorkerQueue>>;
+export type UiServiceSetWorkerConcurrencyLimitMutationResult = 
Awaited<ReturnType<typeof UiService.setWorkerConcurrencyLimit>>;
diff --git 
a/providers/edge3/src/airflow/providers/edge3/plugins/www/openapi-gen/queries/queries.ts
 
b/providers/edge3/src/airflow/providers/edge3/plugins/www/openapi-gen/queries/queries.ts
index 782a26bbf91..83cae99c134 100644
--- 
a/providers/edge3/src/airflow/providers/edge3/plugins/www/openapi-gen/queries/queries.ts
+++ 
b/providers/edge3/src/airflow/providers/edge3/plugins/www/openapi-gen/queries/queries.ts
@@ -2,7 +2,7 @@
 
 import { UseMutationOptions, UseQueryOptions, useMutation, useQuery } from 
"@tanstack/react-query";
 import { JobsService, LogsService, MonitorService, UiService, WorkerService } 
from "../requests/services.gen";
-import { EdgeWorkerState, MaintenanceRequest, PushLogsBody, TaskInstanceState, 
WorkerQueueUpdateBody, WorkerQueuesBody, WorkerStateBody } from 
"../requests/types.gen";
+import { ConcurrencyRequest, EdgeWorkerState, MaintenanceRequest, 
PushLogsBody, TaskInstanceState, WorkerQueueUpdateBody, WorkerQueuesBody, 
WorkerStateBody } from "../requests/types.gen";
 import * as Common from "./common";
 export const useLogsServiceLogfilePath = <TData = 
Common.LogsServiceLogfilePathDefaultResponse, TError = unknown, TQueryKey 
extends Array<unknown> = unknown[]>({ authorization, dagId, mapIndex, runId, 
taskId, tryNumber }: {
   authorization: string;
@@ -139,3 +139,10 @@ export const useUiServiceRemoveWorkerQueue = <TData = 
Common.UiServiceRemoveWork
   queueName: string;
   workerName: string;
 }, TContext>({ mutationFn: ({ queueName, workerName }) => 
UiService.removeWorkerQueue({ queueName, workerName }) as unknown as 
Promise<TData>, ...options });
+export const useUiServiceSetWorkerConcurrencyLimit = <TData = 
Common.UiServiceSetWorkerConcurrencyLimitMutationResult, TError = unknown, 
TContext = unknown>(options?: Omit<UseMutationOptions<TData, TError, {
+  requestBody: ConcurrencyRequest;
+  workerName: string;
+}, TContext>, "mutationFn">) => useMutation<TData, TError, {
+  requestBody: ConcurrencyRequest;
+  workerName: string;
+}, TContext>({ mutationFn: ({ requestBody, workerName }) => 
UiService.setWorkerConcurrencyLimit({ requestBody, workerName }) as unknown as 
Promise<TData>, ...options });
diff --git 
a/providers/edge3/src/airflow/providers/edge3/plugins/www/openapi-gen/requests/schemas.gen.ts
 
b/providers/edge3/src/airflow/providers/edge3/plugins/www/openapi-gen/requests/schemas.gen.ts
index 11e48528f69..51756edfb44 100644
--- 
a/providers/edge3/src/airflow/providers/edge3/plugins/www/openapi-gen/requests/schemas.gen.ts
+++ 
b/providers/edge3/src/airflow/providers/edge3/plugins/www/openapi-gen/requests/schemas.gen.ts
@@ -257,6 +257,21 @@ export const $JobCollectionResponse = {
     description: 'Job Collection serializer.'
 } as const;
 
+export const $ConcurrencyRequest = {
+    properties: {
+        concurrency: {
+            type: 'integer',
+            exclusiveMinimum: 0,
+            title: 'Concurrency',
+            description: 'New concurrency limit for the worker.'
+        }
+    },
+    type: 'object',
+    required: ['concurrency'],
+    title: 'ConcurrencyRequest',
+    description: 'Request body for worker concurrency update.'
+} as const;
+
 export const $MaintenanceRequest = {
     properties: {
         maintenance_comment: {
diff --git 
a/providers/edge3/src/airflow/providers/edge3/plugins/www/openapi-gen/requests/services.gen.ts
 
b/providers/edge3/src/airflow/providers/edge3/plugins/www/openapi-gen/requests/services.gen.ts
index da00585e044..2dd062ec1ff 100644
--- 
a/providers/edge3/src/airflow/providers/edge3/plugins/www/openapi-gen/requests/services.gen.ts
+++ 
b/providers/edge3/src/airflow/providers/edge3/plugins/www/openapi-gen/requests/services.gen.ts
@@ -3,7 +3,7 @@
 import type { CancelablePromise } from './core/CancelablePromise';
 import { OpenAPI } from './core/OpenAPI';
 import { request as __request } from './core/request';
-import type { FetchData, FetchResponse, StateData, StateResponse, 
LogfilePathData, LogfilePathResponse, PushLogsData, PushLogsResponse, 
RegisterData, RegisterResponse, SetStateData, SetStateResponse, 
UpdateQueuesData, UpdateQueuesResponse, HealthResponse, WorkerData, 
WorkerResponse, JobsResponse, RequestWorkerMaintenanceData, 
RequestWorkerMaintenanceResponse, UpdateWorkerMaintenanceData, 
UpdateWorkerMaintenanceResponse, ExitWorkerMaintenanceData, 
ExitWorkerMaintenanceResponse, RequestWor [...]
+import type { FetchData, FetchResponse, StateData, StateResponse, 
LogfilePathData, LogfilePathResponse, PushLogsData, PushLogsResponse, 
RegisterData, RegisterResponse, SetStateData, SetStateResponse, 
UpdateQueuesData, UpdateQueuesResponse, HealthResponse, WorkerData, 
WorkerResponse, JobsResponse, RequestWorkerMaintenanceData, 
RequestWorkerMaintenanceResponse, UpdateWorkerMaintenanceData, 
UpdateWorkerMaintenanceResponse, ExitWorkerMaintenanceData, 
ExitWorkerMaintenanceResponse, RequestWor [...]
 
 export class JobsService {
     /**
@@ -472,5 +472,29 @@ export class UiService {
             }
         });
     }
-    
+
+    /**
+     * Set Worker Concurrency Limit
+     * Set the concurrency limit for an edge worker.
+     * @param data The data for the request.
+     * @param data.workerName
+     * @param data.requestBody
+     * @returns unknown Successful Response
+     * @throws ApiError
+     */
+    public static setWorkerConcurrencyLimit(data: 
SetWorkerConcurrencyLimitData): 
CancelablePromise<SetWorkerConcurrencyLimitResponse> {
+        return __request(OpenAPI, {
+            method: 'PATCH',
+            url: '/edge_worker/ui/worker/{worker_name}/concurrency',
+            path: {
+                worker_name: data.workerName
+            },
+            body: data.requestBody,
+            mediaType: 'application/json',
+            errors: {
+                422: 'Validation Error'
+            }
+        });
+    }
+
 }
\ No newline at end of file
diff --git 
a/providers/edge3/src/airflow/providers/edge3/plugins/www/openapi-gen/requests/types.gen.ts
 
b/providers/edge3/src/airflow/providers/edge3/plugins/www/openapi-gen/requests/types.gen.ts
index 12e2a71fd64..4ac26c50b1a 100644
--- 
a/providers/edge3/src/airflow/providers/edge3/plugins/www/openapi-gen/requests/types.gen.ts
+++ 
b/providers/edge3/src/airflow/providers/edge3/plugins/www/openapi-gen/requests/types.gen.ts
@@ -523,6 +523,20 @@ export type RemoveWorkerQueueData = {
 
 export type RemoveWorkerQueueResponse = unknown;
 
+export type ConcurrencyRequest = {
+    /**
+     * New concurrency limit for the worker.
+     */
+    concurrency: number;
+};
+
+export type SetWorkerConcurrencyLimitData = {
+    workerName: string;
+    requestBody: ConcurrencyRequest;
+};
+
+export type SetWorkerConcurrencyLimitResponse = unknown;
+
 export type $OpenApiTs = {
     '/edge_worker/v1/jobs/fetch/{worker_name}': {
         post: {
@@ -831,4 +845,19 @@ export type $OpenApiTs = {
             };
         };
     };
+    '/edge_worker/ui/worker/{worker_name}/concurrency': {
+        patch: {
+            req: SetWorkerConcurrencyLimitData;
+            res: {
+                /**
+                 * Successful Response
+                 */
+                200: unknown;
+                /**
+                 * Validation Error
+                 */
+                422: HTTPValidationError;
+            };
+        };
+    };
 };
\ No newline at end of file
diff --git 
a/providers/edge3/src/airflow/providers/edge3/plugins/www/src/components/WorkerConcurrencyButton.tsx
 
b/providers/edge3/src/airflow/providers/edge3/plugins/www/src/components/WorkerConcurrencyButton.tsx
new file mode 100644
index 00000000000..90b8404cb8e
--- /dev/null
+++ 
b/providers/edge3/src/airflow/providers/edge3/plugins/www/src/components/WorkerConcurrencyButton.tsx
@@ -0,0 +1,157 @@
+/*!
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+import {
+  Button,
+  CloseButton,
+  Dialog,
+  IconButton,
+  Input,
+  Portal,
+  Text,
+  VStack,
+  useDisclosure,
+} from "@chakra-ui/react";
+import { useUiServiceSetWorkerConcurrencyLimit } from "openapi/queries";
+import type { Worker } from "openapi/requests/types.gen";
+import { useState } from "react";
+import { LuSlidersHorizontal } from "react-icons/lu";
+
+interface WorkerConcurrencyButtonProps {
+  onConcurrencyUpdate: (toast: Record<string, string>) => void;
+  worker: Worker;
+}
+
+export const WorkerConcurrencyButton = ({
+  onConcurrencyUpdate,
+  worker,
+}: WorkerConcurrencyButtonProps) => {
+  const { onClose, onOpen, open } = useDisclosure();
+  const workerName = worker.worker_name;
+  const currentConcurrency = worker.sysinfo?.concurrency;
+  const [concurrency, setConcurrency] = useState<string>(
+    currentConcurrency !== undefined ? String(currentConcurrency) : "",
+  );
+
+  const setConcurrencyMutation = useUiServiceSetWorkerConcurrencyLimit({
+    onError: (error: unknown) => {
+      onConcurrencyUpdate({
+        description: `Unable to set concurrency for worker ${workerName}: 
${error}`,
+        title: "Set Concurrency Failed",
+        type: "error",
+      });
+    },
+    onSuccess: () => {
+      onConcurrencyUpdate({
+        description: `Concurrency for worker ${workerName} set to 
${concurrency}.`,
+        title: "Concurrency Updated",
+        type: "success",
+      });
+      onClose();
+    },
+  });
+
+  const handleSetConcurrency = () => {
+    const value = parseInt(concurrency, 10);
+
+    if (!concurrency.trim() || isNaN(value) || value <= 0) {
+      onConcurrencyUpdate({
+        description: "Please enter a valid concurrency value greater than 0.",
+        title: "Invalid Input",
+        type: "error",
+      });
+      return;
+    }
+
+    setConcurrencyMutation.mutate({
+      requestBody: { concurrency: value },
+      workerName,
+    });
+  };
+
+  const handleOpen = () => {
+    setConcurrency(currentConcurrency !== undefined ? 
String(currentConcurrency) : "");
+    onOpen();
+  };
+
+  const concurrencyValue = parseInt(concurrency, 10);
+  const isValid = concurrency.trim() !== "" && !isNaN(concurrencyValue) && 
concurrencyValue > 0;
+
+  return (
+    <>
+      <IconButton
+        aria-label="Set Concurrency"
+        colorPalette="blue"
+        onClick={handleOpen}
+        size="sm"
+        title="Set Concurrency"
+        variant="ghost"
+      >
+        <LuSlidersHorizontal />
+      </IconButton>
+
+      <Dialog.Root onOpenChange={onClose} open={open} size="md">
+        <Portal>
+          <Dialog.Backdrop />
+          <Dialog.Positioner>
+            <Dialog.Content>
+              <Dialog.Header>
+                <Dialog.Title>Set Concurrency for {workerName}</Dialog.Title>
+              </Dialog.Header>
+              <Dialog.Body>
+                <VStack align="stretch" gap={4}>
+                  <Text>Enter the new concurrency limit for this worker:</Text>
+                  <Input
+                    min={1}
+                    onChange={(e) => setConcurrency(e.target.value)}
+                    onKeyDown={(e) => {
+                      if (e.key === "Enter") {
+                        handleSetConcurrency();
+                      }
+                    }}
+                    placeholder="Concurrency limit"
+                    type="number"
+                    value={concurrency}
+                  />
+                </VStack>
+              </Dialog.Body>
+              <Dialog.Footer>
+                <Dialog.ActionTrigger asChild>
+                  <Button variant="outline">Cancel</Button>
+                </Dialog.ActionTrigger>
+                <Button
+                  colorPalette="blue"
+                  disabled={!isValid}
+                  loading={setConcurrencyMutation.isPending}
+                  loadingText="Setting concurrency..."
+                  onClick={handleSetConcurrency}
+                >
+                  <LuSlidersHorizontal />
+                  Set Concurrency
+                </Button>
+              </Dialog.Footer>
+              <Dialog.CloseTrigger asChild>
+                <CloseButton size="sm" />
+              </Dialog.CloseTrigger>
+            </Dialog.Content>
+          </Dialog.Positioner>
+        </Portal>
+      </Dialog.Root>
+    </>
+  );
+};
diff --git 
a/providers/edge3/src/airflow/providers/edge3/plugins/www/src/components/WorkerOperations.tsx
 
b/providers/edge3/src/airflow/providers/edge3/plugins/www/src/components/WorkerOperations.tsx
index 0d0aa727eea..1b5322ce08b 100644
--- 
a/providers/edge3/src/airflow/providers/edge3/plugins/www/src/components/WorkerOperations.tsx
+++ 
b/providers/edge3/src/airflow/providers/edge3/plugins/www/src/components/WorkerOperations.tsx
@@ -26,6 +26,7 @@ import { MaintenanceEditCommentButton } from 
"./MaintenanceEditCommentButton";
 import { MaintenanceEnterButton } from "./MaintenanceEnterButton";
 import { MaintenanceExitButton } from "./MaintenanceExitButton";
 import { RemoveQueueButton } from "./RemoveQueueButton";
+import { WorkerConcurrencyButton } from "./WorkerConcurrencyButton";
 import { WorkerDeleteButton } from "./WorkerDeleteButton";
 import { WorkerShutdownButton } from "./WorkerShutdownButton";
 
@@ -48,6 +49,7 @@ export const WorkerOperations = ({ onOperations, worker }: 
WorkerOperationsProps
       <Flex justifyContent="end" gap={2}>
         <AddQueueButton onQueueUpdate={onWorkerChange} workerName={workerName} 
/>
         <RemoveQueueButton onQueueUpdate={onWorkerChange} worker={worker} />
+        <WorkerConcurrencyButton onConcurrencyUpdate={onWorkerChange} 
worker={worker} />
         <MaintenanceEnterButton onEnterMaintenance={onWorkerChange} 
workerName={workerName} />
         <WorkerShutdownButton onShutdown={onWorkerChange} 
workerName={workerName} />
       </Flex>
diff --git 
a/providers/edge3/src/airflow/providers/edge3/worker_api/datamodels_ui.py 
b/providers/edge3/src/airflow/providers/edge3/worker_api/datamodels_ui.py
index 3aa2e12e9df..e38671928a6 100644
--- a/providers/edge3/src/airflow/providers/edge3/worker_api/datamodels_ui.py
+++ b/providers/edge3/src/airflow/providers/edge3/worker_api/datamodels_ui.py
@@ -77,3 +77,9 @@ class QueueUpdateRequest(BaseModel):
     """Request body for queue operations."""
 
     queue_name: Annotated[str, Field(description="Name of the queue to add or 
remove.")]
+
+
+class ConcurrencyRequest(BaseModel):
+    """Request body for worker concurrency update."""
+
+    concurrency: Annotated[int, Field(description="New concurrency limit for 
the worker.", gt=0)]
diff --git 
a/providers/edge3/src/airflow/providers/edge3/worker_api/routes/ui.py 
b/providers/edge3/src/airflow/providers/edge3/worker_api/routes/ui.py
index f24b1e19729..996a3a26128 100644
--- a/providers/edge3/src/airflow/providers/edge3/worker_api/routes/ui.py
+++ b/providers/edge3/src/airflow/providers/edge3/worker_api/routes/ui.py
@@ -38,8 +38,10 @@ from airflow.providers.edge3.models.edge_worker import (
     remove_worker_queues,
     request_maintenance,
     request_shutdown,
+    set_worker_concurrency,
 )
 from airflow.providers.edge3.worker_api.datamodels_ui import (
+    ConcurrencyRequest,
     Job,
     JobCollectionResponse,
     MaintenanceRequest,
@@ -325,3 +327,26 @@ def remove_worker_queue(
         remove_worker_queues(worker_name, [queue_name], session=session)
     except Exception as e:
         raise HTTPException(status.HTTP_400_BAD_REQUEST, detail=str(e))
+
+
+@ui_router.patch(
+    "/worker/{worker_name}/concurrency",
+    dependencies=[
+        Depends(requires_access_view(access_view=AccessView.JOBS)),
+    ],
+)
+def set_worker_concurrency_limit(
+    worker_name: str,
+    concurrency_request: ConcurrencyRequest,
+    session: SessionDep,
+) -> None:
+    """Set the concurrency limit for an edge worker."""
+    worker_query = select(EdgeWorkerModel).where(EdgeWorkerModel.worker_name 
== worker_name)
+    worker = session.scalar(worker_query)
+    if not worker:
+        raise HTTPException(status.HTTP_404_NOT_FOUND, detail=f"Worker 
{worker_name} not found")
+
+    try:
+        set_worker_concurrency(worker_name, concurrency_request.concurrency, 
session=session)
+    except Exception as e:
+        raise HTTPException(status.HTTP_400_BAD_REQUEST, detail=str(e))
diff --git 
a/providers/edge3/src/airflow/providers/edge3/worker_api/v2-edge-generated.yaml 
b/providers/edge3/src/airflow/providers/edge3/worker_api/v2-edge-generated.yaml
index a5daeacb86f..2904a2e0d3d 100644
--- 
a/providers/edge3/src/airflow/providers/edge3/worker_api/v2-edge-generated.yaml
+++ 
b/providers/edge3/src/airflow/providers/edge3/worker_api/v2-edge-generated.yaml
@@ -912,6 +912,41 @@ paths:
             application/json:
               schema:
                 $ref: '#/components/schemas/HTTPValidationError'
+  /edge_worker/ui/worker/{worker_name}/concurrency:
+    patch:
+      tags:
+      - UI
+      summary: Set Worker Concurrency Limit
+      description: Set the concurrency limit for an edge worker.
+      operationId: set_worker_concurrency_limit
+      security:
+      - OAuth2PasswordBearer: []
+      - HTTPBearer: []
+      parameters:
+      - name: worker_name
+        in: path
+        required: true
+        schema:
+          type: string
+          title: Worker Name
+      requestBody:
+        required: true
+        content:
+          application/json:
+            schema:
+              $ref: '#/components/schemas/ConcurrencyRequest'
+      responses:
+        '200':
+          description: Successful Response
+          content:
+            application/json:
+              schema: {}
+        '422':
+          description: Validation Error
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/HTTPValidationError'
 components:
   schemas:
     BundleInfo:
@@ -929,6 +964,18 @@ components:
       - name
       title: BundleInfo
       description: Schema for telling task which bundle to run with.
+    ConcurrencyRequest:
+      properties:
+        concurrency:
+          type: integer
+          exclusiveMinimum: 0.0
+          title: Concurrency
+          description: New concurrency limit for the worker.
+      type: object
+      required:
+      - concurrency
+      title: ConcurrencyRequest
+      description: Request body for worker concurrency update.
     EdgeJobFetched:
       properties:
         dag_id:
diff --git a/providers/edge3/tests/unit/edge3/worker_api/routes/test_ui.py 
b/providers/edge3/tests/unit/edge3/worker_api/routes/test_ui.py
index f1934b5d0f6..256cdd977d3 100644
--- a/providers/edge3/tests/unit/edge3/worker_api/routes/test_ui.py
+++ b/providers/edge3/tests/unit/edge3/worker_api/routes/test_ui.py
@@ -47,3 +47,30 @@ class TestUiApiRoutes:
         assert worker_response.total_entries == 1
         assert len(worker_response.workers) == 1
         assert worker_response.workers[0].worker_name == "worker1"
+
+    def test_set_worker_concurrency_limit(self, session: Session):
+        from airflow.providers.edge3.worker_api.datamodels_ui import 
ConcurrencyRequest
+        from airflow.providers.edge3.worker_api.routes.ui import 
set_worker_concurrency_limit
+
+        set_worker_concurrency_limit(
+            worker_name="worker1",
+            concurrency_request=ConcurrencyRequest(concurrency=4),
+            session=session,
+        )
+        worker_model = session.get(EdgeWorkerModel, "worker1")
+        assert worker_model is not None
+        assert worker_model.concurrency == 4
+
+    def test_set_worker_concurrency_limit_not_found(self, session: Session):
+        from fastapi import HTTPException
+
+        from airflow.providers.edge3.worker_api.datamodels_ui import 
ConcurrencyRequest
+        from airflow.providers.edge3.worker_api.routes.ui import 
set_worker_concurrency_limit
+
+        with pytest.raises(HTTPException) as exc_info:
+            set_worker_concurrency_limit(
+                worker_name="nonexistent_worker",
+                concurrency_request=ConcurrencyRequest(concurrency=4),
+                session=session,
+            )
+        assert exc_info.value.status_code == 404

Reply via email to