This is an automated email from the ASF dual-hosted git repository.

gian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new caf8ce3e0b0 MSQ: Add CPU and thread usage counters. (#16914)
caf8ce3e0b0 is described below

commit caf8ce3e0b052d67c56620de459f1a27ba6f874f
Author: Gian Merlino <gianmerl...@gmail.com>
AuthorDate: Fri Aug 30 20:02:30 2024 -0700

    MSQ: Add CPU and thread usage counters. (#16914)
    
    * MSQ: Add CPU and thread usage counters.
    
    The main change adds "cpu" and "wall" counters. The "cpu" counter measures
    CPU time (using JvmUtils.getCurrentThreadCpuTime) taken up by processors
    in processing threads. The "wall" counter measures the amount of wall time
    taken up by processors in those same processing threads. Both counters are
    broken down by type of processor.
    
    This patch also includes changes to support adding new counters. Due to an
    oversight in the original design, older deserializers are not 
forwards-compatible;
    they throw errors when encountering an unknown counter type. To manage this,
    the following changes are made:
    
    1) The defaultImpl NilQueryCounterSnapshot is added to 
QueryCounterSnapshot's
       deserialization configuration. This means that any unrecognized counter 
types
       will be read as "nil" by deserializers. Going forward, once all servers 
are
       on the latest code, this is enough to enable easily adding new counters.
    
    2) A new context parameter "includeAllCounters" is added, which defaults to 
"false".
       When this parameter is set "false", only legacy counters are included. 
When set
       to "true", all counters are included. This is currently undocumented. In 
a future
       version, we should set the default to "true", and at that time, include 
a release
       note that people updating from versions prior to Druid 31 should set 
this to
       "false" until their upgrade is complete.
    
    * Style, coverage.
    
    * Fix.
---
 .../apache/druid/msq/counters/CounterNames.java    |   9 ++
 .../druid/msq/counters/CounterSnapshots.java       |   1 +
 .../apache/druid/msq/counters/CounterTracker.java  |  63 ++++++++-
 .../org/apache/druid/msq/counters/CpuCounter.java  | 148 +++++++++++++++++++++
 .../org/apache/druid/msq/counters/CpuCounters.java | 106 +++++++++++++++
 .../CpuTimeAccumulatingFrameProcessor.java         |  92 +++++++++++++
 .../CpuTimeAccumulatingProcessorManager.java       |  74 +++++++++++
 ...rSnapshot.java => NilQueryCounterSnapshot.java} |  30 ++++-
 .../druid/msq/counters/QueryCounterSnapshot.java   |   8 +-
 .../org/apache/druid/msq/exec/ControllerImpl.java  |   1 +
 .../org/apache/druid/msq/exec/RunWorkOrder.java    |  44 ++++--
 .../java/org/apache/druid/msq/exec/WorkerImpl.java |   3 +-
 .../apache/druid/msq/guice/MSQIndexingModule.java  |   6 +
 .../msq/indexing/IndexerControllerContext.java     |   4 +-
 .../druid/msq/indexing/IndexerWorkerContext.java   |   6 +-
 .../druid/msq/indexing/InputChannelsImpl.java      |  26 +++-
 .../druid/msq/util/MultiStageQueryContext.java     |  23 ++++
 .../msq/counters/CountersSnapshotTreeTest.java     |  96 +++++++++++++
 .../apache/druid/msq/counters/CpuCountersTest.java |  86 ++++++++++++
 .../frame/processor/FrameProcessorDecorator.java   |  20 ++-
 .../apache/druid/frame/processor/SuperSorter.java  |   5 +-
 .../druid/frame/processor/SuperSorterTest.java     |   3 +
 22 files changed, 821 insertions(+), 33 deletions(-)

diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/counters/CounterNames.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/counters/CounterNames.java
index 350dc958e7b..19d5da36efb 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/counters/CounterNames.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/counters/CounterNames.java
@@ -34,6 +34,7 @@ public class CounterNames
   private static final String INPUT = "input";
   private static final String OUTPUT = "output";
   private static final String SHUFFLE = "shuffle";
+  private static final String CPU = "cpu";
   private static final String SORT_PROGRESS = "sortProgress";
   private static final String SEGMENT_GENERATION_PROGRESS = 
"segmentGenerationProgress";
   private static final String WARNINGS = "warnings";
@@ -68,6 +69,14 @@ public class CounterNames
     return SHUFFLE;
   }
 
