This is an automated email from the ASF dual-hosted git repository.

vogievetsky pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new a7a49f64cdc Web console: Fix inactive worker counting (#18806)
a7a49f64cdc is described below

commit a7a49f64cdc616eae0039bf4ffcce6846802c4dd
Author: Vadim Ogievetsky <[email protected]>
AuthorDate: Thu Dec 4 01:35:16 2025 +0000

    Web console: Fix inactive worker counting (#18806)
    
    * inactive if zero across all counters, not just input
    
    * add test
---
 web-console/src/druid-models/stages/stages.spec.ts | 357 ++++++++++++++++++++-
 web-console/src/druid-models/stages/stages.ts      |   6 +-
 2 files changed, 359 insertions(+), 4 deletions(-)

diff --git a/web-console/src/druid-models/stages/stages.spec.ts 
b/web-console/src/druid-models/stages/stages.spec.ts
index 0cc49486ba6..9b931215930 100644
--- a/web-console/src/druid-models/stages/stages.spec.ts
+++ b/web-console/src/druid-models/stages/stages.spec.ts
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-import { aggregateSortProgressCounters } from './stages';
+import { aggregateSortProgressCounters, Stages } from './stages';
 import { STAGES } from './stages.mock';
 
 describe('aggregateSortProgressCounters', () => {
@@ -69,6 +69,361 @@ describe('Stages', () => {
     });
   });
 
+  describe('#getInactiveWorkerCount', () => {
+    it('returns undefined when no counters exist for stage', () => {
+      // Create a custom Stages instance where stage has no counters
+      const customStages = new Stages(
+        [
+          {
+            stageNumber: 5,
+            definition: {
+              id: 'test-stage-no-counters',
+              input: [
+                {
+                  type: 'external',
+                  inputSource: { type: 'http', uris: [] },
+                  inputFormat: { type: 'json' },
+                  signature: [],
+                },
+              ],
+              processor: { type: 'scan' },
+              signature: [],
+              maxWorkerCount: 1,
+            },
+            phase: 'NEW',
+            workerCount: 1,
+            partitionCount: 1,
+          },
+        ],
+        {},
+      );
+
+      
expect(customStages.getInactiveWorkerCount(customStages.stages[0])).toBeUndefined();
+    });
+
+    it('counts workers with zero rows across all channels', () => {
+      // Stage 2 has counters data in the mock
+      const inactiveCount = STAGES.getInactiveWorkerCount(STAGES.stages[2]);
+      expect(inactiveCount).toBe(0);
+    });
+
+    it('identifies inactive workers correctly', () => {
+      // Create a custom Stages instance with workers that have zero rows
+      const customStages = new Stages(
+        [
+          {
+            stageNumber: 0,
+            definition: {
+              id: 'test-stage',
+              input: [
+                {
+                  type: 'external',
+                  inputSource: { type: 'http', uris: [] },
+                  inputFormat: { type: 'json' },
+                  signature: [],
+                },
+              ],
+              processor: { type: 'scan' },
+              signature: [],
+              maxWorkerCount: 3,
+            },
+            phase: 'READING_INPUT',
+            workerCount: 3,
+            partitionCount: 1,
+          },
+        ],
+        {
+          '0': {
+            '0': {
+              input0: {
+                type: 'channel',
+                rows: [100],
+                bytes: [1000],
+              },
+              output: {
+                type: 'channel',
+                rows: [100],
+                bytes: [1000],
+              },
+            },
+            '1': {
+              input0: {
+                type: 'channel',
+                rows: [0],
+                bytes: [0],
+              },
+              output: {
+                type: 'channel',
+                rows: [0],
+                bytes: [0],
+              },
+            },
+            '2': {
+              input0: {
+                type: 'channel',
+                rows: [0],
+                bytes: [0],
+              },
+              output: {
+                type: 'channel',
+                rows: [0],
+                bytes: [0],
+              },
+            },
+          },
+        },
+      );
+
+      const inactiveCount = 
customStages.getInactiveWorkerCount(customStages.stages[0]);
+      expect(inactiveCount).toBe(2);
+    });
+
+    it('handles missing channel data correctly', () => {
+      // Create a custom Stages instance where some workers have missing 
channels
+      const customStages = new Stages(
+        [
+          {
+            stageNumber: 0,
+            definition: {
+              id: 'test-stage',
+              input: [
+                {
+                  type: 'external',
+                  inputSource: { type: 'http', uris: [] },
+                  inputFormat: { type: 'json' },
+                  signature: [],
+                },
+              ],
+              processor: { type: 'scan' },
+              signature: [],
+              maxWorkerCount: 2,
+            },
+            phase: 'READING_INPUT',
+            workerCount: 2,
+            partitionCount: 1,
+          },
+        ],
+        {
+          '0': {
+            '0': {
+              input0: {
+                type: 'channel',
+                rows: [100],
+                bytes: [1000],
+              },
+            },
+            '1': {
+              // Missing input0 channel - should be counted as inactive
+            },
+          },
+        },
+      );
+
+      const inactiveCount = 
customStages.getInactiveWorkerCount(customStages.stages[0]);
+      expect(inactiveCount).toBe(1);
+    });
+
+    it('counts all workers as inactive when all have zero rows', () => {
+      const customStages = new Stages(
+        [
+          {
+            stageNumber: 0,
+            definition: {
+              id: 'test-stage',
+              input: [
+                {
+                  type: 'external',
+                  inputSource: { type: 'http', uris: [] },
+                  inputFormat: { type: 'json' },
+                  signature: [],
+                },
+              ],
+              processor: { type: 'scan' },
+              signature: [],
+              maxWorkerCount: 2,
+            },
+            phase: 'READING_INPUT',
+            workerCount: 2,
+            partitionCount: 1,
+          },
+        ],
+        {
+          '0': {
+            '0': {
+              input0: {
+                type: 'channel',
+                rows: [],
+                bytes: [],
+              },
+            },
+            '1': {
+              input0: {
+                type: 'channel',
+                rows: [0],
+                bytes: [0],
+              },
+            },
+          },
+        },
+      );
+
+      const inactiveCount = 
customStages.getInactiveWorkerCount(customStages.stages[0]);
+      expect(inactiveCount).toBe(2);
+    });
+
+    it('counts no inactive workers when all have non-zero rows', () => {
+      const customStages = new Stages(
+        [
+          {
+            stageNumber: 0,
+            definition: {
+              id: 'test-stage',
+              input: [
+                {
+                  type: 'external',
+                  inputSource: { type: 'http', uris: [] },
+                  inputFormat: { type: 'json' },
+                  signature: [],
+                },
+              ],
+              processor: { type: 'scan' },
+              signature: [],
+              maxWorkerCount: 3,
+            },
+            phase: 'READING_INPUT',
+            workerCount: 3,
+            partitionCount: 1,
+          },
+        ],
+        {
+          '0': {
+            '0': {
+              input0: {
+                type: 'channel',
+                rows: [100],
+                bytes: [1000],
+              },
+            },
+            '1': {
+              input0: {
+                type: 'channel',
+                rows: [50],
+                bytes: [500],
+              },
+            },
+            '2': {
+              input0: {
+                type: 'channel',
+                rows: [75],
+                bytes: [750],
+              },
+            },
+          },
+        },
+      );
+
+      const inactiveCount = 
customStages.getInactiveWorkerCount(customStages.stages[0]);
+      expect(inactiveCount).toBe(0);
+    });
+
+    it('counts worker as active if it has output but no input yet', () => {
+      // Tests the fix: input is reported in batches, so a worker might have 
output
+      // before input counters are updated. Such workers should be considered 
active.
+      const customStages = new Stages(
+        [
+          {
+            stageNumber: 0,
+            definition: {
+              id: 'test-stage',
+              input: [
+                {
+                  type: 'external',
+                  inputSource: { type: 'http', uris: [] },
+                  inputFormat: { type: 'json' },
+                  signature: [],
+                },
+              ],
+              processor: { type: 'scan' },
+              signature: [],
+              shuffleSpec: {
+                type: 'targetSize',
+                clusterBy: { columns: [] },
+                targetSize: 3000000,
+              },
+              maxWorkerCount: 3,
+            },
+            phase: 'READING_INPUT',
+            workerCount: 3,
+            partitionCount: 1,
+          },
+        ],
+        {
+          '0': {
+            '0': {
+              input0: {
+                type: 'channel',
+                rows: [100],
+                bytes: [1000],
+              },
+              output: {
+                type: 'channel',
+                rows: [100],
+                bytes: [1000],
+              },
+              shuffle: {
+                type: 'channel',
+                rows: [100],
+                bytes: [1000],
+              },
+            },
+            '1': {
+              // Worker 1 has output and shuffle but input is not reported yet 
(still zero)
+              // This can happen because input is reported in batches
+              input0: {
+                type: 'channel',
+                rows: [0],
+                bytes: [0],
+              },
+              output: {
+                type: 'channel',
+                rows: [50],
+                bytes: [500],
+              },
+              shuffle: {
+                type: 'channel',
+                rows: [50],
+                bytes: [500],
+              },
+            },
+            '2': {
+              // Worker 2 is truly inactive - zero across all channels
+              input0: {
+                type: 'channel',
+                rows: [0],
+                bytes: [0],
+              },
+              output: {
+                type: 'channel',
+                rows: [0],
+                bytes: [0],
+              },
+              shuffle: {
+                type: 'channel',
+                rows: [0],
+                bytes: [0],
+              },
+            },
+          },
+        },
+      );
+
+      const inactiveCount = 
customStages.getInactiveWorkerCount(customStages.stages[0]);
+      // Only worker 2 should be counted as inactive
+      // Worker 1 has output/shuffle data, so it's active even though input is 
zero
+      expect(inactiveCount).toBe(1);
+    });
+  });
+
   describe('#getByPartitionCountersForStage', () => {
     it('works for input', () => {
       expect(STAGES.getByPartitionCountersForStage(STAGES.stages[2], 
'in')).toMatchInlineSnapshot(`
diff --git a/web-console/src/druid-models/stages/stages.ts 
b/web-console/src/druid-models/stages/stages.ts
index 65818c80467..13131dc344c 100644
--- a/web-console/src/druid-models/stages/stages.ts
+++ b/web-console/src/druid-models/stages/stages.ts
@@ -619,17 +619,17 @@ export class Stages {
 
   getInactiveWorkerCount(stage: StageDefinition): number | undefined {
     const { counters } = this;
-    const { stageNumber, definition } = stage;
+    const { stageNumber } = stage;
     const forStageCounters = counters?.[stageNumber];
     if (!forStageCounters) return;
 
-    const inputChannelCounters = definition.input.map((_, i) => `input${i}` as 
ChannelCounterName);
+    const channelCounters = this.getChannelCounterNamesForStage(stage);
 
     // Calculate and return the number of workers that have zero count across 
all inputChannelCounters
     return sum(
       Object.values(forStageCounters).map(stageCounters =>
         Number(
-          inputChannelCounters.every(channel => {
+          channelCounters.every(channel => {
             const c = stageCounters[channel];
             if (!c) return true;
             const totalRows = sum(c.rows || []);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to