This is an automated email from the ASF dual-hosted git repository.
vogievetsky pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new a7a49f64cdc Web console: Fix inactive worker counting (#18806)
a7a49f64cdc is described below
commit a7a49f64cdc616eae0039bf4ffcce6846802c4dd
Author: Vadim Ogievetsky <[email protected]>
AuthorDate: Thu Dec 4 01:35:16 2025 +0000
Web console: Fix inactive worker counting (#18806)
* inactive if zero across all counters, not just input
* add test
---
web-console/src/druid-models/stages/stages.spec.ts | 357 ++++++++++++++++++++-
web-console/src/druid-models/stages/stages.ts | 6 +-
2 files changed, 359 insertions(+), 4 deletions(-)
diff --git a/web-console/src/druid-models/stages/stages.spec.ts
b/web-console/src/druid-models/stages/stages.spec.ts
index 0cc49486ba6..9b931215930 100644
--- a/web-console/src/druid-models/stages/stages.spec.ts
+++ b/web-console/src/druid-models/stages/stages.spec.ts
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-import { aggregateSortProgressCounters } from './stages';
+import { aggregateSortProgressCounters, Stages } from './stages';
import { STAGES } from './stages.mock';
describe('aggregateSortProgressCounters', () => {
@@ -69,6 +69,361 @@ describe('Stages', () => {
});
});
+ describe('#getInactiveWorkerCount', () => {
+ it('returns undefined when no counters exist for stage', () => {
+ // Create a custom Stages instance where stage has no counters
+ const customStages = new Stages(
+ [
+ {
+ stageNumber: 5,
+ definition: {
+ id: 'test-stage-no-counters',
+ input: [
+ {
+ type: 'external',
+ inputSource: { type: 'http', uris: [] },
+ inputFormat: { type: 'json' },
+ signature: [],
+ },
+ ],
+ processor: { type: 'scan' },
+ signature: [],
+ maxWorkerCount: 1,
+ },
+ phase: 'NEW',
+ workerCount: 1,
+ partitionCount: 1,
+ },
+ ],
+ {},
+ );
+
+
expect(customStages.getInactiveWorkerCount(customStages.stages[0])).toBeUndefined();
+ });
+
+ it('counts workers with zero rows across all channels', () => {
+ // Stage 2 has counters data in the mock
+ const inactiveCount = STAGES.getInactiveWorkerCount(STAGES.stages[2]);
+ expect(inactiveCount).toBe(0);
+ });
+
+ it('identifies inactive workers correctly', () => {
+ // Create a custom Stages instance with workers that have zero rows
+ const customStages = new Stages(
+ [
+ {
+ stageNumber: 0,
+ definition: {
+ id: 'test-stage',
+ input: [
+ {
+ type: 'external',
+ inputSource: { type: 'http', uris: [] },
+ inputFormat: { type: 'json' },
+ signature: [],
+ },
+ ],
+ processor: { type: 'scan' },
+ signature: [],
+ maxWorkerCount: 3,
+ },
+ phase: 'READING_INPUT',
+ workerCount: 3,
+ partitionCount: 1,
+ },
+ ],
+ {
+ '0': {
+ '0': {
+ input0: {
+ type: 'channel',
+ rows: [100],
+ bytes: [1000],
+ },
+ output: {
+ type: 'channel',
+ rows: [100],
+ bytes: [1000],
+ },
+ },
+ '1': {
+ input0: {
+ type: 'channel',
+ rows: [0],
+ bytes: [0],
+ },
+ output: {
+ type: 'channel',
+ rows: [0],
+ bytes: [0],
+ },
+ },
+ '2': {
+ input0: {
+ type: 'channel',
+ rows: [0],
+ bytes: [0],
+ },
+ output: {
+ type: 'channel',
+ rows: [0],
+ bytes: [0],
+ },
+ },
+ },
+ },
+ );
+
+ const inactiveCount =
customStages.getInactiveWorkerCount(customStages.stages[0]);
+ expect(inactiveCount).toBe(2);
+ });
+
+ it('handles missing channel data correctly', () => {
+ // Create a custom Stages instance where some workers have missing
channels
+ const customStages = new Stages(
+ [
+ {
+ stageNumber: 0,
+ definition: {
+ id: 'test-stage',
+ input: [
+ {
+ type: 'external',
+ inputSource: { type: 'http', uris: [] },
+ inputFormat: { type: 'json' },
+ signature: [],
+ },
+ ],
+ processor: { type: 'scan' },
+ signature: [],
+ maxWorkerCount: 2,
+ },
+ phase: 'READING_INPUT',
+ workerCount: 2,
+ partitionCount: 1,
+ },
+ ],
+ {
+ '0': {
+ '0': {
+ input0: {
+ type: 'channel',
+ rows: [100],
+ bytes: [1000],
+ },
+ },
+ '1': {
+ // Missing input0 channel - should be counted as inactive
+ },
+ },
+ },
+ );
+
+ const inactiveCount =
customStages.getInactiveWorkerCount(customStages.stages[0]);
+ expect(inactiveCount).toBe(1);
+ });
+
+ it('counts all workers as inactive when all have zero rows', () => {
+ const customStages = new Stages(
+ [
+ {
+ stageNumber: 0,
+ definition: {
+ id: 'test-stage',
+ input: [
+ {
+ type: 'external',
+ inputSource: { type: 'http', uris: [] },
+ inputFormat: { type: 'json' },
+ signature: [],
+ },
+ ],
+ processor: { type: 'scan' },
+ signature: [],
+ maxWorkerCount: 2,
+ },
+ phase: 'READING_INPUT',
+ workerCount: 2,
+ partitionCount: 1,
+ },
+ ],
+ {
+ '0': {
+ '0': {
+ input0: {
+ type: 'channel',
+ rows: [],
+ bytes: [],
+ },
+ },
+ '1': {
+ input0: {
+ type: 'channel',
+ rows: [0],
+ bytes: [0],
+ },
+ },
+ },
+ },
+ );
+
+ const inactiveCount =
customStages.getInactiveWorkerCount(customStages.stages[0]);
+ expect(inactiveCount).toBe(2);
+ });
+
+ it('counts no inactive workers when all have non-zero rows', () => {
+ const customStages = new Stages(
+ [
+ {
+ stageNumber: 0,
+ definition: {
+ id: 'test-stage',
+ input: [
+ {
+ type: 'external',
+ inputSource: { type: 'http', uris: [] },
+ inputFormat: { type: 'json' },
+ signature: [],
+ },
+ ],
+ processor: { type: 'scan' },
+ signature: [],
+ maxWorkerCount: 3,
+ },
+ phase: 'READING_INPUT',
+ workerCount: 3,
+ partitionCount: 1,
+ },
+ ],
+ {
+ '0': {
+ '0': {
+ input0: {
+ type: 'channel',
+ rows: [100],
+ bytes: [1000],
+ },
+ },
+ '1': {
+ input0: {
+ type: 'channel',
+ rows: [50],
+ bytes: [500],
+ },
+ },
+ '2': {
+ input0: {
+ type: 'channel',
+ rows: [75],
+ bytes: [750],
+ },
+ },
+ },
+ },
+ );
+
+ const inactiveCount =
customStages.getInactiveWorkerCount(customStages.stages[0]);
+ expect(inactiveCount).toBe(0);
+ });
+
+ it('counts worker as active if it has output but no input yet', () => {
+ // Tests the fix: input is reported in batches, so a worker might have
output
+ // before input counters are updated. Such workers should be considered
active.
+ const customStages = new Stages(
+ [
+ {
+ stageNumber: 0,
+ definition: {
+ id: 'test-stage',
+ input: [
+ {
+ type: 'external',
+ inputSource: { type: 'http', uris: [] },
+ inputFormat: { type: 'json' },
+ signature: [],
+ },
+ ],
+ processor: { type: 'scan' },
+ signature: [],
+ shuffleSpec: {
+ type: 'targetSize',
+ clusterBy: { columns: [] },
+ targetSize: 3000000,
+ },
+ maxWorkerCount: 3,
+ },
+ phase: 'READING_INPUT',
+ workerCount: 3,
+ partitionCount: 1,
+ },
+ ],
+ {
+ '0': {
+ '0': {
+ input0: {
+ type: 'channel',
+ rows: [100],
+ bytes: [1000],
+ },
+ output: {
+ type: 'channel',
+ rows: [100],
+ bytes: [1000],
+ },
+ shuffle: {
+ type: 'channel',
+ rows: [100],
+ bytes: [1000],
+ },
+ },
+ '1': {
+ // Worker 1 has output and shuffle but input is not reported yet
(still zero)
+ // This can happen because input is reported in batches
+ input0: {
+ type: 'channel',
+ rows: [0],
+ bytes: [0],
+ },
+ output: {
+ type: 'channel',
+ rows: [50],
+ bytes: [500],
+ },
+ shuffle: {
+ type: 'channel',
+ rows: [50],
+ bytes: [500],
+ },
+ },
+ '2': {
+ // Worker 2 is truly inactive - zero across all channels
+ input0: {
+ type: 'channel',
+ rows: [0],
+ bytes: [0],
+ },
+ output: {
+ type: 'channel',
+ rows: [0],
+ bytes: [0],
+ },
+ shuffle: {
+ type: 'channel',
+ rows: [0],
+ bytes: [0],
+ },
+ },
+ },
+ },
+ );
+
+ const inactiveCount =
customStages.getInactiveWorkerCount(customStages.stages[0]);
+ // Only worker 2 should be counted as inactive
+ // Worker 1 has output/shuffle data, so it's active even though input is
zero
+ expect(inactiveCount).toBe(1);
+ });
+ });
+
describe('#getByPartitionCountersForStage', () => {
it('works for input', () => {
expect(STAGES.getByPartitionCountersForStage(STAGES.stages[2],
'in')).toMatchInlineSnapshot(`
diff --git a/web-console/src/druid-models/stages/stages.ts
b/web-console/src/druid-models/stages/stages.ts
index 65818c80467..13131dc344c 100644
--- a/web-console/src/druid-models/stages/stages.ts
+++ b/web-console/src/druid-models/stages/stages.ts
@@ -619,17 +619,17 @@ export class Stages {
getInactiveWorkerCount(stage: StageDefinition): number | undefined {
const { counters } = this;
- const { stageNumber, definition } = stage;
+ const { stageNumber } = stage;
const forStageCounters = counters?.[stageNumber];
if (!forStageCounters) return;
- const inputChannelCounters = definition.input.map((_, i) => `input${i}` as
ChannelCounterName);
+ const channelCounters = this.getChannelCounterNamesForStage(stage);
// Calculate and return the number of workers that have zero count across
all inputChannelCounters
return sum(
Object.values(forStageCounters).map(stageCounters =>
Number(
- inputChannelCounters.every(channel => {
+ channelCounters.every(channel => {
const c = stageCounters[channel];
if (!c) return true;
const totalRows = sum(c.rows || []);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]