+  /**
+   * Standard name for CPU counters created by {@link CounterTracker#cpu}.
+   */
+  public static String cpu()
+  {
+    return CPU;
+  }
+
   /**
    * Standard name for a sort progress counter created by {@link 
CounterTracker#sortProgress()}.
    */
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/counters/CounterSnapshots.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/counters/CounterSnapshots.java
index d75eb1ce8cb..b1595e9a0ae 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/counters/CounterSnapshots.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/counters/CounterSnapshots.java
@@ -27,6 +27,7 @@ import java.util.Objects;
 
 /**
  * Named counter snapshots. Immutable. Often part of a {@link 
CounterSnapshotsTree}.
+ * Created by {@link CounterTracker#snapshot()}.
  */
 public class CounterSnapshots
 {
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/counters/CounterTracker.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/counters/CounterTracker.java
index c73ead63c11..599a4df92b0 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/counters/CounterTracker.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/counters/CounterTracker.java
@@ -19,7 +19,12 @@
 
 package org.apache.druid.msq.counters;
 
+import org.apache.druid.frame.processor.FrameProcessor;
 import org.apache.druid.frame.processor.SuperSorterProgressTracker;
+import org.apache.druid.frame.processor.manager.ProcessorManager;
+import org.apache.druid.msq.util.MultiStageQueryContext;
+import org.apache.druid.query.QueryContext;
+import org.apache.druid.utils.JvmUtils;
 
 import java.util.HashMap;
 import java.util.Map;
@@ -37,11 +42,55 @@ public class CounterTracker
 {
   private final ConcurrentHashMap<String, QueryCounter> countersMap = new 
ConcurrentHashMap<>();
 
+  /**
+   * See {@link MultiStageQueryContext#getIncludeAllCounters(QueryContext)}.
+   */
+  private final boolean includeAllCounters;
+
+  public CounterTracker(boolean includeAllCounters)
+  {
+    this.includeAllCounters = includeAllCounters;
+  }
+
   public ChannelCounters channel(final String name)
   {
     return counter(name, ChannelCounters::new);
   }
 
+  /**
+   * Returns a {@link CpuCounter} that can be used to accumulate CPU time 
under a particular label.
+   */
+  public CpuCounter cpu(final String name)
+  {
+    return counter(CounterNames.cpu(), CpuCounters::new).forName(name);
+  }
+
+  /**
+   * Decorates a {@link FrameProcessor} such that it accumulates CPU time 
under a particular label.
+   */
+  public <T> FrameProcessor<T> trackCpu(final FrameProcessor<T> processor, 
final String name)
+  {
+    if (JvmUtils.isThreadCpuTimeEnabled()) {
+      final CpuCounter counter = counter(CounterNames.cpu(), 
CpuCounters::new).forName(name);
+      return new CpuTimeAccumulatingFrameProcessor<>(processor, counter);
+    } else {
+      return processor;
+    }
+  }
+
+  /**
+   * Decorates a {@link ProcessorManager} such that it accumulates CPU time 
under a particular label.
+   */
+  public <T, R> ProcessorManager<T, R> trackCpu(final ProcessorManager<T, R> 
processorManager, final String name)
+  {
+    if (JvmUtils.isThreadCpuTimeEnabled()) {
+      final CpuCounter counter = counter(CounterNames.cpu(), 
CpuCounters::new).forName(name);
+      return new CpuTimeAccumulatingProcessorManager<>(processorManager, 
counter);
+    } else {
+      return processorManager;
+    }
+  }
+
   public SuperSorterProgressTracker sortProgress()
   {
     return counter(CounterNames.sortProgress(), 
SuperSorterProgressTrackerCounter::new).tracker();
@@ -69,11 +118,23 @@ public class CounterTracker
 
     for (final Map.Entry<String, QueryCounter> entry : countersMap.entrySet()) 
{
       final QueryCounterSnapshot counterSnapshot = entry.getValue().snapshot();
-      if (counterSnapshot != null) {
+      if (counterSnapshot != null && (includeAllCounters || 
isLegacyCounter(counterSnapshot))) {
         m.put(entry.getKey(), counterSnapshot);
       }
     }
 
     return new CounterSnapshots(m);
   }
+
+  /**
+   * Returns whether a counter is a "legacy counter" that can be snapshotted 
regardless of the value of
+   * {@link MultiStageQueryContext#getIncludeAllCounters(QueryContext)}.
+   */
+  private static boolean isLegacyCounter(final QueryCounterSnapshot 
counterSnapshot)
+  {
+    return counterSnapshot instanceof ChannelCounters.Snapshot
+        || counterSnapshot instanceof 
SuperSorterProgressTrackerCounter.Snapshot
+        || counterSnapshot instanceof WarningCounters.Snapshot
+        || counterSnapshot instanceof 
SegmentGenerationProgressCounter.Snapshot;
+  }
 }
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/counters/CpuCounter.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/counters/CpuCounter.java
new file mode 100644
index 00000000000..747d0e5aa3b
--- /dev/null
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/counters/CpuCounter.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.counters;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.druid.utils.JvmUtils;
+
+import java.util.Objects;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class CpuCounter implements QueryCounter
+{
+  private final AtomicLong cpuTime = new AtomicLong();
+  private final AtomicLong wallTime = new AtomicLong();
+
+  public void accumulate(final long cpu, final long wall)
+  {
+    cpuTime.addAndGet(cpu);
+    wallTime.addAndGet(wall);
+  }
+
+  public <E extends Throwable> void run(final Doer<E> doer) throws E
+  {
+    final long startCpu = JvmUtils.getCurrentThreadCpuTime();
+    final long startWall = System.nanoTime();
+
+    try {
+      doer.run();
+    }
+    finally {
+      accumulate(
+          JvmUtils.getCurrentThreadCpuTime() - startCpu,
+          System.nanoTime() - startWall
+      );
+    }
+  }
+
+  public <T, E extends Throwable> T run(final Returner<T, E> returner) throws E
+  {
+    final long startCpu = JvmUtils.getCurrentThreadCpuTime();
+    final long startWall = System.nanoTime();
+
+    try {
+      return returner.run();
+    }
+    finally {
+      accumulate(
+          JvmUtils.getCurrentThreadCpuTime() - startCpu,
+          System.nanoTime() - startWall
+      );
+    }
+  }
+
+  @Override
+  public Snapshot snapshot()
+  {
+    return new Snapshot(cpuTime.get(), wallTime.get());
+  }
+
+  @JsonTypeName("cpu")
+  public static class Snapshot implements QueryCounterSnapshot
+  {
+    private final long cpuTime;
+    private final long wallTime;
+
+    @JsonCreator
+    public Snapshot(
+        @JsonProperty("cpu") long cpuTime,
+        @JsonProperty("wall") long wallTime
+    )
+    {
+      this.cpuTime = cpuTime;
+      this.wallTime = wallTime;
+    }
+
+    @JsonProperty("cpu")
+    @JsonInclude(JsonInclude.Include.NON_DEFAULT)
+    public long getCpuTime()
+    {
+      return cpuTime;
+    }
+
+    @JsonProperty("wall")
+    @JsonInclude(JsonInclude.Include.NON_DEFAULT)
+    public long getWallTime()
+    {
+      return wallTime;
+    }
+
+    @Override
+    public boolean equals(Object o)
+    {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+      Snapshot snapshot = (Snapshot) o;
+      return cpuTime == snapshot.cpuTime && wallTime == snapshot.wallTime;
+    }
+
+    @Override
+    public int hashCode()
+    {
+      return Objects.hash(cpuTime, wallTime);
+    }
+
+    @Override
+    public String toString()
+    {
+      return "CpuCounter.Snapshot{" +
+             "cpuTime=" + cpuTime +
+             ", wallTime=" + wallTime +
+             '}';
+    }
+  }
+
+  public interface Doer<E extends Throwable>
+  {
+    void run() throws E;
+  }
+
+  public interface Returner<T, E extends Throwable>
+  {
+    T run() throws E;
+  }
+}
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/counters/CpuCounters.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/counters/CpuCounters.java
new file mode 100644
index 00000000000..8ab79b302c6
--- /dev/null
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/counters/CpuCounters.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.counters;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.fasterxml.jackson.annotation.JsonValue;
+import com.google.common.base.Preconditions;
+
+import javax.annotation.Nullable;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class CpuCounters implements QueryCounter
+{
+  public static final String LABEL_MAIN = "main";
+  public static final String LABEL_KEY_STATISTICS = "collectKeyStatistics";
+  public static final String LABEL_MERGE_INPUT = "mergeInput";
+  public static final String LABEL_HASH_PARTITION = "hashPartitionOutput";
+  public static final String LABEL_MIX = "mixOutput";
+  public static final String LABEL_SORT = "sortOutput";
+
+  private final ConcurrentHashMap<String, CpuCounter> counters = new 
ConcurrentHashMap<>();
+
+  public CpuCounter forName(final String name)
+  {
+    return counters.computeIfAbsent(name, k -> new CpuCounter());
+  }
+
+  @Nullable
+  @Override
+  public CpuCounters.Snapshot snapshot()
+  {
+    final Map<String, CpuCounter.Snapshot> snapshotMap = new HashMap<>();
+    for (Map.Entry<String, CpuCounter> entry : counters.entrySet()) {
+      snapshotMap.put(entry.getKey(), entry.getValue().snapshot());
+    }
+    return new Snapshot(snapshotMap);
+  }
+
+  @JsonTypeName("cpus")
+  public static class Snapshot implements QueryCounterSnapshot
+  {
+    // String keys, not enum, so deserialization is forwards-compatible
+    private final Map<String, CpuCounter.Snapshot> map;
+
+    @JsonCreator
+    public Snapshot(Map<String, CpuCounter.Snapshot> map)
+    {
+      this.map = Preconditions.checkNotNull(map, "map");
+    }
+
+    @JsonValue
+    public Map<String, CpuCounter.Snapshot> getCountersMap()
+    {
+      return map;
+    }
+
+    @Override
+    public boolean equals(Object o)
+    {
+
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+      Snapshot snapshot = (Snapshot) o;
+      return Objects.equals(map, snapshot.map);
+    }
+
+    @Override
+    public int hashCode()
+    {
+      return Objects.hash(map);
+    }
+
+    @Override
+    public String toString()
+    {
+      return "CpuCounters.Snapshot{" +
+             "map=" + map +
+             '}';
+    }
+  }
+}
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/counters/CpuTimeAccumulatingFrameProcessor.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/counters/CpuTimeAccumulatingFrameProcessor.java
new file mode 100644
index 00000000000..5cb6866a5c4
--- /dev/null
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/counters/CpuTimeAccumulatingFrameProcessor.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.counters;
+
+import it.unimi.dsi.fastutil.ints.IntSet;
+import org.apache.druid.frame.channel.ReadableFrameChannel;
+import org.apache.druid.frame.channel.WritableFrameChannel;
+import org.apache.druid.frame.processor.FrameProcessor;
+import org.apache.druid.frame.processor.ReturnOrAwait;
+import org.apache.druid.utils.JvmUtils;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Wrapper around {@link FrameProcessor} that accumulates time taken into a 
{@link CpuCounter}.
+ */
+public class CpuTimeAccumulatingFrameProcessor<T> implements FrameProcessor<T>
+{
+  private final FrameProcessor<T> delegate;
+  private final CpuCounter counter;
+
+  public CpuTimeAccumulatingFrameProcessor(final FrameProcessor<T> delegate, 
final CpuCounter counter)
+  {
+    this.delegate = delegate;
+    this.counter = counter;
+  }
+
+  @Override
+  public List<ReadableFrameChannel> inputChannels()
+  {
+    return delegate.inputChannels();
+  }
+
+  @Override
+  public List<WritableFrameChannel> outputChannels()
+  {
+    return delegate.outputChannels();
+  }
+
+  @Override
+  public ReturnOrAwait<T> runIncrementally(IntSet readableInputs) throws 
InterruptedException, IOException
+  {
+    // Can't use counter.run, because it turns "throws InterruptedException, 
IOException" into "throws Exception".
+    final long startCpu = JvmUtils.getCurrentThreadCpuTime();
+    final long startWall = System.nanoTime();
+
+    try {
+      return delegate.runIncrementally(readableInputs);
+    }
+    finally {
+      counter.accumulate(
+          JvmUtils.getCurrentThreadCpuTime() - startCpu,
+          System.nanoTime() - startWall
+      );
+    }
+  }
+
+  @Override
+  public void cleanup() throws IOException
+  {
+    final long startCpu = JvmUtils.getCurrentThreadCpuTime();
+    final long startWall = System.nanoTime();
+
+    try {
+      delegate.cleanup();
+    }
+    finally {
+      counter.accumulate(
+          JvmUtils.getCurrentThreadCpuTime() - startCpu,
+          System.nanoTime() - startWall
+      );
+    }
+  }
+}
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/counters/CpuTimeAccumulatingProcessorManager.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/counters/CpuTimeAccumulatingProcessorManager.java
new file mode 100644
index 00000000000..6e04e1d3c18
--- /dev/null
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/counters/CpuTimeAccumulatingProcessorManager.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.counters;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.druid.common.guava.FutureUtils;
+import org.apache.druid.frame.processor.manager.ProcessorAndCallback;
+import org.apache.druid.frame.processor.manager.ProcessorManager;
+
+import java.util.Optional;
+
+/**
+ * Wrapper around {@link ProcessorManager} that accumulates time taken into a 
{@link CpuCounter}.
+ */
+public class CpuTimeAccumulatingProcessorManager<T, R> implements 
ProcessorManager<T, R>
+{
+  private final ProcessorManager<T, R> delegate;
+  private final CpuCounter counter;
+
+  public CpuTimeAccumulatingProcessorManager(ProcessorManager<T, R> delegate, 
CpuCounter counter)
+  {
+    this.delegate = delegate;
+    this.counter = counter;
+  }
+
+  @Override
+  public ListenableFuture<Optional<ProcessorAndCallback<T>>> next()
+  {
+    // Measure time taken by delegate.next()
+    final ListenableFuture<Optional<ProcessorAndCallback<T>>> delegateNext = 
counter.run(delegate::next);
+
+    return FutureUtils.transform(
+        delegateNext,
+
+        // Don't bother measuring time taken by opt.map, it's very quick.
+        opt -> opt.map(
+            pac -> new ProcessorAndCallback<>(
+                new CpuTimeAccumulatingFrameProcessor<>(pac.processor(), 
counter),
+                // Do measure time taken by onComplete(t), though.
+                t -> counter.run(() -> pac.onComplete(t))
+            )
+        )
+    );
+  }
+
+  @Override
+  public R result()
+  {
+    return counter.run(delegate::result);
+  }
+
+  @Override
+  public void close()
+  {
+    counter.run(delegate::close);
+  }
+}
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/counters/QueryCounterSnapshot.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/counters/NilQueryCounterSnapshot.java
similarity index 55%
copy from 
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/counters/QueryCounterSnapshot.java
copy to 
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/counters/NilQueryCounterSnapshot.java
index c065f7f8252..eca3f6d6228 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/counters/QueryCounterSnapshot.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/counters/NilQueryCounterSnapshot.java
@@ -19,13 +19,33 @@
 
 package org.apache.druid.msq.counters;
 
-import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import com.fasterxml.jackson.annotation.JsonTypeName;
 
 /**
- * Marker interface for the results of {@link QueryCounter#snapshot()}. No 
methods, because the only purpose of these
- * snapshots is to pass things along from worker -> controller -> report.
+ * Represents an unknown counter type. This is the "defaultType" for {@link 
QueryCounterSnapshot}, so it is
+ * substituted at deserialization time if the type is unknown. This can happen 
when running mixed versions, where some
+ * servers support a newer counter type and some don't.
  */
-@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
-public interface QueryCounterSnapshot
+@JsonTypeName("nil")
+public class NilQueryCounterSnapshot implements QueryCounterSnapshot
 {
+  private NilQueryCounterSnapshot()
+  {
+    // Singleton
+  }
+
+  @Override
+  public int hashCode()
+  {
+    return 0;
+  }
+
+  @Override
+  public boolean equals(Object o)
+  {
+    if (this == o) {
+      return true;
+    }
+    return o != null && getClass() == o.getClass();
+  }
 }
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/counters/QueryCounterSnapshot.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/counters/QueryCounterSnapshot.java
index c065f7f8252..0fe03088ae7 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/counters/QueryCounterSnapshot.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/counters/QueryCounterSnapshot.java
@@ -24,8 +24,14 @@ import com.fasterxml.jackson.annotation.JsonTypeInfo;
 /**
  * Marker interface for the results of {@link QueryCounter#snapshot()}. No 
methods, because the only purpose of these
  * snapshots is to pass things along from worker -> controller -> report.
+ *
+ * To support easy adding of new counters, implementations must use 
forward-compatible deserialization setups.
+ * In particular, implementations should avoid using enums where new values 
may be added in the future.
+ *
+ * The default impl is {@link NilQueryCounterSnapshot}. This means that 
readers will see {@link NilQueryCounterSnapshot}
+ * if they don't understand the particular counter type in play.
  */
-@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
+@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = 
NilQueryCounterSnapshot.class)
 public interface QueryCounterSnapshot
 {
 }
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
index 46a24611351..77a0b7d48d6 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
@@ -2705,6 +2705,7 @@ public class ControllerImpl implements Controller
             () -> ArenaMemoryAllocator.createOnHeap(5_000_000),
             resultReaderExec,
             cancellationId,
+            null,
             
MultiStageQueryContext.removeNullBytes(querySpec.getQuery().context())
         );
 
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/RunWorkOrder.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/RunWorkOrder.java
index 27779f53251..e48f1ef098a 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/RunWorkOrder.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/RunWorkOrder.java
@@ -44,6 +44,7 @@ import 
org.apache.druid.frame.processor.FileOutputChannelFactory;
 import org.apache.druid.frame.processor.FrameChannelHashPartitioner;
 import org.apache.druid.frame.processor.FrameChannelMixer;
 import org.apache.druid.frame.processor.FrameProcessor;
+import org.apache.druid.frame.processor.FrameProcessorDecorator;
 import org.apache.druid.frame.processor.FrameProcessorExecutor;
 import org.apache.druid.frame.processor.OutputChannel;
 import org.apache.druid.frame.processor.OutputChannelFactory;
@@ -62,6 +63,7 @@ import org.apache.druid.java.util.common.UOE;
 import org.apache.druid.java.util.common.concurrent.Execs;
 import org.apache.druid.msq.counters.CounterNames;
 import org.apache.druid.msq.counters.CounterTracker;
+import org.apache.druid.msq.counters.CpuCounters;
 import org.apache.druid.msq.indexing.CountingOutputChannelFactory;
 import org.apache.druid.msq.indexing.InputChannelFactory;
 import org.apache.druid.msq.indexing.InputChannelsImpl;
@@ -243,6 +245,7 @@ public class RunWorkOrder
             () -> 
ArenaMemoryAllocator.createOnHeap(frameContext.memoryParameters().getStandardFrameSize()),
             exec,
             cancellationId,
+            counterTracker,
             removeNullBytes
         );
 
@@ -348,7 +351,7 @@ public class RunWorkOrder
     }
 
     final ListenableFuture<ManagerReturnType> workResultFuture = 
exec.runAllFully(
-        processorManager,
+        counterTracker.trackCpu(processorManager, CpuCounters.LABEL_MAIN),
         maxOutstandingProcessors,
         frameContext.processorBouncer(),
         cancellationId
@@ -641,7 +644,7 @@ public class RunWorkOrder
                 );
 
             return new ResultAndChannels<>(
-                exec.runFully(mixer, cancellationId),
+                exec.runFully(counterTracker.trackCpu(mixer, 
CpuCounters.LABEL_MIX), cancellationId),
                 
OutputChannels.wrap(Collections.singletonList(outputChannel.readOnly()))
             );
           }
@@ -723,6 +726,14 @@ public class RunWorkOrder
                 stageDefinition.getSortKey(),
                 partitionBoundariesFuture,
                 exec,
+                new FrameProcessorDecorator()
+                {
+                  @Override
+                  public <T> FrameProcessor<T> decorate(FrameProcessor<T> 
processor)
+                  {
+                    return counterTracker.trackCpu(processor, 
CpuCounters.LABEL_SORT);
+                  }
+                },
                 outputChannelFactory,
                 makeSuperSorterIntermediateOutputChannelFactory(sorterTmpDir),
                 memoryParameters.getSuperSorterMaxActiveProcessors(),
@@ -770,7 +781,11 @@ public class RunWorkOrder
                 )
             );
 
-            final ListenableFuture<Long> partitionerFuture = 
exec.runFully(partitioner, cancellationId);
+            final ListenableFuture<Long> partitionerFuture =
+                exec.runFully(
+                    counterTracker.trackCpu(partitioner, 
CpuCounters.LABEL_HASH_PARTITION),
+                    cancellationId
+                );
 
             final ResultAndChannels<Long> retVal =
                 new ResultAndChannels<>(partitionerFuture, 
OutputChannels.wrap(outputChannels));
@@ -844,6 +859,14 @@ public class RunWorkOrder
                         stageDefinition.getSortKey(),
                         
Futures.immediateFuture(ClusterByPartitions.oneUniversalPartition()),
                         exec,
+                        new FrameProcessorDecorator()
+                        {
+                          @Override
+                          public <T> FrameProcessor<T> 
decorate(FrameProcessor<T> processor)
+                          {
+                            return counterTracker.trackCpu(processor, 
CpuCounters.LABEL_SORT);
+                          }
+                        },
                         partitionOverrideOutputChannelFactory,
                         
makeSuperSorterIntermediateOutputChannelFactory(sorterTmpDir),
                         1,
@@ -929,13 +952,16 @@ public class RunWorkOrder
 
       final ListenableFuture<ClusterByStatisticsCollector> 
clusterByStatisticsCollectorFuture =
           exec.runAllFully(
-              ProcessorManagers.of(processors)
-                               .withAccumulation(
-                                   
stageDefinition.createResultKeyStatisticsCollector(
-                                       
frameContext.memoryParameters().getPartitionStatisticsMaxRetainedBytes()
+              counterTracker.trackCpu(
+                  ProcessorManagers.of(processors)
+                                   .withAccumulation(
+                                       
stageDefinition.createResultKeyStatisticsCollector(
+                                           
frameContext.memoryParameters().getPartitionStatisticsMaxRetainedBytes()
+                                       ),
+                                       ClusterByStatisticsCollector::addAll
                                    ),
-                                   ClusterByStatisticsCollector::addAll
-                               ),
+                  CpuCounters.LABEL_KEY_STATISTICS
+              ),
               // Run all processors simultaneously. They are lightweight and 
this keeps things moving.
               processors.size(),
               Bouncer.unlimited(),
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java
index 90f018d07ae..92664feeabb 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java
@@ -407,13 +407,14 @@ public class WorkerImpl implements Worker
     kernel.startReading();
 
     final QueryContext queryContext = task != null ? 
QueryContext.of(task.getContext()) : QueryContext.empty();
+    final boolean includeAllCounters = 
MultiStageQueryContext.getIncludeAllCounters(queryContext);
     final RunWorkOrder runWorkOrder = new RunWorkOrder(
         task.getControllerTaskId(),
         workOrder,
         inputChannelFactory,
         stageCounters.computeIfAbsent(
             IntObjectPair.of(workOrder.getWorkerNumber(), 
stageDefinition.getId()),
-            ignored -> new CounterTracker()
+            ignored -> new CounterTracker(includeAllCounters)
         ),
         workerExec,
         cancellationId,
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/guice/MSQIndexingModule.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/guice/MSQIndexingModule.java
index 4e28edc3ac1..341496f7842 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/guice/MSQIndexingModule.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/guice/MSQIndexingModule.java
@@ -27,6 +27,9 @@ import com.google.inject.Binder;
 import org.apache.druid.initialization.DruidModule;
 import org.apache.druid.msq.counters.ChannelCounters;
 import org.apache.druid.msq.counters.CounterSnapshotsSerializer;
+import org.apache.druid.msq.counters.CpuCounter;
+import org.apache.druid.msq.counters.CpuCounters;
+import org.apache.druid.msq.counters.NilQueryCounterSnapshot;
 import org.apache.druid.msq.counters.SegmentGenerationProgressCounter;
 import org.apache.druid.msq.counters.SuperSorterProgressTrackerCounter;
 import org.apache.druid.msq.counters.WarningCounters;
@@ -175,6 +178,9 @@ public class MSQIndexingModule implements DruidModule
         SuperSorterProgressTrackerCounter.Snapshot.class,
         WarningCounters.Snapshot.class,
         SegmentGenerationProgressCounter.Snapshot.class,
+        CpuCounters.Snapshot.class,
+        CpuCounter.Snapshot.class,
+        NilQueryCounterSnapshot.class,
 
         // InputSpec classes
         ExternalInputSpec.class,
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java
index 0fef9d32e6d..1037aa6c2af 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java
@@ -264,13 +264,15 @@ public class IndexerControllerContext implements 
ControllerContext
     final ImmutableMap.Builder<String, Object> taskContextOverridesBuilder = 
ImmutableMap.builder();
     final long maxParseExceptions = 
MultiStageQueryContext.getMaxParseExceptions(querySpec.getQuery().context());
     final boolean removeNullBytes = 
MultiStageQueryContext.removeNullBytes(querySpec.getQuery().context());
+    final boolean includeAllCounters = 
MultiStageQueryContext.getIncludeAllCounters(querySpec.getQuery().context());
 
     taskContextOverridesBuilder
         .put(MultiStageQueryContext.CTX_DURABLE_SHUFFLE_STORAGE, 
queryKernelConfig.isDurableStorage())
         .put(MSQWarnings.CTX_MAX_PARSE_EXCEPTIONS_ALLOWED, maxParseExceptions)
         .put(MultiStageQueryContext.CTX_IS_REINDEX, 
MSQControllerTask.isReplaceInputDataSourceTask(querySpec))
         .put(MultiStageQueryContext.CTX_MAX_CONCURRENT_STAGES, 
queryKernelConfig.getMaxConcurrentStages())
-        .put(MultiStageQueryContext.CTX_REMOVE_NULL_BYTES, removeNullBytes);
+        .put(MultiStageQueryContext.CTX_REMOVE_NULL_BYTES, removeNullBytes)
+        .put(MultiStageQueryContext.CTX_INCLUDE_ALL_COUNTERS, 
includeAllCounters);
 
     // Put the lookup loading info in the task context to facilitate selective 
loading of lookups.
     if (controllerTaskContext.get(LookupLoadingSpec.CTX_LOOKUP_LOADING_MODE) 
!= null) {
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerWorkerContext.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerWorkerContext.java
index 63358467489..0b3063ef48b 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerWorkerContext.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerWorkerContext.java
@@ -80,6 +80,7 @@ public class IndexerWorkerContext implements WorkerContext
   private final ServiceClientFactory clientFactory;
   private final MemoryIntrospector memoryIntrospector;
   private final int maxConcurrentStages;
+  private final boolean includeAllCounters;
 
   @GuardedBy("this")
   private ServiceLocator controllerLocator;
@@ -105,7 +106,10 @@ public class IndexerWorkerContext implements WorkerContext
     this.clientFactory = clientFactory;
     this.memoryIntrospector = memoryIntrospector;
     this.dataServerQueryHandlerFactory = dataServerQueryHandlerFactory;
-    this.maxConcurrentStages = 
MultiStageQueryContext.getMaxConcurrentStages(QueryContext.of(task.getContext()));
+
+    final QueryContext queryContext = QueryContext.of(task.getContext());
+    this.maxConcurrentStages = 
MultiStageQueryContext.getMaxConcurrentStages(queryContext);
+    this.includeAllCounters = 
MultiStageQueryContext.getIncludeAllCounters(queryContext);
   }
 
   public static IndexerWorkerContext createProductionInstance(
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/InputChannelsImpl.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/InputChannelsImpl.java
index 2c6539f5930..c1429625e99 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/InputChannelsImpl.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/InputChannelsImpl.java
@@ -30,6 +30,8 @@ import org.apache.druid.frame.processor.FrameChannelMixer;
 import org.apache.druid.frame.processor.FrameProcessorExecutor;
 import org.apache.druid.frame.read.FrameReader;
 import org.apache.druid.frame.write.FrameWriters;
+import org.apache.druid.msq.counters.CounterTracker;
+import org.apache.druid.msq.counters.CpuCounters;
 import org.apache.druid.msq.input.stage.InputChannels;
 import org.apache.druid.msq.input.stage.ReadablePartition;
 import org.apache.druid.msq.input.stage.ReadablePartitions;
@@ -38,6 +40,7 @@ import org.apache.druid.msq.kernel.StageDefinition;
 import org.apache.druid.msq.kernel.StageId;
 import org.apache.druid.msq.kernel.StagePartition;
 
+import javax.annotation.Nullable;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -55,17 +58,23 @@ public class InputChannelsImpl implements InputChannels
   private final InputChannelFactory channelFactory;
   private final Supplier<MemoryAllocator> allocatorMaker;
   private final FrameProcessorExecutor exec;
-  private final String cancellationId;
   private final Map<StagePartition, ReadablePartition> readablePartitionMap;
   private final boolean removeNullBytes;
 
+  @Nullable
+  private final String cancellationId;
+
+  @Nullable
+  private final CounterTracker counterTracker;
+
   public InputChannelsImpl(
       final QueryDefinition queryDefinition,
       final ReadablePartitions readablePartitions,
       final InputChannelFactory channelFactory,
       final Supplier<MemoryAllocator> allocatorMaker,
       final FrameProcessorExecutor exec,
-      final String cancellationId,
+      @Nullable final String cancellationId,
+      @Nullable final CounterTracker counterTracker,
       final boolean removeNullBytes
   )
   {
@@ -75,6 +84,7 @@ public class InputChannelsImpl implements InputChannels
     this.allocatorMaker = allocatorMaker;
     this.exec = exec;
     this.cancellationId = cancellationId;
+    this.counterTracker = counterTracker;
     this.removeNullBytes = removeNullBytes;
 
     for (final ReadablePartition readablePartition : readablePartitions) {
@@ -133,8 +143,6 @@ public class InputChannelsImpl implements InputChannels
           FrameWriters.makeRowBasedFrameWriterFactory(
               new SingleMemoryAllocatorFactory(allocatorMaker.get()),
               stageDefinition.getFrameReader().signature(),
-
-              // No sortColumns, because FrameChannelMerger generates frames 
that are sorted all on its own
               Collections.emptyList(),
               removeNullBytes
           ),
@@ -146,7 +154,10 @@ public class InputChannelsImpl implements InputChannels
       // Discard future, since there is no need to keep it. We aren't 
interested in its return value. If it fails,
       // downstream processors are notified through fail(e) on in-memory 
channels. If we need to cancel it, we use
       // the cancellationId.
-      exec.runFully(merger, cancellationId);
+      exec.runFully(
+          counterTracker == null ? merger : counterTracker.trackCpu(merger, 
CpuCounters.LABEL_MERGE_INPUT),
+          cancellationId
+      );
 
       return queueChannel.readable();
     }
@@ -171,7 +182,10 @@ public class InputChannelsImpl implements InputChannels
       // Discard future, since there is no need to keep it. We aren't 
interested in its return value. If it fails,
       // downstream processors are notified through fail(e) on in-memory 
channels. If we need to cancel it, we use
       // the cancellationId.
-      exec.runFully(muxer, cancellationId);
+      exec.runFully(
+          counterTracker == null ? muxer : counterTracker.trackCpu(muxer, 
CpuCounters.LABEL_MERGE_INPUT),
+          cancellationId
+      );
 
       return queueChannel.readable();
     }
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java
index 9b715f8c8cf..ed6a7c0e7b9 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java
@@ -30,12 +30,14 @@ import org.apache.druid.error.DruidException;
 import org.apache.druid.indexing.common.TaskLockType;
 import org.apache.druid.indexing.common.task.Tasks;
 import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.msq.counters.NilQueryCounterSnapshot;
 import org.apache.druid.msq.exec.ClusterStatisticsMergeMode;
 import org.apache.druid.msq.exec.Limits;
 import org.apache.druid.msq.exec.SegmentSource;
 import org.apache.druid.msq.indexing.destination.MSQSelectDestination;
 import org.apache.druid.msq.indexing.error.MSQWarnings;
 import org.apache.druid.msq.kernel.WorkerAssignmentStrategy;
+import org.apache.druid.msq.rpc.ControllerResource;
 import org.apache.druid.msq.sql.MSQMode;
 import org.apache.druid.query.QueryContext;
 import org.apache.druid.query.QueryContexts;
@@ -168,6 +170,19 @@ public class MultiStageQueryContext
   public static final String CTX_ARRAY_INGEST_MODE = "arrayIngestMode";
   public static final ArrayIngestMode DEFAULT_ARRAY_INGEST_MODE = 
ArrayIngestMode.ARRAY;
 
+  /**
+   * Whether new counters (anything other than channel, sortProgress, 
warnings, segmentGenerationProgress) should
+   * be included in reports. This parameter is necessary because prior to 
Druid 31, we lacked
+   * {@link NilQueryCounterSnapshot} as a default counter, which means that 
{@link SqlStatementResourceHelper} and
+   * {@link ControllerResource#httpPostCounters} would throw errors when 
encountering new counter types that they do
+   * not yet recognize. This causes problems during rolling updates.
+   *
+   * Once all servers are on Druid 31 or later, this can safely be flipped to 
"true". At that point, unknown counters
+   * are represented on the deserialization side using {@link 
NilQueryCounterSnapshot}.
+   */
+  public static final String CTX_INCLUDE_ALL_COUNTERS = "includeAllCounters";
+  public static final boolean DEFAULT_INCLUDE_ALL_COUNTERS = false;
+
   public static final String CTX_FORCE_TIME_SORT = 
DimensionsSpec.PARAMETER_FORCE_TIME_SORT;
   private static final boolean DEFAULT_FORCE_TIME_SORT = 
DimensionsSpec.DEFAULT_FORCE_TIME_SORT;
 
@@ -365,6 +380,14 @@ public class MultiStageQueryContext
     return queryContext.getEnum(CTX_ARRAY_INGEST_MODE, ArrayIngestMode.class, 
DEFAULT_ARRAY_INGEST_MODE);
   }
 
+  /**
+   * See {@link #CTX_INCLUDE_ALL_COUNTERS}.
+   */
+  public static boolean getIncludeAllCounters(final QueryContext queryContext)
+  {
+    return queryContext.getBoolean(CTX_INCLUDE_ALL_COUNTERS, 
DEFAULT_INCLUDE_ALL_COUNTERS);
+  }
+
   public static boolean isForceSegmentSortByTime(final QueryContext 
queryContext)
   {
     return queryContext.getBoolean(CTX_FORCE_TIME_SORT, 
DEFAULT_FORCE_TIME_SORT);
diff --git 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/counters/CountersSnapshotTreeTest.java
 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/counters/CountersSnapshotTreeTest.java
index a6b0e830a20..f595ee643dd 100644
--- 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/counters/CountersSnapshotTreeTest.java
+++ 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/counters/CountersSnapshotTreeTest.java
@@ -19,13 +19,20 @@
 
 package org.apache.druid.msq.counters;
 
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.collect.ImmutableMap;
 import org.apache.druid.msq.guice.MSQIndexingModule;
 import org.apache.druid.segment.TestHelper;
+import org.hamcrest.CoreMatchers;
+import org.hamcrest.MatcherAssert;
 import org.junit.Assert;
 import org.junit.Test;
 
+import java.util.Objects;
+
 public class CountersSnapshotTreeTest
 {
   @Test
@@ -46,4 +53,93 @@ public class CountersSnapshotTreeTest
 
     Assert.assertEquals(snapshotsTree.copyMap(), snapshotsTree2.copyMap());
   }
+
+  @Test
+  public void testSerdeUnknownCounter() throws Exception
+  {
+    final ObjectMapper serializationMapper =
+        TestHelper.makeJsonMapper().registerModules(new 
MSQIndexingModule().getJacksonModules());
+    serializationMapper.registerSubtypes(TestCounterSnapshot.class);
+
+    final ObjectMapper deserializationMapper =
+        TestHelper.makeJsonMapper().registerModules(new 
MSQIndexingModule().getJacksonModules());
+
+    final TestCounter testCounter = new TestCounter(10);
+    final CounterSnapshotsTree snapshotsTree = new CounterSnapshotsTree();
+    snapshotsTree.put(1, 2, new CounterSnapshots(ImmutableMap.of("ctr", 
testCounter.snapshot())));
+
+    final String json = serializationMapper.writeValueAsString(snapshotsTree);
+    final CounterSnapshotsTree snapshotsTree2 = 
serializationMapper.readValue(json, CounterSnapshotsTree.class);
+    final CounterSnapshotsTree snapshotsTree3 = 
deserializationMapper.readValue(json, CounterSnapshotsTree.class);
+
+    Assert.assertEquals(snapshotsTree.copyMap(), snapshotsTree2.copyMap());
+    Assert.assertNotEquals(snapshotsTree.copyMap(), snapshotsTree3.copyMap());
+
+    // Confirm that deserializationMapper reads the TestCounterSnapshot as a 
NilQueryCounterSnapshot.
+    MatcherAssert.assertThat(
+        snapshotsTree3.copyMap().get(1).get(2).getMap().get("ctr"),
+        CoreMatchers.instanceOf(NilQueryCounterSnapshot.class)
+    );
+  }
+
+  private static class TestCounter implements QueryCounter
+  {
+    private final int n;
+
+    public TestCounter(int n)
+    {
+      this.n = n;
+    }
+
+    @Override
+    public QueryCounterSnapshot snapshot()
+    {
+      return new TestCounterSnapshot(n);
+    }
+  }
+
+  @JsonTypeName("test")
+  private static class TestCounterSnapshot implements QueryCounterSnapshot
+  {
+    private final int n;
+
+    @JsonCreator
+    public TestCounterSnapshot(@JsonProperty("n") int n)
+    {
+      this.n = n;
+    }
+
+    @JsonProperty("n")
+    public int getN()
+    {
+      return n;
+    }
+
+    @Override
+    public boolean equals(Object o)
+    {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+      TestCounterSnapshot that = (TestCounterSnapshot) o;
+      return n == that.n;
+    }
+
+    @Override
+    public int hashCode()
+    {
+      return Objects.hashCode(n);
+    }
+
+    @Override
+    public String toString()
+    {
+      return "TestCounterSnapshot{" +
+             "n=" + n +
+             '}';
+    }
+  }
 }
diff --git 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/counters/CpuCountersTest.java
 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/counters/CpuCountersTest.java
new file mode 100644
index 00000000000..d9fee44ec2d
--- /dev/null
+++ 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/counters/CpuCountersTest.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.counters;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import nl.jqno.equalsverifier.EqualsVerifier;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class CpuCountersTest
+{
+  @Test
+  public void test_forName_runDoer()
+  {
+    final CpuCounters counters = new CpuCounters();
+    final CpuCounter counter = counters.forName("xyz");
+    counter.run(() -> { /* Nothing in particular */ });
+    final CpuCounters.Snapshot snapshot = counters.snapshot();
+    Assert.assertEquals(ImmutableSet.of("xyz"), 
snapshot.getCountersMap().keySet());
+  }
+
+  @Test
+  public void test_forName_runReturner()
+  {
+    final CpuCounters counters = new CpuCounters();
+    final CpuCounter counter = counters.forName("xyz");
+    Assert.assertEquals("boo", counter.run(() -> "boo"));
+    final CpuCounters.Snapshot snapshot = counters.snapshot();
+    Assert.assertEquals(ImmutableSet.of("xyz"), 
snapshot.getCountersMap().keySet());
+  }
+
+  @Test
+  public void test_forName_accumulate()
+  {
+    final CpuCounters counters = new CpuCounters();
+    final CpuCounter counter = counters.forName("xyz");
+    counter.accumulate(1L, 1L);
+    final CpuCounters.Snapshot snapshot = counters.snapshot();
+    Assert.assertEquals(
+        ImmutableMap.of("xyz", new CpuCounter.Snapshot(1L, 1L)),
+        snapshot.getCountersMap()
+    );
+  }
+
+  @Test
+  public void test_counter_snapshot_equals()
+  {
+    EqualsVerifier.forClass(CpuCounter.Snapshot.class)
+                  .usingGetClass()
+                  .verify();
+  }
+
+  @Test
+  public void test_counters_snapshot_equals()
+  {
+    EqualsVerifier.forClass(CpuCounters.Snapshot.class)
+                  .usingGetClass()
+                  .verify();
+  }
+
+  @Test
+  public void test_nil_equals()
+  {
+    EqualsVerifier.forClass(NilQueryCounterSnapshot.class)
+                  .usingGetClass()
+                  .verify();
+  }
+}
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/counters/QueryCounterSnapshot.java
 
b/processing/src/main/java/org/apache/druid/frame/processor/FrameProcessorDecorator.java
similarity index 66%
copy from 
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/counters/QueryCounterSnapshot.java
copy to 
processing/src/main/java/org/apache/druid/frame/processor/FrameProcessorDecorator.java
index c065f7f8252..f72765dc19f 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/counters/QueryCounterSnapshot.java
+++ 
b/processing/src/main/java/org/apache/druid/frame/processor/FrameProcessorDecorator.java
@@ -17,15 +17,21 @@
  * under the License.
  */
 
-package org.apache.druid.msq.counters;
-
-import com.fasterxml.jackson.annotation.JsonTypeInfo;
+package org.apache.druid.frame.processor;
 
 /**
- * Marker interface for the results of {@link QueryCounter#snapshot()}. No 
methods, because the only purpose of these
- * snapshots is to pass things along from worker -> controller -> report.
+ * Passed to {@link SuperSorter} to decorate the processors it launches.
  */
-@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
-public interface QueryCounterSnapshot
+public interface FrameProcessorDecorator
 {
+  FrameProcessorDecorator NONE = new FrameProcessorDecorator()
+  {
+    @Override
+    public <T> FrameProcessor<T> decorate(FrameProcessor<T> processor)
+    {
+      return processor;
+    }
+  };
+
+  <T> FrameProcessor<T> decorate(FrameProcessor<T> processor);
 }
diff --git 
a/processing/src/main/java/org/apache/druid/frame/processor/SuperSorter.java 
b/processing/src/main/java/org/apache/druid/frame/processor/SuperSorter.java
index 5cb9c0830e4..d2b6934d292 100644
--- a/processing/src/main/java/org/apache/druid/frame/processor/SuperSorter.java
+++ b/processing/src/main/java/org/apache/druid/frame/processor/SuperSorter.java
@@ -131,6 +131,7 @@ public class SuperSorter
   private final List<KeyColumn> sortKey;
   private final ListenableFuture<ClusterByPartitions> outputPartitionsFuture;
   private final FrameProcessorExecutor exec;
+  private final FrameProcessorDecorator processorDecorator;
   private final OutputChannelFactory outputChannelFactory;
   private final OutputChannelFactory intermediateOutputChannelFactory;
   private final int maxChannelsPerMerger;
@@ -224,6 +225,7 @@ public class SuperSorter
       final List<KeyColumn> sortKey,
       final ListenableFuture<ClusterByPartitions> outputPartitionsFuture,
       final FrameProcessorExecutor exec,
+      final FrameProcessorDecorator processorDecorator,
       final OutputChannelFactory outputChannelFactory,
       final OutputChannelFactory intermediateOutputChannelFactory,
       final int maxActiveProcessors,
@@ -239,6 +241,7 @@ public class SuperSorter
     this.sortKey = sortKey;
     this.outputPartitionsFuture = outputPartitionsFuture;
     this.exec = exec;
+    this.processorDecorator = processorDecorator;
     this.outputChannelFactory = outputChannelFactory;
     this.intermediateOutputChannelFactory = intermediateOutputChannelFactory;
     this.maxChannelsPerMerger = maxChannelsPerMerger;
@@ -743,7 +746,7 @@ public class SuperSorter
   private <T> void runWorker(final FrameProcessor<T> worker, final Consumer<T> 
outConsumer)
   {
     Futures.addCallback(
-        exec.runFully(worker, cancellationId),
+        exec.runFully(processorDecorator.decorate(worker), cancellationId),
         new FutureCallback<T>()
         {
           @Override
diff --git 
a/processing/src/test/java/org/apache/druid/frame/processor/SuperSorterTest.java
 
b/processing/src/test/java/org/apache/druid/frame/processor/SuperSorterTest.java
index 2690d79c7a5..36644f6d771 100644
--- 
a/processing/src/test/java/org/apache/druid/frame/processor/SuperSorterTest.java
+++ 
b/processing/src/test/java/org/apache/druid/frame/processor/SuperSorterTest.java
@@ -126,6 +126,7 @@ public class SuperSorterTest
           Collections.emptyList(),
           outputPartitionsFuture,
           exec,
+          FrameProcessorDecorator.NONE,
           new FileOutputChannelFactory(tempFolder, FRAME_SIZE, null),
           new FileOutputChannelFactory(tempFolder, FRAME_SIZE, null),
           2,
@@ -161,6 +162,7 @@ public class SuperSorterTest
           Collections.emptyList(),
           Futures.immediateFuture(ClusterByPartitions.oneUniversalPartition()),
           exec,
+          FrameProcessorDecorator.NONE,
           new FileOutputChannelFactory(tempFolder, FRAME_SIZE, null),
           new FileOutputChannelFactory(tempFolder, FRAME_SIZE, null),
           2,
@@ -345,6 +347,7 @@ public class SuperSorterTest
           clusterBy.getColumns(),
           clusterByPartitionsFuture,
           exec,
+          FrameProcessorDecorator.NONE,
           new FileOutputChannelFactory(tempFolder, maxBytesPerFrame, null),
           outputChannelFactory,
           maxActiveProcessors,


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org

Reply via email to